一、前言
本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,比如动态新增 RabbitMQ 交换机、队列等操作。
二、默认RabbitMQ中的exchange、queue动态新增及监听
1、新增RabbitMQ配置
RabbitMQConfig.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate();
}
@Bean
public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
return new RabbitAdmin(rabbitTemplate);
}
}
2、新增RabbitMQ动态操作组件
RabbitDynamicConfigService.java
RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除,Queue的创建和删除、绑定Exchange
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
public class RabbitDynamicConfigService {
private final RabbitAdmin rabbitAdmin;
private final RabbitListenerService rabbitListenerService;
@Autowired
public RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,
RabbitListenerService rabbitListenerService) {
this.rabbitAdmin = rabbitAdmin;
this.rabbitListenerService = rabbitListenerService;
}
public void createQueue(String queueName) {
Queue queue = new Queue(queueName, true);
rabbitAdmin.declareQueue(queue);
System.out.println("队列创建成功: " + queueName);
}
public void createQueue(String queueName, Boolean isListener) {
Queue queue = new Queue(queueName, true);
rabbitAdmin.declareQueue(queue);
System.out.println("队列创建成功: " + queueName);
if (!isListener) {
return;
}
rabbitListenerService.createListener(queueName);
}
public void createExchange(String exchangeName) {
DirectExchange exchange = new DirectExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(exchange);
log.info("交换机创建成功: {}", exchangeName);
}
public void createDirectExchange(String exchangeName) {
DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(fanoutExchange);
log.info("Direct 交换机创建成功: {}", exchangeName);
}
public void createFanoutExchange(String exchangeName) {
FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(fanoutExchange);
log.info("Fanout 交换机创建成功: {}", exchangeName);
}
public void createTopicExchange(String exchangeName) {
TopicExchange topicExchange = new TopicExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(topicExchange);
log.info("Topic 交换机创建成功: {}", exchangeName);
}
public void createHeadersExchange(String exchangeName) {
HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(headersExchange);
log.info("Headers 交换机创建成功: {}", exchangeName);
}
public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName);
DirectExchange exchange = new DirectExchange(exchangeName);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
}
public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, Map<String, Object> headers) {
switch (exchangeType) {
case "fanout" -> bindQueueToExchange(queueName, exchangeName, routingKey);
case "direct" -> bindQueueToDirectExchange(queueName, exchangeName, routingKey);
case "topic" -> bindQueueToTopicExchange(queueName, exchangeName, routingKey);
case "headers" -> bindQueueToHeadersExchange(queueName, exchangeName, headers);
default -> throw new IllegalArgumentException("不支持的交换机类型: " + exchangeType);
}
}
public void bindQueueToFanoutExchange(String queueName, String exchangeName) {
Queue queue = new Queue(queueName);
FanoutExchange exchange = new FanoutExchange(exchangeName);
Binding binding = BindingBuilder.bind(queue).to(exchange);
rabbitAdmin.declareBinding(binding);
log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName);
}
public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName);
DirectExchange exchange = new DirectExchange(exchangeName);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
}
public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName);
TopicExchange exchange = new TopicExchange(exchangeName);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
}
public void bindQueueToHeadersExchange(String queueName, String exchangeName, Map<String, Object> headers) {
Queue queue = new Queue(queueName);
HeadersExchange exchange = new HeadersExchange(exchangeName);
Binding binding = BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();
rabbitAdmin.declareBinding(binding);
log.info("队列 {}", queueName + " 已绑定到 Headers 交换机 {}", exchangeName + ",使用头部匹配规则: {}", headers);
}
public void deleteQueue(String queueName) {
rabbitAdmin.deleteQueue(queueName);
log.info("队列删除成功: {}", queueName);
}
public void deleteExchange(String exchangeName) {
rabbitAdmin.deleteExchange(exchangeName);
log.info("交换机删除成功: {}", exchangeName);
}
}
3、RabbitMQ中队列的动态监听
RabbitListenerService.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RabbitListenerService {
private final SimpleRabbitListenerContainerFactory listenerContainerFactory;
private final ConnectionFactory connectionFactory;
@Autowired
public RabbitListenerService(
SimpleRabbitListenerContainerFactory listenerContainerFactory,
ConnectionFactory connectionFactory) {
this.listenerContainerFactory = listenerContainerFactory;
this.connectionFactory = connectionFactory;
}
public void createListener(String queueName) {
SimpleMessageListenerContainer container = listenerContainerFactory.createListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MessageListenerAdapter(new Object() {
public void handleMessage(String message) {
System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);
}
}));
container.start();
System.out.println("RabbitMQ队列监听器已启动:" + queueName);
}
}
4、RabbitMQ中的Exchange、Queue动态操作接口
RabbitDynamicChannelController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/rabbit/dynamic/channel")
public class RabbitDynamicChannelController {
@Resource
private RabbitDynamicConfigService rabbitDynamicConfigService;
@GetMapping("/createQueue")
public String createQueue(@RequestParam("queueName") String queueName) {
rabbitDynamicConfigService.createQueue(queueName);
return "队列已创建: " + queueName;
}
@GetMapping("/createExchange")
public String createExchange(@RequestParam("exchangeName") String exchangeName) {
rabbitDynamicConfigService.createExchange(exchangeName);
return "交换机已创建: " + exchangeName;
}
@GetMapping("/bindQueue")
public String bindQueueToExchange(@RequestParam("queueName") String queueName,
@RequestParam("exchangeName") String exchangeName,
@RequestParam("routingKey") String routingKey) {
rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);
return "队列和交换机已绑定: " + queueName + " -> " + exchangeName;
}
@GetMapping("/deleteQueue")
public String deleteQueue(@RequestParam("queueName") String queueName) {
rabbitDynamicConfigService.deleteQueue(queueName);
return "队列已删除: " + queueName;
}
@GetMapping("/deleteExchange")
public String deleteExchange(@RequestParam("exchangeName") String exchangeName) {
rabbitDynamicConfigService.deleteExchange(exchangeName);
return "交换机已删除: " + exchangeName;
}
@GetMapping("/createDirectExchange")
public String createDirectExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
rabbitDynamicConfigService.createDirectExchange(exchangeName);
rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);
return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
}
@GetMapping("/createFanoutExchange")
public String createFanoutExchange(@RequestParam String exchangeName, @RequestParam String queueName) {
rabbitDynamicConfigService.createFanoutExchange(exchangeName);
rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);
return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName;
}
@GetMapping("/createTopicExchange")
public String createTopicExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
rabbitDynamicConfigService.createTopicExchange(exchangeName);
rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);
return "Topic Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
}
@GetMapping("/createHeadersExchange")
public String createHeadersExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam Map<String, String> headersMap) {
Map<String, Object> headers = new HashMap<>(headersMap);
rabbitDynamicConfigService.createHeadersExchange(exchangeName);
rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);
return "Headers Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with headers: " + headers;
}
}
5、RabbitMQ中的Queue消息监听动态操作接口
RabbitChannelListenerController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbit/channel/listener")
public class RabbitChannelListenerController {
@Resource
private RabbitDynamicConfigService rabbitDynamicConfigService;
@GetMapping("/queue")
public String listenQueue(@RequestParam("queueName") String queueName) {
rabbitDynamicConfigService.createQueue(queueName, true);
return "开始监听队列:" + queueName;
}
}
三、动态exchange、queue的测试
1、测试Exchange、Queue的动态创建和删除
2、测试Exchange和Queue的动态绑定
3、发送、接收消息测试动态创建Exchange、Queue
4、测试Queue的动态监听接口
下一篇:7、Spring Boot 3.x集成RabbitMQ动态实例等操作