RabbitMQ的高级特性

发布于:2025-05-30 ⋅ 阅读:(22) ⋅ 点赞:(0)

1、消息确认

消息确认机制作用于队列和消费者之间,当消费者接收到消息,会执行回调函数handleDelivery,发送一个回调信息给到队列,告诉队列它已经正确接收到了消息,以此保证消息的可靠传递。

原生API

RabbitMQ支持自动确认和手动确认(autoAck)。

  • 自动确认:
    autoAck=true,发送完该消息,该消息就会从队列中移除,不论消费者是否真正消费。适合可靠性要求底的场景。
  • 手动确认:
    autoAck=false,消息发送后,需要消费者发送确认信号,才能从队列中移除该消息,否则一直存在于队列中。
String basicConsume(String queue, boolean autoAck, Consumer callback) throws
IOException;

callback用于指定回调方式:

DefaultConsumer consumer = new DefaultConsumer(channel) {
//回调函数
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
	//如果autoAck=false,需要指定消息确认方式
 System.out.println("接收到消息: " + new String(body));
 
 }
};

//消费消息
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

RabbitMQ原生API中消息确认方式有三种:

  1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
    通过信道告知队列消费者已经处理了该消息,队列会主动把该消息从磁盘中删除
    参数说明:
    1)deliveryTag是每个信道独自维护的自增唯一ID用于区分不同消息。
    2)multiple用于标识是否批量确认接收到的消息。

  2. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)

  3. 否定确认Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

💡TIPS

  1. requeue的含义是如果否定确认是否让队列重新发送消息。
  2. basicReject和basicReject都是用于否定确认的,唯一的区别在于basicReject一次性只能确认一条消息,basicNack可以批量否定(multiple)

Spring.AMQP

Spring 框架中的消息确认机制(通常通过 Spring AMQP 提供)是对 RabbitMQ 原生 API 的封装。Spring AMQP 提供了一种更高层次的抽象,简化了与 RabbitMQ 的交互,底层使用的还是RabbitMQ原生API。
Spring提供了三种确认模式:

spring:
  application:
    name:
      extensions
  rabbitmq:
    addresses: amqp://admin:password@ip:port/virtualhost_name
    listener:
      simple:
      #	acknowledge-mode: none  # 不做处理
      	acknowledge-mode: auto  # 自动模式 spring默认配置
      # acknowledge-mode: manual  # 手动模式 
  • none

    • 队列发送完消息,不论消费者是否正确处理,RabbitMQ自动ACK消息,从队列移除该消息因此如果消费者没有正确处理消息,消息可能丢失
  • auto

    • 消息正确处理,自动确认消息,移除队列;如果处理失败,消息不会被移除,RabbitMQ会一直重发该消息直到处理成功(每次重试DeliveryTag会自增);如果一直处理失败,消息就会一直处于UnAck状态,导致消息积压。
  • manual

    • 消息发送后,不会自动确认,需要消费端使用channel.basicAck(deliveryTag, multiple) channel.basicNack(deliveryTag, multiple,requeue),又或者channel.basicReject(deliveryTag, requeue)进行确认。如果不进行确认,消息会处于UnAck状态,requeuetrue表示重发消息,反之不会。

💡
basicNack和basicReject的唯一区别是前者可以批量否定确认,后者只能单个消息否定确认。

代码演示(manual模式为例):

spring:
  application:
    name:
      extensions
  rabbitmq:
    addresses: amqp://账号:密码@113.44.150.39:5672/extension
    listener:
      simple:
        acknowledge-mode: manual  # 消息接收确认模式

//绑定关系
@Configuration
public class RabbitMQConfig {

    @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }
    @Bean("ackExchange")
    public DirectExchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }
    @Bean("ackBind")
    public Binding ackBind(@Qualifier("ackExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue ackQueue) {
        return BindingBuilder.bind(ackQueue).to(directExchange).with("ack");
    }
}
    @RequestMapping("ack")
    public String ack() {

        //默认情况下,发送持久化消息
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消息发送了,用的Direct类型交换机,绑定的ack key");
        return "消息发送成功";
    }
