2、RocketMQ 核心概念拆解

发布于:2025-05-10 ⋅ 阅读:(34) ⋅ 点赞:(0)

1. 如何实现消息过滤?

一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。

另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。
一般采用 Cosumer 端过滤,如果希望提高吞吐量,可以采用 Broker 过滤。

在这里插入图片描述

  • Tag 过滤:最常用,效率高
  • SQL 表达式过滤:更灵活,支持复杂条件
  • Filter Server:自定义过滤逻辑,适合特殊场景

2. 延时消息原理

  • 生产者设置延时级别,消息先存储在延时队列,到期后投递到目标队列
  • 支持 18 个固定延时级别(如 1s、5s、10s、30s、1m、2m…2h)
  • 典型应用:订单超时取消、定时任务等

临时存储+定时任务。

Broker 收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。
在这里插入图片描述

3. 分布式消息事务实现(半消息)

1、Producer 向 broker 发送半消息
2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 "不可投递" 状态,Consumer 消费不了。
3、Producer 端执行本地事务。
4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半	消息的执行情况。
6、Producer 端查询本地事务的状态
7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
8、消费者段消费到消息之后,执行本地事务

在这里插入图片描述

4. 死信队列(DLQ)

  • 消息多次消费失败后进入死信队列,便于后续排查和补偿
  • 可通过管理工具导出、重发死信消息
    产生死信的原因是,消费者在处理消息时发生异常,且达到了最大重试次数。当消费失败的原因排查并解决后,可以重发这些死信消息,让消费者重新消费;如果暂时无法处理,为避免到期后死信消息被删除,可以先将死信消息导出并进行保存。

5. 如何保证 RocketMQ 的高可用?

  • NameServer 集群部署,无状态,任意节点挂掉不影响服务
    在这里插入图片描述

  • Broker 主从架构,支持同步/异步复制,提升读写高可用

  • 消费端自动切换到可用 Broker,保证消费不中断

6. RocketMQ 整体工作流程

简单来说,RocketMQ 是一个分布式消息队列,也就是消息队列+分布式系统。

作为消息队列,它是发-存-收的一个模型,对应的就是 Producer、Broker、Cosumer

作为分布式系统,它要有服务端、客户端、注册中心,对应的就是 Broker、Producer/Consumer、NameServer

  1. Broker 启动后向所有 NameServer 注册并保持心跳
  2. Producer 发送消息前从 NameServer 获取 Broker 路由信息
  3. Producer 选择 Broker 发送消息
  4. Consumer 从 NameServer 获取 Broker 路由,主动拉取消息

7. 为什么 RocketMQ 不用 Zookeeper 做注册中心?

  • NameServer 轻量级、无状态,启动快,易扩展
  • Zookeeper 选举期间不可用,RocketMQ 追求高可用和极致性能
  • NameServer 支持弱依赖,短时间不可用不影响消息收发

1、基于可用性的考虑,根据 CAP 理论,同时最多只能满足两个点,而 Zookeeper 满足的是 CP,也就是说 Zookeeper 并不能保证服务的可用性,Zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。

2、基于性能的考虑,NameServer 本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而 Zookeeper 的写是不可扩展的,Zookeeper 要解决这个问题只能通过划分领域。

3、持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。

4、消息发送应该弱依赖注册中心,而 RocketMQ 的设计理念也正是基于此,生产者在第一次发送消息的时候从 NameServer 获取到 Broker 地址后缓存到本地,如果 NameServer 整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响

8. Broker 如何保存数据?

在这里插入图片描述

  • CommitLog:消息主体及元数据顺序写入,提升写入性能

  • CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
    在这里插入图片描述

  • ConsumeQueue:逻辑队列,保存消息在 CommitLog 的偏移量、大小、Tag hash,提升消费效率

  • IndexFile:支持按 key 或时间区间查询消息
    在这里插入图片描述

在这里插入图片描述

9. RocketMQ 文件读写原理

  • 利用操作系统 PageCache、顺序写、零拷贝(mmap、FileChannel)等技术,极大提升读写性能
  • 读写过程减少用户态与内核态切换和内存拷贝次数

10. 消息刷盘机制

  • 同步刷盘:消息写入磁盘后才返回,可靠性高,适合金融等场景
  • 异步刷盘:消息写入内存即返回,后台线程异步刷盘,性能高,适合对可靠性要求不高的场景
  • Broker 支持主从同步/异步复制,提升高可用

11. RocketMQ 负载均衡实现

  • Producer 端发送消息时轮询选择队列,支持延迟容错机制
    Producer 端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下 selectOneMessageQueue()方法会从 TopicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息。具这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。

所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 3000Lms;超过 1000L,就退避 60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。

  • Consumer 端通过心跳和分配算法(如平均分配)实现队列与消费者的动态分配,支持集群和广播模式
    Consumer 端的心跳包发送
    在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端 id 的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

1、Consumer 端实现负载均衡的核心类—RebalanceImpl在 Consumer 实例的启动流程中的启动 MQClientInstance 实例部分,会完成负载均衡服务线程—RebalanceService 的启动(每隔 20s 执行一次)。

2、通过查看源码可以发现,RebalanceService 线程的 run()方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic()方法,这个方法是实现 Consumer 端负载均衡的核心。

12. RocketMQ 消息长轮询原理

  • Consumer 拉取不到消息时,Broker 会挂起请求,等待新消息到达或超时后再返回
  • 提升消息实时性,减少无效拉取,降低系统压力

网站公告

今日签到

点亮在社区的每一天
去签到