005 延时交换机

发布于:2024-04-28 ⋅ 阅读:(17) ⋅ 点赞:(0)


RabbitMQ中既有延时队列的概念,也有延时交换机的概念,但两者在实现机制上有所不同。以下是关于这两者的详细解释:

延时队列:

延时队列是RabbitMQ中的一种队列类型,其主要特点是消息在队列中会被延迟一段时间后再被消费。
延时队列的实现通常依赖于消息的TTL(Time To Live)和死信队列。当消息在队列中的存活时间超过了设定的TTL后,消息会变成死信,并被发送到预先配置好的死信队列中,从而实现消息的延时处理。
延时队列适用于需要在特定时间后处理消息的场景,如订单超时未支付自动取消等。
延时交换机:

延时交换机是RabbitMQ的一个扩展插件,它提供了一种新的消息交换类型,允许在发送消息时指定一个延迟时间。
当消息达到指定的延迟时间后,才会被放入队列供消费者消费。这种方式提供了更高的延时精度和灵活性。
使用延时交换机需要安装和配置相应的RabbitMQ插件。
综上所述,RabbitMQ中既支持延时队列也支持延时交换机,它们都可以实现消息的延迟处理,但具体实现机制和使用方式有所不同。在实际应用中,可以根据具体需求和场景选择合适的方式来实现消息的延时处理。

延时交换机插件的安装

下载地址:https://www.rabbitmq.com/community-plugins.html
把rabbitmq_delayed_message_exchange放到rabbitMQ server plugins中
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

PluginsDelayConfig


package com.example.delay;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class PluginsDelayConfig {


    //延时交换机
    @Bean
    public CustomExchange newDelayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, args);
    }

    //延时队列
    @Bean
    public Queue newDelayQueue() {

        return new Queue("delayed-queue", true);
    }

    //绑定
    @Bean
    public Binding bindingDelayedQueue() {
        return BindingBuilder.bind(newDelayQueue()).to(newDelayExchange()).with("key3").noargs();
    }
}



Producer.java


package com.example.delay;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


//消费顺序与发送顺序一致,即"消息1","消息3","消息2",很可能是因为这些消息都被发送到了同一个队列,并且该队列中的消息是按照它们到达的顺序进行排列的。RabbitMQ的队列通常遵循FIFO(先进先出)的原则,除非特别配置了其他策略,如优先级队列等。
//
//当调用/delayedMsg、/delayedMsg3、/delayedMsg2接口时,生产者会依次将消息发送到配置了延迟功能的交换机(假设已经安装并配置了RabbitMQ的延迟消息插件)。尽管每条消息都设置了不同的延迟时间,但这些延迟时间是在消息从交换机路由到队列之前应用的。一旦消息被路由到队列,它们就会按照到达队列的顺序排列。
//
//如果消费者是从这个队列中按照FIFO原则拉取消息,那么消费顺序就会与发送顺序一致,因为消息是按照它们被发送到队列的顺序被排列和消费的。
//
//简而言之,尽管为消息设置了延迟,但这些延迟是在消息被发送到队列之前应用的。一旦消息进入队列,它们就会按照到达的顺序被排列,并由消费者按照这个顺序进行消费。这就是为什么消费顺序与发送顺序一致的原因。



//setDelay 方法并不是用来设置TTL的,而是用来设置消息在交换机中的延迟时间,这是RabbitMQ的延迟消息插件提供的功能。这个延迟是指消息在被发送到队列之前,会在交换机中等待指定的时间。一旦延迟时间过去,消息才会被路由到相应的队列中。
//
//现在,关于消息发送顺序问题,实际上与消息的延迟时间无关。消息的发送顺序是由生产者发送消息的时间点决定的。当依次调用 /delayedMsg、/delayedMsg3、/delayedMsg2 接口时,生产者会按照这个顺序发送消息。因此,即使为每条消息设置了不同的延迟时间,消息的发送顺序仍然是 "消息1","消息3","消息2"。
//
//这里的关键是理解“发送顺序”和“消费顺序”的区别。发送顺序是生产者将消息发送到RabbitMQ的顺序,而消费顺序是消费者从队列中获取并处理消息的顺序。如果希望按照消息的延迟时间顺序来消费消息,那么需要在消费者端进行适当的处理,例如使用优先级队列或根据消息的延迟时间进行排序等。
//
//总结一下,虽然为每条消息设置了不同的延迟时间,但这并不影响消息的发送顺序。发送顺序仍然是由生产者发送消息的时间点决定的。而消息的延迟时间只是在消息被路由到队列之前,在交换机中的等待时间。因此,观察到的现象与RabbitMQ的延时队列TTL设置并不矛盾。


