RocketMQ深入理解原理,工作流程细节

发布于:2025-06-11 ⋅ 阅读:(28) ⋅ 点赞:(0)

1、消息生产

Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:

  • Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
  • NameServer返回该Topic的路由表Broker列表

producer发送消息前要知道往哪个topic中投递,进而向nameserver要这个topic的路由信息表

一个消息Topic路由信息的示例:

{
  "queueDatas": [
    {
      "brokerName": "broker-a",
      "readQueueNums": 4,
      "writeQueueNums": 4,
      "perm": 6,
      "topicSynFlag": 0
    },
    {
      "brokerName": "broker-b",
      "readQueueNums": 4,
      "writeQueueNums": 4,
      "perm": 6,
      "topicSynFlag": 0
    }
  ],
  
  "brokerDatas": [
    {
      "brokerName": "broker-a",
      "brokerAddrs": {
        "0": "192.168.1.1:10911",  // master
        "1": "192.168.1.2:10911"   // slave
      }
    },
    {
      "brokerName": "broker-b",
      "brokerAddrs": {
        "0": "192.168.1.3:10911",  // master
        "1": "192.168.1.4:10911"   // slave
      }
    }
  ]
}

2、Queue选择算法

假设现在是单broker,选择完这个broker后,要考虑怎么选择队列了

对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:

轮询 以及 最小投递延迟算法

3、消息存储(非常重要)

每个 Broker 对应一个 store 文件夹,如果是主备(master-slave),分别对应一个store文件夹,也就说有2个store文件夹!!!

store文件夹目录:
在这里插入图片描述

3.1 commitlog

commitlog 是 RocketMQ 消息的底层 消息总线(消息池),所有 Topic 所有消息都会写入这个目录。
在这里插入图片描述
在这里插入图片描述
mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset(准确来说是CommitlogOffset)、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在Queue中存储的偏移量QueueOffset等近20余项消息相关属性。

3.2 consumequeue — 消息消费队列索引区

在这里插入图片描述
主题topic——队列ID——文件
在这里插入图片描述

文件每条记录存放:该消息偏移量+消息大小+HashCode ,每条记录20字节
在这里插入图片描述
生产流程:
在这里插入图片描述
消费流程:

下面把 RocketMQ 消费者拉取消息 的全过程用“翻书找页码”的比喻重新讲一遍,避开生硬的公式,让流程更直观。

1、 先了解两种“页码”

名称 像什么 用途
消费 offset 读书时放的书签 记住“我已经读到第几条消息”
消息 offset 下一页的页码 书签 + 1,表示“我要读的下一条”

2 、 消费者出门前:算好“我要看的那页”

  1. 拿书签:Consumer 本地记录的 消费 offset,假设是 99(说明前 99 条已经读过)。
  2. 下一页:要看的 消息 offset = 99 + 1 = 100

就像翻书时,书签停在第 99 页,上一次读到这里,这次要从第 100 页开始读。


3、 发请求:告诉 Broker “我要第 100 页”

请求里带三样信息:

  • 具体是哪本书 → Topic + QueueId
  • 我要的页码 → 消息 offset = 100
  • 只想要带某个标签(Tag)的段落 → Tag 过滤条件

4、 Broker 接单:先去“目录”找页码

目录 = consumequeue,每条目录卡片固定 20 字节:

[ 8B commitlogOffset ][ 4B 消息长度 ][ 8B tagHash ]
  • 想找第 100 条目录卡片
  • 每条 20 字节,所以起始位置 = (100 – 1) × 20 = 1980 字节
    (减 1 是因为从 0 开始计数)

Broker 从 1980 字节处开始往后翻目录卡片,直到找到 Tag 匹配的那一张。


5 、 拿到“正文的绝对页码”

在那张目录卡片里,前 8 个字节就是 commitlogOffset,好比“正文书”中的物理页码。例如 commitlogOffset = 4 104 576


6 、 去正文(commitlog)把那页读出来

Broker 按这个物理偏移量^1 直接 seek() 到 commitlog 文件的 4 104 576 字节处,顺序读出完整消息,把它打包返回给 Consumer。


7 、 Consumer 收到消息 → 书签前移

消息处理成功后,Consumer 把 消费 offset 更新为 100,书签向前挪一格,为下一次拉取做准备。


网站公告

今日签到

点亮在社区的每一天
去签到