死信队列:springboot+RabbitMQ实现死信队列
在 Spring Boot 中使用 RabbitMQ 实现 死信队列(DLQ, Dead Letter Queue) 是一种常见的消息可靠性处理机制。当消息在正常队列中消费失败(如超时、拒绝、TTL过期等),可以将其转发到一个“死信队列”中,便于后续分析和重试。
✅ 一、实现目标
- 消息在正常队列中消费失败后自动进入死信队列;
- 支持消息的延迟、重试、日志记录等功能;
- 使用 Spring Boot + RabbitMQ 实现。
🧩 二、核心概念说明
名称 | 说明 |
---|---|
正常队列(Normal Queue) | 接收并处理业务消息的主队列 |
死信队列(DLQ) | 当消息被拒绝、过期或重试失败后,自动进入该队列 |
死信交换机(DLX) | 绑定死信队列的消息路由规则 |
TTL(Time To Live) | 消息存活时间,可设置为消息级或队列级 |
🔧 三、配置步骤与代码示例
1. 添加依赖(pom.xml
)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置 RabbitMQ 参数(application.yml
)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
3. 定义常量类(RabbitMQConfig.java
)
public class RabbitMQConfig {
// 正常队列名称
public static final String NORMAL_QUEUE = "normal.queue";
// 死信队列名称
public static final String DLQ_QUEUE = "dlq.queue";
// 正常交换机名称
public static final String NORMAL_EXCHANGE = "normal.exchange";
// 死信交换机名称
public static final String DLX_EXCHANGE = "dlx.exchange";
// 路由键
public static final String ROUTING_KEY = "normal.key";
}
4. 配置队列和绑定关系(RabbitMQConfig.java
)
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE);
}
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE);
// 设置死信路由键
args.put("x-dead-letter-routing-key", DLQ_QUEUE);
// 设置消息过期时间(单位:毫秒)
args.put("x-message-ttl", 10000); // 10秒
return QueueBuilder.durable(NORMAL_QUEUE)
.withArguments(args)
.build();
}
@Bean
public Queue dlqQueue() {
return new QueueBuilder().durable(DLQ_QUEUE).build();
}
@Bean
public Binding bindingNormalToExchange() {
return BindingBuilder.bind(normalQueue()).to(normalExchange())
.with(RabbitMQConfig.ROUTING_KEY).noargs();
}
@Bean
public Binding bindingDLQToExchange() {
return BindingBuilder.bind(dlqQueue()).to(dlxExchange())
.with(RabbitMQConfig.DLQ_QUEUE).noargs();
}
}
5. 发送消息(Producer.java
)
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
Message msg = new Message(message.getBytes(), new MessageProperties());
rabbitTemplate.send(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.ROUTING_KEY, msg);
}
}
6. 消费者监听正常队列(Consumer.java
)
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)
public void process(Message message, Channel channel) throws IOException {
try {
System.out.println("收到消息:" + new String(message.getBody()));
// 模拟消费失败
int i = 1 / 0; // 抛异常模拟失败
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("消息处理失败,拒绝消息");
// 拒绝消息,并不重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
7. 监听死信队列(DLQConsumer.java
)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DLQConsumer {
@RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE)
public void process(byte[] message) {
System.out.println("死信队列收到消息:" + new String(message));
}
}
📊 四、流程图解
生产者 → 发送到 normal.exchange
↓
normal.queue(带TTL、DLX)
↓
消费失败 → basicNack 或 TTL 过期
↓
dlx.exchange
↓
dlq.queue(死信队列)
↓
dlqConsumer 处理死信消息
📌 五、死信触发条件
消息变成死信一般有以下几种情况:
条件 | 描述 |
---|---|
消息被拒绝(basic.reject / basic.nack) | 并且 requeue=false |
消息过期(TTL) | 可以设置单条消息或整个队列的过期时间 |
队列达到最大长度限制 | 超出部分会被丢弃或转发到 DLQ |
✅ 六、总结
通过上述配置,可以轻松实现一个基于 Spring Boot + RabbitMQ 的死信队列系统,用于:
- 消息失败后的统一处理;
- 消息重试机制;
- 日志记录与问题排查;
- 构建可靠的消息中间件架构。
还可以结合数据库、定时任务、重试策略来构建更完善的消息补偿机制。