RabbitMQ--进阶篇

发布于:2025-05-12 ⋅ 阅读:(14) ⋅ 点赞:(0)

RabbitMQ


客户端整合Spring Boot

添加相关的依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写配置文件,配置RabbitMQ的服务信息

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
logging:
  level:
    com.atguigu.mq.listener.MyMessageListener: info

监听消息与发送消息(这里如果想要演示的明白的话需要创建两个Boot工程分别作为生产端与服务端来进行演示操作)

//=================================监听消息使用注解的方式来完成=============================
@Component
@Slf4j
public class MyMessageListener {
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";  
    public static final String QUEUE_NAME  = "queue.order";  
  
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage(String dateString,
                               Message message,
                               Channel channel) {
        log.info(dateString);
    }
  
}
​
//=================================发送消息使用组件的方式来完成=============================
@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

注解

示例1:

@RabbitListener(bindings = @QueueBinding(
         value = @Queue(value = QUEUE_NAME, durable = "true"),
         exchange = @Exchange(value = EXCHANGE_DIRECT),
         key = {ROUTING_KEY}
))
  • @RabbitListener注解

    • 作用:自动绑定指定队列进行消息监听,支持直接队列名绑定表达式占位符配置等方式,自动反序列化消息体到方法参数,支持多种参数类型(POJO/Message/byte[]),管理消息确认模式(自动/手动ACK),配置并发消费者线程池,实现消息重试和死信处理

    • binding属性

      • 作用:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

      • @QueueBinding注解:

        • 作用:声明需要绑定的队列与交换机的信息

        • value属性:使用@Queue注解来指定队列信息

        • exchange属性:使用@Exchange注解来指定交换机信息

        • key属性:使用{}来指定路由键信息

示例2:

@RabbitListener(queues = {QUEUE_ATGUIGU})

直接使用queues这个属性值,通过给这个属性进行赋值来直接指定需要监听的队列,优点是编写模式简洁精短,缺点是不能像示例1所写的那样在未创建交换机与队列以及设置其绑定关系时可以系统帮忙设置以防发生报错。需要在RabbitMQ的可视化界面自己手动进行创建与绑定的过程

消息可靠性投递

故障及其解决方案

  • 故障情况1:消息没有发送到消息队列上面

  • 解决方案:

  • 故障情况2:消息成功发送到了消息队列,但是消息队列服务宕机了。原本保存在内存里面的消息也丢失了

  • 解决方案:将消息持久化到硬盘上面,哪怕服务器重启也不会导致消息丢失

  • 故障情况3:消息成功存入了消息队列,但是消费端出现了问题(宕机,抛出异常等)

  • 解决方案:

业务代码实现

生产者故障
  • 导入依赖,编写配置文件
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.atguigu.mq.config.MQProducerAckConfig: info
  • 创建配置类

在创建完配置类之后,重写相应的可提升消息可靠性的方法后,我们还需要对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

方法名 方法功能 所属接口 接口所属类
confirm() 确认消息是否发送到交换机 ConfirmCallback RabbitTemplate
returnedMessage() 确认消息是否发送到队列 ReturnsCallback RabbitTemplate
设置组件调用的方法 所需对象类型
setConfirmCallback() ConfirmCallback接口类型
setReturnCallback() ReturnCallback接口类型
//================================配置类的逻辑,确保消息到达交换机、到达队列=====================================
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        //初始化RabbitMQ模板,将重写的方法重新加入到rabbitTemplate中
    }
​
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功!数据:" + correlationData);
        } else {
            log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
        }
    }
​
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

注解说明:

  • 发送消息

@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确

  • 交换机正确、路由键不正确,无法发送到队列

  • 交换机不正确,无法发送到交换机

RabbitMQ服务故障

我们直接重启docker容器就会发现,我们明明没有向队列中发送消息但是队列的消息速率图仍然显示有消息到达。这是因为刚才我们做实验时放入队列中的消息在docker重启完毕之后就从硬盘里面被移动到了MQ中。实际上RabbitMQ默认是配置了持久化的。表现在交换机是默认持久化的、队列是默认持久化的、消息是默认持久化的。在一些配置交换机与队列的注解里面就会看见底层是默认了持久化的。

