阿里双十一交易核心链路产品--RocketMQ 底层原理及性能调优实战

发布于:2022-12-19 ⋅ 阅读:(380) ⋅ 点赞:(0)

目录

基础入门

消息中间件(MQ)的定义

其实并没有标准定义。一般认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布 式系统中的其余各个子系统进行集成。
高效: 对于消息的处理处理速度快。
可靠: 一般消息中间件都会有消息持久化机制和其他的机制确保消息不丢失。
异步: 指发送完一个请求,不需要等待返回,随时可以再发送下一个请求,既不需要等待。
一句话总结,我们消息中间件不生产消息,只是消息的搬运工。

在这里插入图片描述

为什么要用消息中间件?

应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验

使用消息中间件,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,继续处理存放在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

在这里插入图片描述

流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大
提到系统的稳定性和用户体验。

深度揭开阿里(蚂蚁金服)技术面试流程 附前期准备,学习方向

互联网公司的大促场景(双十一、店庆活动、秒杀活动)都会使用到 MQ。
在这里插入图片描述

数据分发

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
接口调用的弊端,无论是新增系统,还是移除系统,代码改造工作量都很大。
使用 MQ 做数据分发好处,无论是新增系统,还是移除系统,代码改造工作量较小。
所以使用 MQ 做数据的分发,可以提高团队开发的效率。
在这里插入图片描述
在这里插入图片描述

RocketMQ 产品发展

RocketMQ 版本发展

Metaq1.x 是 RocketMQ 前身的第一个版本,本质上把 Kafka 做了一次 java 版本的重写(Kafka 是scala)。

Meta2.x,主要是对存储部分进行了优化,因为 kafka 的数据存储,它的 partition 是一个全量的复制,在阿里、在淘宝的这种海量交易。Kafka这种机制的横向拓展是非常不好的。2012 年阿里同时把 Meta2.0 从阿里内部开源出来,取名 RocketMQ,同时为了命名上的规范(版本上延续),所以这个就是RocketMQ3.0。

现在 RocketMQ 主要维护的是 4.x 的版本,也是大家使用得最多的版本,2017 年从 Apache 顶级项目毕业。

阿里内部项目的使用

那么在阿里公司内部,原则上遵守开源共建原则。RocketMQ 项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个 BU( Business Unit 业务单元)的个性化需求都在 RocketMQ 项目之上进行深度定制。RocketMQ 向其他 BU 提供的仅仅是 Jar 包,例如要定制一个 Broker,那么只需要依赖 rocketmq-broker 这 jar 包即可,可通过 API 进行交互, 如果定制 client,则依赖 rocketmq-client 这个 jar 包,对其提供的 api 进行再封装。

在 RocketMQ 项目基础上几个常用的项目如下

  • com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求 为淘宝应用提供消息服务
  • com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求 为支付宝应用提供消息服务
  • com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B 个性化需求 为 B2B应用提供消息服务

展望未来

从阿里负责 RocketMQ 的架构核心人员的信息来看,阿里内部一直全力拓展 RocketMQ。

2017 年 10 月份,OpenMessaging 项目由阿里巴巴发起,与雅虎、滴滴出行、Streamlio 公司共同参与创立, 项目意在创立厂商无关、平台无关的分布式消息及流处理领域的应用开发标准。同时 OpenMessaging 入驻 Linux 基金会。

OpenMessaging 项目已经开始在 Apache RocketMQ 中率先落地,并推广至整个阿里云平台. 另外 RocketMQ5 的版本也在内部推进,主要的方向是 Cloud Native(云原生)

另外还要讲一下 Apache RocketMQ 的商业版本,Aliware MQ 在微服务、流计算、IoT、异步解耦、数据同步等场景有非常广泛的运用

在这里插入图片描述

RocketMQ 的物理架构

消息队列 RocketMQ 是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双 11 使用的核心产品。

RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。

核心概念

在这里插入图片描述

NameServer

NameServer 是整个 RocketMQ 的“大脑”,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。

Broker 在启动时向所有 NameServer 注册(主要是服务器地址等),生产者在发送消息之前先从 NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。这样就可以实现 RocketMQ 的高可用。具体细节后续的课程会进行讲解。

生产者(Producer)

生产者:也称为消息发布者,负责生产并发送消息至 RocketMQ。

消费者(Consumer)

消费者:也称为消息订阅者,负责从 RocketMQ 接收并消费消息。

消息(Message)

消息:生产或消费的数据,对于 RocketMQ 来说,消息就是字节数组。

主机(Broker)

RocketMQ 的核心,用于暂存和传输消息。

物理架构中的整体运转

  1. NameServer 先启动
  2. Broker 启动时向 NameServer 注册
  3. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
  4. NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。
  5. 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中订阅消息,订阅规则由 Broker 配置决定。

95后三面快手成功上岸经验,其实拿到这份java面试宝典你上你也行!

RocketMQ 的概念模型

核心概念

分组(Group)

