🐰 Spring Boot 实现 RabbitMQ 消息可靠性机制
🔧 环境前置配置(application.yml)
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirm-type: correlated # 开启 confirm 模式(推荐) publisher-returns: true # 开启 return 回调(路由失败) listener: simple: acknowledge-mode: manual # 手动 ACK
✅ 1. 持久化机制(队列 + 消息)
📌 功能:消息和队列都设置为持久化
🔧 配置类:Queue + Exchange + Binding
@Configuration public class RabbitConfig { @Bean public Queue persistentQueue() { return QueueBuilder.durable("persistent_queue").build(); } }
✅ 生产者
@Service public class PersistentProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send() { MessageProperties props = new MessageProperties(); props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化 Message message = new Message("Hello Persistent".getBytes(), props); rabbitTemplate.send("", "persistent_queue", message); System.out.println("✅ 消息已发送(持久化)"); } }
✅ 消费者
@Component public class PersistentConsumer { @RabbitListener(queues = "persistent_queue") public void receive(String msg) { System.out.println("✅ 消费者收到:" + msg); } }
✅ 2. 生产者确认机制(Confirm)
🔧 Confirm 回调配置
@Configuration public class ConfirmCallbackConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { // Confirm 回调(消息是否到达交换机) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("✅ 交换机已收到消息"); } else { System.err.println("❌ 消息未送达交换机:" + cause); } }); // Return 回调(交换机→队列失败) rabbitTemplate.setReturnsCallback(returned -> { System.err.println("❌ 路由失败:" + new String(returned.getMessage().getBody())); }); } }
✅ 异步发送(Spring Boot 默认是异步 confirm)
@Service public class ConfirmProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send() { CorrelationData data = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( "confirm_exchange", "confirm_key", "Hello Confirm", data); System.out.println("消息发送成功,等待 confirm 回调..."); } }
✅ 3. 消费者手动确认(ACK)
✅ 配置类
@Configuration public class AckConfig { @Bean public Queue ackQueue() { return QueueBuilder.durable("ack_queue").build(); } }
✅ 生产者
@Service public class AckProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send() { rabbitTemplate.convertAndSend("", "ack_queue", "Manual ACK 消息"); } }
✅ 消费者(手动 ACK)
@Component public class AckConsumer { @RabbitListener(queues = "ack_queue", ackMode = "MANUAL") public void receive(Message message, Channel channel) throws Exception { try { String body = new String(message.getBody()); System.out.println("处理消息:" + body); // 业务处理成功后手动 ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 异常处理,拒绝并重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
✅ 4. 死信队列(DLX)
🔧 队列配置
@Configuration public class DLXConfig { @Bean public Queue normalQueue() { return QueueBuilder.durable("normal_queue") .withArgument("x-dead-letter-exchange", "dlx-exchange") // DLX绑定 .build(); } @Bean public FanoutExchange dlxExchange() { return new FanoutExchange("dlx-exchange"); } @Bean public Queue dlxQueue() { return QueueBuilder.durable("dlx_queue").build(); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()); } }
✅ 生产者
@Service public class DLXProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg) { rabbitTemplate.convertAndSend("", "normal_queue", msg); } }
✅ 消费者
@Component public class DLXConsumer { @RabbitListener(queues = "normal_queue", ackMode = "MANUAL") public void consume(Message message, Channel channel) throws IOException { String body = new String(message.getBody()); if (body.contains("error")) { // 拒绝并不重回队列 → 进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = "dlx_queue") public void handleDLX(String msg) { System.err.println("🧟 死信处理:" + msg); } }
✅ 5. 事务模式(不推荐但支持)
@Service public class TxProducer { @Autowired private ConnectionFactory connectionFactory; public void sendTx() throws Exception { try (Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(false)) { channel.txSelect(); // 开启事务 try { channel.queueDeclare("tx_queue", true, false, false, null); channel.basicPublish("", "tx_queue", null, "事务消息".getBytes()); channel.txCommit(); System.out.println("✅ 事务提交成功"); } catch (Exception e) { channel.txRollback(); System.out.println("❌ 事务回滚:" + e.getMessage()); } } } }
✅ 对比总结
机制 Spring Boot 配置 可靠性 性能 推荐 持久化 durable + deliveryMode ✅ 高 中等 ✅ 推荐 Confirm publisher-confirm-type ✅ 高 高 ✅ 推荐 手动 ACK acknowledge-mode=manual ✅ 高 中等 ✅ 推荐 死信队列 x-dead-letter-exchange ✅ 高 高 ✅ 推荐 事务模式 txSelect + txCommit ✅ 非常高 ❌ 低 🚫 不推荐 最后:在生产者的确认机制(Confirm)分两种
1. RabbitTemplate + ConfirmCallback(这种就是上面写的)(推荐)
作用范围:
RabbitTemplate 是 Spring Boot 封装的模板工具类,ConfirmCallback 监听的是通过这个 RabbitTemplate 发送的所有消息。
一旦配置了publisher-confirm-type: correlated
,该模板发送到任意交换机的消息都会触发回调。使用特点:
全局生效,适合大部分场景。
不需要手动调用
waitForConfirms()
,由 Spring 异步回调通知发送结果。需要在
application.yml
中开启publisher-confirm-type
,否则不生效。如果使用多个 RabbitTemplate,需要分别配置 ConfirmCallback。
适用场景:
大部分 Spring Boot 项目中使用。
需要简单统一的确认机制,而不是针对单一消息进行控制。
适合与消息持久化、重试机制结合,统一日志记录发送结果。
2. Channel + waitForConfirms(使用的java原生的 ⚠️ 不推荐)
作用范围:
这是 RabbitMQ 原生的 Channel 级别确认。它只对当前 Channel 发送的消息有效,并且可以精确控制每一条消息的确认状态。使用特点:
局部控制,只影响当前 Channel。
不需要 Spring 配置,直接用
channel.confirmSelect()
开启。可以 单条确认(
waitForConfirms()
)或 批量确认(waitForConfirmsOrDie()
)。可以与异步确认(
addConfirmListener
)结合,实现细粒度的消息追踪。适用场景:
需要对某一批消息单独控制确认(而不是整个系统都一个回调)。
在一个服务中需要对不同交换机、不同消息类型采取不同的确认策略。
对性能有较高要求,需要批量确认或者异步确认机制。
2.1 代码样例
@Component public class ConfirmSyncBatchProducer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private CachingConnectionFactory connectionFactory; public void sendBatchSyncConfirm() { //Spring Boot 不支持原生 channel.waitForConfirms() 的方式 //但我们可用 ChannelCallback 手动实现 rabbitTemplate.execute(channel -> { channel.confirmSelect(); // 启用 confirm 模式 try { for (int i = 0; i < 5; i++) { String msg = "Sync Batch " + i; channel.basicPublish("", "confirm-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); } if (!channel.waitForConfirms()) { System.out.println("❌ 批量同步确认失败!"); } else { System.out.println("✅ 批量同步确认成功!"); } } catch (Exception e) { e.printStackTrace(); } return null; }); } }