RabbitMQ 可靠传输性(包括消息确认, 持久性和发送方确认)

发布于:2025-09-09 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

1. 消息确认

1.1 简介

1.2 手动确认方法

1.2.1 basiAck (肯定确认)

1.2.2 basicNack (否定确认)

1.2.3 basicReject (否定确认)

1.3 代码示例 (这里主要在 spring 中测试)

1.3.1 none

1.3.2 auto

1.3.3 manual

2. 持久性

2.1 简介

2.2 交换机的持久化

2.3 队列持久化

2.4 消息持久化

3. 发送方确认

3.1 简介

3.2 confirm 确认模式

3.3 return 返回模式


RabbitMQ 作为消息中间件来说, 最重要的就是收发消息了, 但是我们在收发消息的时候, 可能会因为一系列特殊情况导致消息丢失 : 

  • 生产者问题 : 为应用程序故障, 网络抖动等各种原因, 生产者没有成功向 Broker 发送消息.
  • 消息中间件自身问题 : 生产者成功发送给了 Broker, 但是 Broker 没有把消息保存好, 导致消息丢失.
  • 消费者问题 : Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致 Broker 将消费失败的消息从队列中删除了.

 RabbitMQ 对上面消息丢失的情况进行考虑, 做出了不同的应对措施 : 

  • 针对生产者的问题, RabbitMQ 推出了发送方确认机制(发布确认模式)
  • 针对消息中间件自身的问题, RabbitMQ 推出了持久化机制
  • 针对消费者的问题, RabbitMQ 推出了消息确认机制

1. 消息确认

1.1 简介

RabbitMQ 向消费者发送消息后, 就会删除这条消息. 但是, 如果消费者处理消息异常, 就会造成消息丢失. 为了解决消息在 Broker 和 消费者之间问题 RabbitMQ 推出了消息确认.

消费者在订阅队列时, 可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制可以分为两种 : 

  • 手动确认 : 当 autoAck 等于 false 时, RabbitMQ 会等待消费者显示地调用 Basic.Ack 命令, 恢复确认信号后才能从内存 (或者磁盘中) 删除消息. 这种模式适合于消息可靠性要求较高的场景.
  • 自动确认 : 当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息设置为确认, 然后从内存 (或者硬盘中) 删除消, 而不管消费者是否真正消费了这条消息. 这种模式适合于消息可靠性要求不高的场景.
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

目前仅支持中英翻译

目前仅支持中英翻译

将 autoAck 设置为 false 时, 队列中的消息就被分成了两个部分 : 

  • 等待投递给消费者的消息.
  • 已经投递给消费者, 但是还没有收到消费者确认信号的消息.

如果 RabbitMQ 一直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则  RabbitMQ 会安排该消息重新进入队列, 等待投递给下一个消费, 当然也有可能还是原来的那个消费者.

1.2 手动确认方法

消费者在收到消息之后, 可以选择确认, 也可以选择直接拒绝或者跳过. RabbitMQ 也提供了不同的确认应答方式, 消费者可以调用与其相对用的 channel 相关方法 : 

1.2.1 basiAck (肯定确认)
void basiAck(long delivertTag, boolean multiple)

deliveryTag :是 broker 给 consumer 发送消息的唯一标识, 在一个 channel 中 deliveryTag 是唯一的. 他确保了消息传递的可靠性和顺序性.

mulitple : 是否批量确认

1.2.2 basicNack (否定确认)
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws I0Exception;

requeue : 是否重新入队列

使用这个方法, 就相当于 broker 发送 nack. 这条消息没有被正常消费

requeue 表示拒绝这条消息后如何处理 : 

  • true : 将这条消息重新入队列, 继续给 consumer 消费
  • false : broker 删除这条消息
1.2.3 basicReject (否定确认)
void basicReject(long deliveryTag, boolean requeue) throws IoException;

这个就是没了批量选项

1.3 代码示例 (这里主要在 spring 中测试)

在 spring 中, AMQP 对于消息的确认机制提供了三种策略 (acknowledge-mode) : 

  • none : 消息一旦投递给消费者, 不管消费者是否处理成功该消息, RabbitMQ 都会自动确认, 从队列中移除该消息. 如果消费者处理消息失败, 消息可能会丢失.
  • auto(默认) : 表示消息投递给消费者, 如果处理过程中抛出了异常, 则不会确认该消息. 但是如果没有发生异常, 该消息就会自动确认.
  • manual : 手动确认模式, 消费者必须在成功处理消息之后调用 basicAck 来确认消息. 如果消息未被确认, RabbitMQ 会认为消息未处理成功, 并且会在消费者可用时重新投递该消息, 这种模式提高了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, 而是可以被重新处理.

Constans 文件 : 

public class Constants {
    public static final String ACK_EXCHANGE = "ack.exchange";
    public static final String ACK_QUEUE = "ack.queue";
}

配置文件 : 