生产者: 标识发送同一类消息的 Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息: (事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该 commit
消费者: 标识一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。同一个 Consumer Group 下的各个实例将共同消费 topic的消息,起到负载均衡的作用。
消费进度以 Consumer Group 为粒度管理,不同 Consumer Group 之间消费进度彼此不受影响,即消息 A 被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

主题(Topic)

标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定 Topic。
区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息

标签(Tag)

RocketMQ 支持给在发送的时候给 topic 打 tag,同一个 topic 的消息虽然逻辑管理是一样的。但是消费 topic1 的时候,如果你消费订阅的时候指定的是 tagA,那么 tagB 的消息将不会投递。

消息队列(Message Queue)

简称 Queue 或 Q。消息物理管理单位。一个 Topic 将有若干个 Q。若一个 Topic 创建在不同的 Broker,则不同的 broker 上都有若干 Q,消息将物理地存储落在不同 Broker 结点上,具有水平扩展的能力。

无论生产者还是消费者,实际的生产和消费都是针对 Q 级别。例如 Producer 发送消息的时候,会预先选择(默认轮询)好该 Topic 下面的某一条 Q发送;Consumer 消费的时候也会负载均衡地分配若干个 Q,只拉取对应 Q 的消息。

每一条 message queue 均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

偏移量(Offset)

RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的 offset。不指定的话,就是指 Message Queue 下面的 offset。

Message queue 是无限长的数组。一条消息进来下标就会涨 1,而这个数组的下标就是 offset,Message queue 中的 max offset 表示消息的最大 offset

Consumer offset 可以理解为标记 Consumer Group 在一条逻辑 Message Queue 上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息 offset+1,即实际上表示的是下次拉取的 offset 位置。

玩转各种消息

普通消息

整体流程如下

导入 MQ 客户端依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>

消息发送者步骤
1.创建消息生产者 producer,并指定生产者组名
2.指定 Nameserver 地址
3.启动 producer
4.创建消息对象,指定 Topic、Tag 和消息体
5.发送消息
6.关闭生产者 producer

消息消费者步骤
1.创建消费者 Consumer,指定消费者组名
2.指定 Nameserver 地址
3.订阅主题 Topic 和 Tag
4.设置回调函数,处理消息
5.启动消费者 consumer

消息发送

发送同步消息

在这里插入图片描述
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
代码演示
在这里插入图片描述
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

Message ID
消息的全局唯一标识(内部机制的 ID 生成是使用机器 IP 和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑 Key),由消息队列 MQ系统自动生成,唯一标识某条消息。

SendStatus
发送的标识。成功,失败等

Queue
相当于是 Topic 的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
代码演示
在这里插入图片描述
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理

在这里插入图片描述

在这里插入图片描述

单向发送

这种方式主要用在不特别关心发送结果的场景,例如日志发送。
代码演示
在这里插入图片描述
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。cn.enjoyedu.normal.OnewayProducer
在这里插入图片描述

消息发送的权衡

在这里插入图片描述

消息消费

集群消费

在这里插入图片描述
消费者的一种消费模式。一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。

实际上,每个 Consumer 是平均分摊 Message Queue 的去做拉取消费。例如某个 Topic 有 3 条 Q,其中一个 Consumer Group 有 3 个实例(可能是 3个进程,或者 3 台机器),那么每个实例只消费其中的 1 条 Q。

而由 Producer 发送消息的时候是轮询所有的 Q,所以消息会平均散落在不同的 Q 上,可以认为 Q 上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度(Consumer Offset)的存储会持久化到 Broker。
在这里插入图片描述
在这里插入图片描述

广播消费

在这里插入图片描述
消费者的一种消费模式。消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了 topic 下面的每个 Message Queue 去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
在这里插入图片描述
在这里插入图片描述

消息消费时的权衡

集群模式:适用场景&注意事项
消费端集群化部署,每条消息只需要被处理一次。
由于消费进度在服务端维护,可靠性更高。
集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播模式:适用场景&注意事项
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
每条消息都需要被相同逻辑的多台机器处理。
消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅 Java 客户端支持广播模式。
广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉
取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。

全局顺序消息
在这里插入图片描述
部分顺序消息
在这里插入图片描述

顺序消息生产

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用顺序消息:首先要保证消息是有序进入 MQ 的,消息放入 MQ 之前,对 id 等关键字进行取模,放入指定 messageQueue,consume 消费消息失败时,不能返回 reconsume——later,这样会导致乱序
应该返回 suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。(具体实现的话,需要后续的源码分析中–集群消费时保证消费的有序性章节)

顺序消息消费

消费时,同一个 OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
在这里插入图片描述
在这里插入图片描述

消息发送时的重要方法/属性

属性

在这里插入图片描述
producerGroup:生产者所属组
defaultTopicQueueNums:默认主题在每一个 Broker 队列数量
sendMsgTimeout:发送消息默认超时时间,默认 3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认 4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为 2,总共执行 3 次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为 2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个 Broker 时,是否不等待存储结果就返回,默认为 false
maxMessageSize:允许发送的最大消息长度,默认为 4M

方法

org.apache.rocketmq.example.details. ProducerDetails 类中
//启动
void start() throws MQClientException;
//关闭
void shutdown();
//查找该主题下所有消息队列
List fetchPublishMessageQueues(final String top ic) throws MQClientException;
在这里插入图片描述
在这里插入图片描述

单向发送

在这里插入图片描述
//发送单向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
//选择指定队列单向发送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;

同步发送

在这里插入图片描述
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//选择指定队列同步发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

异步发送

在这里插入图片描述
在这里插入图片描述
//异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
//异步超时发送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;

消息消费时的重要方法/属性

org.apache.rocketmq.example.details. ComuserDetails 类中

属性

在这里插入图片描述
//消费者组
private String consumerGroup;
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
ConsumeFromTimestamp 模式下只会在订阅组(消费者群组)第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了
新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和 ConsumeFromLastOffset 类似

//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认 32 条
private int pullBatchSize = 32;
//消息重试次数,-1 代表 16 次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;

方法

void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅
Set fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器

在这里插入图片描述
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
在这里插入图片描述

消费确认(ACK)

业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条)
是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。

如果业务的回调没有处理好而抛出异常,会认为是消费失败ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。

为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态,只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费

延时消息

概念介绍

延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

适用场景

消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

使用方式

Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源)

发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟 10 秒
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java

在这里插入图片描述
是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。

代码演示

org.apache.rocketmq.example. scheduled 包中

生产者

在这里插入图片描述

消费者

在这里插入图片描述
在这里插入图片描述

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK(集群时会细讲),而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

代码演示

org.apache.rocketmq.example. batch 包中

生产者

在这里插入图片描述

消费者

在这里插入图片描述

批量切分

如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割

代码演示

