自动化脚本快速批量处理
引言
自动化脚本是现代IT工作中不可或缺的工具,它能够显著提高工作效率,减少重复性劳动。本文将详细介绍如何利用Python编写自动化脚本来实现快速批量处理任务,涵盖文件操作、数据处理、网络请求等多个方面,并提供完整的代码示例。
一、自动化脚本基础概念
自动化脚本是通过编写程序代码来替代人工执行重复性任务的技术。它不仅可以提高工作效率,还能减少人为错误。Python因其简洁的语法和丰富的库支持,成为编写自动化脚本的首选语言。
二、文件批量处理脚本
以下是使用Python实现文件批量重命名和格式转换的完整脚本:
import os import sys from pathlib import Path def batch_rename_files(directory, prefix, suffix=''): """ 批量重命名指定目录下的文件 :param directory: 目标目录路径 :param prefix: 新文件名前缀 :param suffix: 新文件名后缀 """ for filename in os.listdir(directory): if filename.startswith('.'): continue # 跳过隐藏文件 src = os.path.join(directory, filename) dst = os.path.join(directory, f'{prefix}_{filename}{suffix}') os.rename(src, dst) print(f'Renamed: {filename} -> {dst}') def convert_image_format(input_dir, output_dir, target_format='jpg'): """ 批量转换图片格式 :param input_dir: 输入目录 :param output_dir: 输出目录 :param target_format: 目标格式(jpg/png等) """ if not os.path.exists(output_dir): os.makedirs(output_dir) for filename in os.listdir(input_dir): if filename.startswith('.'): continue if '.' not in filename: continue ext = filename.split('.')[-1].lower() if ext not in ['jpg', 'jpeg', 'png', 'gif']: continue src = os.path.join(input_dir, filename) dst = os.path.join(output_dir, f'{Path(filename).stem}.{target_format}') try: from PIL import Image img = Image.open(src) img.save(dst) print(f'Converted: {filename} -> {dst}') except Exception as e: print(f'Error converting {filename}: {str(e)}') if __name__ == '__main__': # 示例用法 # batch_rename_files('/path/to/files', 'new_prefix') # convert_image_format('/path/to/images', '/path/to/output', 'png') pass
三、数据批量处理脚本
以下是使用Python进行CSV数据批量清洗和转换的脚本:
import pandas as pd import numpy as np import os from datetime import datetime def clean_csv_data(input_file, output_file): """ 清洗CSV数据 :param input_file: 输入文件路径 :param output_file: 输出文件路径 """ try: df = pd.read_csv(input_file) # 处理缺失值 df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna('Unknown', inplace=True) # 转换日期格式 if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') # 保存清洗后的数据 df.to_csv(output_file, index=False) print(f'Cleaned data saved to {output_file}') except Exception as e: print(f'Error cleaning data: {str(e)}') def batch_process_csv(input_dir, output_dir): """ 批量处理目录下的CSV文件 :param input_dir: 输入目录 :param output_dir: 输出目录 """ if not os.path.exists(output_dir): os.makedirs(output_dir) for filename in os.listdir(input_dir): if filename.endswith('.csv'): input_path = os.path.join(input_dir, filename) output_path = os.path.join(output_dir, f'cleaned_{filename}') clean_csv_data(input_path, output_path) print(f'Processed: {filename}') if __name__ == '__main__': # 示例用法 # clean_csv_data('input.csv', 'output.csv') # batch_process_csv('/path/to/csvs', '/path/to/output') pass
四、网络请求批量处理脚本
以下是使用Python进行API批量请求的脚本:
import requests import json import time from concurrent.futures import ThreadPoolExecutor def fetch_api_data(url, params=None): """ 发送API请求并返回数据 :param url: API端点 :param params: 查询参数 :return: 响应数据 """ try: response = requests.get(url, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f'Error fetching data from {url}: {str(e)}') return None def batch_api_requests(urls, max_workers=5): """ 批量发送API请求 :param urls: 请求URL列表 :param max_workers: 最大并发数 :return: 响应结果列表 """ results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(fetch_api_data, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() if data is not None: results.append(data) except Exception as e: print(f'Error processing {url}: {str(e)}') return results def save_api_results(results, output_file): """ 保存API请求结果 :param results: 结果数据列表 :param output_file: 输出文件路径 """ try: with open(output_file, 'w') as f: json.dump(results, f, indent=4) print(f'Results saved to {output_file}') except Exception as e: print(f'Error saving results: {str(e)}') if __name__ == '__main__': # 示例用法 # urls = ['https://api.example.com/data1', 'https://api.example.com/data2'] # results = batch_api_requests(urls) # save_api_results(results, 'api_results.json') pass
五、日志批量处理脚本
以下是使用Python进行日志文件分析的脚本:
import re import pandas as pd from collections import defaultdict def parse_log_file(log_file): """ 解析日志文件并提取关键信息 :param log_file: 日志文件路径 :return: 包含日志数据的字典 """ log_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) - (\w+) - (.*)') log_data = defaultdict(list) with open(log_file, 'r') as f: for line in f: match = log_pattern.search(line.strip()) if match: timestamp, level, message = match.groups() log_data['timestamp'].append(timestamp) log_data['level'].append(level) log_data['message'].append(message) return dict(log_data) def analyze_logs(log_files, output_file): """ 分析多个日志文件并生成报告 :param log_files: 日志文件路径列表 :param output_file: 输出文件路径 """ all_data = {} for log_file in log_files: print(f'Processing {log_file}') data = parse_log_file(log_file) if data: df = pd.DataFrame(data) # 统计各级别日志数量 level_counts = df['level'].value_counts() all_data[log_file] = level_counts # 保存分析结果 if all_data: pd.DataFrame(all_data).to_csv(output_file, index=False) print(f'Log analysis saved to {output_file}') if __name__ == '__main__': # 示例用法 # log_files = ['app.log', 'error.log'] # analyze_logs(log_files, 'log_analysis.csv') pass
六、自动化脚本部署与维护
自动化脚本编写完成后,还需要考虑部署和维护的问题。以下是几个关键点:
版本控制:使用Git等工具管理脚本版本
日志记录:添加详细的日志记录功能
错误处理:完善错误处理机制
参数化配置:使用配置文件管理脚本参数
定时任务:使用cron(Linux)或任务计划程序(Windows)定时执行
结论
自动化脚本是提高工作效率的强大工具,通过本文介绍的Python脚本示例,您可以快速实现文件处理、数据处理、网络请求等批量任务。掌握这些技能将大大提升您的工作效率,减少重复劳动。