RabbitMQ 消费幂等性与消息重放实现

发布于:2025-06-28 ⋅ 阅读:(13) ⋅ 点赞:(0)

一、幂等性实现

1.1 什么是幂等性?

幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。

RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。

1.2 实现思路

RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。

消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略

 二、消息重放实现

在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。

2.1 ack和nack

如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。

(1) ack

确认消息已被消费成功。当消费者调用:

ch.basic_ack(delivery_tag=method.delivery_tag)

RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。

(2) nack

告诉RabbitMQ“我没处理好”。有两种方式:

# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

2.2 实现代码

下方代码实现了以下关键功能:

1. 消息通过 SETNX + EXPIRE 在 Redis 中写入幂等标记,确保同一消息只会被一个消费者处理。
2. 如果标记已存在,判断是“已完成”还是“正在处理”,分别选择直接确认或稍后重试。
3. 业务处理成功后将标记更新为 done 并延长过期,表示消费已完成。
4. 如果处理失败,删除标记以便下次重新消费,并根据重试次数决定是否放弃或重试。

生产者代码
import pika
import uuid

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)

message_id = str(uuid.uuid4())
body = "test message"    # 可以通过推送body = "fail message" 模拟消费异常


properties = pika.BasicProperties(
    delivery_mode=2,
    message_id=message_id
)

channel.basic_publish(
    exchange='',
    routing_key='test_queue',
    body=body,
    properties=properties
)

print(f"[x] Sent '{body}' with message_id {message_id}")

connection.close()
 消费者代码
import pika
import redis
import time

# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)

# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)

MAX_RETRY = 5

def callback(ch, method, properties, body):
    message_id = properties.message_id
    if not message_id:
        import hashlib
        message_id = hashlib.md5(body).hexdigest()

    redis_key = f"msg:{message_id}"
    retry_key = f"retry:{message_id}"

    # 尝试用SETNX写入幂等标记
    result = r.setnx(redis_key, "processing")
    if not result:
        status = r.get(redis_key)
        if status and status.decode() == "done":
            # 已经处理过
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print(f"[!] Duplicate message detected: {message_id}")
        else:
            # 正在处理,稍后重试
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
            print(f"[!] Message {message_id} is being processed by another consumer.")
        return

    # SETNX成功,要设置过期时间,防止永久占用
    r.expire(redis_key, 300)  # 300秒

    try:
        # 获取重试次数
        retry_count = r.get(retry_key)
        if retry_count is None:
            retry_count = 0
        else:
            retry_count = int(retry_count)

        print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")

        # 模拟失败
        if "fail" in body.decode():
            raise Exception("Simulated failure")

        # 业务逻辑
        # ...

        # 处理成功,改为done并延长过期
        r.set(redis_key, "done")
        r.expire(redis_key, 24*60*60)

        r.delete(retry_key)

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print("[+] Message processed successfully")

    except Exception as e:
        retry_count += 1
        r.set(retry_key, retry_count)
        r.expire(retry_key, 24*60*60)
        print(f"[!] Error processing message (retry {retry_count}): {e}")

        # 失败时删除幂等标记,下次可以继续处理
        r.delete(redis_key)

        if retry_count >= MAX_RETRY:
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print("[!] Max retries reached, moving message to dead letter log.")
        else:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue='test_queue',
    on_message_callback=callback,
    auto_ack=False
)

print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()


网站公告

今日签到

点亮在社区的每一天
去签到