死信队列:springboot+RabbitMQ实现死信队列

发布于:2025-07-19 ⋅ 阅读:(14) ⋅ 点赞:(0)

死信队列:springboot+RabbitMQ实现死信队列

在 Spring Boot 中使用 RabbitMQ 实现 死信队列(DLQ, Dead Letter Queue) 是一种常见的消息可靠性处理机制。当消息在正常队列中消费失败(如超时、拒绝、TTL过期等),可以将其转发到一个“死信队列”中,便于后续分析和重试。


✅ 一、实现目标

  • 消息在正常队列中消费失败后自动进入死信队列;
  • 支持消息的延迟、重试、日志记录等功能;
  • 使用 Spring Boot + RabbitMQ 实现。

🧩 二、核心概念说明

名称 说明
正常队列(Normal Queue) 接收并处理业务消息的主队列
死信队列(DLQ) 当消息被拒绝、过期或重试失败后,自动进入该队列
死信交换机(DLX) 绑定死信队列的消息路由规则
TTL(Time To Live) 消息存活时间,可设置为消息级或队列级

🔧 三、配置步骤与代码示例

1. 添加依赖(pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置 RabbitMQ 参数(application.yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3. 定义常量类(RabbitMQConfig.java

public class RabbitMQConfig {

    // 正常队列名称
    public static final String NORMAL_QUEUE = "normal.queue";
    // 死信队列名称
    public static final String DLQ_QUEUE = "dlq.queue";
    // 正常交换机名称
    public static final String NORMAL_EXCHANGE = "normal.exchange";
    // 死信交换机名称
    public static final String DLX_EXCHANGE = "dlx.exchange";
    // 路由键
    public static final String ROUTING_KEY = "normal.key";
}

4. 配置队列和绑定关系(RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    @Bean
    public Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE);
        // 设置死信路由键
        args.put("x-dead-letter-routing-key", DLQ_QUEUE);
        // 设置消息过期时间(单位:毫秒)
        args.put("x-message-ttl", 10000); // 10秒

        return QueueBuilder.durable(NORMAL_QUEUE)
                .withArguments(args)
                .build();
    }

    @Bean
    public Queue dlqQueue() {
        return new QueueBuilder().durable(DLQ_QUEUE).build();
    }

    @Bean
    public Binding bindingNormalToExchange() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange())
                .with(RabbitMQConfig.ROUTING_KEY).noargs();
    }

    @Bean
    public Binding bindingDLQToExchange() {
        return BindingBuilder.bind(dlqQueue()).to(dlxExchange())
                .with(RabbitMQConfig.DLQ_QUEUE).noargs();
    }
}

5. 发送消息(Producer.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        Message msg = new Message(message.getBytes(), new MessageProperties());
        rabbitTemplate.send(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.ROUTING_KEY, msg);
    }
}

6. 消费者监听正常队列(Consumer.java

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Consumer {

    @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)
    public void process(Message message, Channel channel) throws IOException {
        try {
            System.out.println("收到消息:" + new String(message.getBody()));
            // 模拟消费失败
            int i = 1 / 0; // 抛异常模拟失败
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("消息处理失败,拒绝消息");
            // 拒绝消息,并不重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

7. 监听死信队列(DLQConsumer.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DLQConsumer {

    @RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE)
    public void process(byte[] message) {
        System.out.println("死信队列收到消息:" + new String(message));
    }
}

📊 四、流程图解

生产者 → 发送到 normal.exchange
           ↓
       normal.queue(带TTL、DLX)
           ↓
     消费失败 → basicNack 或 TTL 过期
           ↓
         dlx.exchange
           ↓
        dlq.queue(死信队列)
           ↓
      dlqConsumer 处理死信消息

📌 五、死信触发条件

消息变成死信一般有以下几种情况:

条件 描述
消息被拒绝(basic.reject / basic.nack) 并且 requeue=false
消息过期(TTL) 可以设置单条消息或整个队列的过期时间
队列达到最大长度限制 超出部分会被丢弃或转发到 DLQ

✅ 六、总结

通过上述配置,可以轻松实现一个基于 Spring Boot + RabbitMQ 的死信队列系统,用于:

  • 消息失败后的统一处理;
  • 消息重试机制;
  • 日志记录与问题排查;
  • 构建可靠的消息中间件架构。

还可以结合数据库、定时任务、重试策略来构建更完善的消息补偿机制。


网站公告

今日签到

点亮在社区的每一天
去签到