RabbitMQ使用延迟插件实现延迟消息

发布于:2025-07-06 ⋅ 阅读:(15) ⋅ 点赞:(0)

插件地址:

https://github.com/rabbitmq/rabbitmq-lvc-exchange/releases
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?page=1

可根据上述地址找到自己安装的Rabbitmq对应的版本
安装插件​​:
将下载的 rabbitmq_delayed_message_exchange-3.8.0.ez 文件放到 RabbitMQ 的插件目录中(通常是 /usr/lib/rabbitmq/lib/rabbitmq_server-/plugins/)
然后使用 RabbitMQ 的命令行工具启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

若出现,说明安装成功:
[root@MiWiFi-RA72-srv plugins]# sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@MiWiFi-RA72-srv:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@MiWiFi-RA72-srv...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

添加依赖​:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在 application.yml 或 application.properties 中配置 RabbitMQ 连接:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
//RabbitMq公共配置类
public class RabbitMqConfig {
    //延时交换机
    public static final String DIRECT_DELAYED_EXCHANGE = "direct_delayed_exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
    public static final String DIRECT_DELAYED_QUEUE = "direct_delayed_queue";
}
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {

    @Resource
    private RabbitAdmin rabbitAdmin;


    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //只有设置为true,spring才会加载RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    //--------------------------------延迟交换机与队列-------------------------------------
    @Bean
    public DirectExchange delayExchange() {
        return ExchangeBuilder.directExchange(RabbitMqConfig.DIRECT_DELAYED_EXCHANGE )
                .delayed()          //设置delay的属性为true
                .durable(true)      //持久化
                .build();
    }

    @Bean
    public Queue delayQueue() {
        return new Queue(RabbitMqConfig.DIRECT_DELAYED_QUEUE , true, false, false);
    }

    @Bean
    public Binding bindDelayQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConfig.DELAYED_ROUTING_KEY );
    }



    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //创建交换机
        rabbitAdmin.declareExchange(delayExchange());
        rabbitAdmin.declareQueue(delayQueue());
        return null;
    }

}

生产者发送延迟消息:

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayedMessage(String message) {
        //发送消息
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_DELAYED_EXCHANGE , RabbitMqConfig.DELAYED_ROUTING_KEY,
                    message, m-> {
                        m.getMessageProperties().setDelay(10000);//延时10s
                        return m;
                    });
    }
}

消费者接收延迟消息​:

@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = RabbitMqConfig.DIRECT_DELAYED_QUEUE)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}