Python 后端开发进阶知识全解2

发布于:2025-04-16 ⋅ 阅读:(29) ⋅ 点赞:(0)

21. 中间件开发
from flask import request

@app.before_request
def log_request_info():
    app.logger.debug(f"Request Headers: {request.headers}")
    app.logger.debug(f"Request Body: {request.get_data()}")

@app.after_request
def add_custom_header(response):
    response.headers["X-Powered-By"] = "My API Server"
    return response

解析

  • @app.before_request 在请求处理前执行(日志记录/权限校验)
  • @app.after_request 在响应返回前修改响应头
  • 中间件适用于全局逻辑处理(如跨域/限流)

22. 微服务通信(gRPC)
# protobuf 定义(user.proto)
service UserService {
    rpc GetUser (UserRequest) returns (UserResponse) {}
}

# 服务端实现
class UserServicer(user_pb2_grpc.UserServiceServicer):
    def GetUser(self, request, context):
        return user_pb2.UserResponse(id=1, name="Alice")

server = grpc.server(futures.ThreadPoolExecutor())
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()

解析

  • Protocol Buffers 定义接口契约
  • 高性能二进制通信(比 REST 快 5-7 倍)
  • 支持双向流式通信

23. 性能压测(Locust)
from locust import HttpUser, task

class ApiUser(HttpUser):
    @task
    def get_users(self):
        self.client.get("/api/users")

    @task(3)  # 权重3倍
    def create_user(self):
        self.client.post("/api/users", json={"name": "test"})

解析

  • 模拟高并发用户行为(可视化压测报告)
  • 定义任务权重控制请求比例
  • 找出系统瓶颈(数据库/网络/CPU)

24. 配置热更新
import configparser
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class ConfigHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith("config.ini"):
            reload_config()

def reload_config():
    config = configparser.ConfigParser()
    config.read('config.ini')
    app.config.update(config['DEFAULT'])

解析

  • 使用 watchdog 监听配置文件变化
  • 避免重启服务实现配置更新
  • 适用场景:功能开关/日志级别调整

25. 分布式锁(Redis)
import redis
from redis.lock import Lock

r = redis.Redis()
lock = Lock(r, "order_lock", timeout=10)

def process_order():
    if lock.acquire(blocking_timeout=5):
        try:
            # 关键业务逻辑
        finally:
            lock.release()

解析

  • 防止并发操作导致数据不一致
  • timeout 防止死锁(自动释放锁)
  • 需处理网络分区带来的脑裂问题

26. 定时任务(APScheduler)
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.add_job(clean_expired_tokens, 'cron', hour=2)
scheduler.start()

@app.teardown_appcontext
def shutdown_scheduler(exception=None):
    scheduler.shutdown()

解析

  • 后台线程执行定时任务
  • 支持 cron 表达式/间隔触发
  • 集群环境下需配合分布式锁

27. 服务健康检查
@app.route('/health')
def health_check():
    try:
        db.session.execute('SELECT 1')
        return "OK", 200
    except Exception as e:
        return str(e), 500

解析

  • Kubernetes/Docker 依赖此接口做存活检测
  • 检查核心依赖状态(数据库/缓存/第三方服务)
  • 返回详细错误信息便于诊断

28. 请求限流
from flask_limiter import Limiter

limiter = Limiter(app=app, key_func=get_remote_address)
limiter.limit("100/hour")(app.route('/api/data'))

@limiter.request_filter
def ip_whitelist():
    return request.remote_addr == "192.168.1.1"

解析

  • 基于 IP/用户ID 的速率限制
  • 白名单机制排除特定请求
  • 防止 API 被滥用/拒绝服务攻击

29. 数据分页优化
@app.route('/articles')
def get_articles():
    page = request.args.get('page', 1, type=int)
    per_page = 20
    pagination = Article.query.paginate(
        page=page, 
        per_page=per_page,
        error_out=False
    )
    return {
        "data": [article.to_dict() for article in pagination.items],
        "total": pagination.total
    }

解析

  • paginate() 方法自动计算分页参数
  • error_out=False 无效页码返回空而非404
  • 前端需处理分页元数据展示

30. 对象存储集成
import boto3
from botocore.exceptions import ClientError

s3 = boto3.client('s3', 
    aws_access_key_id=os.getenv('AWS_KEY'),
    aws_secret_access_key=os.getenv('AWS_SECRET')
)

def upload_file(file_stream, bucket, object_name):
    try:
        s3.upload_fileobj(file_stream, bucket, object_name)
        return True
    except ClientError as e:
        logger.error(e)
        return False

解析

  • 使用 AWS S3 协议兼容的存储服务
  • upload_fileobj 支持流式上传大文件
  • 生产环境需配置 CDN 加速访问

31. 服务监控(Prometheus)
from prometheus_client import Counter, generate_latest

REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP Requests')

@app.route('/metrics')
def metrics():
    REQUEST_COUNT.inc()
    return generate_latest()

@app.route('/api/data')
def get_data():
    REQUEST_COUNT.inc()
    return "Data"

解析

  • 暴露 /metrics 端点供 Prometheus 抓取
  • 定义业务指标(请求数/耗时/错误率)
  • Grafana 可视化监控仪表盘

32. 消息队列(RabbitMQ)
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(f"Received: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

解析

  • durable=True 持久化队列防止消息丢失
  • basic_ack 手动确认消息处理完成
  • 实现解耦和流量削峰

33. 数据库迁移(Alembic)
# 初始化迁移仓库
alembic init migrations

# 生成迁移脚本
alembic revision --autogenerate -m "create user table"

# 执行迁移
alembic upgrade head

解析

  • 版本化数据库 schema 变更
  • 自动对比模型与数据库差异
  • 支持回滚到历史版本

34. 文档生成(Swagger)
from flasgger import Swagger

swagger = Swagger(app)

@app.route('/api/user/<int:user_id>')
def get_user(user_id):
    """
    获取用户信息
    ---
    parameters:
      - name: user_id
        in: path
        type: integer
        required: true
    responses:
      200:
        description: 用户对象
    """
    user = User.query.get(user_id)
    return jsonify(user.to_dict())

解析

  • 自动生成交互式 API 文档
  • 通过代码注释定义接口规范
  • 支持在线测试接口

35. 国际化和本地化
from flask_babel import Babel, _

babel = Babel(app)

@babel.localeselector
def get_locale():
    return request.accept_languages.best_match(['zh', 'en'])

@app.route('/greeting')
def greeting():
    return _("Hello World")

解析

  • _() 函数实现文本翻译
  • 根据请求头自动选择语言
  • 需要维护多语言翻译文件

36. 全链路追踪(Jaeger)
from jaeger_client import Config

config = Config(config={'sampler': {'type': 'const', 'param': 1}}, service_name='user-service')
tracer = config.initialize_tracer()

@app.route('/order')
def create_order():
    with tracer.start_span('order-creation') as span:
        span.log_kv({'event': 'start processing'})
        # 业务逻辑
    return "Order created"

解析

  • 追踪跨服务调用链路
  • 分析系统性能瓶颈
  • 结合 OpenTelemetry 标准

37. 服务熔断(Hystrix)
from pyhystrix import Command

class PaymentCommand(Command):
    def run(self):
        return call_payment_service()  # 可能失败的操作
    
    def fallback(self):
        return {"status": "payment system busy"}

result = PaymentCommand().execute()

解析

  • 当失败率超过阈值时触发熔断
  • fallback 提供降级响应
  • 防止雪崩效应扩散

38. 服务发现(Consul)
import consul

c = consul.Consul()

# 注册服务
c.agent.service.register(
    'user-service',
    service_id='user-service-1',
    address='10.0.0.1',
    port=5000,
    check={
        'HTTP': 'http://10.0.0.1:5000/health',
        'Interval': '10s'
    }
)

# 发现服务
_, services = c.catalog.service('user-service')

解析

  • 服务实例自动注册/注销
  • 健康检查机制剔除故障节点
  • 客户端负载均衡基础

39. 特征开关(Feature Flag)
from flask import request
from UnleashClient import UnleashClient

unleash = UnleashClient(
    url="http://unleash:4242/api",
    app_name="my-api",
    environment="prod"
)

@app.route('/new-feature')
def new_feature():
    if unleash.is_enabled("new-ui"):
        return render_template('new-ui.html')
    else:
        return render_template('old-ui.html')

解析

  • 灰度发布新功能
  • 动态控制功能可见性
  • 支持用户分群测试

40. 实时日志分析
import logging
from logging.handlers import HTTPHandler

http_handler = HTTPHandler(
    host='logstash:8080',
    url='/logs',
    method='POST'
)
logger.addHandler(http_handler)

logger.error("Payment failed", extra={
    "user_id": 123,
    "order_id": "ABC-123"
})

解析

  • 将日志发送到 ELK 等分析系统
  • 结构化日志便于检索分析
  • 快速定位生产环境问题

架构设计原则总结

  1. 解耦原则:服务间通过明确定义的接口通信
  2. 弹性设计:具备容错、限流、熔断能力
  3. 可观测性:日志/监控/追踪三位一体
  4. 自动化优先:测试/部署/扩缩容自动化
  5. 持续演进:拥抱云原生技术栈

每个知识点对应实际生产场景中的关键需求,理解原理后可根据业务特点灵活组合使用。
声明:本文通过AI及自己整理,如有雷同纯属巧合


网站公告

今日签到

点亮在社区的每一天
去签到