@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handerMessage(Message message, Channel channel) throws Exception {
        try {
            System.out.println("消费者接收到消息:"
                    + new String(message.getBody(), "UTF-8") +
                    message.getMessageProperties().getDeliveryTag());
//        //业务逻辑处理,此处一定会抛出异常,进行reject
            int num = 3 / 0;
            System.out.println("业务处理完成");
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }
}

此时我们发送消息给RabbitMQ服务器,就会不断重试(requeue=true):
在这里插入图片描述


2、发送方确认(Spring)

发送方确认作用与生产者和RabbitMQ服务器之间,也是为了保证数据传递的可靠性。发送方确认分为两个模式:Confirm确认和Retrurn回退。

Confirm确认模式

  1. 配置:
spring:
 rabbitmq:
   addresses: amqp://账号:密码@110.41.51.65:15673/bite
   listener:
     simple:
 	   acknowledge-mode: manual #消息接收确认
 	 publisher-confirm-type: correlated #消息发送确认
  1. 定义消息确认的回调逻辑,并发送消息
    配置yml之后,Spring会自动创建connectionFactory
  @Bean("confirmRabbitTemplate")
  public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
                                                      connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
          @Override
          public void confirm(CorrelationData correlationData, boolean ack,
                              String cause) {
              System.out.printf("");
              if (ack) {
                  System.out.printf("消息接收成功, id:%s \n",
                          correlationData.getId());//这个id用来区分不同的消息
              } else {
                  System.out.printf("消息接收失败, id:%s, cause: %s",
                          correlationData.getId(), cause);
              }
          }
      });
      return rabbitTemplate;
  }

    @Resource(name = "confirmRabbitTemplate")
    private RabbitTemplate confirmRabbitTemplate;

    @RequestMapping("/confirm")
    public String confirm() throws InterruptedException {

        CorrelationData correlationData1 = new CorrelationData("1");
        confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
                "confirm", "confirm test...", correlationData1);
        return "确认成功";
    }

说明:

	@FunctionalInterface
	public interface ConfirmCallback {

		/**
		 * Confirmation callback.
		 * @param correlationData 发送消息时的附加消息,识别特定消息
		 * @param ack 消息被exchange确认,为true,否则false
		 * @param cause 消息确认失败时,存储的出错原因
		 */
		void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

	}

💡RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别

  1. 前者作用域生产者和exchange之间,后者作用与队列和消费者之间
  2. RabbitTemplate.ConfirmCallback是Spring.AMQP实现,只有一个需要重写的方法confirm()用于确认回调。
  3. ConfirmListener来自于RabbitMQ原生API,内含handleAckhandleNack, ⽤于处理消息确认和否定确认。

Retrurn回退:

confirm确认保证的是生产者和exchange之间的可靠性,Return回退保证的是exchange能够正确路由到指定队列,如果消息没有被路由到任何队列,把消息回退给生产者。

  1. Spring的yml配置和confirm确认一致。
  2. 定义回退逻辑并发送消息
    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
             
        //消息被退回时,回调下面的方法
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回:" + returned);
            }
        });
        return rabbitTemplate;
    }
  @RequestMapping("/returns")
    public String returns() {
        CorrelationData correlationData = new CorrelationData("5");
        //回退模式必须加上这个设置
        confirmRabbitTemplate.setMandatory(true);

        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE
                , "confirm***"
                , "return test..."
                , correlationData);
        return "回退模式,消息发送了";
    }

💡TIPS

  1. setMandatory
    设置为true告诉RabbitMQ,如果exchange没有把该消息成功给到任何队列,执行回退,因此setMandatory必须加
  2. ReturnedMessage参数中的信息
public class ReturnedMessage {
    //返回的消息对象,包含了消息体和消息属性
    private final Message message;
    //由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义 .
    private final int replyCode;
    //⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
    private final String replyText;
    //消息被发送到的交换机名称
    private final String exchange;
    //消息的路由键,即发送消息时指定的键
    private final String routingKey;
}

