什么是死信队列?死信队列是如何导致的?

发布于:2025-05-09 ⋅ 阅读:(16) ⋅ 点赞:(0)

死信交换机(Dead Letter Exchange,DLX)

  • 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理,如根据路由规则将消息发送到绑定的队列。
  • 作用:为死信提供一个集中处理的入口点。通过将死信发送到死信交换机,再由其路由到相应的死信队列,可以方便地对这些异常消息进行统一管理和处理,确保数据不丢失。

死信队列(Dead Letter Queue,DLQ)

  • 定义:死信队列用于存储那些无法在正常流程中被消费的消息,即死信。这些消息进入死信队列后,可以后续进行分析、重试或其他特殊处理。
  • 产生死信的原因
    • 消息被拒绝且不重新入队:消费者调用basic.rejectbasic.nack方法拒绝消息,并将requeue参数设置为false,表明该消息不再重新放回原队列等待消费,从而成为死信。
    • 消息过期:可以为消息或队列设置生存时间(TTL,Time-To-Live)。当消息在队列中的存活时间超过设定的TTL值时,消息就会过期成为死信。消息的TTL既可以在发送消息时针对单条消息设置,也可以在声明队列时对队列中的所有消息统一设置。
    • 队列达到最大长度:当为队列设置了最大长度(Max-Length),并且队列中的消息数量达到这个上限时,新进入的消息会被丢弃成为死信。

代码举例

下面将用代码举例,由于消息过期而进入死信队列

初始化RabbitMQ的连接配置、队列和交换机的声明

/**
 * RabbitMQ配置类
 * 负责管理RabbitMQ的连接配置、队列和交换机的声明
 */
@Slf4j
public class RabbitMQConfig {
    // 普通队列和死信队列的配置常量
    public static final String NORMAL_QUEUE = "normal.queue";      // 普通队列名称
    public static final String DLX_QUEUE = "dlx.queue";           // 死信队列名称
    public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交换机名称
    public static final String DLX_EXCHANGE = "dlx.exchange";     // 死信交换机名称
    public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由键
    public static final String DLX_ROUTING_KEY = "dlx.routing.key";      // 死信路由键

    /**
     * 创建RabbitMQ连接
     *
     * @return Connection RabbitMQ连接对象
     * @throws Exception
     */
    public static Connection createConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxxx");    // 设置RabbitMQ服务器地址
        factory.setPort(5672);           // 设置RabbitMQ服务器端口
        factory.setUsername("xxxx");    // 设置用户名
        factory.setPassword("xxxx");    // 设置密码
        return factory.newConnection();  // 创建并返回新的连接
    }

    /**
     * 初始化RabbitMQ的队列和交换机
     * 包括:
     * 1. 删除已存在的队列和交换机
     * 2. 声明死信交换机和队列
     * 3. 声明普通交换机和队列
     * 4. 设置队列的死信参数
     * 5. 绑定队列和交换机
     *
     * @throws Exception
     */
    public static void init() throws Exception {
        try (Connection connection = createConnection();
             Channel channel = connection.createChannel()) {

            // 删除已存在的队列和交换机
            try {
                channel.queueDelete(NORMAL_QUEUE);
                channel.queueDelete(DLX_QUEUE);
                channel.exchangeDelete(NORMAL_EXCHANGE);
                channel.exchangeDelete(DLX_EXCHANGE);
            } catch (Exception e) {
                // 忽略删除不存在的队列或交换机时的错误
                log.warn("删除队列或交换机时出错(可能是首次创建): {}", e.getMessage());
            }

            // 声明死信交换机,类型为direct,持久化
            channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);

            // 声明死信队列,持久化
            channel.queueDeclare(DLX_QUEUE, true, false, false, null);

            // 将死信队列绑定到死信交换机,使用死信路由键
            channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);

            // 声明普通交换机,类型为direct,持久化
            channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);

            // 设置普通队列的死信参数
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", DLX_EXCHANGE);     // 设置死信交换机
            args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键

            // 声明普通队列,并应用死信参数
            channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);

            // 将普通队列绑定到普通交换机,使用普通路由键
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
        }
    }
} 

消息生产者

/**
 * 消息生产者类
 * 负责向RabbitMQ发送消息
 */
@Slf4j
public class MessageProducer {
    
    /**
     * 发送消息到普通队列
     * 该方法会:
     * 1. 创建RabbitMQ连接和通道
     * 2. 将消息发布到普通交换机
     * 3. 使用try-with-resources自动关闭连接和通道
     * 
     * @param message 要发送的消息内容
     * @throws Exception 
     */
    public void sendMessage(String message) throws Exception {
        // 使用try-with-resources自动管理连接和通道的关闭
        try (Connection connection = RabbitMQConfig.createConnection();
             Channel channel = connection.createChannel()) {
            
            // 打印发送的消息内容
            log.info("发送消息: {}", message);
            
            // 发布消息到普通交换机
            // 参数说明:
            // 1. 交换机名称
            // 2. 路由键
            // 3. 消息属性(这里为null表示使用默认属性)
            // 4. 消息内容(转换为字节数组)
            channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,
                               RabbitMQConfig.NORMAL_ROUTING_KEY,
                               null,
                               message.getBytes());
        }
    }

    /**
     * 发送带TTL的消息到普通队列
     * 该方法会:
     * 1. 创建RabbitMQ连接和通道
     * 2. 设置消息的TTL属性
     * 3. 将消息发布到普通交换机
     * 4. 使用try-with-resources自动关闭连接和通道
     * 
     * @param message 要发送的消息内容
     * @param ttl 消息的过期时间(毫秒)
     * @throws Exception 如果发送过程中出现错误则抛出异常
     */
    public void sendMessageWithTTL(String message, int ttl) throws Exception {
        // 使用try-with-resources自动管理连接和通道的关闭
        try (Connection connection = RabbitMQConfig.createConnection();
             Channel channel = connection.createChannel()) {
            
            // 设置消息属性,包括TTL
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration(String.valueOf(ttl))
                    .build();
            
            // 打印发送的消息内容
            log.info("发送消息: {}, TTL: {}ms", message, ttl);
            
            // 发布消息到普通交换机
            // 参数说明:
            // 1. 交换机名称
            // 2. 路由键
            // 3. 消息属性(包含TTL)
            // 4. 消息内容(转换为字节数组)
            channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,
                               RabbitMQConfig.NORMAL_ROUTING_KEY,
                               properties,
                               message.getBytes());
        }
    }
} 

