// 缓存已创建的绑定,避免重复声明
private final Map<String, Date> createdBindings = new ConcurrentHashMap<>();
public void createAndBindQueueToExchange(String type,String clinetId, String routingKey) {
String queueName = routingKey;
log.info("初始化类型:{}",type);
QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);
if(queueInformation == null) {
//队列不存在则创建
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", maxPriority); // 设置优先级范围
Queue queue = new Queue(queueName, true, false, false, args);
rabbitAdmin.declareQueue(queue);
log.info("创建队列: {} ", queueName);
}else{
log.info("队列已存在: {} ", queueName);
}
String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;
//还要判断监听容器是否存在
if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {
log.info("绑定已存在缓存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",
queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));
createdBindings.put(containerKey,new Date());
}else{
//
stopContainerListenerAndCleanCash(containerKey,"缓存 无Key(有无监听容器)或(有缓存Key无监听容器)");
// 2. 声明绑定到已存在的交换机
Binding binding = new Binding(
queueName,
Binding.DestinationType.QUEUE,
mainDirectExchange,
routingKey,
null
);
rabbitAdmin.declareBinding(binding);
// 添加到缓存
createdBindings.put(containerKey,new Date());
log.info("成功创建绑定 for queue: {} to exchange: {} with routing key: {}",
queueName, mainDirectExchange, routingKey);
}
// 3. 注册监听器
if (!registry.getListenerContainerIds().contains(containerKey)) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(containerKey);
endpoint.setQueueNames(queueName);
//endpoint.setAutoStartup(true);
// 使用手动ACK的消息监听器
endpoint.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//处理消息完成后,不退回为true
boolean okMessage = false;
String consumerTag = "";
Exception exception = null;
FlowInstanceNode flowInstanceNode = null;
try {
consumerTag = message.getMessageProperties().getConsumerTag();
String clientId = consumerTag.split(":")[3];
String messageBody = new String(message.getBody());
flowInstanceNode = processMessage(clientId,messageBody);
if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {
//消费正常
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
okMessage = true;
}
} catch (Exception e) {
log.error("指令包消息处理失败: {}", truncateString(new String(message.getBody())), e);
exception = e;
}
finally {
// 根据异常类型决定是否重新入队
// 网络异常的情况都应返回队列重新消费
boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);
log.info("指令执行包异常标志exceptionFlag: {}",exceptionFlag);
if (exceptionFlag) {
okMessage = false;
// 消息重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.warn("指令包消息处理失败,已重新入队: {}", truncateString(new String(message.getBody())));
} else {
// 消息丢弃(不重新入队)
if(!okMessage){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("指令包消息处理失败,已丢弃: {}", truncateString(new String(message.getBody())));
}
okMessage = true;
}
if (okMessage) {
// 消费消息序列ID写入缓存
Long sequence = flowInstanceNode.getSequence();
String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();
String sequenceKey = professionSoftwareCode+":message:finishedSequence";
cacheService.set(sequenceKey, sequence);
}
String exceptionType = flowInstanceNode.getExceptionType();
//网络异常
if(exceptionFlag){
//调用专业软件API网络异常
//停止监听
stopContainerListenerAndCleanCash(consumerTag,"网络异常");
//数据返回队列
}
//判断心跳是否超时
stopContainer(consumerTag,"每次检查心跳超时");
log.info("本次消费执行完毕");
}
}
});
registry.registerListenerContainer(endpoint, factory, false);
SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return containerKey;
}
});
simpleMessageListenerContainer.start();
// 设置监听器容器
log.info("设置监听器容器并启动监听 queue: {},containerKey :{}" , queueName,containerKey);
}else {
// 启动已存在的监听器
if (!registry.getListenerContainer(containerKey).isRunning()) {
//启动监听
registry.getListenerContainer(containerKey).start();
log.info("启动监听 queue: {},containerKey:{}" , queueName,containerKey);
} else {
log.info("监听已运行 queue: {},containerKey:{}" , queueName,containerKey);
}
}
}
动态销毁:
public void stopContainerListenerAndCleanCash(String containerKey,String tip) {
try {
log.info(tip+",清除缓存,停止监听容器:{}", containerKey);
if (registry.getListenerContainerIds().contains(containerKey)) {
registry.getListenerContainer(containerKey).stop();
registry.unregisterListenerContainer(containerKey);
}
// 删除队列
// rabbitAdmin.deleteQueue(queueName);
// 清理该队列相关的绑定缓存
createdBindings.remove(containerKey);
}catch (Exception e){
log.error("清除缓存,停止监听容器异常",e);
}
}
容器事件监听:
//容器异常
@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
@Autowired
RabbitMQService rabbitMQService;
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
Throwable t = event.getThrowable();
Object object = event.getSource();
if (object instanceof SimpleMessageListenerContainer){
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;
String queueName = container.getQueueNames()[0];
String listenerId = container.getListenerId();
rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器异常");
}
log.error("RabbitMQ监听容器异常", t);
// 这里可以判断异常类型,比如队列不存在、连接断开等
if (t instanceof ShutdownSignalException) {
// 处理队列被删除、服务失联等
}
}
}
//
@PostConstruct
public void init() {
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//factory.setReceiveTimeout();
//factory.setReceiveTimeout(1L);
factory.setContainerCustomizer(container -> {
// 设置消费者超时时间(需RabbitMQ服务端支持)
container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));
});
factory.setErrorHandler(t -> {
// 这里可以捕获到消息处理时的异常
log.error("RabbitMQ消息处理异常", t);
// 可以根据异常类型做不同处理
});
connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onClose(Connection connection) {
stopAllContainerListeners();
log.warn("RabbitMQ连接关闭");
}
@Override
public void onCreate(Connection connection) {
stopAllContainerListeners();
log.info("RabbitMQ连接创建");
}
@Override
public void onShutDown(ShutdownSignalException signal) {
stopAllContainerListeners();
log.error("RabbitMQ连接异常关闭", signal);
}
});
}