RabbitMQ(八)【高级 - 过期时间 TTL】

发布于:2023-01-04 ⋅ 阅读:(389) ⋅ 点赞:(0)

八、RabbitMQ高级 - 过期时间 TTL


上一篇文章SpringBoot案例

概述

过期时间 TTL 表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对 消息和队列 设置 TTL,目前有两种方法可以设置

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
  • 第二种方法是对消息进行单独设置,每条消息 TTL 可以不同

如果上述两种方式同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息队列的生存时间一旦超过设置的 TLL 值,就称为 dead message 被投递到死信队列,消费者将无法再收到该消息

8.1 队列属性设置过期时间

队列过期时间 - 代码测试

  1. springboot-order-rabbitmq-producer工程下的config包下,新建TTLRabbitMQConfiguration.java
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册direct模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange", true, false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        return new Queue("account.ttl.queue", true, false, false, args);
    } 
    // 3.完成绑定关系(队列)
    @Bean
    public Binding ttl_accountBinding(){
        return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
    }

}
  1. service包下的OrderServiceImpl.java新增 ttl 函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttl";

        /*
         *  #.account.#
         *  *.express.#
         *  sms.#
         */
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }
}

在这里插入图片描述

  1. 测试用例
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    // 选择该测试用例
    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }
}
  1. 查看web界面中的Queues队列

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

8.2 队列里的消息设置过期时间

队列消息过期时间 - 代码测试

  1. springboot-order-rabbitmq-producer工程下的config包下,在TTLRabbitMQConfiguration.java类添加以下内容
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange", true, false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        return new Queue("account.ttl.queue", true, false, false, args);
    }

    @Bean
    public Queue ttlMessage_accountQueue(){
        return new Queue("account.ttl.message.queue", true, false, false);
    }

    @Bean
    public Binding ttl_accountBinding(){
		// ...
    }

	// 绑定操作
    @Bean
    public Binding ttlMessage_accountBinding(){
        return BindingBuilder.bind(ttlMessage_accountQueue()).to(ttl_directExchange()).with("ttlmessage");
    }

}
  1. service包下的OrderServiceImpl.java新增 ttlMessage 函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        //......
    }

    // ttlmessage
    public void createOrderTtlMessage(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttlmessage";

        // 给消息设置过期时间
        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 参数类型是字符串
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, postProcessor);
    }
}
  1. 测试用例
package com.vinjcent.rabbitmq;

import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }

    // 使用该测试用例
    @Test
    void testTtlMessage() {
        orderService.createOrderTtlMessage("1","1",12);
    }
}

在这里插入图片描述

在这里插入图片描述

两者的区别在于,过期的消息队列,不会立即删除掉队列里的消息,而是将该队列中的消息放在死信队列中;而过期消息的消息队列则会立马删除掉该指定的消息

当两者同时存在时,谁的有效期时间短,就以该较短的时间作为标准

8.3 死信队列

概述

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为私信邮箱。当消息在一个队列中变成了死信(Dead Message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列

消息变成死信,有如下原因

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另外一个队列,即死信队列

想要使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可

在这里插入图片描述

  1. springboot-order-rabbitmq-producer工程下的config包下,添加DeadRabbitMQConfiguration.java类(定义死信交换机和死信队列
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DeadRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("ttl_dead_exchange", true, false);
    }

    // 2.声明队列
    @Bean
    public Queue deadQueue(){
        return new Queue("direct.dead.queue", true);
    }

    // 3.完成绑定关系(队列)
    // 路由
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }

}
  1. 在该工程下修改service包下的TTLRabbitMQConfiguration.java类(为该队列和交换机绑定死信参数

package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange",true,false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        // 配置死信交换机
        args.put("x-dead-letter-exchange","ttl_dead_exchange");
        // 配置死信队列的路由key
        args.put("x-dead-letter-routing-key","dead");   // direct模式需要配置路由key,而fanout模式不需要
        return new Queue("account.ttl.queue",true,false,false,args);
    }

    // 3.完成绑定关系(队列)
    // direct模式比fanout模式多了一个路由key
    @Bean
    public Binding ttl_accountBinding(){
        return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
    }


}


  1. 在该工程下修改service包下的OrderServiceImpl.java类下的createOrderTtl函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttl";

        /*
         *  #.account.#
         *  *.express.#
         *  sms.#
         */
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }

    // ttlmessage
    public void createOrderTtlMessage(String userId, String productId,int num){
        //......
    }




}

配置的死信队列参数可在web界面查看

在这里插入图片描述

在这里插入图片描述

  1. 运行测试用例
package com.vinjcent.rabbitmq;

import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    // 使用该测试用例
    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }

    @Test
    void testTtlMessage() {
        orderService.createOrderTtlMessage("1","1",12);
    }
}

在这里插入图片描述

报错原因:由于已创建的队列,在原来的基础上进行代码的修改,这个队列不会被更改或者覆盖,只能通过删除之后重新运行

在这里插入图片描述

在实际开发中,并不推荐直接从线上直接删除,这样做的风险比较大。可以通过重新创建一个队列,将要删除的队列的死信队列与之绑定,将死信队列里的信息用生产者发送到新的队列当中,实现转换/迁移的过程

在这里插入图片描述

在这里插入图片描述

对于参数的设置,可以在web界面查看异常队列规则,从而限制队列消息的推入
在这里插入图片描述

下一篇文章内存磁盘的监控

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到