在这里插入图片描述

我们需要发送 10 万元素的数组,这个量很大,怎么快速发送完。同时每一次批量发送的消息大小不能超过 4M
具体见代码

过滤消息

org.apache.rocketmq.example. filter 包中

Tag 过滤

在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。

Sql 过滤

SQL 基本语法

RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

消息生产者(加入消息属性)

发送消息时,你能通过 putUserProperty 来设置消息的属性

在这里插入图片描述

消息消费者(使用 SQL 筛选)

用 MessageSelector.bySql 来使用 sql 筛选消息
在这里插入图片描述
如果这个地方抛出错误:说明 Sql92 功能没有开启

在这里插入图片描述
需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务

事务消息

在这里插入图片描述
其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

正常事务流程

(1) 发送消息(half 消息):图中步骤 1。
(2) 服务端响应消息写入结果:图中步骤 2。
(3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
(4) 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4

事务补偿流程

(1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
(2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
(3) 根据本地事务状态,重新 Commit 或者 Rollback::图中步骤 6。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

事务消息状态

(1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
(2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
(3) 根据本地事务状态,重新 Commit 或者 Rollback::图中步骤 6。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

95后三面快手成功上岸经验,其实拿到这份java面试宝典你上你也行!

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成图中了 1,2,3,4
    步,第 4 步是 Commit)。
  • TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了
    1,2,3,4 步, 第 4 步是 Rollback)。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成图中了 1,2,3 步, 但是没有
    4 或者没有 7,无法 Commit 或 Rollback)。

代码演示

org.apache.rocketmq.example. transaction 包中

创建事务性生产者

使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。

在这里插入图片描述

实现事务的监听接口

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务(步骤 3)。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态(步骤 5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

在这里插入图片描述
在这里插入图片描述

使用场景

用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?
可以使用 RocketMQ 的分布式事务保证在下单失败后系统数据的完整性

使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
  3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
    transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认
    情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
  4. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用
    户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  5. 事务性消息可能不止一次被检查或消费。
  6. 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
  7. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事
    务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者
    ID 查询到消费者。

分布式事务

在这里插入图片描述
业务场景:用户 A 转账 100 元给用户 B,这个业务比较简单,具体的步骤:
1、用户 A 的账户先扣除 100 元
2、再把用户 B 的账户加 100 元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中,因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。

在这里插入图片描述

消息中间件的方式,把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到消息中间件;加钱业务订阅“扣款成功消息”,再对用户 B 加钱(系统怎么知道给用户 B 加钱呢?是消息体里面包含了源账户和目标账户 ID,以及钱数)
场景一:先扣款后向 MQ 发消息
先扣款再发送消息,万一发送消息失败了,那用户 B 就没法加钱
场景二:先向 MQ 发像消息,后扣款
扣款成功消息发送成功,但用户 A 扣款失败,可加钱业务订阅到了消息,用户 B 加了钱
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
RocketMq 消息中间件把消息分为两个阶段:半事务阶段和确认阶段阶段

半事务阶段:
该阶段主要发一个消息到 rocketmq,但该消息只储存在 commitlog 中,但 consumeQueue 中不可见,也就是消费端(订阅端)无法看到此消息
commit/rollback 阶段(确认阶段):
该阶段主要是把 prepared 消息保存到 consumeQueue 中,即让消费端可以看到此消息,也就是可以消费此消息。如果是 rollback 就不保存。

在这里插入图片描述
整个流程:
1、A 在扣款之前,先发送半事务消息
2、发送预备消息成功后,执行本地扣款事务
3、扣款成功后,再发送确认消息
4、B 消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱

注意:上面的确认消息可以为 commit 消息,可以被订阅者消费;也可以是 Rollback 消息,即执行本地扣款事务失败后,提交 rollback 消息,即删除那个预备消息,订阅者无法消费
异常 1:如果发送半事务消息失败,下面的流程不会走下去;这个是正常的
异常 2:如果发送半事务消息成功,但执行本地事务失败;这个也没有问题,因为此预备消息不会被消费端订阅到,消费端不会执行业务。
异常 3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户 A 扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。

在这里插入图片描述
RocketMq 如何解决上面的问题,核心思路就是【事务回查】,也就是 RocketMq 会定时遍历 commitlog 中的半事务消息。
异常 3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为 RocketMq 会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送 commit 确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常 2 也解决了,如果本地事务没有执行成功,RocketMQ 回查业务,发现没有执行成功,就会发送 RollBack 确认消息,把消息进行删除。

同时还要注意的点是,RocketMQ 不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把 A已经处理完的业务进行回退)

在这里插入图片描述

如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张 Transaction 表,将业务表和 Transaction 绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction 中应当已经记录该TransactionId 的状态为「已完成」。当 RocketMq 事务回查时,只需要检查对应的 TransactionId 的状态是否是「已完成」就好,而不用关心具体的业务数据。

如果是银行业务,对数据要求性极高,一般 A 与 B 需要进行手动对账,手动补偿。

RocketMQ 的存储设计

Domain Model

领域模型(Domain Model)是对领域内的概念类或现实世界中对象的可视化表示。又称概念模型、领域对象模型、分析对象模型。它专注于分析问
题领域本身,发掘重要的业务领域概念,并建立业务领域概念之间的关系。
在这里插入图片描述

Message

Message 是 RocketMQ 消息引擎中的主体。messageId 是全局唯一的。MessageKey 是业务系统(生产者)生成的,所以如果要结合业务,可以使用MessageKey 作为业务系统的唯一索引。

在这里插入图片描述
在这里插入图片描述
另外 Message 中的 equals 方法和 hashCode 主要是为了完成消息只处理一次(Exactly-Once)。
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

Topic

Tags 是在同一 Topic 中对消息进行分类
subTopics==Message Queue,其实在内存逻辑中,subTopics 是对 Topics 的一个拓展,尤其是在 MQTT 这种协议下,在 Topic 底下会有很多 subTopics。

Queue

Queue 是消息物理管理单位,比如在 RocketMQ 的控制台中,就可以看到每一个 queue 中的情况(比如消息的堆积情况、消息的 TPS、QPS)

Offset

对于每一个 Queue 来说都有 Offset,这个是消费位点。

Group

业务场景中,如果有一堆发送者,一堆消费者,所以这里使用 Group 的概念进行管理。

对应关系

Message 与 Topic 是多对一的关系,一个 Topic 可以有多个 Message. Topic 到 Queue 是一对多的关系,这个也是方便横向拓展,也就是消费的时候,这里可以有很多很多的 Queue. 一个 Queue 只有一个消费位点(Offset),所以 Topic 和 Offset 也是一对多的关系
Topic 和 Group 也是多对多的关系。

消费并发度

从上面模型可以看出,要解决消费并发,就是要利用 Queue,一个 Topic 可以分出更多的 queue,每一个 queue 可以存放在不同的硬件上来提高并发。

热点问题(顺序、重复)

前面讲过要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)
RocketMQ 不解决这个矛盾的问题。理由如下:
1、 乱序的应用实际大量存在
2、 队列无序并不意味着消息无序
另外还有消息重复,造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该是怎么处理?

RocketMQ 不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。
1、 消费端处理消息的业务逻辑保持幂等性
2、 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现

消息存储结构

存储文件

在这里插入图片描述
commitLog:消息存储目录
config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在改文件则 Broker 非正常关闭
checkpoint:文件检查点,存储 CommitLog 文件最后一次刷盘时间戳、consumerqueue 最后一次刷盘时间,index 索引文件最后一次刷盘时间戳。

消息存储结构

在这里插入图片描述
RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成 的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

CommitLog:存储消息的元数据
ConsumerQueue:存储消息在 CommitLog 的索引
IndexFile:为了消息查询提供了一种通过 key 或时间区间来查询消息的方法,这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程
在这里插入图片描述

CommitLog

CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home}\store$ { commitlog} \ $ { fileName}。在 CommitLog 中,一个消息的存储长度是不固定的, RocketMQ 采取一些机制,尽量向 CommitLog 中顺序写 ,但是随机读。commitlog 文件默认大小为 lG ,可通过在 broker 置文件中设置 mappedFileSizeCommitLog 属性来改变默认小。
在这里插入图片描述
Commitlog 文件存储的逻辑视图如下,每条消息的前面 4 个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。
在这里插入图片描述
每个 CommitLog 文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为1073741824 (1G = 1073741824byte)。
在这里插入图片描述
每台 Rocket 只会往一个 commitlog 文件中写,写完一个接着写下一个。
indexFile 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。

