目录
使用RabbitMQ的三大痛点问题🚀:
- 重复消费
- 顺序性丢失
- 消息消失
1.重复消费问题(消息幂等性)🔄
为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。
解决方案:
✅ 数据库唯一键:利用数据库主键或唯一约束防止重复插入
✅ Redis Set操作:天然幂等性,自动去重
✅ 全局ID方案:为每条消息分配全局唯一ID;消费前检查Redis是否存在<id, message>
记录
1. 生产者 → 消息+全局ID → RabbitMQ
2. 消费者 ← 消息 → 检查Redis
- 已存在 → 直接确认
- 不存在 → 处理业务 → 写入Redis → 确认消息
2.顺序性丢失⚡
解决方案:
🔧 生产者层面:
单线程发送保证同一业务流消息顺序
📦 队列层面:
分区/队列划分:对于支持分区或多个队列的消息队列系统,可以将需要保证顺序的消息发送到同一个分区或队列中。
👨💻 消费者层面:
单线程消费特定队列:消费者使用给单线程来消费特定的分区合队列中的消息,可以确保消息按照消息队列顺序进行消费。
本地维护消息状态(记录已处理消息序号):消费者可以在本地存储已处理消息的状态,例如记录已处理的消息的序号或唯一标识。在消费下一个消息之前,检查该消息的序号是否符合预期顺序,如果不符合则等待或进行相应的处理
重试机制:当消费消息出现错误需要重试时,要确保重试的消息不会打乱顺序。可以将重试的消息放入一个单独的队列或延迟处理,等前面的消息都处理完成后再进行重试。
3. 消息消失🚨
- 生产者发送失败
- 消息队列存储失败
- 消费者未正确确认
解决方案:
(1) 生产者方面:RabbitMQ提供transaction和confirm模式来确保生产者不丢消息
。
⚖️transaction机制:发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。
⚡confirm模式:一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
(2)消息队列
处理消息队列丢数据的情况一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步:
①将queue的持久化标识durable设置为true,则代表是一个持久的队列(队列持久化)
②发送消息的时候将deliveryMode=2(消息持久化)
(3)消费者:消费者丢数据一般是因为采用了自动确认消息模式。MQ收到确认消息后会删除消息,如果这时消费者异常了,那消息就没了。
✋ 启用手动确认模式可以解决这个问题