11高可用与容错

发布于:2025-05-30 ⋅ 阅读:(24) ⋅ 点赞:(0)

一、Broker 高可用架构设计

1.1 RabbitMQ 镜像集群方案

集群搭建步骤
# 节点1初始化
rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_management

# 节点2加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 创建镜像策略
rabbitmqctl set_policy ha-all "^celery\." '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Celery 客户端配置
app.conf.broker_url = 'amqp://user:pass@node1:5672,node2:5672,node3:5672/vhost'
app.conf.broker_failover_strategy = 'shuffle'
app.conf.broker_connection_retry_on_startup = True
app.conf.broker_heartbeat = 300  # 适当延长心跳间隔

故障转移测试场景:

import socket
from kombu import Connection

def test_failover():
    with Connection('amqp://node1:5672') as conn:
        try:
            conn.connection  # 强制建立连接
            socket.create_connection(('node1', 5672), timeout=1).close()
        except ConnectionError:
            assert conn.connection.connected  # 验证自动切换

1.2 Redis Sentinel 方案

app.conf.broker_url = 'sentinel://:mypassword@sentinel1:26379,sentinel2:26379/0'
app.conf.broker_transport_options = {
    'master_name': 'mymaster',
    'sentinel_kwargs': {'password': 'sentinel_pass'},
    'socket_timeout': 0.5,
    'retry_on_timeout': True
}

二、Worker 容错机制实现

2.1 智能重试策略

@app.task(
    autoretry_for=(TimeoutError, IOError),
    retry_backoff=30,
    retry_backoff_max=600,
    retry_jitter=True,
    max_retries=5,
    acks_late=True
)
def process_payment(order_id):
    if db.is_connection_lost():
        raise self.retry(exc=ConnectionLostError())

重试参数矩阵:

参数 推荐值 作用说明
autoretry_for (Exception,) 自动重试的异常类型
retry_backoff 30 初始退避时间(秒)
retry_backoff_max 600 最大退避时间(秒)
retry_jitter True 添加随机抖动避免惊群效应
max_retries 3-5 最大重试次数

2.2 死信队列(DLX)配置

from kombu import Exchange, Queue

dead_letter_exchange = Exchange('dlx', type='direct')
dead_letter_queue = Queue('dead_letters', 
                         exchange=dead_letter_exchange,
                         routing_key='dead_letter')

app.conf.task_queues = [
    Queue('orders',
          exchange=Exchange('orders'),
          routing_key='order.process',
          queue_arguments={
              'x-dead-letter-exchange': 'dlx',
              'x-dead-letter-routing-key': 'dead_letter'
          }),
    dead_letter_queue
]

@app.task(queue='dead_letters')
def handle_failed_task(task_id, exc):
    logger.error(f"任务 {task_id} 最终失败: {exc}")
    send_alert_to_ops(task_id, exc)

三、任务幂等性设计

3.1 幂等性保障方案

from celery import Task
from django.core.cache import caches

cache = caches['db']

class IdempotentTask(Task):
    def __call__(self, *args, **kwargs):
        task_id = self.request.id
        lock_key = f'task_lock:{task_id}'
        
        # 分布式锁实现
        if cache.add(lock_key, '1', timeout=3600):
            try:
                return self.run(*args, **kwargs)
            finally:
                cache.delete(lock_key)
        else:
            return cache.get(f'task_result:{task_id}')

@app.task(base=IdempotentTask)
def process_order(order_id):
    result = _execute_order(order_id)
    cache.set(f'task_result:{order_id}', result, 86400)
    return result

3.2 幂等性检查清单

  1. 数据库唯一约束
  2. 版本号控制机制
  3. 请求去重令牌
  4. 状态机校验
  5. 业务层面的幂等校验

四、高可用架构验证方案

4.1 混沌工程测试

import random
from unittest.mock import patch

def test_broker_failover():
    with patch('kombu.transport.pyamqp.Transport.establish_connection') as mock:
        mock.side_effect = ConnectionError
        result = process_order.delay(123)
        assert result.get(timeout=30)  # 验证任务最终成功

4.2 监控指标验证

# 重试率告警规则
alert: HighTaskRetryRate
expr: rate(celery_task_retries_total[5m]) > 0.1
for: 10m

# 死信队列监控
alert: DeadLetterQueueGrowth
expr: increase(celery_dead_letters_total[1h]) > 10

五、生产环境最佳实践

5.1 容错架构检查表

  • Broker 集群健康检查
  • Worker 节点跨AZ部署
  • 任务超时时间合理设置
  • 结果后端独立冗余部署
  • 定期执行故障演练

5.2 灾难恢复方案

# 紧急消息转移脚本
celery -A proj purge -Q orders  # 清空问题队列
celery -A proj control cancel_consumer orders  # 停止消费
celery -A proj control add_consumer orders -d backup_worker@node4  # 定向恢复

六、典型场景案例分析

6.1 金融交易系统

class TransactionTask(Task):
    acks_late = True
    reject_on_worker_lost = True
    priority = 9
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        rollback_transaction(args[0])
        super().on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=TransactionTask)
def execute_transfer(source, target, amount):
    if Transfer.objects.filter(txid=self.request.id).exists():
        return  # 幂等性检查
    _perform_transfer(source, target, amount)

6.2 物联网数据处理

@app.task(
    rate_limit='100/s',
    autoretry_for=(DeviceOfflineError,),
    retry_kwargs={'max_retries': 3, 'countdown': 5},
    queue='iot_high'
)
def process_sensor_data(device_id, readings):
    if cache.get(f'device_{device_id}_status') == 'offline':
        raise DeviceOfflineError()
    _store_readings(device_id, readings)

总结与演进路线

高可用架构成熟度模型:

基础冗余
自动故障转移
区域容灾
混沌工程验证

推荐技术组合:

  • Broker 层:RabbitMQ 镜像队列 + Keepalived VIP
  • 计算层:Kubernetes Worker 自动伸缩
  • 存储层:Redis Cluster + 持久化
  • 监控层:Prometheus + Alertmanager + Grafana

扩展能力建设:

  1. 实现跨区域双活架构
  2. 开发自动化容灾演练平台
  3. 集成AI驱动的异常预测
  4. 构建声明式任务编排系统

通过本文的架构设计和实践方案,可使Celery集群达到:

  • 99.99%的可用性 SLA
  • 秒级故障检测与恢复
  • 日均亿级任务处理能力
  • 全年计划外停机时间 < 5分钟

建议结合业务特点进行定制化设计,并建立持续改进机制,定期进行架构评审和压力测试,确保系统随业务发展持续演进。