ConsumeQueue

ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的
ConsumeQueue 文件, 文件地址在$ {$storeRoot} \consumequeue$ {topicName} $ { queueld} $ {fileName}。
在这里插入图片描述
在这里插入图片描述
ConsumeQueue 中存储的是消息条目,为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue 条目不会存储消息的全
量信息,消息条目如下:
在这里插入图片描述
ConsumeQueue 即为 Commitlog 文件的索引文件, 其构建机制是 当消息到达 Commitlog 文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。

存储机制这样设计有以下几个好处:
1 ) CommitLog 顺序写 ,可以大大提高写入效率。
(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概lOOKB/s,和顺序写的性能相差 6000 倍!)
2 )虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3 )为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和ConsumeQueue的一致性, CommitLog 里存储了 Consume Queues 、Message Key、 Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。

IndexFile

RocketMQ 还支持通过 MessageID 或者 MessageKey 来查询消息;使用 ID 查询时,因为 ID 就是用 broker+offset 生成的(这里 msgId 指的是服务端的),所以很容易就找到对应的 commitLog 文件来读取消息。但是对于用 MessageKey 来查询消息,RocketMQ 则通过构建一个 index 来提高读取速度。

index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索消息的速度 ,使用 Hash 索引机制,具体是 Hash 槽与 Hash 冲突的链表结构。(这里不做过多解释)

在这里插入图片描述

Config

config 文件夹中 存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。

在这里插入图片描述

其他

abort :如果存在 abort 文件说明 Broker 非正常闭,该文件默认启动时创建,正常退出之前删除
checkpoint :文件检测点,存储 commitlog 文件最后一次刷盘时间戳、 consumequeue 最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。

过期文件删除

由于 RocketMQ 操作 CommitLog,ConsumeQueue 文件是基于内存映射机制并在启动的时候会加载 commitlog,ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。

删除过程分别执行清理消息存储文件( Commitlog )与消息消费 队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件( Commitlog )共用一套过期文件机制。

RocketMQ 清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42 小时(不同版本的默认值不同,这里以 4.4.0 为例) ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每 10s 执行一次。

过期判断

文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。

另外还有其他两个配置参数:
deletePhysicFilesInterval:删除物理文件的时间间隔(默认是 100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔 deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。

destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳 destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少 1000,直到引用 小于等于 0 为止,即可删除该文件.

删除条件

1)指定删除文件的时间点, RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨 4 点。
2)磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是 85),会触发过期文件删除操作。

另外还有 RocketMQ 的磁盘配置参数:
1:物理使用率大于 diskSpaceWarningLevelRatio(默认 90%可通过参数设置),则会阻止新消息的插入。
2:物理磁盘使用率小于 diskMaxUsedSpaceRatio(默认 75%) 表示磁盘使用正常。

零拷贝与 MMAP

什么是零拷贝?

零拷贝(英语: Zero-copy) 技术是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省 CPU 周期和内存带宽。
➢零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率
➢零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上:下文切换而带来的开销
可以看出没有说不需要拷贝,只是说减少冗余[不必要]的拷贝。
下面这些组件、框架中均使用了零拷贝技术:Kafka、Netty、Rocketmq、Nginx、Apache。

