发布于

Python自动化脚本开发:提升工作效率的实用工具集

作者

Python自动化脚本开发:提升工作效率的实用工具集

Python在自动化脚本开发方面具有得天独厚的优势,简洁的语法和丰富的库生态使其成为自动化任务的首选语言。本文将介绍各种实用的自动化脚本开发技巧。

环境配置和核心库

项目依赖

# requirements.txt
requests==2.31.0
beautifulsoup4==4.12.2
selenium==4.11.2
pandas==2.0.3
openpyxl==3.1.2
python-docx==0.8.11
Pillow==10.0.0
schedule==1.2.0
watchdog==3.0.0
psutil==5.9.5
paramiko==3.3.1
smtplib==3.11.4
click==8.1.7
rich==13.5.2
tqdm==4.66.1
python-dotenv==1.0.0
pyyaml==6.0.1
jinja2==3.1.2

基础自动化工具包

# automation/core/base.py - 基础自动化工具
import os
import sys
import json
import yaml
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from rich.console import Console
from rich.progress import Progress, TaskID
from rich.logging import RichHandler
import click

# 配置日志
console = Console()
logging.basicConfig(
    level=logging.INFO,
    format="%(message)s",
    datefmt="[%X]",
    handlers=[RichHandler(console=console)]
)
logger = logging.getLogger(__name__)

@dataclass
class TaskResult:
    """任务执行结果"""
    success: bool
    message: str
    data: Optional[Any] = None
    execution_time: float = 0.0
    error: Optional[Exception] = None

