https://www.python.org/static/community_logos/python-logo-master-v3-TM.png
基础设施即代码(IaC)
使用Fabric执行远程命令
python
复制
下载
from fabric import Connection def deploy_app(): # 连接到远程服务器 with Connection('web-server.example.com', user='deploy', connect_kwargs={"key_filename": "/path/to/key.pem"}) as c: # 更新代码 c.run('cd /var/www/myapp && git pull origin master') # 安装依赖 c.run('cd /var/www/myapp && pip install -r requirements.txt') # 重启服务 c.sudo('systemctl restart myapp', pty=True) # 检查状态 result = c.run('systemctl status myapp', hide=True) print(f"服务状态:\n{result.stdout}") if __name__ == '__main__': deploy_app()
Ansible Python API
python
复制
下载
from ansible.module_utils.basic import AnsibleModule import subprocess def run_ansible_playbook(playbook_path): try: result = subprocess.run( ['ansible-playbook', playbook_path], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) print("Playbook执行成功:") print(result.stdout) except subprocess.CalledProcessError as e: print("Playbook执行失败:") print(e.stderr) # 自定义Ansible模块示例 def configure_nginx(module): # 获取参数 server_name = module.params['server_name'] root_path = module.params['root_path'] # 生成Nginx配置 config = f""" server {{ listen 80; server_name {server_name}; root {root_path}; location / {{ try_files $uri $uri/ =404; }} }} """ # 写入配置文件 try: with open(f'/etc/nginx/sites-available/{server_name}', 'w') as f: f.write(config) # 创建符号链接 subprocess.run(['ln', '-sf', f'/etc/nginx/sites-available/{server_name}', f'/etc/nginx/sites-enabled/{server_name}'], check=True) # 测试并重载Nginx subprocess.run(['nginx', '-t'], check=True) subprocess.run(['systemctl', 'reload', 'nginx'], check=True) module.exit_json(changed=True, msg="Nginx配置更新成功") except Exception as e: module.fail_json(msg=f"配置失败: {str(e)}") if __name__ == '__main__': run_ansible_playbook('deploy.yml')
监控与告警系统
Prometheus自定义指标
python
复制
下载
from prometheus_client import start_http_server, Gauge import psutil import time # 创建指标 CPU_USAGE = Gauge('system_cpu_percent', 'CPU使用百分比') MEMORY_USAGE = Gauge('system_memory_percent', '内存使用百分比') DISK_USAGE = Gauge('system_disk_percent', '磁盘使用百分比') def collect_metrics(): while True: # 收集CPU使用率 CPU_USAGE.set(psutil.cpu_percent()) # 收集内存使用率 MEMORY_USAGE.set(psutil.virtual_memory().percent) # 收集磁盘使用率 DISK_USAGE.set(psutil.disk_usage('/').percent) time.sleep(5) if __name__ == '__main__': # 启动指标服务器 start_http_server(8000) print("Prometheus指标服务器已启动,端口8000") collect_metrics()
告警规则与通知
python
复制
下载
import smtplib from email.mime.text import MIMEText from datetime import datetime class AlertManager: def __init__(self, thresholds): self.thresholds = thresholds self.alert_history = {} def check_metrics(self, metrics): alerts = [] current_time = datetime.now() # CPU检查 if metrics['cpu'] > self.thresholds['cpu']: alert_key = 'high_cpu' if self._should_alert(alert_key, current_time): alerts.append(f"CPU使用率过高: {metrics['cpu']}%") # 内存检查 if metrics['memory'] > self.thresholds['memory']: alert_key = 'high_memory' if self._should_alert(alert_key, current_time): alerts.append(f"内存使用率过高: {metrics['memory']}%") # 磁盘检查 if metrics['disk'] > self.thresholds['disk']: alert_key = 'high_disk' if self._should_alert(alert_key, current_time): alerts.append(f"磁盘使用率过高: {metrics['disk']}%") return alerts def _should_alert(self, alert_key, current_time): # 防止告警风暴,同一问题5分钟内不重复告警 last_alert = self.alert_history.get(alert_key) if last_alert and (current_time - last_alert).seconds < 300: return False self.alert_history[alert_key] = current_time return True def send_email_alert(self, to_addr, subject, body): msg = MIMEText(body) msg['Subject'] = subject msg['From'] = 'alert@example.com' msg['To'] = to_addr try: with smtplib.SMTP('smtp.example.com', 587) as server: server.starttls() server.login('user', 'password') server.send_message(msg) print("告警邮件发送成功") except Exception as e: print(f"发送告警邮件失败: {str(e)}") # 使用示例 thresholds = {'cpu': 90, 'memory': 85, 'disk': 90} alert_manager = AlertManager(thresholds) metrics = {'cpu': 95, 'memory': 80, 'disk': 92} alerts = alert_manager.check_metrics(metrics) if alerts: alert_manager.send_email_alert( 'admin@example.com', '系统告警通知', '\n'.join(alerts) )
日志管理与分析
ELK日志处理管道
python
复制
下载
import logging from pythonjsonlogger import jsonlogger from logging.handlers import RotatingFileHandler import logstash def setup_logging(): # 创建logger logger = logging.getLogger('app') logger.setLevel(logging.INFO) # JSON格式 formatter = jsonlogger.JsonFormatter( '%(asctime)s %(levelname)s %(name)s %(message)s' ) # 文件处理器 file_handler = RotatingFileHandler( '/var/log/app/app.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Logstash处理器 logstash_handler = logstash.LogstashHandler( 'logstash.example.com', 5044, version=1 ) logger.addHandler(logstash_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger # 使用示例 logger = setup_logging() logger.info("应用启动", extra={'user': 'admin', 'module': 'startup'}) try: 1 / 0 except Exception as e: logger.error("发生错误", exc_info=True, extra={'context': 'division'})
日志分析脚本
python
复制
下载
import pandas as pd import re from collections import Counter def analyze_logs(log_file): # 读取日志文件 logs = [] with open(log_file, 'r') as f: for line in f: try: log = json.loads(line) logs.append(log) except json.JSONDecodeError: continue # 转换为DataFrame df = pd.DataFrame(logs) # 分析错误级别 print("\n错误级别分布:") print(df['levelname'].value_counts()) # 提取并统计错误消息 error_messages = df[df['levelname'] == 'ERROR']['message'] print("\n最常见错误消息:") print(error_messages.value_counts().head(5)) # 提取HTTP状态码 df['status_code'] = df['message'].str.extract(r'status code (\d{3})') if 'status_code' in df.columns: print("\nHTTP状态码分布:") print(df['status_code'].value_counts()) # 分析时间模式 df['hour'] = pd.to_datetime(df['asctime']).dt.hour print("\n按小时分布的日志量:") print(df['hour'].value_counts().sort_index()) if __name__ == '__main__': analyze_logs('/var/log/app/app.log')
配置管理
使用Python管理配置文件
python
复制
下载
import configparser import yaml import json from typing import Dict, Any class ConfigManager: def __init__(self): self.configs = {} def load_ini(self, filepath: str) -> Dict[str, Any]: """加载INI配置文件""" config = configparser.ConfigParser() config.read(filepath) self.configs['ini'] = {s: dict(config.items(s)) for s in config.sections()} return self.configs['ini'] def load_yaml(self, filepath: str) -> Dict[str, Any]: """加载YAML配置文件""" with open(filepath, 'r') as f: self.configs['yaml'] = yaml.safe_load(f) return self.configs['yaml'] def load_json(self, filepath: str) -> Dict[str, Any]: """加载JSON配置文件""" with open(filepath, 'r') as f: self.configs['json'] = json.load(f) return self.configs['json'] def get_value(self, config_type: str, *keys) -> Any: """获取嵌套配置值""" current = self.configs.get(config_type, {}) for key in keys: if isinstance(current, dict) and key in current: current = current[key] else: return None return current def update_config(self, config_type: str, updates: Dict[str, Any]): """更新配置""" if config_type in self.configs: self.configs[config_type].update(updates) def save_config(self, config_type: str, filepath: str): """保存配置到文件""" if config_type not in self.configs: raise ValueError(f"未加载的配置类型: {config_type}") if config_type == 'ini': config = configparser.ConfigParser() for section, options in self.configs['ini'].items(): config[section] = options with open(filepath, 'w') as f: config.write(f) elif config_type == 'yaml': with open(filepath, 'w') as f: yaml.dump(self.configs['yaml'], f) elif config_type == 'json': with open(filepath, 'w') as f: json.dump(self.configs['json'], f, indent=2) # 使用示例 config_manager = ConfigManager() config_manager.load_yaml('config.yaml') db_host = config_manager.get_value('yaml', 'database', 'host') print(f"数据库主机: {db_host}") # 更新配置 config_manager.update_config('yaml', {'database': {'host': 'new.db.example.com'}}) config_manager.save_config('yaml', 'config_updated.yaml')
容器化与编排
Docker SDK for Python
python
复制
下载
import docker from docker.errors import DockerException class DockerManager: def __init__(self): try: self.client = docker.from_env() print("Docker连接成功") except DockerException as e: print(f"连接Docker失败: {str(e)}") raise def list_containers(self, all=False): """列出容器""" return self.client.containers.list(all=all) def run_container(self, image, command=None, detach=True, **kwargs): """运行容器""" return self.client.containers.run( image, command=command, detach=detach, **kwargs ) def build_image(self, path, tag, dockerfile='Dockerfile'): """构建镜像""" return self.client.images.build( path=path, tag=tag, dockerfile=dockerfile ) def cleanup_containers(self): """清理停止的容器""" stopped_containers = self.list_containers(all=True, filters={'status': 'exited'}) for container in stopped_containers: print(f"删除容器: {container.id}") container.remove() def cleanup_images(self, dangling=True): """清理未使用的镜像""" filters = {'dangling': True} if dangling else None unused_images = self.client.images.list(filters=filters) for image in unused_images: print(f"删除镜像: {image.tags}") self.client.images.remove(image.id) # 使用示例 docker_manager = DockerManager() print("运行中的容器:") for container in docker_manager.list_containers(): print(f" - {container.name}: {container.status}") # 运行新容器 nginx = docker_manager.run_container( 'nginx:latest', ports={'80/tcp': 8080}, name='web-server' ) print(f"启动容器: {nginx.name}")
Kubernetes Python客户端
python
复制
下载
from kubernetes import client, config class KubernetesManager: def __init__(self, in_cluster=False): if in_cluster: config.load_incluster_config() else: config.load_kube_config() self.core_v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() def list_pods(self, namespace='default'): """列出Pod""" return self.core_v1.list_namespaced_pod(namespace) def create_deployment(self, name, image, replicas=1, namespace='default'): """创建Deployment""" deployment = client.V1Deployment( metadata=client.V1ObjectMeta(name=name), spec=client.V1DeploymentSpec( replicas=replicas, selector={'matchLabels': {'app': name}}, template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={'app': name}), spec=client.V1PodSpec( containers=[client.V1Container( name=name, image=image )] ) ) ) ) return self.apps_v1.create_namespaced_deployment( namespace=namespace, body=deployment ) def create_service(self, name, port, target_port, namespace='default', service_type='LoadBalancer'): """创建Service""" service = client.V1Service( metadata=client.V1ObjectMeta(name=name), spec=client.V1ServiceSpec( selector={'app': name}, ports=[client.V1ServicePort( port=port, target_port=target_port )], type=service_type ) ) return self.core_v1.create_namespaced_service( namespace=namespace, body=service ) # 使用示例 k8s_manager = KubernetesManager() print("集群中的Pod:") pods = k8s_manager.list_pods() for pod in pods.items: print(f" - {pod.metadata.name}: {pod.status.phase}") # 部署应用 deployment = k8s_manager.create_deployment( 'myapp', 'myapp:1.0', replicas=3 ) service = k8s_manager.create_service( 'myapp-service', port=80, target_port=8080 ) print(f"已部署: {deployment.metadata.name}") print(f"已创建服务: {service.metadata.name}")
CI/CD流水线
GitLab CI Python脚本
python
复制
下载
import gitlab import requests import time class GitLabCICD: def __init__(self, gitlab_url, private_token, project_id): self.gl = gitlab.Gitlab(gitlab_url, private_token=private_token) self.project = self.gl.projects.get(project_id) def trigger_pipeline(self, branch='master', variables=None): """触发CI/CD流水线""" pipeline = self.project.pipelines.create({ 'ref': branch, 'variables': variables or {} }) print(f"已触发流水线: {pipeline.id}") return pipeline def wait_for_pipeline(self, pipeline, timeout=1800, interval=30): """等待流水线完成""" start_time = time.time() while time.time() - start_time < timeout: pipeline.refresh() if pipeline.status in ['success', 'failed', 'canceled', 'skipped']: print(f"流水线状态: {pipeline.status}") return pipeline print(f"等待中... 当前状态: {pipeline.status}") time.sleep(interval) raise TimeoutError("等待流水线超时") def get_job_logs(self, pipeline, job_name): """获取作业日志""" for job in pipeline.jobs.list(): if job.name == job_name: return job.trace().decode('utf-8') return None def deploy_to_environment(self, environment, version): """部署到指定环境""" # 触发部署流水线 pipeline = self.trigger_pipeline( branch='master', variables={ 'DEPLOY_ENV': environment, 'APP_VERSION': version } ) # 等待部署完成 pipeline = self.wait_for_pipeline(pipeline) if pipeline.status == 'success': print(f"成功部署 {version} 到 {environment}") return True else: logs = self.get_job_logs(pipeline, 'deploy') print(f"部署失败. 作业日志:\n{logs}") return False # 使用示例 ci_cd = GitLabCICD( gitlab_url='https://gitlab.example.com', private_token='your-private-token', project_id=12345 ) # 部署新版本 ci_cd.deploy_to_environment('production', '1.2.0')
GitHub Actions Python SDK
python
复制
下载
import os from github import Github from base64 import b64encode class GitHubActionsManager: def __init__(self, token, repo_name): self.gh = Github(token) self.repo = self.gh.get_repo(repo_name) def create_workflow(self, workflow_name, workflow_content): """创建或更新工作流""" workflow_path = f".github/workflows/{workflow_name}.yml" try: # 检查工作流是否存在 contents = self.repo.get_contents(workflow_path) # 更新现有工作流 self.repo.update_file( workflow_path, f"Update {workflow_name}", workflow_content, contents.sha ) print(f"已更新工作流: {workflow_name}") except Exception as e: # 创建新工作流 self.repo.create_file( workflow_path, f"Create {workflow_name}", workflow_content ) print(f"已创建工作流: {workflow_name}") def trigger_workflow_dispatch(self, workflow_name, ref='master', inputs=None): """手动触发工作流""" workflow = self.repo.get_workflow(f"{workflow_name}.yml") workflow.create_dispatch(ref, inputs=inputs or {}) print(f"已触发工作流: {workflow_name}") def get_workflow_runs(self, workflow_name, branch=None): """获取工作流运行记录""" workflow = self.repo.get_workflow(f"{workflow_name}.yml") runs = workflow.get_runs(branch=branch) return list(runs) def download_workflow_logs(self, run_id, output_dir='logs'): """下载工作流日志""" run = self.repo.get_workflow_run(run_id) jobs = run.jobs() if not os.path.exists(output_dir): os.makedirs(output_dir) for job in jobs: log = job.logs() log_file = os.path.join(output_dir, f"{run_id}_{job.name}.log") with open(log_file, 'w') as f: f.write(log) print(f"已保存日志: {log_file}") # 使用示例 actions_mgr = GitHubActionsManager( token=os.getenv('GITHUB_TOKEN'), repo_name='your-username/your-repo' ) # 定义CI工作流 ci_workflow = """ name: Python CI on: [push, pull_request] jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Set up Python uses: actions/setup-python@v2 with: python-version: '3.9' - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Run tests run: | pytest """ # 创建/更新工作流 actions_mgr.create_workflow("python-ci", ci_workflow) # 手动触发工作流 actions_mgr.trigger_workflow_dispatch( "python-ci", inputs={"environment": "staging"} )
安全自动化
漏洞扫描集成
python
复制
下载
import subprocess import json from datetime import datetime class SecurityScanner: def __init__(self, output_dir='reports'): self.output_dir = output_dir if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) def run_dependency_scan(self, project_path): """运行依赖项漏洞扫描""" report_file = os.path.join( self.output_dir, f"dependency_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) try: result = subprocess.run( ['safety', 'check', '--json', '--output', report_file], cwd=project_path, capture_output=True, text=True ) if result.returncode == 0: print("未发现漏洞依赖") return True else: with open(report_file, 'r') as f: vulnerabilities = json.load(f) print(f"发现 {len(vulnerabilities)} 个漏洞:") for vuln in vulnerabilities: print(f" - {vuln['package_name']} {vuln['analyzed_version']}: {vuln['advisory']}") return False except Exception as e: print(f"依赖扫描失败: {str(e)}") return False def run_code_scan(self, project_path): """运行静态代码分析""" report_file = os.path.join( self.output_dir, f"code_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) try: result = subprocess.run( ['bandit', '-r', '.', '-f', 'json', '-o', report_file], cwd=project_path, capture_output=True, text=True ) with open(report_file, 'r') as f: report = json.load(f) if report['metrics']['_totals']['issues'] == 0: print("未发现安全问题") return True else: print(f"发现 {report['metrics']['_totals']['issues']} 个安全问题:") for issue in report['results']: print(f" - {issue['issue_text']} (严重性: {issue['issue_severity']})") return False except Exception as e: print(f"代码扫描失败: {str(e)}") return False def generate_report(self): """生成安全报告""" reports = [] for filename in os.listdir(self.output_dir): if filename.endswith('.json'): with open(os.path.join(self.output_dir, filename), 'r') as f: reports.append({ 'filename': filename, 'content': json.load(f) }) html_report = """ <html> <head><title>安全扫描报告</title></head> <body> <h1>安全扫描报告</h1> <p>生成时间: {}</p> """.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) for report in reports: html_report += f"<h2>{report['filename']}</h2>" html_report += "<pre>" + json.dumps(report['content'], indent=2) + "</pre>" html_report += "</body></html>" report_path = os.path.join(self.output_dir, 'security_report.html') with open(report_path, 'w') as f: f.write(html_report) print(f"报告已生成: {report_path}") return report_path # 使用示例 scanner = SecurityScanner() project_path = '/path/to/your/project' # 运行扫描 dep_scan_ok = scanner.run_dependency_scan(project_path) code_scan_ok = scanner.run_code_scan(project_path) # 生成报告 if not dep_scan_ok or not code_scan_ok: report_path = scanner.generate_report() print("发现安全问题,请查看报告")
自动化安全加固
python
复制
下载
import os import platform import subprocess class SystemHardener: def __init__(self): self.system = platform.system().lower() self.hardening_actions = [] def check_permissions(self, path, recommended_mode): """检查文件权限""" current_mode = oct(os.stat(path).st_mode & 0o777) if current_mode != recommended_mode: self.hardening_actions.append( f"chmod {recommended_mode} {path}" ) return False return True def check_ssh_config(self): """检查SSH配置""" sshd_config = '/etc/ssh/sshd_config' if not os.path.exists(sshd_config): return True with open(sshd_config, 'r') as f: content = f.read() checks = { 'PermitRootLogin': 'no', 'PasswordAuthentication': 'no', 'X11Forwarding': 'no' } for param, expected_value in checks.items(): if f"{param} {expected_value}" not in content: self.hardening_actions.append( f"echo '{param} {expected_value}' >> {sshd_config}" ) def apply_hardening(self, dry_run=False): """应用安全加固""" if not self.hardening_actions: print("系统已符合安全标准") return print("需要执行以下加固操作:") for action in self.hardening_actions: print(f" - {action}") if dry_run: return confirm = input("确认执行这些操作吗? (y/n): ") if confirm.lower() == 'y': for action in self.hardening_actions: try: if action.startswith('echo'): # 处理追加配置的情况 cmd, redirection = action.split('>>') param = cmd.strip().split('echo ')[1] with open(redirection.strip(), 'a') as f: f.write(f"\n{param}\n") else: subprocess.run(action, shell=True, check=True) print(f"执行成功: {action}") except subprocess.CalledProcessError as e: print(f"执行失败: {action} - {str(e)}") print("安全加固完成") else: print("操作已取消") # 使用示例 hardener = SystemHardener() # 检查关键文件权限 hardener.check_permissions('/etc/passwd', '0o644') hardener.check_permissions('/etc/shadow', '0o640') # 检查SSH配置 hardener.check_ssh_config() # 应用加固(测试运行) hardener.apply_hardening(dry_run=True) # 实际应用加固 # hardener.apply_hardening()
结语与学习路径
https://www.python.org/static/community_logos/python-powered-h-140x182.png
通过这十篇系列教程,你已经掌握了:
基础设施即代码(IaC)实践
监控告警系统构建
日志管理与分析技术
配置管理最佳实践
容器化与编排技术
CI/CD流水线实现
安全自动化与加固
进阶学习方向:
云原生技术栈:
深入Kubernetes Operator开发
服务网格(Service Mesh)实现
无服务器架构(Serverless)
性能优化:
分布式系统性能调优
大规模集群管理
高可用架构设计
安全专业领域:
渗透测试与红队技术
零信任架构实现
合规性自动化检查
认证体系:
AWS/Azure/GCP云认证
Certified Kubernetes Administrator
HashiCorp认证工程师
Python在自动化运维和DevOps领域的应用日益广泛,持续学习和实践将助你成为高效能的技术专家!