RabbitMQ可靠传输——持久性、发送方确认

发布于:2025-05-23 ⋅ 阅读:(11) ⋅ 点赞:(0)

一、持久性

前面学习消息确认机制时,是为了保证Broker到消费者直接的可靠传输的,但是如果是Broker出现问题(如停止服务),如何保证消息可靠性?对此,RabbitMQ提供了持久化功能:

持久化分为三种:1. 交换机持久化   2. 队列持久化   3.消息持久化

1.1 交换机持久化

一、交换机持久化方法

声明交换机时,将durable置为true即可,如果不指定,默认为true

二、交换机持久化的作用

避免了当Broker重启时,未重新执行交换机声明代码,而导致生产者消息无法路由


1.2 队列持久化

一、队列持久化方法

在声明队列时,使用durable方法声明的队列为持久化队列,使用nonDurable声明的队列为非持久化队列

对应管理界面:

二、队列持久化的作用

在RabbitMQ服务器重启时,未持久化的队列将丢失,持久化队列保留


1.3 消息持久化

 一、消息持久化方法

前面在发送消息时,都是直接指定一个字符串来发送消息,如:


我们先进入convertAndSend方法,观察其源码:

接下来再进入MessageProperties,观察其源码:

可以看到,不指定消息是否持久化,默认为持久化

也就是说,如果要指定消息为非持久化,就选哟给convertAndSend传入一个Message对象而不是仅传一个消息字符串,接下来学习如何设置消息非持久化:

1> 创建一个Message对象

 Message message = new Message("persistent test...".getBytes(),new MessageProperties());

2> 获取MessageProperties对象,通过setDeliveryMode方法设置消息非持久化


/*
*如果要设置为持久化,可以直接换一个String的消息,也可以将这里的
*MessageDeliveryMode.NON_PERSISTENT改为MessageDeliveryMode.PERSISTENT
*/
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

3> 将Message对象作为参数传递给converAndSend方法

rabbitTemplate.convertAndSend(Constants.NO_PERSISTENT_EXCHANGE,"",message);

整体代码:

二、消息持久化作用

RabbitMQ服务器重启时,未持久化的消息将丢失即使消息所在队列未持久化队列),持久化的消息将保留前提是消息所在的队列是持久化队列

但是,有了消息确认机制以及持久性就能保证消息传输的可靠性了吗?显然不是,因为消息确认机制保证的是Broker到消费者的可靠性 ,持久性保证的是Broker内部的可靠性,还有生产者到Broker的可靠性没有被保证,因此,RabbitMQ引入了publiser confirms(发送方确认)机制


二、发送方确认机制

前面已经学习了RabbitMQ核心机制之一——持久化,但是这就能保证消息传输的可靠性了吗?显然不是,如果发送方发送的消息没有到达Broker,又谈何持久化?因此,我们还需要了解RabbitMQ的发送方确认机制(通过事务也能解决,但是比较复杂,这里不谈)

publisher confirms 机制又包含两种模式

1> confirm确认模式

2> return退回模式

2.1 confirm确认模式

一、触发机制

Producer向Broker发送消息时,需要设置一个ConfirmCallback监听,这样消息无论是否到达exchange,这个监听都会触发,如果消息到达exchange,ACK为true,如果没有到达exchange,ACK为false


二、代码演示

1> 添加RabbitMQ配置

publisher-confirm-type: correlated #配置publisher confirm机制

2> 代码实现(队列、交换机随便声明一个就行,类型随意)

    @RequestMapping("/confirm")
    public String confirm(){
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行confirm方法");
                if(ack){//ack为true,消息到达exchange
                    System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());
                }else {//ack为false,消息为到达exchange
                    System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);
                }
            }
        });
        CorrelationData correlationData = new CorrelationData("1");
        //发送消息
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);
        return "消息发送成功";
    }

3>运行程序,测试接口

    1.正确发送消息(交换机名、routingKey存在)

消息发送成功,接下来查看控制台信息:

可以看到,交换机成功接收到消息

  2.错误发送消息(改为一个不存在的交换机名)

再次运行程序,访问接口,发送消息:

消息发送成功,查看控制台:

可以看到,消息并没有到达指定交换机,原因是不存在这个交换机。

  3.错误发送消息(改为一个不存在的routingKey)

运行程序,测试接口:

消息发送成功,查看控制台:

可以看到,在routingKey不存在的情况下,消息还是到达了交换机,但是这个消息一定是无法路由到队列的,因此就需要通过publsher confirm的 return退回模式 来解决

4> 上述代码编写存在的问题

  上面我们设置了ConfirmCallback监听经过测试,似乎并没有问题,但是仔细思考就会发现,我们在上面的代码中是通过rabbitTemplate这个对象来设置的,那岂不是前面所有使用rabbitTemplate的接口都被设置了监听?访问其它接口也一样会打印回调方法中的信息?

下面我们测试一下下面的方法:

运行程序,测试接口:

消息发送成功,查看控制台:

可以看到,同样会触发监听,为了避免这个问题,我们可以在config包中自己配置一个RabbitTemplate对象并注入进来:

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        //消息到达exchange时的回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行confirm方法");
                if(ack){//ack为true,表示消息到达交换机
                    System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());

                }else{//ack为false,表示消息未到达交换机
                    System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);
                    //业务逻辑,如重发等
                }
            }
        });

        //消息退回时的回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回: " + returned);
            }
        });
        return rabbitTemplate;
    }
}

2.2 return退回模式

 一、触发机制

当消息到达exchange后,需要路由到queue中,如果一条消息无法被任何queue消费(routingKey不存在或队列不存在),可以把消息退回给producer,退回时可以设置一个回调方法ReturnCallback,对消息进行处理


二、代码演示

   //消息退回时的回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回: " + returned);
            }
        });

修啊routingKey为一个不存在的routingKey:

运行程序,测试接口:

查看控制台:

可以看到,消息被退回


2.3 总结

publisher confirms 机制可以保证消息从生产者到Broker的可靠性,其中confirm模式工作在生产者到exchange之间,return模式工作在exchange到queue之间 


网站公告

今日签到

点亮在社区的每一天
去签到