消费者故障
  • 配置文件

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认
  • 接收消息

@Component
@Slf4j
public class MyMessageListener {
​
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME  = "queue.order";
​
    // 修饰监听方法
    @RabbitListener(
            // 设置绑定关系
            bindings = @QueueBinding(
​
                // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
​
                // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
​
                // 配置路由键信息
                key = {ROUTING_KEY}
    ))
    public void processMessage(String dataString, Message message, Channel channel) throws IOException {
​
        // 1、获取当前消息的 deliveryTag 值备用
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
​
        try {
            // 2、正常业务操作
            log.info("消费端接收到消息内容:" + dataString);
            
            // System.out.println(10 / 0);
​
            // 3、给 RabbitMQ 服务器返回 ACK 确认信息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
​
            // 4、获取信息,看当前消息是否曾经被投递过
            Boolean redelivered = message.getMessageProperties().getRedelivered();
​
            if (!redelivered) {
                // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
                channel.basicNack(deliveryTag, false, true);
            } else {
                // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
                channel.basicReject(deliveryTag, false);
            }
​
        }
    }
​
}

ACK与NACK机制:ACK(成功)消费者成功处理消息后,显式或隐式地向RabbitMQ服务器发送确认信号,告知消息已被正确处理,服务器可以安全移除该消息。在没有手动的在配置文件中声明时就是默认返回ACK数据;NACK(失败)消费者明确告知服务器消息处理失败,服务器可根据策略重新投递或丢弃消息(如进入死信队列)。

  • ACK是消息处理成功的“绿灯”,确保可靠性。

  • NACK是处理失败的“黄灯”,提供容错和重试机制。

  • 两者结合使用时,需通过重试策略、死信队列和幂等性设计,构建高可靠的分布式消息系统。

API说明:

①basicAck()方法:

  • 作用:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了

  • 参数列表:

参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息

②basicNack()方法:

  • 作用:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值

  • 参数列表:

参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列

③basicReject()方法:

  • 作用:根据指定的deliveryTag,对该消息表示拒绝

  • 参数列表:

参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列

消费端限流

作用
  • 防止资源耗尽:避免消费者因处理速度跟不上消息到达速度,导致内存或线程资源耗尽。

  • 保护下游服务:防止突发流量压垮数据库等依赖服务。

  • 平衡负载:确保消费者处理能力与消息生产速率匹配。

实现

①yml文件配置,设置消费端的限流阈值为多少(prefetch参数)

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息
  • Prefetch Count(QoS设置)

    • 通过设置 prefetch count 参数,限制消费者未确认(unacknowledged)的最大消息数量。

    • 例如,若 prefetch=5,消费者最多同时处理5条消息,处理并确认后才会获取新消息。

    • 手动确认模式(Manual Acknowledgement)必须启用手动ACK(关闭自动确认),否则prefetch机制失效。

②消费与生产端的测试代码

//=====================生产端===============================
@Test  
public void testSendMessage() {
    for (int i = 0; i < 100; i++) {
        rabbitTemplate.convertAndSend(
                EXCHANGE_DIRECT,
                ROUTING_KEY,
                "Hello atguigu" + i);
    }
}

//=========================消费端===========================
// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
​
// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);
​
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);

③结果分析

在未配置消费端限流时,RabbitMQ的监视界面显示消息涌入100条瞬间就被在监视的消费端一扫而空,总消息数为0,但是在ACK的确认机制下Raabbit服务还在慢慢处理消息的确认信息并反馈给监视界面

在配置了消息限流之后,RabbitMQ的监视界面显示任然是消息涌入100条但是消息并非是一口气被消费端全部消费完成而是随着时间的推移慢慢的被消费取出,总消息数随着时间的变化而在变化,同样在ACK的机制下Rabbit服务还在慢慢处理消息的确认信息并反馈给监视界面

消息超时

RabbitMQ 的消息超时机制主要通过 TTL(Time-To-Live) 实现,允许为消息或队列设置存活时间,过期后消息会被自动删除或转移到死信队列。