3、持久化

如果RabbitMQ服务器宕机,某些重要数据遗失可能造成严重后果,RabbbitMQ提供持久化功能,保证消息不会丢失。
RabbitMQ的持久化分为三个部分:

  1. 交换器的持久化
  2. 队列的持久化
  3. 消息的持久化

交换机持久化

当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在。如果是一个需要长期使用的交换机,建议进行持久化。

    @Bean("ackExchange")
    public DirectExchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }

上面这种方式创建交换机,默认是持久化的,如果想要设置非持久化,这样设置:

   @Bean("ackExchange")
    public DirectExchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(false).build();
    }

队列持久化

如果队列不设置持久化,RabbitMQ重启,队列也会跟着消息,队列中的消息也会丢失。
队列设置持久化方式和Exchange有所不同:

    @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }

设置非持久化:

  @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.nonDurable(Constants.ACK_EXCHANGE).build();
    }

消息持久化

如果队列是持久化的,消息不是持久化的,RabbitMQ重启,消息也会丢失。换言之,队列和消息共同持久化才能保证消息在重启之后不会丢失

设置消息持久化需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT

public enum MessageDeliveryMode {
 		NON_PERSISTENT,//⾮持久化
 		PERSISTENT;//持久化
 }

RabbitMQ原生API设置持久化:
这里

💡TIP
PERSISTENT_TEXT_PLAIN中设置了deliveryMode =2,进而实现的持久化:在这里插入图片描述

Spring.AMQP设置持久化:

在这里插入图片描述


4、重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息
处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的。启用下方配置重试机制将会生效;
配置:

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://账号:密码@47.108.157.13:5672/extension
    listener:
      simple:
         acknowledge-mode: auto
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败等待时长为5秒
          max-attempts: 5 # 最大重试次数

这里我们故意设置一个异常,看看运行结果:

@Component
public class RetryListener {
    @RabbitListener(queues=Constants.RETRY_QUEUE)
    public void retryQueue(Message message) throws UnsupportedEncodingException {
        System.out.println("重试队列消费者auto模式 接收到消息 deliveryTag是:"+message.getMessageProperties().getDeliveryTag());
        int a=10/0;
        System.out.println("业务处理完成");

    }
}

运行结果:
在这里插入图片描述

💡 1、重试机制只在manual模式下生效!

  1. none模式消息一旦发送rabbitmq就会把消息从队列移除。
  2. 而manual模式下不进行手动确认,只会导致消息积压
  3. manual模式进行手动确认且requeue=false,那么消息只会被发送一次,因为设置了不重新入队
  4. manual模式进行手动确认且requeue=true,会一直进行重试,因为每次发送确认消息给rabbitmq,都会告诉他重新入队,发送消息过来。

💡2、为什么deliveryTag一直是1,而不像第二点中auto模式自动重发消息deliveryTag一直递增?

  • 重试机制,retry=enable,每次重试deliveryTag相同,是因为这个重试是在消费者内部执行的
  • 而auto模式下,出现异常,不断重试则是RabbitMQ服务器重发消息给到消费者,所以deliveryTag自然是递增的。

5、TTL(过期时间)

当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。目前设置TTL方式有两种:

  1. 设置队列的TTL, 队列中所有消息都有相同的过期时间.
  2. 对消息本⾝进⾏单独设置, 每条消息的TTL可以不同.

如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.

消息TTL

  @RequestMapping("/ttl")
    public String ttl() {
        System.out.println("ttl....");

  rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test...");

        //也可以这样设置过期时间
        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...", message -> {
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
        return "ttl测试";
    }