传统数据传送机制

比如:读取文件,再用 socket 发送出去,实际经过四次 copy。
伪码实现如下:
buffer = File.read()
Socket.send(buffer)
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy 到应用程序的 buffer;
3、第三步:将 application 应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。
在这里插入图片描述
分析上述的过程,虽然引入 DMA 来接管 CPU 的中断请求,但四次 copy 是存在“不必要的拷贝”的。实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。

显然,第二次和第三次数据 copy 其实在这种场景下没有什么帮助反而带来开销(DMA 拷贝速度一般比 CPU 拷贝速度快一个数量级),这也正是零拷贝出现的背景和意义。

打个比喻:200M 的数据,读取文件,再用 socket 发送出去,实际经过四次 copy(2 次 cpu 拷贝每次 100ms ,2 次 DMS 拷贝每次 10ms)传统网络传输的话:合计耗时将有 220ms

同时,read 和 send 都属于系统调用,每次调用都牵涉到两次上下文切换:
在这里插入图片描述
总结下,传统的数据传送所消耗的成本:4 次拷贝,4 次上下文切换。
4 次拷贝,其中两次是 DMA copy,两次是 CPU copy。

mmap 内存映射

硬盘上文件的位置和应用程序缓冲区(application buffers)进行映射(建立一种一一对应关系),由于 mmap()将文件直接映射到用户空间,所以实际文件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。

mmap 内存映射将会经历:3 次拷贝: 1 次 cpu copy,2 次 DMA copy;

打个比喻:200M 的数据,读取文件,再用 socket 发送出去,如果是使用 MMAP 实际经过三次 copy(1 次 cpu 拷贝每次 100ms ,2 次 DMS 拷贝每次10ms)合计只需要 120ms,从数据拷贝的角度上来看,就比传统的网络传输,性能提升了近一倍。

以及 4 次上下文切换
在这里插入图片描述
mmap()是在 <sys/mman.h> 中定义的一个函数,此函数的作用是创建一个新的 虚拟内存 区域,并将指定的对象映射到此区域。mmap 其实就是通过 内存映射 的机制来进行文件操作。

Windows 操作系统上也有虚拟机内存,如下图:
在这里插入图片描述

代码

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

RocketMQ 中 MMAP 运用

如果按照传统的方式进行数据传送,那肯定性能上不去,作为 MQ 也是这样,尤其是 RocketMQ,要满足一个高并发的消息中间件,一定要进行优化。所以 RocketMQ 使用的是 MMAP。

RocketMQ 一个映射文件大概是,commitlog 文件默认大小为 lG。

这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了。

MMAP 文件对应

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

RocketMQ 源码中的 MMAP 运用

RocketMQ 源码中,使用 MappedFile 这个类类进行 MMAP 的映射
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

RocketMQ 存储整体设计总结

消息生产与消息消费相互分离

Producer 端发送消息最终写入的是 CommitLog(消息存储的日志数据文件),Consumer 端先从 ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 Tag 的 HashCode 值,随后再从 CommitLog 中进行读取待拉取消费消息的真正实体内容部分;

RocketMQ 的 CommitLog 文件采用混合型存储

所有的 Topic 下的消息队列共用同一个 CommitLog 的日志数据文件,并通过建立类似索引文件—ConsumeQueue 的方式来区分不同 Topic 下面的不同MessageQueue 的消息,同时为消费消息起到一定的缓冲作用(异步服务线生成了 ConsumeQueue 队列的信息后,Consumer 端才能进行消费)。这样,只要消息写入并刷盘至 CommitLog 文件后,消息就不会丢失,即使 ConsumeQueue 中的数据丢失,也可以通过 CommitLog 来恢复。

RocketMQ 每次读写文件的时候真的是完全顺序读写吗?

发送消息时,生产者端的消息确实是顺序写入 CommitLog;订阅消息时,消费者端也是顺序读取 ConsumeQueue,然而根据其中的起始物理位置偏移量 offset 读取消息真实内容却是随机读取 CommitLog。 所以在 RocketMQ 集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的,RocketMQ 怎么优化的,源码解读部分进行讲解。

RocketMQ 的高可用

RocketMQ 中的高可用机制

在这里插入图片描述
RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。

Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个 Broker 是 Master,大于 0 表明这个 Broker 是 Slave,同时 brokerRole参数也会说明这个 Broker 是 Master 还是 Slave。

Master 角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。

集群部署模式

1)单 master 模式

也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。

2)多 master 模式

多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

优点:所有模式中性能最高(一个 Topic 的可以分布在不同的 master,进行横向拓展)
在多主多从的架构体系下,无论使用客户端还是管理界面创建主题,一个主题都会创建多份队列在多主中(默认是 4 个的话,双主就会有 8 个队列,每台主 4 个队列,所以双主可以提高性能,一个 Topic 的分布在不同的 master,方便进行横向拓展。

缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。

3)多 master 多 slave 异步复制模式

而从节点(Slave)就是复制主节点的数据,对于生产者完全感知不到,对于消费者正常情况下也感知不到。(只有当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。)

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。

优点: 一般情况下都是 master 消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。

缺点:使用异步复制的同步方式有可能会有消息丢失的问题。(Master 宕机后,生产者发送的消息没有消费完,同时到 Slave 节点的数据也没有同步完)

4)多 master 多 slave 主从同步复制+异步刷盘(最优推荐)

优点:主从同步复制模式能保证数据不丢失。
缺点:发送单个消息响应时间会略长,性能相比异步复制低 10%左右。
对数据要求较高的场景,主从同步复制方式,保存数据热备份,通过异步刷盘方式,保证 rocketMQ 高吞吐量。

5)Dlegder(不推荐)
在 RocketMQ4.5 版本之后推出了 Dlegder 模式,但是这种模式一直存在严重的 BUG,同时性能有可能有问题,包括升级到了 4.8 的版本后也一样,所以目前不讲这种模式。(类似于 Zookeeper 的集群选举模式)