spring:
  application:
    name: rabbitmq-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@192.168.100.10:5672/extension
    listener:
      simple:
#        acknowledge-mode: none
#        acknowledge-mode: auto
#        acknowledge-mode: manual

生产者 : 

@RequestMapping("/producer")
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
        return "消息发送成功";
    }
}

消费者 : 

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),StandardCharsets.UTF_8),
                message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

分别看下这三种策略出现异常时会怎样? 

1.3.1 none

虽然处理过程失败了, 但消息却是正常删除了.

1.3.2 auto

这里处理过程依然失败, 但是这次的消息却没有删除, 异常消息没有被确认, idea 也会一直报错

1.3.3 manual

消费者代码需要改一下. 这里我们看下异常消息在 不同的 requeue 下的情况

@Component
public class AckListener {
//    @RabbitListener(queues = Constants.ACK_QUEUE)
//    public void handMessage(Message message, Channel channel) throws Exception {
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        //消费者逻辑
//        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),StandardCharsets.UTF_8),
//                message.getMessageProperties().getDeliveryTag());
//
//        //进行业务逻辑处理
//        System.out.println("业务逻辑处理");
//        int num = 3/0;
//        System.out.println("业务处理完成");
//    }

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //消费者逻辑
            System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
                    message.getMessageProperties().getDeliveryTag());

            //进行业务逻辑处理
            System.out.println("业务逻辑处理");
            int num = 3/0;
            System.out.println("业务处理完成");
            //肯定确认
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliveryTag, false, true);
        }
    }
}
  • requeue : true

这里消息没有被正常消费, 会一直重发给我们重新处理. 因为 delieryTag 是唯一的, 所以他会一直递增, 而队列中的消息始终只有一个.

  • requeue : false

异常消息被删除.

2. 持久性

2.1 简介

RabbitMQ 的持久化是表示的是资源储存在硬盘中, 当RAbbitMQ 服务器重启后, 设置为持久化的资源不会被释放, 除非手动删除, 不然不会丢失.

持久化是 RabbitMQ 的可靠保证机制之一, 它保证的是 RabbitMQ 内部的可靠性.

持久化分为三个部分 : 交换机的持久化, 队列的持久化, 消息的持久化

2.2 交换机的持久化