class AutomationBase:
    """自动化脚本基类"""
    
    def __init__(self, config_path: Optional[str] = None):
        self.config = self.load_config(config_path) if config_path else {}
        self.start_time = None
        self.results = []
        
    def load_config(self, config_path: str) -> Dict:
        """加载配置文件"""
        config_file = Path(config_path)
        
        if not config_file.exists():
            logger.warning(f"配置文件不存在: {config_path}")
            return {}
        
        try:
            if config_file.suffix.lower() == '.json':
                with open(config_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            elif config_file.suffix.lower() in ['.yml', '.yaml']:
                with open(config_file, 'r', encoding='utf-8') as f:
                    return yaml.safe_load(f)
            else:
                logger.error(f"不支持的配置文件格式: {config_file.suffix}")
                return {}
        except Exception as e:
            logger.error(f"加载配置文件失败: {e}")
            return {}
    
    def execute_task(self, task_func: Callable, *args, **kwargs) -> TaskResult:
        """执行任务并记录结果"""
        start_time = time.time()
        
        try:
            result = task_func(*args, **kwargs)
            execution_time = time.time() - start_time
            
            task_result = TaskResult(
                success=True,
                message="任务执行成功",
                data=result,
                execution_time=execution_time
            )
            
            logger.info(f"任务完成,耗时: {execution_time:.2f}秒")
            
        except Exception as e:
            execution_time = time.time() - start_time
            task_result = TaskResult(
                success=False,
                message=f"任务执行失败: {str(e)}",
                execution_time=execution_time,
                error=e
            )
            
            logger.error(f"任务失败: {e}")
        
        self.results.append(task_result)
        return task_result
    
    def run_with_retry(
        self, 
        task_func: Callable, 
        max_retries: int = 3,
        delay: float = 1.0,
        *args, 
        **kwargs
    ) -> TaskResult:
        """带重试机制的任务执行"""
        for attempt in range(max_retries + 1):
            if attempt > 0:
                logger.info(f"第 {attempt} 次重试...")
                time.sleep(delay * attempt)  # 指数退避
            
            result = self.execute_task(task_func, *args, **kwargs)
            
            if result.success:
                return result
            
            if attempt == max_retries:
                logger.error(f"任务在 {max_retries} 次重试后仍然失败")
        
        return result
    
    def generate_report(self) -> Dict:
        """生成执行报告"""
        total_tasks = len(self.results)
        successful_tasks = sum(1 for r in self.results if r.success)
        failed_tasks = total_tasks - successful_tasks
        total_time = sum(r.execution_time for r in self.results)
        
        report = {
            'summary': {
                'total_tasks': total_tasks,
                'successful_tasks': successful_tasks,
                'failed_tasks': failed_tasks,
                'success_rate': successful_tasks / total_tasks if total_tasks > 0 else 0,
                'total_execution_time': total_time
            },
            'details': [
                {
                    'success': r.success,
                    'message': r.message,
                    'execution_time': r.execution_time,
                    'error': str(r.error) if r.error else None
                }
                for r in self.results
            ]
        }
        
        return report

class FileAutomation(AutomationBase):
    """文件操作自动化"""
    
    def __init__(self, config_path: Optional[str] = None):
        super().__init__(config_path)
        
    def batch_rename_files(
        self, 
        directory: str, 
        pattern: str, 
        replacement: str,
        file_extension: Optional[str] = None
    ) -> List[str]:
        """批量重命名文件"""
        directory_path = Path(directory)
        
        if not directory_path.exists():
            raise FileNotFoundError(f"目录不存在: {directory}")
        
        renamed_files = []
        
        # 获取文件列表
        if file_extension:
            files = list(directory_path.glob(f"*.{file_extension}"))
        else:
            files = [f for f in directory_path.iterdir() if f.is_file()]
        
        for file_path in files:
            old_name = file_path.name
            new_name = old_name.replace(pattern, replacement)
            
            if old_name != new_name:
                new_path = file_path.parent / new_name
                
                # 避免文件名冲突
                counter = 1
                while new_path.exists():
                    name_parts = new_name.rsplit('.', 1)
                    if len(name_parts) == 2:
                        new_name = f"{name_parts[0]}_{counter}.{name_parts[1]}"
                    else:
                        new_name = f"{new_name}_{counter}"
                    new_path = file_path.parent / new_name
                    counter += 1
                
                file_path.rename(new_path)
                renamed_files.append(f"{old_name} -> {new_name}")
                logger.info(f"重命名: {old_name} -> {new_name}")
        
        return renamed_files
    
    def organize_files_by_type(self, source_dir: str, target_dir: str) -> Dict[str, List[str]]:
        """按文件类型整理文件"""
        source_path = Path(source_dir)
        target_path = Path(target_dir)
        
        if not source_path.exists():
            raise FileNotFoundError(f"源目录不存在: {source_dir}")
        
        target_path.mkdir(parents=True, exist_ok=True)
        
        # 文件类型映射
        file_type_mapping = {
            'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'],
            'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'],
            'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'],
            'presentations': ['.ppt', '.pptx', '.odp'],
            'videos': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv'],
            'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
            'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
            'code': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c']
        }
        
        organized_files = {category: [] for category in file_type_mapping.keys()}
        organized_files['others'] = []
        
        for file_path in source_path.iterdir():
            if file_path.is_file():
                file_extension = file_path.suffix.lower()
                moved = False
                
                for category, extensions in file_type_mapping.items():
                    if file_extension in extensions:
                        # 创建目标目录
                        category_dir = target_path / category
                        category_dir.mkdir(exist_ok=True)
                        
                        # 移动文件
                        target_file = category_dir / file_path.name
                        
                        # 处理文件名冲突
                        counter = 1
                        while target_file.exists():
                            name_parts = file_path.stem, file_path.suffix
                            target_file = category_dir / f"{name_parts[0]}_{counter}{name_parts[1]}"
                            counter += 1
                        
                        file_path.rename(target_file)
                        organized_files[category].append(file_path.name)
                        logger.info(f"移动文件: {file_path.name} -> {category}/")
                        moved = True
                        break
                
                if not moved:
                    # 未分类文件移动到others目录
                    others_dir = target_path / 'others'
                    others_dir.mkdir(exist_ok=True)
                    
                    target_file = others_dir / file_path.name
                    counter = 1
                    while target_file.exists():
                        name_parts = file_path.stem, file_path.suffix
                        target_file = others_dir / f"{name_parts[0]}_{counter}{name_parts[1]}"
                        counter += 1
                    
                    file_path.rename(target_file)
                    organized_files['others'].append(file_path.name)
                    logger.info(f"移动文件: {file_path.name} -> others/")
        
        return organized_files
    
    def duplicate_file_finder(self, directory: str) -> Dict[str, List[str]]:
        """查找重复文件"""
        import hashlib
        
        directory_path = Path(directory)
        if not directory_path.exists():
            raise FileNotFoundError(f"目录不存在: {directory}")
        
        file_hashes = {}
        duplicates = {}
        
        def get_file_hash(file_path: Path) -> str:
            """计算文件MD5哈希值"""
            hash_md5 = hashlib.md5()
            with open(file_path, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_md5.update(chunk)
            return hash_md5.hexdigest()
        
        # 递归遍历目录
        for file_path in directory_path.rglob('*'):
            if file_path.is_file():
                try:
                    file_hash = get_file_hash(file_path)
                    
                    if file_hash in file_hashes:
                        # 发现重复文件
                        if file_hash not in duplicates:
                            duplicates[file_hash] = [str(file_hashes[file_hash])]
                        duplicates[file_hash].append(str(file_path))
                    else:
                        file_hashes[file_hash] = file_path
                        
                except Exception as e:
                    logger.warning(f"无法处理文件 {file_path}: {e}")
        
        # 格式化结果
        result = {}
        for file_hash, file_list in duplicates.items():
            result[f"Group_{len(result)+1}"] = file_list
            logger.info(f"发现重复文件组: {len(file_list)} 个文件")
        
        return result
    
    def clean_empty_directories(self, directory: str) -> List[str]:
        """清理空目录"""
        directory_path = Path(directory)
        if not directory_path.exists():
            raise FileNotFoundError(f"目录不存在: {directory}")
        
        removed_dirs = []
        
        # 从最深层开始清理
        for dir_path in sorted(directory_path.rglob('*'), key=lambda p: len(p.parts), reverse=True):
            if dir_path.is_dir() and dir_path != directory_path:
                try:
                    # 检查目录是否为空
                    if not any(dir_path.iterdir()):
                        dir_path.rmdir()
                        removed_dirs.append(str(dir_path))
                        logger.info(f"删除空目录: {dir_path}")
                except OSError as e:
                    logger.warning(f"无法删除目录 {dir_path}: {e}")
        
        return removed_dirs

class WebAutomation(AutomationBase):
    """Web自动化工具"""
    
    def __init__(self, config_path: Optional[str] = None):
        super().__init__(config_path)
        
    def download_files_from_urls(
        self, 
        urls: List[str], 
        download_dir: str,
        headers: Optional[Dict[str, str]] = None
    ) -> List[str]:
        """从URL列表下载文件"""
        import requests
        from urllib.parse import urlparse
        
        download_path = Path(download_dir)
        download_path.mkdir(parents=True, exist_ok=True)
        
        downloaded_files = []
        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        if headers:
            default_headers.update(headers)
        
        for url in urls:
            try:
                response = requests.get(url, headers=default_headers, stream=True)
                response.raise_for_status()
                
                # 获取文件名
                parsed_url = urlparse(url)
                filename = Path(parsed_url.path).name
                
                if not filename:
                    filename = f"download_{len(downloaded_files)+1}"
                
                # 从Content-Disposition头获取文件名
                if 'Content-Disposition' in response.headers:
                    import re
                    cd = response.headers['Content-Disposition']
                    filename_match = re.findall('filename=(.+)', cd)
                    if filename_match:
                        filename = filename_match[0].strip('"')
                
                file_path = download_path / filename
                
                # 处理文件名冲突
                counter = 1
                while file_path.exists():
                    name_parts = Path(filename).stem, Path(filename).suffix
                    new_filename = f"{name_parts[0]}_{counter}{name_parts[1]}"
                    file_path = download_path / new_filename
                    counter += 1
                
                # 下载文件
                with open(file_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                downloaded_files.append(str(file_path))
                logger.info(f"下载完成: {filename}")
                
            except Exception as e:
                logger.error(f"下载失败 {url}: {e}")
        
        return downloaded_files
    
    def scrape_website_data(
        self, 
        url: str, 
        selectors: Dict[str, str],
        output_format: str = 'json'
    ) -> Any:
        """网站数据抓取"""
        import requests
        from bs4 import BeautifulSoup
        import pandas as pd
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            scraped_data = []
            
            # 查找所有匹配的元素组合
            base_elements = soup.select(list(selectors.values())[0])
            
            for element in base_elements:
                item_data = {}
                
                for field_name, selector in selectors.items():
                    try:
                        # 在当前元素内查找
                        found_element = element.select_one(selector)
                        if found_element:
                            item_data[field_name] = found_element.get_text(strip=True)
                        else:
                            # 在整个页面查找
                            found_element = soup.select_one(selector)
                            item_data[field_name] = found_element.get_text(strip=True) if found_element else ""
                    except Exception as e:
                        logger.warning(f"提取字段 {field_name} 失败: {e}")
                        item_data[field_name] = ""
                
                if any(item_data.values()):  # 只添加非空数据
                    scraped_data.append(item_data)
            
            # 根据输出格式返回数据
            if output_format.lower() == 'json':
                return scraped_data
            elif output_format.lower() == 'csv':
                df = pd.DataFrame(scraped_data)
                return df.to_csv(index=False)
            elif output_format.lower() == 'dataframe':
                return pd.DataFrame(scraped_data)
            else:
                return scraped_data
                
        except Exception as e:
            logger.error(f"网站抓取失败: {e}")
            raise

class SystemAutomation(AutomationBase):
    """系统管理自动化"""
    
    def __init__(self, config_path: Optional[str] = None):
        super().__init__(config_path)
        
    def system_cleanup(self, cleanup_config: Dict[str, Any]) -> Dict[str, Any]:
        """系统清理"""
        import psutil
        import shutil
        
        cleanup_results = {
            'temp_files_cleaned': 0,
            'cache_cleared': 0,
            'disk_space_freed': 0,
            'processes_terminated': 0
        }
        
        # 清理临时文件
        if cleanup_config.get('clean_temp', True):
            temp_dirs = [
                Path.home() / 'AppData' / 'Local' / 'Temp',  # Windows
                Path('/tmp'),  # Linux/Mac
                Path('/var/tmp')  # Linux
            ]
            
            for temp_dir in temp_dirs:
                if temp_dir.exists():
                    try:
                        for item in temp_dir.iterdir():
                            if item.is_file():
                                size = item.stat().st_size
                                item.unlink()
                                cleanup_results['temp_files_cleaned'] += 1
                                cleanup_results['disk_space_freed'] += size
                            elif item.is_dir():
                                size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file())
                                shutil.rmtree(item, ignore_errors=True)
                                cleanup_results['disk_space_freed'] += size
                    except Exception as e:
                        logger.warning(f"清理临时目录失败 {temp_dir}: {e}")
        
        # 终止指定进程
        if 'terminate_processes' in cleanup_config:
            process_names = cleanup_config['terminate_processes']
            
            for proc in psutil.process_iter(['pid', 'name']):
                try:
                    if proc.info['name'].lower() in [name.lower() for name in process_names]:
                        proc.terminate()
                        cleanup_results['processes_terminated'] += 1
                        logger.info(f"终止进程: {proc.info['name']} (PID: {proc.info['pid']})")
                except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
                    logger.warning(f"无法终止进程: {e}")
        
        # 转换字节为MB
        cleanup_results['disk_space_freed_mb'] = cleanup_results['disk_space_freed'] / (1024 * 1024)
        
        return cleanup_results
    
    def monitor_system_resources(self, duration: int = 60, interval: int = 5) -> List[Dict]:
        """监控系统资源"""
        import psutil
        
        monitoring_data = []
        start_time = time.time()
        
        logger.info(f"开始监控系统资源,持续 {duration} 秒...")
        
        while time.time() - start_time < duration:
            try:
                # 收集系统信息
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
                disk = psutil.disk_usage('/')
                
                data_point = {
                    'timestamp': datetime.now().isoformat(),
                    'cpu_percent': cpu_percent,
                    'memory_percent': memory.percent,
                    'memory_used_gb': memory.used / (1024**3),
                    'memory_total_gb': memory.total / (1024**3),
                    'disk_percent': disk.percent,
                    'disk_used_gb': disk.used / (1024**3),
                    'disk_total_gb': disk.total / (1024**3)
                }
                
                monitoring_data.append(data_point)
                
                logger.info(
                    f"CPU: {cpu_percent}%, "
                    f"内存: {memory.percent}%, "
                    f"磁盘: {disk.percent}%"
                )
                
                time.sleep(interval)
                
            except Exception as e:
                logger.error(f"监控数据收集失败: {e}")
        
        return monitoring_data
    
    def backup_directories(
        self, 
        source_dirs: List[str], 
        backup_dir: str,
        compression: bool = True
    ) -> List[str]:
        """备份目录"""
        import shutil
        import zipfile
        
        backup_path = Path(backup_dir)
        backup_path.mkdir(parents=True, exist_ok=True)
        
        backup_files = []
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        for source_dir in source_dirs:
            source_path = Path(source_dir)
            
            if not source_path.exists():
                logger.warning(f"源目录不存在: {source_dir}")
                continue
            
            backup_name = f"{source_path.name}_backup_{timestamp}"
            
            if compression:
                # 创建ZIP备份
                zip_path = backup_path / f"{backup_name}.zip"
                
                with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                    for file_path in source_path.rglob('*'):
                        if file_path.is_file():
                            arcname = file_path.relative_to(source_path)
                            zipf.write(file_path, arcname)
                
                backup_files.append(str(zip_path))
                logger.info(f"备份完成: {source_dir} -> {zip_path}")
                
            else:
                # 直接复制目录
                target_path = backup_path / backup_name
                shutil.copytree(source_path, target_path)
                backup_files.append(str(target_path))
                logger.info(f"备份完成: {source_dir} -> {target_path}")
        
        return backup_files

# CLI接口
@click.group()
def cli():
    """Python自动化工具集"""
    pass

@cli.command()
@click.option('--directory', '-d', required=True, help='目标目录')
@click.option('--pattern', '-p', required=True, help='替换模式')
@click.option('--replacement', '-r', required=True, help='替换内容')
@click.option('--extension', '-e', help='文件扩展名')
def rename_files(directory, pattern, replacement, extension):
    """批量重命名文件"""
    file_automation = FileAutomation()
    
    result = file_automation.execute_task(
        file_automation.batch_rename_files,
        directory, pattern, replacement, extension
    )
    
    if result.success:
        console.print(f"[green]成功重命名 {len(result.data)} 个文件[/green]")
        for rename_info in result.data:
            console.print(f"  {rename_info}")
    else:
        console.print(f"[red]重命名失败: {result.message}[/red]")

@cli.command()
@click.option('--source', '-s', required=True, help='源目录')
@click.option('--target', '-t', required=True, help='目标目录')
def organize_files(source, target):
    """按类型整理文件"""
    file_automation = FileAutomation()
    
    result = file_automation.execute_task(
        file_automation.organize_files_by_type,
        source, target
    )
    
    if result.success:
        console.print("[green]文件整理完成[/green]")
        for category, files in result.data.items():
            if files:
                console.print(f"  {category}: {len(files)} 个文件")
    else:
        console.print(f"[red]文件整理失败: {result.message}[/red]")

if __name__ == "__main__":
    cli()

总结

Python自动化脚本开发的核心要点:

🎯 自动化领域

  1. 文件操作:批量处理、整理、清理
  2. Web自动化:数据抓取、文件下载
  3. 系统管理:资源监控、备份、清理
  4. 任务调度:定时执行、批处理

✅ 最佳实践

  • 模块化设计和代码复用
  • 错误处理和重试机制
  • 日志记录和进度显示
  • 配置文件和参数化

🚀 高级功能

  • 多线程和异步处理
  • GUI界面和CLI工具
  • 任务调度和监控
  • 报告生成和通知

💡 实用工具

  • 基础自动化框架
  • 文件操作工具集
  • Web自动化组件
  • 系统管理脚本

掌握Python自动化,提升工作效率!


自动化是现代工作流程的重要组成部分,Python提供了强大而灵活的工具来实现各种自动化需求。