【RabbitMQ】死信队列

发布于:2025-04-10 ⋅ 阅读:(71) ⋅ 点赞:(0)

1.概述

死信,顾名思义就是无法被消费的消息,也就是没有被传到消费者的消息,或者即使传到了也没有被消费。当然有死信就有死信队列。死信队列就是用来存储死信的。

它的应用场景就是保证订单业务的消息数据不丢失,当消息消费发 生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付 时自动失效。

死信的来源有3种:

1.消息TTL过期(TTL就是过期时间)

2.队列达到最大长度(也就是队列装满了消息)

3.消息被拒绝((basic.reject 或 basic.nack))并且requeue=false

解释一下这里的requeue=false

在调用basic.rejectbasic.nack 方法时,都有一个参数用于决定是否将被拒绝的消息重新放回原队列 。当requeue设置为false 时,被拒绝的消息不会再回到原队列 。这种情况下,若队列配置了死信交换机(通过x-dead-letter-exchange参数设置 )等相关死信处理机制,消息就会成为死信消息,被转发到死信交换机,再由死信交换机根据路由键转发到对应的死信队列 。如果没有配置死信相关机制,消息就会被直接丢弃 

2.代码实现

在编写代码之前,先看看整个流程图

生产者生产消息发送到普通交换机中,交换机根据routing key将消息转发给相应的普通队列。当普通队列中的消息由于某些原因变成了死信消息,会把死信消息转发到相应的死信交换机中,死信交换机同样会根据routing key转发给相应的死信队列,然后可以安排专门的消费者去消费死信队列中的死信消息

下面分3种情况来讲,就是按照上述3种变成死信消息的情况

2.1消费超时

生产者

public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-normal", "direct");
        //模拟消息超时,超过10秒钟,消息进入死信
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("exchange-normal", "zhangsan", properties, ("message" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(处理正常消息)

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

消费者(处理死信消息)

public class DeadConsumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-dead", "direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //消费消息
        channel.basicConsume(deadQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

2.2达到队列最大长度

生产者

public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-normal", "direct");
        //模拟消息超时,超过10秒钟,消息进入死信
        //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("exchange-normal", "zhangsan", null, ("message" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(消费正常消息)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //设置正常队列长度限制 key是固定值
        params.put("x-max-length", 6);
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

消费死信消息的就不重复写了,跟之前的一样。

2.3消息消费被拒绝

生产者和消费死信消息的消费者都是一样的,只需要改一下消费正常消息的消费者的代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        final Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //设置正常队列长度限制 key是固定值
        //params.put("x-max-length", 6);
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                if (message.equals("message5")) {
                    System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");
                    //拒绝消费该消息
                    channel.basicReject(envelope.getDeliveryTag(), false);
                } else {
                    System.out.println("消费的消息是:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }
}

用equals进行匹配,如果是指定的消息,就拒绝消费。


网站公告

今日签到

点亮在社区的每一天
去签到