1、消息生产
Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:
- Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
- NameServer返回该Topic的路由表及Broker列表
producer发送消息前要知道往哪个topic中投递,进而向nameserver要这个topic的路由信息表
一个消息Topic路由信息的示例:
{
"queueDatas": [
{
"brokerName": "broker-a",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
},
{
"brokerName": "broker-b",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
],
"brokerDatas": [
{
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.1.1:10911", // master
"1": "192.168.1.2:10911" // slave
}
},
{
"brokerName": "broker-b",
"brokerAddrs": {
"0": "192.168.1.3:10911", // master
"1": "192.168.1.4:10911" // slave
}
}
]
}
2、Queue选择算法
假设现在是单broker,选择完这个broker后,要考虑怎么选择队列了
对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:
轮询 以及 最小投递延迟算法
3、消息存储(非常重要)
每个 Broker 对应一个 store 文件夹,如果是主备(master-slave),分别对应一个store文件夹,也就说有2个store文件夹!!!
store文件夹目录:
3.1 commitlog
commitlog 是 RocketMQ 消息的底层 消息总线(消息池),所有 Topic 所有消息都会写入这个目录。
mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset(准确来说是CommitlogOffset)、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在Queue中存储的偏移量QueueOffset等近20余项消息相关属性。
3.2 consumequeue — 消息消费队列索引区
主题topic——队列ID——文件
文件每条记录存放:该消息偏移量+消息大小+HashCode ,每条记录20字节
生产流程:
消费流程:
下面把 RocketMQ 消费者拉取消息 的全过程用“翻书找页码”的比喻重新讲一遍,避开生硬的公式,让流程更直观。
1、 先了解两种“页码”
名称 | 像什么 | 用途 |
---|---|---|
消费 offset | 读书时放的书签 | 记住“我已经读到第几条消息” |
消息 offset | 下一页的页码 | 书签 + 1,表示“我要读的下一条” |
2 、 消费者出门前:算好“我要看的那页”
- 拿书签:Consumer 本地记录的 消费 offset,假设是 99(说明前 99 条已经读过)。
- 下一页:要看的 消息 offset = 99 + 1 = 100。
就像翻书时,书签停在第 99 页,上一次读到这里,这次要从第 100 页开始读。
3、 发请求:告诉 Broker “我要第 100 页”
请求里带三样信息:
- 具体是哪本书 → Topic + QueueId
- 我要的页码 → 消息 offset = 100
- 只想要带某个标签(Tag)的段落 → Tag 过滤条件
4、 Broker 接单:先去“目录”找页码
目录 = consumequeue,每条目录卡片固定 20 字节:
[ 8B commitlogOffset ][ 4B 消息长度 ][ 8B tagHash ]
- 想找第 100 条目录卡片
- 每条 20 字节,所以起始位置 = (100 – 1) × 20 = 1980 字节
(减 1 是因为从 0 开始计数)
Broker 从 1980 字节处开始往后翻目录卡片,直到找到 Tag 匹配的那一张。
5 、 拿到“正文的绝对页码”
在那张目录卡片里,前 8 个字节就是 commitlogOffset,好比“正文书”中的物理页码。例如 commitlogOffset = 4 104 576
。
6 、 去正文(commitlog)把那页读出来
Broker 按这个物理偏移量^1 直接 seek()
到 commitlog 文件的 4 104 576 字节处,顺序读出完整消息,把它打包返回给 Consumer。
7 、 Consumer 收到消息 → 书签前移
消息处理成功后,Consumer 把 消费 offset 更新为 100,书签向前挪一格,为下一次拉取做准备。