TTL的两种设置方式

  • 队列级TTL
    • 定义为整个队列设置统一的过期时间,所有进入该队列的消息共享此 TTL(超时时间)。

    • 特点

      • 全局生效,优先级低于消息级 TTL。

      • 队列中消息的过期时间从入队时开始计算。

            // 设置队列参数:消息1分钟后过期
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 60000); // 单位:毫秒
            
            // 声明队列
            channel.queueDeclare("my_queue", true, false, false, args);
  • 消息级 TTL
    • 定义为单条消息设置独立的过期时间,优先级高于队列级 TTL。

    • 特点

      • 每条消息的 TTL 独立计算。

      • 过期时间从消息入队时开始计算(若消息被重新投递到其他队列,时间重新计算)。

    // 1、创建消息后置处理器对象  
    MessagePostProcessor messagePostProcessor = (Message message) -> {  
  
        // 设定 TTL 时间,以毫秒为单位
        message.getMessageProperties().setExpiration("5000");  
  
        return message;
    };
  
    // 2、发送消息  
    rabbitTemplate.convertAndSend(    
            EXCHANGE_DIRECT,     
            ROUTING_KEY,     
            "Hello atguigu", messagePostProcessor);   

消息超时的行为

  • 自动删除:当消息过期后,默认会直接从队列中删除。

  • 死信队列(Dead Letter Exchange): 若队列配置了死信交换机(DLX),过期消息会转移到 DLX 关联的队列,而非直接删除。

总结

  1. 支持队列级和消息级 TTL,后者优先级更高。

  2. 过期消息可自动删除或路由到死信队列。

  3. 实际应用中需注意过期检查机制的性能影响及潜在阻塞问题。 合理使用 TTL 可有效解决业务超时逻辑、资源释放等常见问题。

死信和死信队列