队列TTL

    public Queue ttlQueue() {
        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();

两者区别:

  1. 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
  2. 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定

💡为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.


6、死信队列

死信队列(DLQ)和死信交换机(DLX)与普通队列、普通交换机本质上没有任何区别。
只不过DLX绑定的是一个个队列,接收的消息都是普通队列丢弃的消息。
DLX和DLQ协同分工,把普通队列中的死信发送给指定消费者进行特殊处理。

在这里插入图片描述
示例代码:

    //普通队列
    @Bean("normalQueue")
    public Queue normalQueue() {

        /*
        * 重点代码:*/
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
                .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
                .build();
    }

    //普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
    }
    //交换机队列进行绑定
    @Bean("bindingNormal")
    public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
    }

    //死信队列
    @Bean("DLQueue")
    public Queue DLQueue() {
        return QueueBuilder.durable(Constants.DL_QUEUE).build();
    }
    //死信交换机
    @Bean("DLExchange")
    public DirectExchange DLExchange() {
        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
    }
    //死信队列和死信交换机的绑定
    @Bean("DLBinding")
    public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
        return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
    }

💡TIPS

  • 由于normalQueue设置了 deadLetterExchange deadLetterRoutingKey ,所以normalQueue中的死信会被路由到指定的DLQ(死信队列),给到指定消费者执行,(当然代码中并没有编写消费者代码)
  • 消息变成死信,主要有这几种情况:
    1. Basic.Reject/Basic.Nack,拒绝了消息,并且requeue=false
    2. 消息TTL过期
    3. 队列已经到达了最大长度

7、延迟队列

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费。
比如只能家具的定时任务、会议时间提前提醒、会员注册一定时间后的用户满意度调查等。
坏消息是RabbitMQ不直接支持延迟队列。好消息是RabbitMQ可以通过其他方式实现延迟队列:

  1. TTL+DLQ
  2. 延迟队列插件

TTL+DLQ

我们可以设置一个TTL队列,队列中消息一旦过期,产生死信,把该死信路由到指定的死信队列,然后消费者直接去监听该死信队列即可实现一个延迟队列的功能。
我们在第6节死信队列代码基础上进行编写:

  • 交换机和队列相关配置
@Configuration
public class DLConfig {

    //普通队列
    @Bean("normalQueue")
    public Queue normalQueue() {

        /*
        * 重点代码:*/
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
                .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
                .build();
    }

    //普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
    }
    //交换机队列进行绑定
    @Bean("bindingNormal")
    public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
    }

    //死信队列
    @Bean("DLQueue")
    public Queue DLQueue() {
        return QueueBuilder.durable(Constants.DL_QUEUE).build();
    }
    //死信交换机
    @Bean("DLExchange")
    public DirectExchange DLExchange() {
        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
    }
    //死信队列和死信交换机的绑定
    @Bean("DLBinding")
    public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
        return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
    }
}
  • 监听死信队列
@Component
public class DelayListener {

    @RabbitListener(queues =Constants.DL_QUEUE)
    public void handleMessage(Message message){

        long end=System.currentTimeMillis();//获取接收到消息时的时间
        String msg=new String(message.getBody());
        String[] msgs=msg.split(":");
        long start=Long.parseLong(msgs[1]);
        System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
    }
}
  • 发送延迟消息
    @RequestMapping("/delay")
    public String delay() {

        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "key1",("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),message -> {
            message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为30s
            return message;
        });
        return "延迟队列消息已经执行";
    }

运行结果:在这里插入图片描述
消息首先在normal.queue上等待10s,然后发送到dl.queue消费(我设置的auto模式,所以消息已经ack,没有显示任何消息):
在这里插入图片描述

💡TTL+DLQ有一个缺陷
如果如果在队列中:消息m1延迟时间大于消息m2的延迟时间,并且m1优先于m2出队列,这样种情况消息m2只能等待m1过期才能被发送,导致m2消息在发送的时候必定过期
因此rabbitmq提供了一个插件,来解决这个问题

延迟队列插件

1. 安装教程

注意:如果启动完插件之后没有生效,需要使用这个命令重启rabbitmq服务:

service rabbitmq-server restart

重启完之后,就会增加一个交换机类型:
在这里插入图片描述
这个交换机可以认为是一个“高级的延迟队列”,它可以临时存储延迟消息,所有存储在该交换机的消息,都会在延迟时间结束后发送给绑定了该交换机的队列,解决了TTL+DLQ中的缺陷。

