Python 自动化运维与DevOps实践
https://www.python.org/static/community_logos/python-logo-master-v3-TM.png
基础设施即代码(IaC)
使用Fabric执行远程命令
python
复制
下载
from fabric import Connectiondef deploy_app():# 连接到远程服务器with Connection('web-server.example.com', user='deploy', connect_kwargs={"key_filename": "/path/to/key.pem"}) as c:# 更新代码c.run('cd /var/www/myapp && git pull origin master')# 安装依赖c.run('cd /var/www/myapp && pip install -r requirements.txt')# 重启服务c.sudo('systemctl restart myapp', pty=True)# 检查状态result = c.run('systemctl status myapp', hide=True)print(f"服务状态:\n{result.stdout}")if __name__ == '__main__':deploy_app()
Ansible Python API
python
复制
下载
from ansible.module_utils.basic import AnsibleModule import subprocessdef run_ansible_playbook(playbook_path):try:result = subprocess.run(['ansible-playbook', playbook_path],check=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)print("Playbook执行成功:")print(result.stdout)except subprocess.CalledProcessError as e:print("Playbook执行失败:")print(e.stderr)# 自定义Ansible模块示例 def configure_nginx(module):# 获取参数server_name = module.params['server_name']root_path = module.params['root_path']# 生成Nginx配置config = f"""server {{listen 80;server_name {server_name};root {root_path};location / {{try_files $uri $uri/ =404;}}}}"""# 写入配置文件try:with open(f'/etc/nginx/sites-available/{server_name}', 'w') as f:f.write(config)# 创建符号链接subprocess.run(['ln', '-sf', f'/etc/nginx/sites-available/{server_name}', f'/etc/nginx/sites-enabled/{server_name}'], check=True)# 测试并重载Nginxsubprocess.run(['nginx', '-t'], check=True)subprocess.run(['systemctl', 'reload', 'nginx'], check=True)module.exit_json(changed=True, msg="Nginx配置更新成功")except Exception as e:module.fail_json(msg=f"配置失败: {str(e)}")if __name__ == '__main__':run_ansible_playbook('deploy.yml')
监控与告警系统
Prometheus自定义指标
python
复制
下载
from prometheus_client import start_http_server, Gauge import psutil import time# 创建指标 CPU_USAGE = Gauge('system_cpu_percent', 'CPU使用百分比') MEMORY_USAGE = Gauge('system_memory_percent', '内存使用百分比') DISK_USAGE = Gauge('system_disk_percent', '磁盘使用百分比')def collect_metrics():while True:# 收集CPU使用率CPU_USAGE.set(psutil.cpu_percent())# 收集内存使用率MEMORY_USAGE.set(psutil.virtual_memory().percent)# 收集磁盘使用率DISK_USAGE.set(psutil.disk_usage('/').percent)time.sleep(5)if __name__ == '__main__':# 启动指标服务器start_http_server(8000)print("Prometheus指标服务器已启动,端口8000")collect_metrics()
告警规则与通知
python
复制
下载
import smtplib from email.mime.text import MIMEText from datetime import datetimeclass AlertManager:def __init__(self, thresholds):self.thresholds = thresholdsself.alert_history = {}def check_metrics(self, metrics):alerts = []current_time = datetime.now()# CPU检查if metrics['cpu'] > self.thresholds['cpu']:alert_key = 'high_cpu'if self._should_alert(alert_key, current_time):alerts.append(f"CPU使用率过高: {metrics['cpu']}%")# 内存检查if metrics['memory'] > self.thresholds['memory']:alert_key = 'high_memory'if self._should_alert(alert_key, current_time):alerts.append(f"内存使用率过高: {metrics['memory']}%")# 磁盘检查if metrics['disk'] > self.thresholds['disk']:alert_key = 'high_disk'if self._should_alert(alert_key, current_time):alerts.append(f"磁盘使用率过高: {metrics['disk']}%")return alertsdef _should_alert(self, alert_key, current_time):# 防止告警风暴,同一问题5分钟内不重复告警last_alert = self.alert_history.get(alert_key)if last_alert and (current_time - last_alert).seconds < 300:return Falseself.alert_history[alert_key] = current_timereturn Truedef send_email_alert(self, to_addr, subject, body):msg = MIMEText(body)msg['Subject'] = subjectmsg['From'] = 'alert@example.com'msg['To'] = to_addrtry:with smtplib.SMTP('smtp.example.com', 587) as server:server.starttls()server.login('user', 'password')server.send_message(msg)print("告警邮件发送成功")except Exception as e:print(f"发送告警邮件失败: {str(e)}")# 使用示例 thresholds = {'cpu': 90, 'memory': 85, 'disk': 90} alert_manager = AlertManager(thresholds)metrics = {'cpu': 95, 'memory': 80, 'disk': 92} alerts = alert_manager.check_metrics(metrics)if alerts:alert_manager.send_email_alert('admin@example.com','系统告警通知','\n'.join(alerts))
日志管理与分析
ELK日志处理管道
python
复制
下载
import logging from pythonjsonlogger import jsonlogger from logging.handlers import RotatingFileHandler import logstashdef setup_logging():# 创建loggerlogger = logging.getLogger('app')logger.setLevel(logging.INFO)# JSON格式formatter = jsonlogger.JsonFormatter('%(asctime)s %(levelname)s %(name)s %(message)s')# 文件处理器file_handler = RotatingFileHandler('/var/log/app/app.log',maxBytes=10*1024*1024, # 10MBbackupCount=5)file_handler.setFormatter(formatter)logger.addHandler(file_handler)# Logstash处理器logstash_handler = logstash.LogstashHandler('logstash.example.com',5044,version=1)logger.addHandler(logstash_handler)# 控制台处理器console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)return logger# 使用示例 logger = setup_logging() logger.info("应用启动", extra={'user': 'admin', 'module': 'startup'}) try:1 / 0 except Exception as e:logger.error("发生错误", exc_info=True, extra={'context': 'division'})
日志分析脚本
python
复制
下载
import pandas as pd import re from collections import Counterdef analyze_logs(log_file):# 读取日志文件logs = []with open(log_file, 'r') as f:for line in f:try:log = json.loads(line)logs.append(log)except json.JSONDecodeError:continue# 转换为DataFramedf = pd.DataFrame(logs)# 分析错误级别print("\n错误级别分布:")print(df['levelname'].value_counts())# 提取并统计错误消息error_messages = df[df['levelname'] == 'ERROR']['message']print("\n最常见错误消息:")print(error_messages.value_counts().head(5))# 提取HTTP状态码df['status_code'] = df['message'].str.extract(r'status code (\d{3})')if 'status_code' in df.columns:print("\nHTTP状态码分布:")print(df['status_code'].value_counts())# 分析时间模式df['hour'] = pd.to_datetime(df['asctime']).dt.hourprint("\n按小时分布的日志量:")print(df['hour'].value_counts().sort_index())if __name__ == '__main__':analyze_logs('/var/log/app/app.log')
配置管理
使用Python管理配置文件
python
复制
下载
import configparser import yaml import json from typing import Dict, Anyclass ConfigManager:def __init__(self):self.configs = {}def load_ini(self, filepath: str) -> Dict[str, Any]:"""加载INI配置文件"""config = configparser.ConfigParser()config.read(filepath)self.configs['ini'] = {s: dict(config.items(s)) for s in config.sections()}return self.configs['ini']def load_yaml(self, filepath: str) -> Dict[str, Any]:"""加载YAML配置文件"""with open(filepath, 'r') as f:self.configs['yaml'] = yaml.safe_load(f)return self.configs['yaml']def load_json(self, filepath: str) -> Dict[str, Any]:"""加载JSON配置文件"""with open(filepath, 'r') as f:self.configs['json'] = json.load(f)return self.configs['json']def get_value(self, config_type: str, *keys) -> Any:"""获取嵌套配置值"""current = self.configs.get(config_type, {})for key in keys:if isinstance(current, dict) and key in current:current = current[key]else:return Nonereturn currentdef update_config(self, config_type: str, updates: Dict[str, Any]):"""更新配置"""if config_type in self.configs:self.configs[config_type].update(updates)def save_config(self, config_type: str, filepath: str):"""保存配置到文件"""if config_type not in self.configs:raise ValueError(f"未加载的配置类型: {config_type}")if config_type == 'ini':config = configparser.ConfigParser()for section, options in self.configs['ini'].items():config[section] = optionswith open(filepath, 'w') as f:config.write(f)elif config_type == 'yaml':with open(filepath, 'w') as f:yaml.dump(self.configs['yaml'], f)elif config_type == 'json':with open(filepath, 'w') as f:json.dump(self.configs['json'], f, indent=2)# 使用示例 config_manager = ConfigManager() config_manager.load_yaml('config.yaml') db_host = config_manager.get_value('yaml', 'database', 'host') print(f"数据库主机: {db_host}")# 更新配置 config_manager.update_config('yaml', {'database': {'host': 'new.db.example.com'}}) config_manager.save_config('yaml', 'config_updated.yaml')
容器化与编排
Docker SDK for Python
python
复制
下载
import docker from docker.errors import DockerExceptionclass DockerManager:def __init__(self):try:self.client = docker.from_env()print("Docker连接成功")except DockerException as e:print(f"连接Docker失败: {str(e)}")raisedef list_containers(self, all=False):"""列出容器"""return self.client.containers.list(all=all)def run_container(self, image, command=None, detach=True, **kwargs):"""运行容器"""return self.client.containers.run(image,command=command,detach=detach,**kwargs)def build_image(self, path, tag, dockerfile='Dockerfile'):"""构建镜像"""return self.client.images.build(path=path,tag=tag,dockerfile=dockerfile)def cleanup_containers(self):"""清理停止的容器"""stopped_containers = self.list_containers(all=True, filters={'status': 'exited'})for container in stopped_containers:print(f"删除容器: {container.id}")container.remove()def cleanup_images(self, dangling=True):"""清理未使用的镜像"""filters = {'dangling': True} if dangling else Noneunused_images = self.client.images.list(filters=filters)for image in unused_images:print(f"删除镜像: {image.tags}")self.client.images.remove(image.id)# 使用示例 docker_manager = DockerManager() print("运行中的容器:") for container in docker_manager.list_containers():print(f" - {container.name}: {container.status}")# 运行新容器 nginx = docker_manager.run_container('nginx:latest',ports={'80/tcp': 8080},name='web-server' ) print(f"启动容器: {nginx.name}")
Kubernetes Python客户端
python
复制
下载
from kubernetes import client, configclass KubernetesManager:def __init__(self, in_cluster=False):if in_cluster:config.load_incluster_config()else:config.load_kube_config()self.core_v1 = client.CoreV1Api()self.apps_v1 = client.AppsV1Api()def list_pods(self, namespace='default'):"""列出Pod"""return self.core_v1.list_namespaced_pod(namespace)def create_deployment(self, name, image, replicas=1, namespace='default'):"""创建Deployment"""deployment = client.V1Deployment(metadata=client.V1ObjectMeta(name=name),spec=client.V1DeploymentSpec(replicas=replicas,selector={'matchLabels': {'app': name}},template=client.V1PodTemplateSpec(metadata=client.V1ObjectMeta(labels={'app': name}),spec=client.V1PodSpec(containers=[client.V1Container(name=name,image=image)]))))return self.apps_v1.create_namespaced_deployment(namespace=namespace,body=deployment)def create_service(self, name, port, target_port, namespace='default', service_type='LoadBalancer'):"""创建Service"""service = client.V1Service(metadata=client.V1ObjectMeta(name=name),spec=client.V1ServiceSpec(selector={'app': name},ports=[client.V1ServicePort(port=port,target_port=target_port)],type=service_type))return self.core_v1.create_namespaced_service(namespace=namespace,body=service)# 使用示例 k8s_manager = KubernetesManager()print("集群中的Pod:") pods = k8s_manager.list_pods() for pod in pods.items:print(f" - {pod.metadata.name}: {pod.status.phase}")# 部署应用 deployment = k8s_manager.create_deployment('myapp','myapp:1.0',replicas=3 ) service = k8s_manager.create_service('myapp-service',port=80,target_port=8080 ) print(f"已部署: {deployment.metadata.name}") print(f"已创建服务: {service.metadata.name}")
CI/CD流水线
GitLab CI Python脚本
python
复制
下载
import gitlab import requests import timeclass GitLabCICD:def __init__(self, gitlab_url, private_token, project_id):self.gl = gitlab.Gitlab(gitlab_url, private_token=private_token)self.project = self.gl.projects.get(project_id)def trigger_pipeline(self, branch='master', variables=None):"""触发CI/CD流水线"""pipeline = self.project.pipelines.create({'ref': branch,'variables': variables or {}})print(f"已触发流水线: {pipeline.id}")return pipelinedef wait_for_pipeline(self, pipeline, timeout=1800, interval=30):"""等待流水线完成"""start_time = time.time()while time.time() - start_time < timeout:pipeline.refresh()if pipeline.status in ['success', 'failed', 'canceled', 'skipped']:print(f"流水线状态: {pipeline.status}")return pipelineprint(f"等待中... 当前状态: {pipeline.status}")time.sleep(interval)raise TimeoutError("等待流水线超时")def get_job_logs(self, pipeline, job_name):"""获取作业日志"""for job in pipeline.jobs.list():if job.name == job_name:return job.trace().decode('utf-8')return Nonedef deploy_to_environment(self, environment, version):"""部署到指定环境"""# 触发部署流水线pipeline = self.trigger_pipeline(branch='master',variables={'DEPLOY_ENV': environment,'APP_VERSION': version})# 等待部署完成pipeline = self.wait_for_pipeline(pipeline)if pipeline.status == 'success':print(f"成功部署 {version} 到 {environment}")return Trueelse:logs = self.get_job_logs(pipeline, 'deploy')print(f"部署失败. 作业日志:\n{logs}")return False# 使用示例 ci_cd = GitLabCICD(gitlab_url='https://gitlab.example.com',private_token='your-private-token',project_id=12345 )# 部署新版本 ci_cd.deploy_to_environment('production', '1.2.0')
GitHub Actions Python SDK
python
复制
下载
import os from github import Github from base64 import b64encodeclass GitHubActionsManager:def __init__(self, token, repo_name):self.gh = Github(token)self.repo = self.gh.get_repo(repo_name)def create_workflow(self, workflow_name, workflow_content):"""创建或更新工作流"""workflow_path = f".github/workflows/{workflow_name}.yml"try:# 检查工作流是否存在contents = self.repo.get_contents(workflow_path)# 更新现有工作流self.repo.update_file(workflow_path,f"Update {workflow_name}",workflow_content,contents.sha)print(f"已更新工作流: {workflow_name}")except Exception as e:# 创建新工作流self.repo.create_file(workflow_path,f"Create {workflow_name}",workflow_content)print(f"已创建工作流: {workflow_name}")def trigger_workflow_dispatch(self, workflow_name, ref='master', inputs=None):"""手动触发工作流"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")workflow.create_dispatch(ref, inputs=inputs or {})print(f"已触发工作流: {workflow_name}")def get_workflow_runs(self, workflow_name, branch=None):"""获取工作流运行记录"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")runs = workflow.get_runs(branch=branch)return list(runs)def download_workflow_logs(self, run_id, output_dir='logs'):"""下载工作流日志"""run = self.repo.get_workflow_run(run_id)jobs = run.jobs()if not os.path.exists(output_dir):os.makedirs(output_dir)for job in jobs:log = job.logs()log_file = os.path.join(output_dir, f"{run_id}_{job.name}.log")with open(log_file, 'w') as f:f.write(log)print(f"已保存日志: {log_file}")# 使用示例 actions_mgr = GitHubActionsManager(token=os.getenv('GITHUB_TOKEN'),repo_name='your-username/your-repo' )# 定义CI工作流 ci_workflow = """ name: Python CIon: [push, pull_request]jobs:build:runs-on: ubuntu-lateststeps:- uses: actions/checkout@v2- name: Set up Pythonuses: actions/setup-python@v2with:python-version: '3.9'- name: Install dependenciesrun: |python -m pip install --upgrade pippip install -r requirements.txt- name: Run testsrun: |pytest """# 创建/更新工作流 actions_mgr.create_workflow("python-ci", ci_workflow)# 手动触发工作流 actions_mgr.trigger_workflow_dispatch("python-ci",inputs={"environment": "staging"} )
安全自动化
漏洞扫描集成
python
复制
下载
import subprocess import json from datetime import datetimeclass SecurityScanner:def __init__(self, output_dir='reports'):self.output_dir = output_dirif not os.path.exists(self.output_dir):os.makedirs(self.output_dir)def run_dependency_scan(self, project_path):"""运行依赖项漏洞扫描"""report_file = os.path.join(self.output_dir,f"dependency_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['safety', 'check', '--json', '--output', report_file],cwd=project_path,capture_output=True,text=True)if result.returncode == 0:print("未发现漏洞依赖")return Trueelse:with open(report_file, 'r') as f:vulnerabilities = json.load(f)print(f"发现 {len(vulnerabilities)} 个漏洞:")for vuln in vulnerabilities:print(f" - {vuln['package_name']} {vuln['analyzed_version']}: {vuln['advisory']}")return Falseexcept Exception as e:print(f"依赖扫描失败: {str(e)}")return Falsedef run_code_scan(self, project_path):"""运行静态代码分析"""report_file = os.path.join(self.output_dir,f"code_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['bandit', '-r', '.', '-f', 'json', '-o', report_file],cwd=project_path,capture_output=True,text=True)with open(report_file, 'r') as f:report = json.load(f)if report['metrics']['_totals']['issues'] == 0:print("未发现安全问题")return Trueelse:print(f"发现 {report['metrics']['_totals']['issues']} 个安全问题:")for issue in report['results']:print(f" - {issue['issue_text']} (严重性: {issue['issue_severity']})")return Falseexcept Exception as e:print(f"代码扫描失败: {str(e)}")return Falsedef generate_report(self):"""生成安全报告"""reports = []for filename in os.listdir(self.output_dir):if filename.endswith('.json'):with open(os.path.join(self.output_dir, filename), 'r') as f:reports.append({'filename': filename,'content': json.load(f)})html_report = """<html><head><title>安全扫描报告</title></head><body><h1>安全扫描报告</h1><p>生成时间: {}</p>""".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))for report in reports:html_report += f"<h2>{report['filename']}</h2>"html_report += "<pre>" + json.dumps(report['content'], indent=2) + "</pre>"html_report += "</body></html>"report_path = os.path.join(self.output_dir, 'security_report.html')with open(report_path, 'w') as f:f.write(html_report)print(f"报告已生成: {report_path}")return report_path# 使用示例 scanner = SecurityScanner() project_path = '/path/to/your/project'# 运行扫描 dep_scan_ok = scanner.run_dependency_scan(project_path) code_scan_ok = scanner.run_code_scan(project_path)# 生成报告 if not dep_scan_ok or not code_scan_ok:report_path = scanner.generate_report()print("发现安全问题,请查看报告")
自动化安全加固
python
复制
下载
import os import platform import subprocessclass SystemHardener:def __init__(self):self.system = platform.system().lower()self.hardening_actions = []def check_permissions(self, path, recommended_mode):"""检查文件权限"""current_mode = oct(os.stat(path).st_mode & 0o777)if current_mode != recommended_mode:self.hardening_actions.append(f"chmod {recommended_mode} {path}")return Falsereturn Truedef check_ssh_config(self):"""检查SSH配置"""sshd_config = '/etc/ssh/sshd_config'if not os.path.exists(sshd_config):return Truewith open(sshd_config, 'r') as f:content = f.read()checks = {'PermitRootLogin': 'no','PasswordAuthentication': 'no','X11Forwarding': 'no'}for param, expected_value in checks.items():if f"{param} {expected_value}" not in content:self.hardening_actions.append(f"echo '{param} {expected_value}' >> {sshd_config}")def apply_hardening(self, dry_run=False):"""应用安全加固"""if not self.hardening_actions:print("系统已符合安全标准")returnprint("需要执行以下加固操作:")for action in self.hardening_actions:print(f" - {action}")if dry_run:returnconfirm = input("确认执行这些操作吗? (y/n): ")if confirm.lower() == 'y':for action in self.hardening_actions:try:if action.startswith('echo'):# 处理追加配置的情况cmd, redirection = action.split('>>')param = cmd.strip().split('echo ')[1]with open(redirection.strip(), 'a') as f:f.write(f"\n{param}\n")else:subprocess.run(action, shell=True, check=True)print(f"执行成功: {action}")except subprocess.CalledProcessError as e:print(f"执行失败: {action} - {str(e)}")print("安全加固完成")else:print("操作已取消")# 使用示例 hardener = SystemHardener()# 检查关键文件权限 hardener.check_permissions('/etc/passwd', '0o644') hardener.check_permissions('/etc/shadow', '0o640')# 检查SSH配置 hardener.check_ssh_config()# 应用加固(测试运行) hardener.apply_hardening(dry_run=True)# 实际应用加固 # hardener.apply_hardening()
结语与学习路径
https://www.python.org/static/community_logos/python-powered-h-140x182.png
通过这十篇系列教程,你已经掌握了:
-
基础设施即代码(IaC)实践
-
监控告警系统构建
-
日志管理与分析技术
-
配置管理最佳实践
-
容器化与编排技术
-
CI/CD流水线实现
-
安全自动化与加固
进阶学习方向:
-
云原生技术栈:
-
深入Kubernetes Operator开发
-
服务网格(Service Mesh)实现
-
无服务器架构(Serverless)
-
-
性能优化:
-
分布式系统性能调优
-
大规模集群管理
-
高可用架构设计
-
-
安全专业领域:
-
渗透测试与红队技术
-
零信任架构实现
-
合规性自动化检查
-
-
认证体系:
-
AWS/Azure/GCP云认证
-
Certified Kubernetes Administrator
-
HashiCorp认证工程师
-
Python在自动化运维和DevOps领域的应用日益广泛,持续学习和实践将助你成为高效能的技术专家!