死信(Dead Letter)定义:在消息队列中无法被正常消费的消息称为死信(Dead Letter);通常产生的原因如下:

  1. 消息被拒绝(Rejected)且未设置重新入队(requeue=false

  2. 消息过期(TTL 超时)

  3. 队列达到最大长度(超过 x-max-length 限制时,头部或尾部消息被丢弃)

死信队列(Dead Letter Queue, DLQ)定义:专门用于接收死信的队列,需通过死信交换机(DLX) 路由。

核心作用

  • 收集异常消息,防止消息丢失

  • 提供消息审计和重试机制

  • 解耦主业务逻辑与错误处理逻辑

注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机

延迟队列

延迟队列(Delayed Queue)是一种特殊类型的消息队列,允许消息在指定的延迟时间后才被消费者获取和处理。RabbitMQ 本身不直接支持延迟队列

1. 基于 TTL + 死信队列的延迟队列(传统方案)

实现原理
  1. 消息设置 TTL:为消息或队列设置存活时间(TTL)。

  2. 绑定死信交换机:当消息过期后,通过死信交换机(DLX)路由到目标队列。

  3. 消费者监听目标队列:实现延迟消费效果。

2. 使用插件 rabbitmq-delayed-message-exchange(推荐方案)

RabbitMQ 官方提供的插件支持精确延迟消息投递消息在交换机层暂存,到期后路由到目标队列。

下载与启用插件:

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
​
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 退出Docker容器
exit
​
# 重启Docker容器
docker restart rabbitmq
//=====================生产端代码============================
@Test
public void testSendDelayMessage() {
    rabbitTemplate.convertAndSend(
            EXCHANGE_DELAY,
            ROUTING_KEY_DELAY,
            "测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",
            messageProcessor -> {
​
                // 设置延迟时间:以毫秒为单位(只有安装好插件之后才可以使参数生效)
                messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
​
                return messageProcessor;
            });
}

//====================消费端代码============================
@Component  
@Slf4j
public class MyDelayMessageListener {
    
    public static final String QUEUE_DELAY = "queue.delay.video";
    
    @RabbitListener(queues = {QUEUE_DELAY})
    public void process(String dataString, Message message, Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
​
}
插件特性
  • 精确延迟:消息按设定的延迟时间精确投递。

  • 灵活配置:每条消息可独立设置延迟时间。

  • 资源高效:消息存储在交换机层,不占用队列资源。

效果展示:

3. 两种方案对比

特性 TTL + 死信队列 插件方案
延迟精度 低(依赖队列处理速度) 高(精确到毫秒)
灵活性 需固定或逐条设置 TTL 每条消息独立设置延迟
资源占用 可能占用队列存储 交换机层存储,更高效
部署复杂度 无需额外插件 需安装插件
适用场景 短延迟(<1分钟)、简单场景 长延迟、高精度、复杂需求

事务消息

RabbitMQ 的事务消息机制用于确保一组消息操作的原子性——要么全部成功提交到 Broker,要么全部回滚

配置类

//========================事务队列的相关配置====================
@Configuration
@Data
public class RabbitConfig {
​
    @Bean//添加事务消息的组件
    public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
​
    @Bean//增强RabbitTemplate开启事务消息模式
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}
//====================消息生产端无事务版本==============================
@SpringBootTest
@Slf4j
public class RabbitMQTest {
​
    public static final String EXCHANGE_NAME = "exchange.tx.dragon";
    public static final String ROUTING_KEY = "routing.key.tx.dragon";
​
    @Resource
    private RabbitTemplate rabbitTemplate;
​
    @Test
    public void testSendMessageInTx() {
        // 1、发送第一条消息
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
​
        // 2、抛出异常
        log.info("do bad:" + 10 / 0);
​
        // 3、发送第二条消息
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
    }
​
}
​
//=======================消息生产端有事务版本===============================
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {
    // 1、发送第一条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
​
    // 2、发送第二条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

注意:请注意区分事务消息与消息可靠性投递这两者之间的关系。首先事务消息是不确保消息的可靠性投递的。他只限制相应的Java代码,在逻辑过程中不抛出异常就将消息发送到Broker上反之就会将消息回滚。

惰性队列

惰性队列(Lazy Queue)是 RabbitMQ 中一种特殊的队列类型,核心目标是通过优先将消息存储到磁盘而非内存,来优化系统资源使用,尤其适用于高吞吐、大消息量且允许一定延迟的场景。

特性 普通队列 惰性队列
消息存储优先级 内存优先(除非消息标记为持久化) 磁盘优先(无论消息是否持久化)
内存占用 高(消息积压时易引发 OOM) 低(消息直接写入磁盘)
消费延迟 低(内存中直接读取) 较高(需从磁盘加载到内存)
适用场景 实时性要求高、消息量小 消息量大、允许延迟、需避免内存耗尽

工作机制:

①消息到达队列时,直接持久化到磁盘(即使未显式标记为持久化消息),仅在需要传递给消费者时加载到内存。

默认仅缓存少量消息(如消费者预取值 prefetch 范围内的消息)。

消费者消费后,消息从磁盘删除。

优先级队列

优先级队列(Priority Queue)是 RabbitMQ 中一种特殊队列类型,允许消息按照优先级高低被消费,高优先级的消息会被优先处理。适用于需要差异化处理消息的场景(如 VIP 用户请求优先处理)。

特性 说明
优先级范围 支持 0-255 的优先级等级(实际受 Erlang 虚拟机限制,通常建议 0-10)
消费顺序 高优先级消息先被消费,同优先级按 FIFO 顺序处理
实现原理 队列内部维护多级子队列,按优先级排序消息
资源消耗 略高于普通队列(需维护优先级索引)

在创建队列的时候添加参数x-max-priority,用来设置消息参数的最大优先等级。在后面创建的消息在设置等级参数的时候不能超过这个参数值。

//=======================生产端产生消息并为它设置优先等级================================
@SpringBootTest
public class RabbitMQTest {
​
    public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
    public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
​
    @Resource
    private RabbitTemplate rabbitTemplate;
​
    @Test
    public void testSendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
            message.getMessageProperties().setPriority(1);
            return message;
        });
    }
​
}
​
//==========================消费端接收消息===============================
@Slf4j
@Component
public class MyMessageProcessor {
​
    public static final String QUEUE_PRIORITY = "queue.test.priority";
​
    @RabbitListener(queues = {QUEUE_PRIORITY})
    public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
        log.info(data);
​
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
​
}


网站公告

今日签到

点亮在社区的每一天
去签到