死信交换机(Dead Letter Exchange,DLX)
- 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理,如根据路由规则将消息发送到绑定的队列。
- 作用:为死信提供一个集中处理的入口点。通过将死信发送到死信交换机,再由其路由到相应的死信队列,可以方便地对这些异常消息进行统一管理和处理,确保数据不丢失。
死信队列(Dead Letter Queue,DLQ)
- 定义:死信队列用于存储那些无法在正常流程中被消费的消息,即死信。这些消息进入死信队列后,可以后续进行分析、重试或其他特殊处理。
- 产生死信的原因:
- 消息被拒绝且不重新入队:消费者调用
basic.reject
或basic.nack
方法拒绝消息,并将requeue
参数设置为false
,表明该消息不再重新放回原队列等待消费,从而成为死信。 - 消息过期:可以为消息或队列设置生存时间(TTL,Time-To-Live)。当消息在队列中的存活时间超过设定的TTL值时,消息就会过期成为死信。消息的TTL既可以在发送消息时针对单条消息设置,也可以在声明队列时对队列中的所有消息统一设置。
- 队列达到最大长度:当为队列设置了最大长度(
Max-Length
),并且队列中的消息数量达到这个上限时,新进入的消息会被丢弃成为死信。
- 消息被拒绝且不重新入队:消费者调用
代码举例
下面将用代码举例,由于消息过期而进入死信队列
初始化RabbitMQ的连接配置、队列和交换机的声明
/**
* RabbitMQ配置类
* 负责管理RabbitMQ的连接配置、队列和交换机的声明
*/
@Slf4j
public class RabbitMQConfig {
// 普通队列和死信队列的配置常量
public static final String NORMAL_QUEUE = "normal.queue"; // 普通队列名称
public static final String DLX_QUEUE = "dlx.queue"; // 死信队列名称
public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交换机名称
public static final String DLX_EXCHANGE = "dlx.exchange"; // 死信交换机名称
public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由键
public static final String DLX_ROUTING_KEY = "dlx.routing.key"; // 死信路由键
/**
* 创建RabbitMQ连接
*
* @return Connection RabbitMQ连接对象
* @throws Exception
*/
public static Connection createConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxxx"); // 设置RabbitMQ服务器地址
factory.setPort(5672); // 设置RabbitMQ服务器端口
factory.setUsername("xxxx"); // 设置用户名
factory.setPassword("xxxx"); // 设置密码
return factory.newConnection(); // 创建并返回新的连接
}
/**
* 初始化RabbitMQ的队列和交换机
* 包括:
* 1. 删除已存在的队列和交换机
* 2. 声明死信交换机和队列
* 3. 声明普通交换机和队列
* 4. 设置队列的死信参数
* 5. 绑定队列和交换机
*
* @throws Exception
*/
public static void init() throws Exception {
try (Connection connection = createConnection();
Channel channel = connection.createChannel()) {
// 删除已存在的队列和交换机
try {
channel.queueDelete(NORMAL_QUEUE);
channel.queueDelete(DLX_QUEUE);
channel.exchangeDelete(NORMAL_EXCHANGE);
channel.exchangeDelete(DLX_EXCHANGE);
} catch (Exception e) {
// 忽略删除不存在的队列或交换机时的错误
log.warn("删除队列或交换机时出错(可能是首次创建): {}", e.getMessage());
}
// 声明死信交换机,类型为direct,持久化
channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);
// 声明死信队列,持久化
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
// 将死信队列绑定到死信交换机,使用死信路由键
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);
// 声明普通交换机,类型为direct,持久化
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);
// 设置普通队列的死信参数
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键
// 声明普通队列,并应用死信参数
channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);
// 将普通队列绑定到普通交换机,使用普通路由键
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
}
}
}
消息生产者
/**
* 消息生产者类
* 负责向RabbitMQ发送消息
*/
@Slf4j
public class MessageProducer {
/**
* 发送消息到普通队列
* 该方法会:
* 1. 创建RabbitMQ连接和通道
* 2. 将消息发布到普通交换机
* 3. 使用try-with-resources自动关闭连接和通道
*
* @param message 要发送的消息内容
* @throws Exception
*/
public void sendMessage(String message) throws Exception {
// 使用try-with-resources自动管理连接和通道的关闭
try (Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel()) {
// 打印发送的消息内容
log.info("发送消息: {}", message);
// 发布消息到普通交换机
// 参数说明:
// 1. 交换机名称
// 2. 路由键
// 3. 消息属性(这里为null表示使用默认属性)
// 4. 消息内容(转换为字节数组)
channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,
RabbitMQConfig.NORMAL_ROUTING_KEY,
null,
message.getBytes());
}
}
/**
* 发送带TTL的消息到普通队列
* 该方法会:
* 1. 创建RabbitMQ连接和通道
* 2. 设置消息的TTL属性
* 3. 将消息发布到普通交换机
* 4. 使用try-with-resources自动关闭连接和通道
*
* @param message 要发送的消息内容
* @param ttl 消息的过期时间(毫秒)
* @throws Exception 如果发送过程中出现错误则抛出异常
*/
public void sendMessageWithTTL(String message, int ttl) throws Exception {
// 使用try-with-resources自动管理连接和通道的关闭
try (Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel()) {
// 设置消息属性,包括TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration(String.valueOf(ttl))
.build();
// 打印发送的消息内容
log.info("发送消息: {}, TTL: {}ms", message, ttl);
// 发布消息到普通交换机
// 参数说明:
// 1. 交换机名称
// 2. 路由键
// 3. 消息属性(包含TTL)
// 4. 消息内容(转换为字节数组)
channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,
RabbitMQConfig.NORMAL_ROUTING_KEY,
properties,
message.getBytes());
}
}
}
消息消费者
/**
* 消息消费者类
* 负责从普通队列和死信队列中消费消息
*/
@Slf4j
public class MessageConsumer {
/**
* 消费普通队列中的消息
* 该方法会:
* 1. 创建RabbitMQ连接和通道
* 2. 设置预取计数为1,确保公平分发
* 3. 创建消费者回调处理消息
* 4. 确认消息处理完成
*
* @throws Exception 异常
*/
public void consumeNormalQueue() throws Exception {
// 创建RabbitMQ连接和通道
Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel();
// 设置预取计数为1,确保公平分发,避免某个消费者处理过多消息
channel.basicQos(1);
// 创建普通队列消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("收到普通队列消息: {}", message);
// 模拟消息处理耗时
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 确认消息处理完成
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 开始消费普通队列
// 参数说明:
// 1. 队列名称
// 2. 是否自动确认消息(false表示手动确认)
// 3. 消息处理回调
// 4. 消费者取消回调(这里为空实现)
channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
});
}
/**
* 消费死信队列中的消息
* 该方法会:
* 1. 创建RabbitMQ连接和通道
* 2. 设置预取计数为1,确保公平分发
* 3. 创建消费者回调处理消息
* 4. 确认消息处理完成
*
* @throws Exception 异常
*/
public void consumeDlxQueue() throws Exception {
// 创建RabbitMQ连接和通道
Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel();
// 设置预取计数为1,确保公平分发
channel.basicQos(1);
// 创建死信队列消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("收到死信队列消息: {}", message);
// 确认消息处理完成
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 开始消费死信队列
// 参数说明:
// 1. 队列名称
// 2. 是否自动确认消息(false表示手动确认)
// 3. 消息处理回调
// 4. 消费者取消回调(这里为空实现)
channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {
});
}
}
测试
@Slf4j
public class DLXTest {
private static final int THREAD_COUNT = 1; // 并发线程数
private static final int MESSAGE_COUNT = 2; // 每个线程发送的消息数
/**
* 主方法,执行死信队列测试流程
* 测试流程:
* 1. 初始化RabbitMQ的队列和交换机
* 2. 创建生产者和消费者实例
* 3. 启动普通队列和死信队列的消费者线程
* 4. 使用线程池发送测试消息
* 5. 等待消息处理完成
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 初始化RabbitMQ的队列和交换机
RabbitMQConfig.init();
// 创建生产者和消费者实例
MessageProducer producer = new MessageProducer();
MessageConsumer consumer = new MessageConsumer();
// 启动普通队列消费者线程
new Thread(() -> {
try {
consumer.consumeNormalQueue();
} catch (Exception e) {
log.error("普通队列消费者异常", e);
}
}).start();
// 启动死信队列消费者线程
new Thread(() -> {
try {
consumer.consumeDlxQueue();
} catch (Exception e) {
log.error("死信队列消费者异常", e);
}
}).start();
// 创建线程池和计数器
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
// 提交任务到线程池
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
executorService.submit(() -> {
try {
// 每个线程发送MESSAGE_COUNT条消息
for (int j = 0; j < MESSAGE_COUNT; j++) {
// 随机生成消息TTL(1-30秒)
int ttl = (int) (Math.random() * 30000) + 1000;
String message = String.format("消息-线程%d-第%d条 (消息TTL: %dms)", threadId + 1, j + 1, ttl);
producer.sendMessageWithTTL(message, ttl);
// 随机延迟0-100ms,模拟真实场景
Thread.sleep((long) (Math.random() * 100));
}
} catch (Exception e) {
log.error("发送消息异常", e);
} finally {
latch.countDown();
}
});
}
// 等待所有消息发送完成
latch.await();
log.info("所有消息已发送完成");
// 关闭线程池
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
// 保持程序运行,等待消息处理完成
Thread.sleep(60000);
log.info("测试完成");
}
}
从结果可以看出,第一条消息 ttl 为 28301ms,被普通消费者进行消费,而产生的第二条消息得到 ttl 为 4332ms,由于第一条消息在消费时耗时较久,在此期间 第二条消息已经过期,不得不进入死信队列,由死信消费者进行处理,从前面的日志时间也可以看出,刚好间隔 4s 左右。