刷盘与主从同步

生产时首先将消息写入到 MappedFile,内存映射文件,然后根据刷盘策略刷写到磁盘。
大致的步骤可以理解成使用 MMAP 中的 MappedByteBuffer 中实际用 flip().

在这里插入图片描述

RocketMQ 的刷盘是把消息存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,同步刷盘和异步刷盘。

同步刷盘

SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问题,但是有很大的磁盘 IO 开销,性能有一定影响。

异步刷盘

ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1 是定期将缓存中更新的数据进行刷盘,2 是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。

在这里插入图片描述
4.8.0 版本中默认值下是异步刷盘,如下图:
在这里插入图片描述
深度揭开阿里(蚂蚁金服)技术面试流程 附前期准备,学习方向

主从同步复制

集群环境下需要部署多个 Broker,Broker 分为两种角色:一种是 master,即可以写也可以读,其 brokerId=0,只能有一个;另外一种是 slave,只允许读,其 brokerId 为非 0。一个 master 与多个 slave 通过指定相同的 brokerClusterName 被归为一个 broker set(broker 集)。通常生产环境中,我们至少需要 2 个 broker set。Slave 是复制 master 的数据。一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。

主从同步复制方式(Sync Broker):生产者发送的每一条消息都至少同步复制到一个 slave 后才返回告诉生产者成功,即“同步双写”在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。

主从异步复制

主从异步复制方式(Async Broker):生产者发送的每一条消息只要写入 master 就返回告诉生产者成功。然后再“异步复制”到 slave。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写 入 Slave,有可能会丢失;
同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三个值中的一个。

配置参数及意义

brokerId=0 代表主
brokerId=1 代表从(大于 0 都代表从)
brokerRole=SYNC_MASTER 同步复制(主从)
brokerRole=ASYNC_MASTER 异步复制(主从)
flushDiskType=SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH 异步刷盘

搭建双主双从同步复制+异步刷盘

NameServer 集群

106.55.246.66
94.191.83.120

Broker 服务器

106.55.246.66 ------MasterA
94.191.83.120 ------MasterB
106.53.195.121 ------SlaveA
106.55.248.74 ------SlaveB

配置文件

注意,因为 RocketMQ 使用外网地址,所以配置文件(MQ 文件夹/conf/2m-2s-sync/)需要修改(同时修改 nameserver 地址为集群地址):
注意如果机器内存不够,建议把启动时的堆内存改小,具体见《RocketMQ 的安装.docx》中 — 3、RocketMQ 在 Linux 下的安装/注意事项
106.55.246.66 ------主 A
broker-a.properties 增加: brokerIP1=106.55.246.66
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876

在这里插入图片描述

94.191.83.120 ------主 B
broker-b.properties 增加: brokerIP1=94.191.83.120
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876

在这里插入图片描述
106.53.195.121 ------从 A
broker-a-s.properties 增加:brokerIP1=106.53.195.121
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876

在这里插入图片描述
106.55.248.74 ------从 B
broker-b-s.properties 增加:brokerIP1=106.55.248.74
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876

在这里插入图片描述
不管是主还是从,如果属于一个集群,使用相同的 brokerClusterName 名称

在这里插入图片描述

启动步骤

启动 NameServer
(记得关闭防火墙或者要开通 9876 端口)
1.启动 NameServer 集群,这里使用 106.55.246.66 和 94.191.83.120 两台作为集群即可。
1) 在机器 A,启动第 1 台 NameServer: 102 服务器进入至‘MQ 文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’
查看日志的命令:tail -f ~/logs/rocketmqlogs/namesrv.log

在这里插入图片描述

2) 在机器 B,启动第 2 台 NameServer: 103 服务器进入至‘MQ 文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’
查看日志的命令:tail -f ~/logs/rocketmqlogs/namesrv.log

在这里插入图片描述

启动 Broker
2.启动双主双从同步集群,顺序是先启动主,然后启动从。
3)启动主 A: 102 服务器进入至‘MQ 文件夹/bin’下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c …/conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log

在这里插入图片描述
4)启动主 B: 103 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:
nohup sh mqbroker -c …/conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
5)启动从 A: 104 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:
nohup sh mqbroker -c …/conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
6)启动从 B: 105 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:
nohup sh mqbroker -c …/conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
每台服务器查看日志:tail -f ~/logs/rocketmqlogs/broker.log
如果是要启动控制台,则需要重新打包:
进入‘\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。(多个 NameServer 使用;分隔)
rocketmq.config.namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。
在把编译后的 jar 包丢上服务器:
nohup java -jar rocketmq-console-ng-2.0.0.jar &
进入控制台 http://106.55.246.66:8089/#/cluster
集群搭建成功。

在这里插入图片描述
在这里插入图片描述

消息生产的高可用机制

在这里插入图片描述

在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。

RocketMQ 目前不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。

高可用消息生产流程

在这里插入图片描述

1、TopicA 创建在双主中,BrokerA 和 BrokerB 中,每一个 Broker 中有 4 个队列
2、选择队列是,默认是使用轮训的方式,比如发送一条消息 A 时,选择 BrokerA 中的 Q4
3、如果发送成功,消息 A 发结束。
4、如果消息发送失败,默认会采用重试机制
retryTimesWhenSendFailed 同步模式下内部尝试发送消息的最大次数 默认值是 2
retryTimesWhenSendAsyncFailed 异步模式下内部尝试发送消息的最大次数 默认值是 2
在这里插入图片描述

在这里插入图片描述
5、如果发生了消息发送失败,这里有一个规避策略(默认配置):
5.1、默认不启用 Broker 故障延迟机制(规避策略):如果是 BrokerA 宕机,上一次路由选择的是 BrokerA 中的 Q4,那么再次重发的队列选择是 BrokerA中的 Q1。但是这里的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。

