RabbitMQ复习笔记

发布于:2025-05-01 ⋅ 阅读:(45) ⋅ 点赞:(0)

文章目录

MQ 概述

同步调用

目前我们采用的是基于OpenFeign的同步调用,也就是说业务执行流程是这样的:

  • 支付服务需要先调用用户服务完成余额扣减
  • 然后支付服务自己要更新支付流水单的状态
  • 然后支付服务调用交易服务,更新业务订单状态为已支付

三个步骤依次执行。

这其中就存在3个问题

在这里插入图片描述

拓展性差的问题

我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。

在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?

某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?

最终你的支付业务会越来越臃肿:

也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

在这里插入图片描述

性能下降的问题

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。

在这里插入图片描述

级联失败问题

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

这其实就是同步调用的级联失败问题。

但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

综上,同步调用的方式存在下列问题:

  • 拓展性差
  • 性能下降
  • 级联失败

而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

在这里插入图片描述

举例

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。

假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。

另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。

综上,异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能
  • 架构复杂,后期维护和调试麻烦

在这里插入图片描述

技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好,

在这里插入图片描述

RabbitMQ

RabbitMQ 安装

  • Docker 环境 run 起来
    • 记得防火墙开放 156725672 两个端口
docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network heima\
 -d \
 rabbitmq:3.8-management
  • 然后访问 http://192.168.88.130:15672 即可访问

在这里插入图片描述

概念图示

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理

在这里插入图片描述

RabbitMQ 收发消息

交换机

  • 打开 Exchanges 选项卡,可以看到已经存在很多交换机

在这里插入图片描述

  • 我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的 publish message 发送一条消息
    • 这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力,所以必须和队列绑定

在这里插入图片描述

在这里插入图片描述

队列

  • 打开 Queues 选项卡,新建一个队列

在这里插入图片描述

  • 命名为hello.queue1

在这里插入图片描述

  • 再以相同的方式,创建一个队列,命名为 hello.queue2,最终队列列表如下

此时,我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列,因为发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。

在这里插入图片描述

绑定关系

  • 点击 Exchanges 选项卡,点击 amq.fanout 交换机,进入交换机详情页,然后点击Bindings 菜单,在表单中填写要绑定的队列名称

在这里插入图片描述

  • 相同的方式,将 hello.queue2 也绑定到改交换机,最终,绑定结果如下

在这里插入图片描述

模拟发送消息

  • 再次回到 exchange 页面,找到刚刚绑定的 amq.fanout,点击进入详情页,再次发送一条消息

在这里插入图片描述

  • 回到 Queues 页面,可以发现 hello.queue 中已经有一条消息了

在这里插入图片描述

  • 点击队列名称,进入详情页,查看队列详情,这次我们点击 get message

在这里插入图片描述

  • 可以看到消息到达队列了

这个时候如果有消费者监听了MQhello.queue1hello.queue2队列,自然就能接收到消息了。

在这里插入图片描述

RabbitMQ 数据隔离

用户管理

  • 点击 Admin 选项卡,首先会看到 RabbitMQ 控制台的用户管理界面

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

在这里插入图片描述

  • 比如,我们给黑马商城创建一个新的用户,命名为 hmall

在这里插入图片描述

  • 你会发现此时 hmall 用户没有任何 virtual host 的访问权限

在这里插入图片描述

virtual host 授权

  • 先退出登录

在这里插入图片描述

  • 切换到刚刚创建的 hmall 用户登录,然后点击 Virtual Hosts 菜单,进入virtual host 管理页

在这里插入图片描述

  • 可以看到目前只有一个默认的 virtual host,名字为 /。我们可以给黑马商城项目创建一个单独的 virtual host,而不是使用默认的 /

在这里插入图片描述

  • 创建完成后如图

在这里插入图片描述

  • 由于我们是登录 hmall 账户后创建的 virtual host,因此回到 users 菜单,你会发现当前用户已经具备了对 /hmall这个 virtual host 的访问权限了

在这里插入图片描述

  • 此时,点击页面右上角的virtual host下拉菜单,切换 virtual host/hmall

在这里插入图片描述

  • 然后再次查看 queues 选项卡,会发现之前的队列已经看不到了
    • 这就是基于 virtual host 的隔离效果

在这里插入图片描述

SpringAMOP

SpringAMOP 快速入门

消息发送

  • 导入依赖
 <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 首先配置 MQ 地址,在 publisher 服务的 application.yml 中添加配置
spring:
  rabbitmq:
    host: 192.168.88.130 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
  • 然后然后 在publisher 服务中编写测试类 SpringAmqpTest,并利用 RabbitTemplate实现消息发送
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
  • 打开控制台,可以看到消息已经发送到队列中

在这里插入图片描述

消息接收

  • 首先配置 MQ 地址,在consumer服务的application.yml中添加配置
spring:
  rabbitmq:
    host: 192.168.88.131 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
  • 然后在 consumer 服务的 com.itheima.consumer.listener 包中新建一个类 SpringRabbitListener