//如果/delayedMsg、/delayedMsg3和/delayedMsg2请求几乎同时到达交换机,并且分别为这些请求设置了不同的延迟时间,理论上期望的是消息会按照它们各自的延迟时间被路由到队列,进而被消费者按照延迟到期的顺序消费。然而,观察到的是"消息1","消息3","消息2"这样的顺序。
//
//这种情况可能是由于以下几个原因造成的:
//
//消息到达交换机的微小时间差:
//即使认为请求是几乎同时发出的,实际上在网络传输和RabbitMQ服务器处理过程中,仍然可能存在微小的时间差。这些微小的时间差可能导致消息到达交换机的顺序与您发送请求的顺序一致,因此交换机仍然会按照FIFO的原则处理这些消息,即使它们被设置了不同的延迟。
//
//延迟时间的精度:
//RabbitMQ的延迟消息插件在处理延迟时可能存在一定的精度误差。如果延迟时间相差不大,这些误差可能会影响消息的实际路由顺序。
//
//队列行为:
//一旦消息从交换机路由到队列,它们将遵循队列的FIFO原则。如果消息几乎同时到达队列(即在延迟时间结束后几乎同时被路由到队列),那么队列将按照它们到达的顺序来处理这些消息。
//
//系统负载和性能:
//在高负载情况下,RabbitMQ服务器的性能可能会影响消息的处理速度。这可能导致实际延迟时间与预期有所偏差。
//
//为了验证和调试这种情况,可以考虑以下步骤:
//
//增加日志记录:在生产者和消费者端记录消息发送和消费的时间戳,以便更准确地跟踪消息的生命周期。
//调整延迟时间:尝试设置更明显的延迟时间差,以观察消息是否按照预期的延迟顺序被消费。
//检查RabbitMQ配置:确保交换机和队列的配置正确,且延迟插件正常工作。
//性能监控:监控RabbitMQ服务器的性能指标,以确保系统资源不是瓶颈。
//最后,如果确实需要严格按照延迟时间的顺序处理消息,可能需要在消费者端实现额外的逻辑来确保这一点,例如通过检查消息中的时间戳或元数据来确定处理顺序。


@RestController
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    //模拟下订单
    @GetMapping("delayedMsg")
    public String sendMsg1( ){
        String msg1 = "消息1";
//        String msg2 = "消息2";
//        String msg3 = "消息3";


        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg1,message ->{
            message.getMessageProperties().setDelay(50000);
            return  message;
        });





//        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg3,message ->{
//            message.getMessageProperties().setDelay(10000);
//            return  message;
//        });
//
//        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg2,message ->{
//            message.getMessageProperties().setDelay(8000);
//            return  message;
//        });




        return "生产者发送消息成功";
    }






    @GetMapping("delayedMsg2")
    public String sendMsg2( ){

        String msg2 = "消息2";



        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg2,message ->{
            message.getMessageProperties().setDelay(80000);
            return  message;
        });


        return "生产者发送消息成功";
    }
//
//
//
//
//
//
//
//
//
//
    @GetMapping("delayedMsg3")
    public String sendMsg3( ){

        String msg3 = "消息3";

        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg3,message ->{
            message.getMessageProperties().setDelay(100000);
            return  message;
        });



        return "生产者发送消息成功";
    }




}


Consumer.java


package com.example.delay;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Consumer {

    @RabbitListener(queues = "delayed-queue")
    public void getMsg(Message message, Channel channel){
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

            System.out.println("消费者收到的消息是:" + message);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

application.yaml


server:
  servlet:
    context-path: /app
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了
    publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)
    listener:
      simple:
        acknowledge-mode: manual # 手动消息确认
        concurrency: 1 #消费者数量
        max-concurrency: 1  #消费者最大数量
        prefetch: 1  #消费者每次从队列中取几个消息


网站公告

今日签到

点亮在社区的每一天
去签到