注意,这里的规避仅仅只针对消息重试,例如在一次消息发送过程中如果遇到消息发送失败,规避 broekr-a,但是在下一次消息发送时,即再次调用 DefaultMQProducer 的 send 方法发送消息时,还是会选择 broker-a 的消息进行发送,只有继续发送失败后,重试时再次规避 broker-a。

为什么会默认这么设计?
1、 某一时间段,从 NameServer 中读到的路由中包含了不可用的主机
2、 不正常的路由信息也是只是一个短暂的时间而已。
生产者每隔 30s 更新一次路由信息,而 NameServer 认为 broker 不可用需要经过 120s。

在这里插入图片描述
所以生产者要发送时认为 broker 不正常(从 NameServer 拿到)和实际 Broker 不正常有延迟。

5.2、启用 Broker 故障延迟机制:代码如下
在这里插入图片描述
开启延迟规避机制,一旦消息发送失败(不是重试的)会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,然后返回 notAvailableDuration 同一个下标对应的延迟值。

这个里面涉及到一个算法,源码部分进行详细讲解。

比如:在发送失败后,在接下来的固定时间(比如 5 分钟)内,发生错误的 BrokeA 中的队列将不再参加队列负载,发送时只选择 BrokerB 服务器上的队列。

如果所有的 Broker 都触发了故障规避,并且 Broker 只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker可用来,那岂不是更糟糕了。所以 RocketMQ 默认不启用 Broker 故障延迟机制。

消息消费的高可用机制

主从的高可用原理

在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。

Master 不可用这个很容易理解,那什么是 Master 繁忙呢?

这个繁忙其实是 RocketMQ 服务器的内存不够导致的。
源码分析:org.apache.rocketmq.store. DefaultMessageStore#getMessage 方法
在这里插入图片描述
当前需要拉取的消息已经超过常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。

消息消费的重试

消费端如果发生消息失败,没有提交成功,消息默认情况下会进入重试队列中。
在这里插入图片描述
注意重试队列的名字其实是跟消费群组有关,不是主题,因为一个主题可以有多个群组消费,所以要注意

在这里插入图片描述

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
所以玩顺序消息时。consume 消费消息失败时,不能返回 reconsume——later,这样会导致乱序,应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

重试次数

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

重试配置

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 RECONSUME_LATER (推荐)
  • 返回 Null
  • 抛出异常

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 CONSUME_SUCCESS,此后这条消息将不会再重试。

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
    在这里插入图片描述
    消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。

如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。

配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信特性

死信消息具有以下特性:
 不会再被消费者正常消费。
 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:
 不会再被消费者正常消费。
 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

查看死信消息

在控制台查询出现死信队列的主题信息
在这里插入图片描述
在消息界面根据主题查询死信消息
选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

负载均衡

Producer 负载均衡

Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:

在这里插入图片描述
发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。

Consumer 负载均衡

集群模式

在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。
默认的分配算法是 AllocateMessageQueueAveragely

还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式
如下图:
在这里插入图片描述
需要注意的是,集群模式下,queue 都是只允许分配只一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。

通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue 将分配到其他实例上继续消费。

但是如果 consumer 实例的数量比 message queue 的总数量还多的话,多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让 queue 的总数量大于等于 consumer 的数量。

广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在 consumer 分配 queue 的时候,所有 consumer 都分到所有的 queue。

RocketMQ 源码分析

源码分析的内容点是比较多,当然我们不可能把 RocketMQ 所有的源码都读完,所以我们把核心、重点的源码进行解读。
需要解读的源码如下:
在这里插入图片描述

读源码前的思考(本次课的)

 NameServer、Broker、Producer、Consumer 的连通性
 Producer、Consumer 连接的建立时机,有何关系?
 NameServer 存储哪些信息,如何存储?
 Topic 的持久化存储是在 NameServer 中还是在 Broker?

RocketMQ 整体架构及连通性

下图是一个典型的双主双从的架构,包括生产者和消费者。
在这里插入图片描述
NameServer 集群中它们相互之间是不通讯。
 生产者同一时间,与 NameServer 集群中其中一台建立长连接。
 生产者与 Broker 之间的 Master 保持长连接。
 消费者同一时间,与 NameServer 集群中其中一台建立长连接。
 消费者与所有 Broker

RocketMQ 核心组件及整体流程

在这里插入图片描述
RocketMQ 的源码的看起来很多,但是如果按照组件来划分的话,核心只要几个。如下图:
在这里插入图片描述

NameServer

命名服务,更新和路由发现 broker 服务。
NameServer 要作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息,NameServer 除了要存储路由的基础信息,还要能够管理Broker 节点,包括路由注册、路由删除等功能。

Producer 和 Consumer

java 版本的 mq 客户端实现,包括生产者和消费者。

Broker

它能接收 producer 和 consumer 的请求,并调用 store 层服务对消息进行处理。HA 服务的基本单元,支持同步双写,异步双写等模式。

Store

存储层实现,同时包括了索引服务,高可用 HA 服务实现。

Netty Remoting Server 与 Netty Remoting Client

基于 netty 的底层通信实现,所有服务间的交互都基于此模块。也区分服务端和客户端。

在这里插入图片描述

NameServer 源码分析

RocketMQ 核心组件及整体流程

在这里插入图片描述

NameServer 启动流程概要

启动流程

从源码的启动可知,NameServer 单独启动。
入口类:NamesrvController
核心方法:NamesrvController 类中 main()->main0-> createNamesrvController->start() -> initialize()

步骤一
解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController
NamesrvController 类中 createNamesrvController 方法

在这里插入图片描述
在这里插入图片描述
这里可以看出,如果你要查看各种参数,直接在启动个参数中送入 -p 就可以打印这个 NameServer 的所有的参数信息
在这里插入图片描述