在声明交换机的时候可以通过 .durable(true) 来表示持久化的交换机, 或者直接不写 .durable(), 默认情况下也是持久化的交换机

    @Bean("directExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }

    @Bean("directExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();
    }

    将 .durable() 的参数设置成 false, 就可以声明不持久的交换机了.

    2.3 队列持久化

    同样在声明队列的时候用 .durable() 和 .nonDurable() 分别来表示持久化的队列和非持久化的队列

        @Bean("ackQueue")
        public Queue ackQueue(){
            return QueueBuilder.durable(Constants.ACK_QUEUE).build();
        }
    
        @Bean("ackQueue")
        public Queue ackQueue(){
            return QueueBuilder.nonDurable(Constants.ACK_QUEUE).build();
        }
    

    2.4 消息持久化

    在生产者发送消息时, 可以指定该消息持久化.

        @RequestMapping("/pres")
        public String pres() {
    
            Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
    
            //消息非持久化
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    
            //消息持久化
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            
            System.out.println(message);
            rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
            return "消息发送成功";
        }

    3. 发送方确认

    3.1 简介

    当消息从生产者发送给 Broker时, 消息可能会发生丢失, 那么我们后续的啥操作都没有用, 所以我们需要保证消息能成功的发送给 Broker. 因为 Broker 包括了 Exchange 和 Queue 两部分, 所以我们要保证消息成功到达 交换机 和 队列中.

    对于该问题, RabbitMQ 提出了两种解决方案 : 事务, 发送确认机制

    我们这里介绍 发送确认机制怎么解决该问题.

    RabbitMQ 分别提供了对应的模式来解决这两个问题 : 

    • confirm 确认模式
    • return 退回模式

    添加配置项 : 

    创建交换机, 队列和进行绑定 : 

    3.2 confirm 确认模式

    ConfirmCallback 为回调函数, 当生产者发送消息给 broker, 都会给 生产者发送一个 ack, 若 ack 为 true, 就表示消息到达了交换机. 若 ack 为 false, 就表示消息没有到达交换机. 我们可以根据 ack 来进行不同的业务操作. 

        @RequestMapping("/confirm")
        public String confirm() {
    
            // 设置回调方法
            // 若在此处设置回调函数,那么会影响到所有 rabbitTemplate
            // 并且只能发送一次消息,因为发送多次消息就相当于设置了多个回调函数,规定只能设置一次
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("执行了 confirm 方法");
                    if (ack) {
    
                        // 消息到达了交换机
                        System.out.printf("接收到信息, 消息 ID: %s \n",
                                correlationData == null ? null : correlationData.getId());
                    } else {
    
                        // 消息未到达交换机
                        System.out.printf("未接收到信息, 消息 ID: %s, cause: %s \n",
                                correlationData == null ? null : correlationData.getId(), cause);
                        // 响应业务的处理
                        System.out.println("相应的业务处理");
                    }
                }
            });
    
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm",
                    "confirm test...", correlationData);
            return "消息发送成功";
        }

    这里我们在进行一次消息发送试试 : 

    日志中显示, 一个 RabbitMQTemplate 实例只能设置一次 ConfirmCallback. 但是我们连续两次调用这个接口, 就使 ConfirmCallback 被创建了两次, 所以发生了报错

    为了解决这个问题, 我们可以将 RabbitMQTemplate 提取出来, 自定义一个 RabbitMQTemplate类, 那它交给 Spring 容器保管, 后续每次都用容器中的, 就保证了它不会再次被创建.

    这里还有一个隐藏的问题, 就是接下来我们所有的消息都会走这个回调函数, 所以这个问题我们也需要解决.

    @Configuration
    public class RabbitTemplateConfig {
    
        // 这里要创建一个新的没有回调函数的 RabbitTemplate 给 Spring 容器保管
        // 因为 Bean 是先根据类型来寻找的, 我们创建了一个又回调函数的类, 默认没有
        // 回调函数的方法 Spring 已经不会帮我们创建了 
    
        @Bean("rabbitTemplate")
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean("confirmRabbitTemplate")
        public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    
            // 设置回调方法
            // 若在此处设置回调函数,那么会影响到所有 rabbitTemplate
            // 并且只能发送一次消息,因为发送多次消息就相当于设置了多个回调函数,规定只能设置一次
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("执行了 confirm 方法");
                    if (ack) {
    
                        // 消息到达了交换机
                        System.out.printf("接收到信息, 消息 ID: %s \n",
                                correlationData == null ? null : correlationData.getId());
                    } else {
    
                        // 消息未到达交换机
                        System.out.printf("未接收到信息, 消息 ID: %s, cause: %s \n",
                                correlationData == null ? null : correlationData.getId(), cause);
                        // 响应业务的处理
                        System.out.println("相应的业务处理");
                    }
                }
            });
            return rabbitTemplate;
        }
    }
        // 这里要指定使用的 Bean   
        @Resource(name = "rabbitTemplate")
        private RabbitTemplate rabbitTemplate;
    
        @Resource(name = "confirmRabbitTemplate")
        private RabbitTemplate confirmRabbitTemplate;
        
        @RequestMapping("/confirm")
        public String confirm() {
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm",
                    "confirm test...", correlationData);
            return "消息发送成功";
        }

    访问 两次 confirm 和 一次 pres, 我们发现现在第一个问题解决了, 第二个问题也解决了

    • 使用未声明的交换机 : 

    这是正常的.

    • 使用错误的 BindingKey : 

    没有发生异常, 这是不正常的, 因为我们的消息没有传到指定的队列中, 却没有提示错误信息. 这里我们就需要用到 return 模式

    3.3 return 返回模式

        @RequestMapping("/returns")
        public String returns() {
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm",
                    "returns test...", correlationData);
            return "消息发送成功";
        }
    @Configuration
    public class RabbitTemplateConfig {
    
        // 这里要创建一个新的没有回调函数的 RabbitTemplate 给 Spring 容器保管
        // 因为 Bean 是先根据类型来寻找的, 我们创建了一个又回调函数的类, 默认没有
        // 回调函数的方法 Spring 已经不会帮我们创建了
        @Bean("rabbitTemplate")
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean("confirmRabbitTemplate")
        public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
    
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    
            // 设置回调方法
            // 若在此处设置回调函数,那么会影响到所有 rabbitTemplate
            // 并且只能发送一次消息,因为发送多次消息就相当于设置了多个回调函数,规定只能设置一次
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("执行了 confirm 方法");
                    if (ack) {
    
                        // 消息到达了交换机
                        System.out.printf("接收到信息, 消息 ID: %s \n",
                                correlationData == null ? null : correlationData.getId());
                    } else {
    
                        // 消息未到达交换机
                        System.out.printf("未接收到信息, 消息 ID: %s, cause: %s \n",
                                correlationData == null ? null : correlationData.getId(), cause);
                        // 响应业务的处理
                        System.out.println("相应的业务处理");
                    }
                }
            });
    
            // 消息被退回时, 回调方法
            rabbitTemplate.setMandatory(true); // 启动强制路由检查
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    System.out.println("消息退回:"+returned);
                }
            });
            return rabbitTemplate;
        }
    }

    setMandatory : true :进行消息是否成功到达队列的判断 false : 即使编写了 return 模式的代码,也就不会生效.

    下面是 BindingKey 使用错误的日志.

    既然消息没有发送到指定队列, 那为什么还会弹出接收到消息的日志呢? 可以自行更改下代码.


    网站公告

    今日签到

    点亮在社区的每一天
    去签到