插件地址:
https://github.com/rabbitmq/rabbitmq-lvc-exchange/releases
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?page=1
可根据上述地址找到自己安装的Rabbitmq对应的版本
安装插件:
将下载的 rabbitmq_delayed_message_exchange-3.8.0.ez 文件放到 RabbitMQ 的插件目录中(通常是 /usr/lib/rabbitmq/lib/rabbitmq_server-/plugins/)
然后使用 RabbitMQ 的命令行工具启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
若出现,说明安装成功:
[root@MiWiFi-RA72-srv plugins]# sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@MiWiFi-RA72-srv:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@MiWiFi-RA72-srv...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 application.yml 或 application.properties 中配置 RabbitMQ 连接:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
//RabbitMq公共配置类
public class RabbitMqConfig {
//延时交换机
public static final String DIRECT_DELAYED_EXCHANGE = "direct_delayed_exchange";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
public static final String DIRECT_DELAYED_QUEUE = "direct_delayed_queue";
}
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {
@Resource
private RabbitAdmin rabbitAdmin;
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//只有设置为true,spring才会加载RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
//--------------------------------延迟交换机与队列-------------------------------------
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange(RabbitMqConfig.DIRECT_DELAYED_EXCHANGE )
.delayed() //设置delay的属性为true
.durable(true) //持久化
.build();
}
@Bean
public Queue delayQueue() {
return new Queue(RabbitMqConfig.DIRECT_DELAYED_QUEUE , true, false, false);
}
@Bean
public Binding bindDelayQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConfig.DELAYED_ROUTING_KEY );
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//创建交换机
rabbitAdmin.declareExchange(delayExchange());
rabbitAdmin.declareQueue(delayQueue());
return null;
}
}
生产者发送延迟消息:
@Component
public class DelayedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message) {
//发送消息
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_DELAYED_EXCHANGE , RabbitMqConfig.DELAYED_ROUTING_KEY,
message, m-> {
m.getMessageProperties().setDelay(10000);//延时10s
return m;
});
}
}
消费者接收延迟消息:
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = RabbitMqConfig.DIRECT_DELAYED_QUEUE)
public void receiveDelayedMessage(String message) {
System.out.println("Received delayed message: " + message);
}
}