Celery+RabbitMQ+Redis

发布于:2025-08-15 ⋅ 阅读:(13) ⋅ 点赞:(0)

FastApi异步任务Demo应用构建:Celery+RabbitMQ+Redis

以下是基于 FastAPI 和 Celery 构建分布式系统的核心技术方案及实现要点,结合高性能、可扩展性和生产级实践:


⚙️ 一、技术栈核心优势

  1. FastAPI

    • 异步支持(ASGI):非阻塞 I/O 处理高并发请求。
    • 自动 API 文档:集成 Swagger UI,简化接口调试。
    • 数据验证:通过 Pydantic 模型确保请求/响应数据完整性。
  2. Celery

    • 分布式任务队列:将耗时任务(如支付处理、数据分析)卸载到后台 Worker。
    • 定时任务:通过 celery beat 实现周期任务(如订单超时取消)。
    • 多节点扩展:支持横向扩展 Worker 应对高负载。
  3. Redis

    • 消息代理(Broker):传递任务消息到 Celery Worker。
    • 结果存储(Backend):持久化任务状态和执行结果。

🏗️ 二、分布式架构设计

推送任务
分发任务
分发任务
存储结果
存储结果
查询状态
FastAPI
Redis Broker
Celery Worker 1
Celery Worker 2
Redis Backend
  • 请求流程

    1. 用户请求 FastAPI 接口(如创建订单)。
    2. FastAPI 调用 task.delay() 将任务发送至 Redis 队列。
    3. Celery Worker 监听队列并执行任务(如库存扣减)。
    4. 结果存入 Redis,FastAPI 通过任务 ID 查询状态。
  • 关键组件

    • Celery Worker:执行异步任务,支持多进程并发(--concurrency=4)。
    • Celery Beat:调度定时任务(如每 30 分钟检查未支付订单)。
    • Flower:监控任务队列和 Worker 状态(可视化界面)。

📂 三、项目结构示例

project/
├── app.py                # FastAPI 主应用
├── celery_app.py         # Celery 实例化配置
├── tasks.py              # Celery 任务定义
├── models.py             # Pydantic 数据模型
├── utils.py              # 辅助函数(如任务状态查询)
└── docker-compose.yml    # 容器化部署

🧩 四、关键代码实现

  1. Celery 初始化(celery_app.py)
from celery import Celery
celery = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
celery.conf.update(timezone="Asia/Shanghai", task_serializer="json")
  1. FastAPI 集成 Celery(app.py)
from fastapi import FastAPI
from tasks import process_order

app = FastAPI()

@app.post("/order")
async def create_order(order_data: dict):
    task = process_order.delay(order_data)  # 异步触发任务
    return {"task_id": task.id, "status": "pending"}
  1. 任务状态查询接口(app.py)
from celery.result import AsyncResult

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    return {"status": result.status, "result": result.result}
  1. Celery 任务定义(tasks.py)
@celery.task
def process_order(order_data: dict):
    # 模拟耗时操作(如支付校验)
    time.sleep(5)
    update_order_status(order_data["id"], "completed")
    return {"status": "success"}

🚀 五、分布式部署方案

Docker Compose 配置(docker-compose.yml)

services:
  fastapi:
    image: fastapi-app
    ports: ["8000:8000"]
  redis:
    image: redis:latest
  celery-worker:
    image: celery-app
    command: celery -A tasks worker --loglevel=info --concurrency=4
  celery-beat:
    image: celery-app
    command: celery -A tasks beat --loglevel=info
  flower:
    image: celery-app
    command: celery -A tasks flower --port=5555
    ports: ["5555:5555"]

⚡️ 六、性能优化策略

  1. 任务重试机制
@celery.task(bind=True, max_retries=3)
def process_order(self, order_data):
    try:
        # 业务逻辑
    except Exception as e:
        self.retry(exc=e, countdown=30)  # 30秒后重试
  1. 动态任务路由
    • 为高优先级任务分配独立队列:
celery.conf.task_routes = {"critical_task": {"queue": "high_priority"}}
  1. 结果过期设置

    • 减少 Redis 存储压力:result_expires=3600(1小时自动清理)。
  2. 监控告警

    • 使用 Flower 实时监控任务失败率与队列积压。

💎 总结

FastAPI + Celery + Redis 的组合提供了从开发到部署的全栈分布式解决方案,尤其适用于订单处理、数据分析等异步场景。关键优势在于:

  • 解耦业务逻辑:API 响应与后台任务分离,避免阻塞。
  • 弹性扩展:通过增加 Celery Worker 应对流量高峰。
  • 生产就绪:定时任务、错误重试、监控等机制保障系统鲁棒性。

网站公告

今日签到

点亮在社区的每一天
去签到