RabbitMQ--Springboot解决消息丢失

发布于:2025-07-28 ⋅ 阅读:(15) ⋅ 点赞:(0)

🐰 Spring Boot 实现 RabbitMQ 消息可靠性机制

🔧 环境前置配置(application.yml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated   # 开启 confirm 模式(推荐)
    publisher-returns: true              # 开启 return 回调(路由失败)
    listener:
      simple:
        acknowledge-mode: manual         # 手动 ACK

✅ 1. 持久化机制(队列 + 消息)

📌 功能:消息和队列都设置为持久化

🔧 配置类:Queue + Exchange + Binding

@Configuration
public class RabbitConfig {

    @Bean
    public Queue persistentQueue() {
        return QueueBuilder.durable("persistent_queue").build();
    }
}

✅ 生产者

@Service
public class PersistentProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        MessageProperties props = new MessageProperties();
        props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化
        Message message = new Message("Hello Persistent".getBytes(), props);
        rabbitTemplate.send("", "persistent_queue", message);
        System.out.println("✅ 消息已发送(持久化)");
    }
}

✅ 消费者

@Component
public class PersistentConsumer {

    @RabbitListener(queues = "persistent_queue")
    public void receive(String msg) {
        System.out.println("✅ 消费者收到:" + msg);
    }
}

✅ 2. 生产者确认机制(Confirm)

🔧 Confirm 回调配置

@Configuration
public class ConfirmCallbackConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // Confirm 回调(消息是否到达交换机)
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("✅ 交换机已收到消息");
            } else {
                System.err.println("❌ 消息未送达交换机:" + cause);
            }
        });

        // Return 回调(交换机→队列失败)
        rabbitTemplate.setReturnsCallback(returned -> {
            System.err.println("❌ 路由失败:" + new String(returned.getMessage().getBody()));
        });
    }
}

✅ 异步发送(Spring Boot 默认是异步 confirm)

@Service
public class ConfirmProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(
                "confirm_exchange", "confirm_key", "Hello Confirm", data);

        System.out.println("消息发送成功,等待 confirm 回调...");
    }
}

✅ 3. 消费者手动确认(ACK)

✅ 配置类

@Configuration
public class AckConfig {

    @Bean
    public Queue ackQueue() {
        return QueueBuilder.durable("ack_queue").build();
    }
}

✅ 生产者

@Service
public class AckProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("", "ack_queue", "Manual ACK 消息");
    }
}

✅ 消费者(手动 ACK)

@Component
public class AckConsumer {

    @RabbitListener(queues = "ack_queue", ackMode = "MANUAL")
    public void receive(Message message, Channel channel) throws Exception {
        try {
            String body = new String(message.getBody());
            System.out.println("处理消息:" + body);
            // 业务处理成功后手动 ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 异常处理,拒绝并重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

✅ 4. 死信队列(DLX)

🔧 队列配置

@Configuration
public class DLXConfig {

    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal_queue")
                .withArgument("x-dead-letter-exchange", "dlx-exchange") // DLX绑定
                .build();
    }

    @Bean
    public FanoutExchange dlxExchange() {
        return new FanoutExchange("dlx-exchange");
    }

    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable("dlx_queue").build();
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange());
    }
}

✅ 生产者

@Service
public class DLXProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        rabbitTemplate.convertAndSend("", "normal_queue", msg);
    }
}

✅ 消费者

@Component
public class DLXConsumer {

    @RabbitListener(queues = "normal_queue", ackMode = "MANUAL")
    public void consume(Message message, Channel channel) throws IOException {
        String body = new String(message.getBody());
        if (body.contains("error")) {
            // 拒绝并不重回队列 → 进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = "dlx_queue")
    public void handleDLX(String msg) {
        System.err.println("🧟 死信处理:" + msg);
    }
}

✅ 5. 事务模式(不推荐但支持)

@Service
public class TxProducer {

    @Autowired
    private ConnectionFactory connectionFactory;

    public void sendTx() throws Exception {
        try (Connection connection = connectionFactory.createConnection();
             Channel channel = connection.createChannel(false)) {

            channel.txSelect(); // 开启事务

            try {
                channel.queueDeclare("tx_queue", true, false, false, null);
                channel.basicPublish("", "tx_queue", null, "事务消息".getBytes());
                channel.txCommit();
                System.out.println("✅ 事务提交成功");
            } catch (Exception e) {
                channel.txRollback();
                System.out.println("❌ 事务回滚:" + e.getMessage());
            }
        }
    }
}

✅ 对比总结

机制 Spring Boot 配置 可靠性 性能 推荐
持久化 durable + deliveryMode ✅ 高 中等 ✅ 推荐
Confirm publisher-confirm-type ✅ 高 ✅ 推荐
手动 ACK acknowledge-mode=manual ✅ 高 中等 ✅ 推荐
死信队列 x-dead-letter-exchange ✅ 高 ✅ 推荐
事务模式 txSelect + txCommit ✅ 非常高 ❌ 低 🚫 不推荐

最后:在生产者的确认机制(Confirm)分两种

1. RabbitTemplate + ConfirmCallback(这种就是上面写的)(推荐)

  • 作用范围
    RabbitTemplate 是 Spring Boot 封装的模板工具类,ConfirmCallback 监听的是通过这个 RabbitTemplate 发送的所有消息
    一旦配置了 publisher-confirm-type: correlated该模板发送到任意交换机的消息都会触发回调。

  • 使用特点

    • 全局生效,适合大部分场景。

    • 不需要手动调用 waitForConfirms(),由 Spring 异步回调通知发送结果。

    • 需要在 application.yml 中开启 publisher-confirm-type,否则不生效。

    • 如果使用多个 RabbitTemplate,需要分别配置 ConfirmCallback。

  • 适用场景

    • 大部分 Spring Boot 项目中使用。

    • 需要简单统一的确认机制,而不是针对单一消息进行控制。

    • 适合与消息持久化、重试机制结合,统一日志记录发送结果。


2. Channel + waitForConfirms(使用的java原生的 ⚠️ 不推荐)

  • 作用范围
    这是 RabbitMQ 原生的 Channel 级别确认。它只对当前 Channel 发送的消息有效,并且可以精确控制每一条消息的确认状态。

  • 使用特点

    • 局部控制,只影响当前 Channel。

    • 不需要 Spring 配置,直接用 channel.confirmSelect() 开启。

    • 可以 单条确认waitForConfirms())或 批量确认waitForConfirmsOrDie())。

    • 可以与异步确认(addConfirmListener)结合,实现细粒度的消息追踪。

  • 适用场景

    • 需要对某一批消息单独控制确认(而不是整个系统都一个回调)。

    • 在一个服务中需要对不同交换机、不同消息类型采取不同的确认策略。

    • 对性能有较高要求,需要批量确认或者异步确认机制。

2.1 代码样例

@Component
public class ConfirmSyncBatchProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    public void sendBatchSyncConfirm() {
       //Spring Boot 不支持原生 channel.waitForConfirms() 的方式
       //但我们可用 ChannelCallback 手动实现
        rabbitTemplate.execute(channel -> {
            channel.confirmSelect(); // 启用 confirm 模式

            try {
                for (int i = 0; i < 5; i++) {
                    String msg = "Sync Batch " + i;
                    channel.basicPublish("",
                            "confirm-queue",
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            msg.getBytes());
                }

                if (!channel.waitForConfirms()) {
                    System.out.println("❌ 批量同步确认失败!");
                } else {
                    System.out.println("✅ 批量同步确认成功!");
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        });
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到