一、前言
本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,对比上一篇文章,进一步集成RabbitMQ动态操作,比如动态新增 RabbitMQ 实例,以及动态实例中的交换机、队列等操作。
二、动态RabbitMQ实例,创建、删除
1、RabbitMQ动态实例配置
DynamicRabbitMQConfig.java
mport com.chain.air.rpp.exchange.properties.RabbitInstance;
import com.chain.air.rpp.exchange.properties.RabbitProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class DynamicRabbitMQConfig {
private final RabbitProperties rabbitProperties;
private Map<String, RabbitInstance> rabbitInstanceMap = new HashMap<>();
private Map<String, RabbitTemplate> rabbitTemplateMap = new HashMap<>();
private Map<String, RabbitAdmin> rabbitAdminMap = new HashMap<>();
private Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();
@Autowired
public DynamicRabbitMQConfig(RabbitProperties rabbitProperties) {
this.rabbitProperties = rabbitProperties;
}
@PostConstruct
public void init() {
rabbitProperties.getInstances().forEach(this::createRabbitInstance);
}
public void createRabbitInstance(RabbitInstance instance) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(instance.getHost());
connectionFactory.setPort(instance.getPort());
connectionFactory.setUsername(instance.getUsername());
connectionFactory.setPassword(instance.getPassword());
connectionFactory.setVirtualHost(instance.getVirtualHost());
connectionFactoryMap.put(instance.getName(), connectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
rabbitTemplateMap.put(instance.getName(), rabbitTemplate);
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
rabbitAdminMap.put(instance.getName(), rabbitAdmin);
rabbitInstanceMap.put(instance.getName(), instance);
}
public RabbitTemplate getRabbitTemplate(String name) {
return rabbitTemplateMap.get(name);
}
public RabbitAdmin getRabbitAdmin(String name) {
return rabbitAdminMap.get(name);
}
public ConnectionFactory getConnectionFactory(String name) {
return connectionFactoryMap.get(name);
}
public Map<String, Object> getRabbitInstance(String name) {
Map<String, Object> result = new HashMap<>();
result.put("instance", rabbitInstanceMap.get(name));
result.put("rabbitAdmin", rabbitAdminMap.get(name));
result.put("rabbitTemplate", rabbitTemplateMap.get(name));
result.put("connectionFactory", connectionFactoryMap.get(name));
return result;
}
public Boolean checkInstanceExist(String name) {
return rabbitInstanceMap.containsKey(name);
}
public List<String> getDynamicInstanceNames() {
return new ArrayList<>(rabbitInstanceMap.keySet());
}
public void removeRabbitInstance(String name) {
rabbitInstanceMap.remove(name);
rabbitAdminMap.remove(name);
rabbitTemplateMap.remove(name);
connectionFactoryMap.remove(name);
}
}
2、RabbitMQ动态实例操作Service组件
RabbitDynamicInstanceService.java
import com.chain.air.rpp.exchange.config.rabbit.DynamicRabbitMQConfig;
import com.chain.air.rpp.exchange.properties.RabbitInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class RabbitDynamicInstanceService {
private final DynamicRabbitMQConfig dynamicRabbitMQConfig;
@Autowired
public RabbitDynamicInstanceService(DynamicRabbitMQConfig dynamicRabbitMQConfig) {
this.dynamicRabbitMQConfig = dynamicRabbitMQConfig;
}
public Boolean createRabbitInstance(RabbitInstance rabbitInstance) {
try {
boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(rabbitInstance.getName());
if (instanceExist) {
log.warn("实例【{}】已存在,无需重复创建", rabbitInstance.getName());
return true;
}
dynamicRabbitMQConfig.createRabbitInstance(rabbitInstance);
return true;
} catch (Exception e) {
log.error("创建RabbitMQ实例失败,失败原因:{}", e.getMessage());
return false;
}
}
public Boolean removeRabbitInstance(String name) {
try {
boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);
if (!instanceExist) {
log.warn("实例【{}】不存在,无需删除", name);
return true;
}
dynamicRabbitMQConfig.removeRabbitInstance(name);
return true;
} catch (Exception e) {
log.error("删除RabbitMQ实例失败,失败原因:{}", e.getMessage());
return false;
}
}
public List<String> getDynamicInstanceNames() {
return dynamicRabbitMQConfig.getDynamicInstanceNames();
}
public Boolean appointRabbitWithExchangeSendMessage(String name, String exchange, String routingKey, String message) {
try {
Boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);
if (!instanceExist) {
log.warn("实例【{}】不存在,无法发送消息", name);
return false;
}
dynamicRabbitMQConfig.sendMessage(name, exchange, routingKey, message);
return true;
} catch (Exception e) {
log.error("指定RabbitMQ实例发送消息失败,失败原因:{}", e.getMessage());
return false;
}
}
public Boolean appointRabbitWithQueueSendMessage(String name, String queue, String message) {
try {
Boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);
if (!instanceExist) {
log.warn("实例【{}】不存在,无法发送消息", name);
return false;
}
dynamicRabbitMQConfig.sendMessage(name, queue, message);
return true;
} catch (Exception e) {
log.error("指定RabbitMQ实例发送消息失败,失败原因:{}", e.getMessage());
return false;
}
}
}
二、动态RabbitMQ实例中的exchange、queue动态新增及监听
1、DynamicRabbitMQConfig.java 新增操作代码
import com.chain.air.rpp.exchange.properties.RabbitInstance;
import com.chain.air.rpp.exchange.properties.RabbitProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class DynamicRabbitMQConfig {
private final RabbitProperties rabbitProperties;
private Map<String, RabbitInstance> rabbitInstanceMap = new HashMap<>();
private Map<String, RabbitTemplate> rabbitTemplateMap = new HashMap<>();
private Map<String, RabbitAdmin> rabbitAdminMap = new HashMap<>();
private Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();
@Autowired
public DynamicRabbitMQConfig(RabbitProperties rabbitProperties) {
this.rabbitProperties = rabbitProperties;
}
@PostConstruct
public void init() {
rabbitProperties.getInstances().forEach(this::createRabbitInstance);
}
public void createRabbitInstance(RabbitInstance instance) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(instance.getHost());
connectionFactory.setPort(instance.getPort());
connectionFactory.setUsername(instance.getUsername());
connectionFactory.setPassword(instance.getPassword());
connectionFactory.setVirtualHost(instance.getVirtualHost());
connectionFactoryMap.put(instance.getName(), connectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
rabbitTemplateMap.put(instance.getName(), rabbitTemplate);
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
rabbitAdminMap.put(instance.getName(), rabbitAdmin);
rabbitInstanceMap.put(instance.getName(), instance);
}
public RabbitTemplate getRabbitTemplate(String name) {
return rabbitTemplateMap.get(name);
}
public RabbitAdmin getRabbitAdmin(String name) {
return rabbitAdminMap.get(name);
}
public ConnectionFactory getConnectionFactory(String name) {
return connectionFactoryMap.get(name);
}
public Map<String, Object> getRabbitInstance(String name) {
Map<String, Object> result = new HashMap<>();
result.put("instance", rabbitInstanceMap.get(name));
result.put("rabbitAdmin", rabbitAdminMap.get(name));
result.put("rabbitTemplate", rabbitTemplateMap.get(name));
result.put("connectionFactory", connectionFactoryMap.get(name));
return result;
}
public Boolean checkInstanceExist(String name) {
return rabbitInstanceMap.containsKey(name);
}
public List<String> getDynamicInstanceNames() {
return new ArrayList<>(rabbitInstanceMap.keySet());
}
public void removeRabbitInstance(String name) {
rabbitInstanceMap.remove(name);
rabbitAdminMap.remove(name);
rabbitTemplateMap.remove(name);
connectionFactoryMap.remove(name);
}
public void sendMessage(String name, String exchange, String routingKey, String message) {
try {
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(name);
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} catch (Exception e) {
log.error("RabbitMQ实例对象:{},发送消息失败,失败原因:{}", name, e.getMessage());
log.error("异常堆栈信息:{}", e);
}
}
public void sendMessage(String name, String queue, String message) {
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(name);
rabbitTemplate.convertAndSend(queue, message);
}
public void createDirectExchange(String name, String exchangeName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
DirectExchange directExchange = new DirectExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(directExchange);
log.info("RabbitMQ实例对象:{},Direct 交换机创建成功: {}", name, exchangeName);
}
public void createFountExchange(String name, String exchangeName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(fanoutExchange);
log.info("RabbitMQ实例对象:{},Fanout 交换机创建成功: {}", name, exchangeName);
}
public void createTopicExchange(String name, String exchangeName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
TopicExchange topicExchange = new TopicExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(topicExchange);
log.info("RabbitMQ实例对象:{},Topic 交换机创建成功: {}", name, exchangeName);
}
public void createHeadersExchange(String name, String exchangeName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(headersExchange);
log.info("RabbitMQ实例对象:{},Headers 交换机创建成功: {}", name, exchangeName);
}
public void createQueue(String name, String queueName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
Queue queue = new Queue(queueName, true, false, false);
rabbitAdmin.declareQueue(queue);
log.info("RabbitMQ实例对象:{},队列创建成功: {}", name, queueName);
}
public void deleteQueue(String name, String queueName) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
rabbitAdmin.deleteQueue(queueName);
log.info("RabbitMQ实例对象:{},队列删除成功: {}", name, queueName);
}
public void createQueue(String name, String queueName, Boolean isListener) {
RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);
ConnectionFactory connectionFactory = connectionFactoryMap.get(name);
Queue queue = new Queue(queueName, true, false, false);
rabbitAdmin.declareQueue(queue);
log.info("RabbitMQ实例对象:{},队列创建成功: {}", name, queueName);
if (isListener) {
createListener(connectionFactory, queueName);
}
}
public void createListener(ConnectionFactory connectionFactory, String queueName) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MessageListenerAdapter(new Object() {
public void handleMessage(String message) {
System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);
}
}));
container.start();
System.out.println("RabbitMQ队列监听器已启动:" + queueName);
}
}