消息消费者

/**
 * 消息消费者类
 * 负责从普通队列和死信队列中消费消息
 */
@Slf4j
public class MessageConsumer {

    /**
     * 消费普通队列中的消息
     * 该方法会:
     * 1. 创建RabbitMQ连接和通道
     * 2. 设置预取计数为1,确保公平分发
     * 3. 创建消费者回调处理消息
     * 4. 确认消息处理完成
     *
     * @throws Exception 异常
     */
    public void consumeNormalQueue() throws Exception {
        // 创建RabbitMQ连接和通道
        Connection connection = RabbitMQConfig.createConnection();
        Channel channel = connection.createChannel();

        // 设置预取计数为1,确保公平分发,避免某个消费者处理过多消息
        channel.basicQos(1);

        // 创建普通队列消费者回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 获取消息内容
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            log.info("收到普通队列消息: {}", message);

            // 模拟消息处理耗时
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 确认消息处理完成
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        // 开始消费普通队列
        // 参数说明:
        // 1. 队列名称
        // 2. 是否自动确认消息(false表示手动确认)
        // 3. 消息处理回调
        // 4. 消费者取消回调(这里为空实现)
        channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
        });
    }

    /**
     * 消费死信队列中的消息
     * 该方法会:
     * 1. 创建RabbitMQ连接和通道
     * 2. 设置预取计数为1,确保公平分发
     * 3. 创建消费者回调处理消息
     * 4. 确认消息处理完成
     *
     * @throws Exception 异常
     */
    public void consumeDlxQueue() throws Exception {
        // 创建RabbitMQ连接和通道
        Connection connection = RabbitMQConfig.createConnection();
        Channel channel = connection.createChannel();

        // 设置预取计数为1,确保公平分发
        channel.basicQos(1);

        // 创建死信队列消费者回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 获取消息内容
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            log.info("收到死信队列消息: {}", message);

            // 确认消息处理完成
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        // 开始消费死信队列
        // 参数说明:
        // 1. 队列名称
        // 2. 是否自动确认消息(false表示手动确认)
        // 3. 消息处理回调
        // 4. 消费者取消回调(这里为空实现)
        channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {
        });
    }
} 

测试

@Slf4j
public class DLXTest {

    private static final int THREAD_COUNT = 1;  // 并发线程数
    private static final int MESSAGE_COUNT = 2; // 每个线程发送的消息数

    /**
     * 主方法,执行死信队列测试流程
     * 测试流程:
     * 1. 初始化RabbitMQ的队列和交换机
     * 2. 创建生产者和消费者实例
     * 3. 启动普通队列和死信队列的消费者线程
     * 4. 使用线程池发送测试消息
     * 5. 等待消息处理完成
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // 初始化RabbitMQ的队列和交换机
        RabbitMQConfig.init();

        // 创建生产者和消费者实例
        MessageProducer producer = new MessageProducer();
        MessageConsumer consumer = new MessageConsumer();

        // 启动普通队列消费者线程
        new Thread(() -> {
            try {
                consumer.consumeNormalQueue();
            } catch (Exception e) {
                log.error("普通队列消费者异常", e);
            }
        }).start();

        // 启动死信队列消费者线程
        new Thread(() -> {
            try {
                consumer.consumeDlxQueue();
            } catch (Exception e) {
                log.error("死信队列消费者异常", e);
            }
        }).start();

        // 创建线程池和计数器
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        // 提交任务到线程池
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadId = i;
            executorService.submit(() -> {
                try {
                    // 每个线程发送MESSAGE_COUNT条消息
                    for (int j = 0; j < MESSAGE_COUNT; j++) {
                        // 随机生成消息TTL(1-30秒)
                        int ttl = (int) (Math.random() * 30000) + 1000;
                        String message = String.format("消息-线程%d-第%d条 (消息TTL: %dms)", threadId + 1, j + 1, ttl);
                        producer.sendMessageWithTTL(message, ttl);
                        // 随机延迟0-100ms,模拟真实场景
                        Thread.sleep((long) (Math.random() * 100));
                    }
                } catch (Exception e) {
                    log.error("发送消息异常", e);
                } finally {
                    latch.countDown();
                }
            });
        }

        // 等待所有消息发送完成
        latch.await();
        log.info("所有消息已发送完成");

        // 关闭线程池
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);

        // 保持程序运行,等待消息处理完成
        Thread.sleep(60000);
        log.info("测试完成");
    }
} 

从结果可以看出,第一条消息 ttl 为 28301ms,被普通消费者进行消费,而产生的第二条消息得到 ttl 为 4332ms,由于第一条消息在消费时耗时较久,在此期间 第二条消息已经过期,不得不进入死信队列,由死信消费者进行处理,从前面的日志时间也可以看出,刚好间隔 4s 左右。

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到