rabbitmq动态创建交换机、队列、动态绑定,销毁

发布于:2025-07-02 ⋅ 阅读:(49) ⋅ 点赞:(0)
 // 缓存已创建的绑定,避免重复声明
    private final Map<String, Date> createdBindings = new ConcurrentHashMap<>(); 
public void createAndBindQueueToExchange(String type,String clinetId,  String routingKey) {
        String   queueName = routingKey;
        log.info("初始化类型:{}",type);
        QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);
        if(queueInformation == null) {
            //队列不存在则创建
            Map<String, Object> args = new HashMap<>();
            args.put("x-max-priority", maxPriority); // 设置优先级范围
            Queue queue = new Queue(queueName, true, false, false, args);
            rabbitAdmin.declareQueue(queue);
            log.info("创建队列: {} ", queueName);
        }else{
            log.info("队列已存在: {} ", queueName);
        }
        String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;
        //还要判断监听容器是否存在
        if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {
            log.info("绑定已存在缓存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",
                    queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));
            createdBindings.put(containerKey,new Date());
        }else{
            //
            stopContainerListenerAndCleanCash(containerKey,"缓存 无Key(有无监听容器)或(有缓存Key无监听容器)");
            // 2. 声明绑定到已存在的交换机
            Binding binding = new Binding(
                    queueName,
                    Binding.DestinationType.QUEUE,
                    mainDirectExchange,
                    routingKey,
                    null
            );
            rabbitAdmin.declareBinding(binding);
            // 添加到缓存
            createdBindings.put(containerKey,new Date());
            log.info("成功创建绑定 for queue: {} to exchange: {} with routing key: {}",
                    queueName, mainDirectExchange, routingKey);
        }

        // 3. 注册监听器
        if (!registry.getListenerContainerIds().contains(containerKey)) {
            SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
            endpoint.setId(containerKey);
            endpoint.setQueueNames(queueName);

            //endpoint.setAutoStartup(true);
            // 使用手动ACK的消息监听器
            endpoint.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    //处理消息完成后,不退回为true
                    boolean okMessage = false;
                    String consumerTag = "";
                    Exception exception = null;
                    FlowInstanceNode flowInstanceNode = null;
                    try {
                        consumerTag = message.getMessageProperties().getConsumerTag();
                        String clientId = consumerTag.split(":")[3];
                        String messageBody = new String(message.getBody());
                        flowInstanceNode = processMessage(clientId,messageBody);
                        if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {
                            //消费正常
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            okMessage = true;
                        }

                    } catch (Exception e) {
                        log.error("指令包消息处理失败: {}", truncateString(new String(message.getBody())), e);
                        exception = e;
                    }
                    finally {

                        // 根据异常类型决定是否重新入队
                        // 网络异常的情况都应返回队列重新消费
                        boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);
                        log.info("指令执行包异常标志exceptionFlag: {}",exceptionFlag);
                        if (exceptionFlag) {
                            okMessage = false;
                            // 消息重新入队
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                            log.warn("指令包消息处理失败,已重新入队: {}", truncateString(new String(message.getBody())));
                        } else {

                            // 消息丢弃(不重新入队)
                            if(!okMessage){
                                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                                log.error("指令包消息处理失败,已丢弃: {}", truncateString(new String(message.getBody())));
                            }

                            okMessage = true;
                        }
                        if (okMessage) {
                            // 消费消息序列ID写入缓存
                            Long sequence = flowInstanceNode.getSequence();
                            String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();
                            String sequenceKey = professionSoftwareCode+":message:finishedSequence";
                            cacheService.set(sequenceKey, sequence);
                        }
                        String exceptionType =  flowInstanceNode.getExceptionType();
                        //网络异常
                        if(exceptionFlag){
                            //调用专业软件API网络异常
                            //停止监听
                            stopContainerListenerAndCleanCash(consumerTag,"网络异常");
                            //数据返回队列
                        }
                        //判断心跳是否超时
                        stopContainer(consumerTag,"每次检查心跳超时");
                        log.info("本次消费执行完毕");
                    }

                }
            });
            registry.registerListenerContainer(endpoint, factory, false);
            SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);
            simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return containerKey;
                }
            });
            simpleMessageListenerContainer.start();
            // 设置监听器容器

            log.info("设置监听器容器并启动监听 queue: {},containerKey :{}" , queueName,containerKey);
        }else {
            // 启动已存在的监听器
            if (!registry.getListenerContainer(containerKey).isRunning()) {
                //启动监听
                registry.getListenerContainer(containerKey).start();
                log.info("启动监听  queue: {},containerKey:{}" , queueName,containerKey);
            } else {
                log.info("监听已运行 queue: {},containerKey:{}" , queueName,containerKey);
            }
        }
    }

动态销毁:

   public void stopContainerListenerAndCleanCash(String containerKey,String tip) {
        try {
            log.info(tip+",清除缓存,停止监听容器:{}", containerKey);
            if (registry.getListenerContainerIds().contains(containerKey)) {
                registry.getListenerContainer(containerKey).stop();
                registry.unregisterListenerContainer(containerKey);
            }
            // 删除队列
            // rabbitAdmin.deleteQueue(queueName);
            // 清理该队列相关的绑定缓存
            createdBindings.remove(containerKey);
        }catch (Exception e){
            log.error("清除缓存,停止监听容器异常",e);
        }
    }

容器事件监听:

//容器异常

@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
    @Autowired
    RabbitMQService rabbitMQService;
    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        Throwable t = event.getThrowable();
        Object object = event.getSource();
        if (object instanceof SimpleMessageListenerContainer){
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;
            String queueName = container.getQueueNames()[0];
            String listenerId = container.getListenerId();
            rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器异常");
        }
        log.error("RabbitMQ监听容器异常", t);
        // 这里可以判断异常类型,比如队列不存在、连接断开等
        if (t instanceof ShutdownSignalException) {
            // 处理队列被删除、服务失联等
        }
    }
}

//

    @PostConstruct
    public void init() {
        //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //factory.setReceiveTimeout();
        //factory.setReceiveTimeout(1L);
        factory.setContainerCustomizer(container -> {
            // 设置消费者超时时间(需RabbitMQ服务端支持)
            container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));
        });
        factory.setErrorHandler(t -> {
            // 这里可以捕获到消息处理时的异常
            log.error("RabbitMQ消息处理异常", t);
            // 可以根据异常类型做不同处理
        });
        connectionFactory.addConnectionListener(new ConnectionListener() {
            @Override
            public void onClose(Connection connection) {
                stopAllContainerListeners();
                log.warn("RabbitMQ连接关闭");
            }
            @Override
            public void onCreate(Connection connection) {
                stopAllContainerListeners();
                log.info("RabbitMQ连接创建");
            }
            @Override
            public void onShutDown(ShutdownSignalException signal) {
                stopAllContainerListeners();
                log.error("RabbitMQ连接异常关闭", signal);
            }
        });
    }


网站公告

今日签到

点亮在社区的每一天
去签到