2. 代码实现

@Configuration
public class DLConfig {


    //声明队列方式和之前没有差别
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
    }

    //与之前不同的是,这里需要用。delays声明使用延迟队列插件
    @Bean("delayExchange")
    public Exchange delayExchange() {
        return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
    }
    //和其他绑定没有区别
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
    }
}
@Component
public class DelayListener {

    @RabbitListener(queues =Constants.DELAY_QUEUE)
    public void handleMessage(Message message){

        long end=System.currentTimeMillis();//获取接收到消息时的时间
        String msg=new String(message.getBody());
        String[] msgs=msg.split(":");
        long start=Long.parseLong(msgs[1]);
        System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
    }
}
 @RequestMapping("/delayPlug")
    public String delayPlug() {

        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
                ,("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setDelayLong(10000L);  //单位: 毫秒 延迟时间
            return messagePostProcessor;
        });
        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
                ,("当前消息需要延迟5秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setDelayLong(5000L);  //单位: 毫秒 延迟时间
            return messagePostProcessor;
        });
        return "延迟队列消息已经执行";
    }

运行结果:
在这里插入图片描述
可以看到即使先发送10s延迟的数据,在发送5s延迟的数据,消息仍然按照正常顺序接收,解决的消息乱序问题。


8、事务

概念理解

RabbitMQ支持消息的发送和接收是原子性的,要么所有消息全部成功,要么全部失败。
例如这个场景,发送两个消息,由于过程中出现异常,正常情况下,队列会接收到第一条消息,第二条消息不会接收到:


    @RequestMapping("/trans")
    public String trans() {
        System.out.println("trans test...");
      rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
        int num = 5/0;
       rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
        return "消息发送成功";
    }

控制台只显示接收到一条消息:
在这里插入图片描述
如果我们希望,出现异常时,回滚所有发送的消息(出现错误一条消息也不发送),那么就可以启用事务。

代码实现

配置代码,为了简便使用默认交换机:

@Configuration
public class TransactionMQConfig {

    @Bean("transationQueue")
    public Queue transationQueue() {
        return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
    }

	//1、建立事务管理器
    @Bean
    public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
    }

	//2、开启事务通道
    @Bean("transRabbitTemplate")
    public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

}

发送消息:

   @Transactional//3、声明开启事务
    @RequestMapping("/trans")
    public String trans() {
        System.out.println("trans test...");
        transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
        int num = 5/0;
        transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
        return "消息发送成功";
    }

发送消息后运行结果:
程序抛出异常:
在这里插入图片描述
但是控制台,队列没有接收到任何消息:
在这里插入图片描述


9、消息分发

RabbitMQ中,消费者获取消息既可以使用推模式(队列把消息发送给消费者),也可以使用拉模式(消费者主要向队列获取)。虽然两者都支持,但是主要还是使用推模式。使用推模式,会出现一个问题,就是默认情况下RabbitMQ采用轮询方式,把消息发送给指定消费者,这种情况下如果消费者本身负载较重,继续接收消息,可能会出现严重后果。为此,RabbitMQ提供了消息分发这样一个机制。每个消费者都可以设置最大接收消息数prefetchCount,如果消费者目前获取消息数等于prefetchCount,RabbittMQ就不会推送消息给这个消费者,这是RabbitMQ中消息分发的核心概念。通过这样一个机制可以实现限流、负载均衡等功能。

代码实现

RabbitMQ 的SDK通过这个方法可以设置预取数量:
在这里插入图片描述
Spring.AMQP可以直接通过yml配置:

spring:
  application:
    name:
      applicationName
  rabbitmq:
    addresses: amqp://账号:密码@113.44.150.39:5672/extension
    listener:
      simple:
        acknowledge-mode: manual # 必须开启手动模式,prefetch才会生效
        prefetch: 5

💡TIPS

  1. prefectch设置为0表示没有获取上限。
  2. 拉模式中,设置channel.basicQos(prefetchCount)是无效的,因为拉模式,一次只能从队列拉去一个消息。