在这里插入图片描述
同理,在启动日志中一定可以找到所有的参数:
在这里插入图片描述
在这里插入图片描述

步骤二
根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器。
核心控制器会启动定时任务:
1、 每隔 10s 扫描一次 Broker,移除不活跃的 Broker
2、 每隔 10min 打印一次 KV 配置
NamesrvController 类中 initialize()

在这里插入图片描述

在这里插入图片描述
步骤三
在 JVM 进程关闭之前,先将线程池关闭,及时释放资源

在这里插入图片描述
最后:
在这里插入图片描述
在这里插入图片描述

Broker 启动流程概要

入口类: BrokerController 核心方法:
在这里插入图片描述

Topic 路由注册、剔除机制

路由注册与发现(读写锁,保证消息发送时的高并发)

在这里插入图片描述
消息发送时会获取路由信息,同时 Broker 会定时更新路由信息,所以路由表
1、 生产者发送消息时需要频繁的获取。对表进行读。
2、 Broker 定时(30s)会更新一个路由表。对表进行写。

为了提高消息发送时的高并发(同时线程安全),这里维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁
有了很大提升。

写锁

Broker 每隔 30s 向 NameServer 报告自己还活着(包含很多信息)这里使用的是写锁。因为数据最终要写入 NameServer 的内存(使用的 HashMap 进行保存)

在这里插入图片描述

读锁

生产者发送消息时,需要从向 NameServer 获取路由信息,这里使用读锁。
MQClientAPIImpl
在这里插入图片描述
在这里插入图片描述
DefaultRequestProcessor. getRouteInfoByTopic()
在这里插入图片描述
RouteInfoManager. pickupTopicRouteData()
在这里插入图片描述

设计亮点

因为 Broker 每隔 30s 向 NameServer 发送一个心跳包,这个操作每次都会更新 Broker 的状态,但同时生产者发送消息时也需要 Broker 的状态,要进行频繁的读取操作。所以这个地方就有一个矛盾,Broker 的状态会被经常性的更新,同时也会被更加频繁的读取。这里如何提高并发,尤其是生产者进行消息发送时的并发,所以这里使用了读写锁机制(针对读多写少的场景)。

NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 Broker 的状态信息以及路由表( topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表使用了锁粒度较少的读写锁,允许多个消息发送者(Producer)并发读保证消息发送时的高并发。但同一时刻 NameServer只处理一个 Broker 心跳包,多个心跳包请求串行执行。这也是读写锁经典使用场景,更多关于读写锁的信息,可以参考《并发编程》。

路由剔除机制

在这里插入图片描述

Broker 每隔 30s 向 NameServer 发送一个心跳包,心跳包包含 BrokerId,Broker 地址,Broker 名称,Broker 所属集群名称、Broker 关联的 FilterServer列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢?NameServer 会每隔 10s 扫描 brokerLiveTable状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker,关闭与 Broker 连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ 有两个触发点来删除路由信息:

  • NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过 120s,则需要移除
    broker。
  • Broker 在正常关闭的情况下,会执行 unregisterBroker 指令

这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该 broker 相关的信息。
在这里插入图片描述
在这里插入图片描述

NameServer 的存储

在这里插入图片描述
可知 NameServer 存储以下信息:
在这里插入图片描述
在这里插入图片描述
topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker 基础信息,包括 brokerName、所属集群名称、主备 Broker 地址
clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称
brokerLiveTable:Broker 状态信息,NameServer 每次收到心跳包是会替换该信息
filterServerTable:Broker 上的 FilterServer 列表,用于类模式消息过滤。
NameServer 的实现基于内存,NameServer 并不会持久化路由信息,持久化的重任是交给 Broker 来完成。

客户端启动核心流程

DefaultMQProducer 是 MQProducer 的唯一默认实现,
其实现 MQProducer 接口的时候 还继承了 ClientConfig 类 (客户端配置类),可以配置如sendMsgTimeout 超时时间,producerGroup 生产者组 最大消息容量和是否启用压缩等
在这里插入图片描述
RocketMQ 中消息发送者、消息消费者都属于”客户端“
每一个客户端就是一个 MQClientInstance,每一个 ClientConfig 对应一个示例。
故不同的生产者、消费端,如果引用同一个客户端配置(ClientConfig),则它们共享一个 MQClientInstance 实例。

MQClientInstance 不是对外应用类,也就是说用户不需要自己实例化使用他。并且,MQClientInstance 的实例化并不是直接 new 后使用,而是通过MQClientManager 这个类型。MQClientManager 是个单例类,使用饿汉模式设计保证线程安全。他的作用是提供 MQClientInstance 实例,RocketMQ 认为,
MQClientInstance 的实例是可以复用的实例,只要 client 相关特征参数相同,就会复用一个MQClientInstance 实例,我们可以看看源码

深度揭开阿里(蚂蚁金服)技术面试流程 附前期准备,学习方向

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
ClientConfig 里的相关参数一致,这些 Client 会复用一个 MQClientInstance,使用的时候需要注意,你的程序里的 Client 和 MQClientInstance 的对应关系。下面我们来看看 RocketMQ 对参数配置如何的 Client 复用一个 MQClientInstance,简单来说就是 IP@instanceName@unitName,这么一个串,其中 IP我不解释了,InstanceName 可以设置,Producer 和 Consucer 都有相应的 API 设置,如果不设置使用缺省值即 Client 的 PID,unitName 不设置缺省为 null,当然你可以 DefaultMQProducer#setUnitName()这个方法设置这个值,这个方法是继承自 ClientConfig 类的。

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接建立的时机

在这里插入图片描述
注:客户端(MQClientInstance)中连接的建立时机为按需创建,也就是在需要与对端进行数据交互时才建立的。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

深度揭开阿里(蚂蚁金服)技术面试流程 附前期准备,学习方向

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到