@Component
public class SpringRabbitListener {
        // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}
  • 测试:启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。最终consumer收到消息

在这里插入图片描述

Work Queues 模型

WorkQueues 模型概述

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。**

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

在这里插入图片描述

消息发送

在这里插入图片描述

  • publisher 发送者模拟大量消息堆积
/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

  • consumer 消费者中添加两个队列
    • 注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
      • 消费者1 sleep20毫秒,相当于每秒钟处理50个消息
      • 消费者2 sleep200毫秒,相当于每秒处理5个消息
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}
  • 启动测试程序

会发现虽然设置了 sleep 但是并没有实现能者多劳

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

实现能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加 perfetch 配置

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

总结

Work Queues 模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

Exchange 交换机概述

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

在这里插入图片描述

Fanout 交换机

Fanout 概述

Fanout,英文翻译叫广播,会将接收到的消息传给所有绑定的 Queue

在广播模式下

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在这里插入图片描述

声明队列和交换机

  • 在控制台创建队列 fanout.queue1fanout.queue2

在这里插入图片描述

在这里插入图片描述

  • 然后创建交换机

在这里插入图片描述

  • 绑定两个队列到交换机

在这里插入图片描述

在这里插入图片描述

消息发送

  • 通过 publish 生产者把消息发给交换机
@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "hmall.fanout";
    // 消息
    String message = "hello, everyone!";
    
    //发送消息 routing key 为 null ,因为用不到
    rabbitTemplate.convertAndSend(exchangeName, "null", message);
}

消息接收

  • 通过消费者消费从交换机广播过来的消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

总结

交换机的作用

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange 的会将消息路由到每个绑定的队列

Direct 交换机

Direct 概述

Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key
  • 消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

声明队列和交换机

  • 声明两个队列direct.queue1direct.queue2

在这里插入图片描述

  • 声明一个direct类型的交换机,命名为hmall.direct

在这里插入图片描述

  • **然后 redblue 作为 key 绑定 direct.queue1hmall.direct **

在这里插入图片描述

在这里插入图片描述

  • 同理,使用 redyellow 作为key,绑定 direct.queue2hmall.direct

在这里插入图片描述

消息接收

  • consumer 消费者中添加监听代码
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送

  • publish 发送者中添加发送代码
@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "hmall.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
  • 由于使用的red这个key,所以两个消费者都收到了消息

在这里插入图片描述

  • 切换 blue 这个 key
@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "hmall.direct";
    // 消息
    String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
  • 会发现,只有消费者1收到了消息,因为只有 1 绑定了 blue

在这里插入图片描述

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout 交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic 交互机

Topic 概述

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

在这里插入图片描述

声明队列和交换机

  • 交换机

在这里插入图片描述

在这里插入图片描述

  • 队列

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

消息发送

  • publisher 发送者中添加发送代码
/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

  • consumer 消费中添加监听代码
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

总结

描述下 Direct 交换机与 Topic 交换机的差异?

  • Topic 交换机接收的消息 RoutingKey 必须是多个单词,以 . 分割
  • Topic 交换机与队列绑定时的 bindingKey 可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

代码声明队列和交换机

基本 API (基于 Bean)

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

  • 创建队列

SpringAMQP提供了一个Queue类,用来创建队列

在这里插入图片描述

  • 创建交换机

SpringAMQP 还提供了一个 Exchange 接口,来表示所有不同类型的交换机

在这里插入图片描述

  • 创建绑定对象

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象

在这里插入图片描述

  • 队列和交换机第二种创建方式

在这里插入图片描述

fanout 示例(基于 Bean)

  • 一般在消费者端构建

注意:是在配置类中

并且绑定关系是通过注入的方式绑定

@Configuration
public class FanoutConfiguration {
    @Bean
    public FanoutExchange fanoutExchange(){
        //第一种方式
        //return new FanoutExchange("hmall.fanout");

        //第二种方式
        return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }

    @Bean
    public Queue fanoutQueue1(){

        //第一种创建队列的方式
        //QueueBuilder.durable("fanout.queue1").build();

        //第二种创建队列的方式
        return new Queue("fanout.queue1");
    }

    @Bean
    public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue2(){

        //第一种创建队列的方式
        //QueueBuilder.durable("fanout.queue1").build();

        //第二种创建队列的方式
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding fanoutQueue2Binding(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

direct 示例(基于 Bean)

direct 模式由于要绑定多个 KEY,会非常麻烦,每一个 Key 都要编写一个binding

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

基于注解声明自动创建

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

  • Direct 方式1

会自动创建队列和交换机还有绑定关系

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息转换器

消息转换器概述

Spring的消息发送代码接收的消息体是一个Object

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

在这里插入图片描述

测试默认转换器

  • consumer 服务中声明新配置类,利用 @Bean 方式创建队列

在这里插入图片描述

@Configuration
public class MessageConfig {

    @Bean
    public Queue objectQueue() {
        return new Queue("object.queue");
    }
}
  • publisher 模块的 SpringAmqpTest 中新增一个消息发送的代码,发送一个 Map 对象
@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}
  • 检查结果,看到格式很差

在这里插入图片描述

配置 JSON 转换器

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

  • publisherconsumer两个服务中都引入依赖
    • 注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
  • publisherconsumer两个服务的配置类中添加消息转换器 Bean
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}
  • 检查结果

在这里插入图片描述

发送者的可靠性

消息丢失的可能性

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:

    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQExchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:

    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:

  • 消息接收后尚未处理突然宕机

  • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

在这里插入图片描述

生产者重连机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplateMQ连接超时后,多次重试。

修改publisher模块的application.yaml文件,添加下面的内容

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

生产者确认机制

般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQExchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

在这里插入图片描述

  • 第一步开启生产者确认

publisher模块的application.yaml中添加配置

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制
  • 定义 ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类

在这里插入图片描述

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
  • 定义 ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

在这里插入图片描述

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

在这里插入图片描述

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

注意事项

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ 的可靠性

MQ 可靠性概述

保证可靠性必须从数据存储的方式上入手

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

在这里插入图片描述

数据持久化

默认情况下 SpringAMOP 交换机,队列,消息默认持久化

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

注意:持久化默认是在内存里面再根据消息要不要持久化放进硬盘

交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数

设置为Durable就是持久化模式,Transient就是临时模式。

在这里插入图片描述

队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数

在这里插入图片描述

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

在这里插入图片描述

 @Test
    void testSendMessage() {
        // 1.自定义构建消息
        Message message = MessageBuilder
                .withBody("Hello, Spring AMQP!".getBytes(StandardCharsets.UTF_8))
                // 设置消息属性为非持久化
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();

        for (int i = 0; i < 1000000; i++) {
            rabbitTemplate.convertAndSend("simple.queues", message);
        }
    }

注意

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ3.12版本或者所有队列都设置为LazyQueue模式。

控制台配置 Lazy 模式

在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式

在这里插入图片描述

代码配置 Lazy 模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}

基于注解声明队列并设置为 Lazy 模式

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为 Lazy 模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。

可以基于命令行设置policy

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

消费者的可靠性

消费者的可靠性概述

RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

消费者确认机制

  • 概念

在这里插入图片描述

  • 配置消费者确认机制

在这里插入图片描述

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeuemq队列。

  • 修改 Consumer 服务的 application.yaml

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

RepublishMessageRecoverer

  • consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
  • 定义 RepublishMessageRecoverer,关联交换机和队列
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
  • 完整代码
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    //这里形参会自动注入。就不用手动注入 rabbitTemplate了
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

业务幂等性

幂等性概述

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。

举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息 ID 解决幂等性

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
  • 给消息开启唯一 ID

SpringAMQPMessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

//转换器原理就是转成 Message 类型对象
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
 	//用 Message 可以收到 id
	@RabbitListener(queues = "simple.queues")
    public void listensSimpleQueue(Message message) {
        log.info("消息者 ID {}", message.getMessageProperties().getMessageId());
        log.info("消费者接收到消息:{}", new String(message.getBody()));
    }

兜底:延迟消息

延迟消息概述

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

死信交换机实现延迟消息

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

在这里插入图片描述

  • 创建死信交换机
    • 普通的死信的的 routingkey 要保持一致
 // exchange 后面 delayed = true 就是声明为死信交换机
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlx.queue", durable = "true"),
            exchange = @Exchange(name = "dlx.direct", delayed = "true"),
            key = {"hi"}
    ))
    public void listensDlxQueue(String message) {
        log.info("监听到 dlx.queue 死信交换机的的消息: 【{}】", message);
    }
  • 创建普通队列和交换机
@Configuration
public class NormalConfiguration {

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal.direct");
    }

    @Bean
    public Queue normalQueue() {
        //.deadLetterExchange 指定死信交换机
        return QueueBuilder.durable("normal.queue")
                .deadLetterExchange("dlx.direct")
                .build();
    }

    @Bean
    public Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
    }
}
  • 实现消息延时发送
    • 因为没有消费者所以会到死信交换机
    • 注意这里是 setExpiration
 @Test
    void testSendDelayMessage() {
		
        //MessagePostProcessor 消息转换器设置过期时间
        rabbitTemplate.convertAndSend("normal.direct", "hi", "hello,", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        });
    }

  • 监听死信交换机的消息
@Component
class DeadLetterQueueConsumer {

    @RabbitListener(queues = "dlx.queue")
    public void handleDeadLetterMessage(String message) {
        System.out.println("Received dead letter message: " + message);
        // 这里可以添加具体的业务逻辑来处理死信消息
    }
}

DelayExchange 插件

直接把普通交换机声明为死信交换机

在这里插入图片描述

安装插件

插件下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

注意和 MQ 版本要对上

  • 基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷
docker volume inspect mq-plugins
//插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]
  • 切换到该挂载目录,把插件拖入文件夹

在这里插入图片描述

  • 执行命令,安装插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

声明死信交换机

  • 基于注解方式
//delayed = true 声明私信交换机
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
  • 基于 Bean 方式
@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

  • 发送消息时,必须通过 x-delay 属性设定延迟时间
    • 然后我们正常监听死信交换机接收消息就行
    • 这里和上面不一样这里是 setDelay 上面是 setExpiration
@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return mes

网站公告

今日签到

点亮在社区的每一天
去签到