消息队列场景
#什么是消息队列?
你可以把消息队列理解为一个使用队列来通信的组件。它的本质,就是个转发器,包含发消息、存消息、消费消息的过程。最简单的消息队列模型如下:
我们通常说的消息队列,简称MQ(Message Queue),它其实就指消息中间件,当前业界比较流行的开源消息中间件包括:RabbitMQ、RocketMQ、Kafka。
#消息队列怎么选型?
Kafka、ActiveMQ、RabbitMQ、RocketMQ来进行不同维度对比。
特性 |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
单机吞吐量 |
万级 |
万级 |
10 万级 |
10 万级 |
时效性 |
毫秒级 |
微秒级 |
毫秒级 |
毫秒级 |
可用性 |
高(主从) |
高(主从) |
非常高(分布式) |
非常高(分布式) |
消息重复 |
至少一次 |
至少一次 |
至少一次 最多一次 |
至少一次最多一次 |
消息顺序性 |
有序 |
有序 |
有序 |
分区有序 |
支持主题数 |
千级 |
百万级 |
千级 |
百级,多了性能严重下滑 |
消息回溯 |
不支持 |
不支持 |
支持(按时间回溯) |
支持(按offset回溯) |
管理界面 |
普通 |
普通 |
完善 |
普通 |
选型的时候,我们需要根据业务场景,结合上述特性来进行选型。
比如你要支持天猫双十一类超大型的秒杀活动,这种一锤子买卖,那管理界面、消息回溯啥的不重要。
我们需要看什么?看吞吐量!
所以优先选Kafka和RocketMQ这种更高吞吐的。
比如做一个公司的中台,对外提供能力,那可能会有很多主题接入,这时候主题个数又是很重要的考量,像Kafka这样百级的,就不太符合要求,可以根据情况考虑千级的RocketMQ,甚至百万级的RabbitMQ。
又比如是一个金融类业务,那么重点考虑的就是稳定性、安全性,分布式部署的Kafka和Rocket就更有优势。
特别说一下时效性,RabbitMQ以微秒的时效作为招牌,但实际上毫秒和微秒,在绝大多数情况下,都没有感知的区别,加上网络带来的波动,这一点在生产过程中,反而不会作为重要的考量。
其它的特性,如消息确认、消息回溯,也经常作为考量的场景,管理界面的话试公司而定了,反正我呆过的地方,都不看重这个,毕竟都有自己的运维体系。
#消息队列使用场景有哪些?
- 解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。
- 异步:加入一个操作设计到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接收的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ将客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。
- 削峰:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问了剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果
#消息重复消费怎么解决?
生产端为了保证消息发送成功,可能会重复推送(直到收到成功ACK),会产生重复消息。但是一个成熟的MQ Server框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的message_id),给生产端提供一个幂等性的发送消息接口。
但是消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。
所以,只能业务端自己做控制,对于已经消费成功的消息,本地数据库表或Redis缓存业务标识,每次处理前先进行校验,保证幂等。
#消息丢失怎么解决的?
使用一个消息队列,其实就分为三大块:生产者、中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
- 消息生产阶段:生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
- 消息存储阶段:Kafka 在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
- 消息消费阶段:消费者接收消息+消息处理之后,才回复 ack 的话,那么消息阶段的消息不会丢失。不能收到消息就回 ack,否则可能消息处理中途挂掉了,消息就丢失了。
#使用消息队列还应该注意哪些问题?
需要考虑消息可靠性和顺序性方面的问题。
#消息队列的可靠性、顺序性怎么保证?
消息可靠性可以通过下面这些方式来保证
- 消息持久化:确保消息队列能够持久化消息是非常关键的。在系统崩溃、重启或者网络故障等情况下,未处理的消息不应丢失。例如,像 RabbitMQ 可以通过配置将消息持久化到磁盘,通过将队列和消息都设置为持久化的方式(设置durable = true),这样在服务器重启后,消息依然可以被重新读取和处理。
- 消息确认机制:消费者在成功处理消息后,应该向消息队列发送确认(acknowledgment)。消息队列只有收到确认后,才会将消息从队列中移除。如果没有收到确认,消息队列可能会在一定时间后重新发送消息给其他消费者或者再次发送给同一个消费者。以 Kafka 为例,消费者通过commitSync或者commitAsync方法来提交偏移量(offset),从而确认消息的消费。
- 消息重试策略:当消费者处理消息失败时,需要有合理的重试策略。可以设置重试次数和重试间隔时间。例如,在第一次处理失败后,等待一段时间(如 5 秒)后进行第二次重试,如果重试多次(如 3 次)后仍然失败,可以将消息发送到死信队列,以便后续人工排查或者采取其他特殊处理。
消息顺序性保证的方式如下:
- 有序消息处理场景识别:首先需要明确业务场景中哪些消息是需要保证顺序的。例如,在金融交易系统中,对于同用户的转账操作顺序是不能打乱的。对于需要顺序处理的消息,要确保消息队列和消费者能够按照特定的顺序进行处理。
- 消息队列对顺序性的支持:部分消息队列本身提供了顺序性保证的功能。比如 Kafka 可以通过将消息划分到同一个分区(Partition)来保证消息在分区内是有序的,消费者按照分区顺序读取消息就可以保证消息顺序。但这也可能会限制消息的并行处理程度,需要在顺序性和吞吐量之间进行权衡。
- 消费者顺序处理策略:消费者在处理顺序消息时,应该避免并发处理可能导致顺序打乱的情况。例如,可以通过单线程或者使用线程池并对顺序消息进行串行化处理等方式,确保消息按照正确的顺序被消费。
#如何保证幂等写?
幂等性是指 同一操作的多次执行对系统状态的影响与一次执行结果一致。例如,支付接口若因网络重试被多次调用,最终应确保仅扣款一次。实现幂等写的核心方案:
- 唯一标识(幂等键):客户端为每个请求生成全局唯一ID(如 UUID、业务主键),服务端校验该ID是否已处理,适用场景接口调用、消息消费等。
- 数据库事务 + 乐观锁:通过版本号或状态字段控制并发更新,确保多次更新等同于单次操作,适用场景数据库记录更新(如余额扣减、订单状态变更)。
- 数据库唯一约束:利用数据库唯一索引防止重复数据写入,适用场景数据插入场景(如订单创建)。
- 分布式锁:通过锁机制保证同一时刻仅有一个请求执行关键操作,适用场景高并发下的资源抢夺(如秒杀)。
- 消息去重:消息队列生产者为每条消息生成唯一的消息 ID,消费者在处理消息前,先检查该消息 ID 是否已经处理过,如果已经处理过则丢弃该消息。
#如何处理消息队列的消息积压问题?
消息积压是因为生产者的生产速度,大于消费者的消费速度。遇到消息积压问题时,我们需要先排查,是不是有bug产生了。
如果不是bug,我们可以优化一下消费的逻辑,比如之前是一条一条消息消费处理的话,我们可以确认是不是可以优为批量处理消息。如果还是慢,我们可以考虑水平扩容,增加Topic的队列数,和消费组机器的数量,提升整体消费能力。
如果是bug导致几百万消息持续积压几小时。有如何处理呢?需要解决bug,临时紧急扩容,大概思路如下:
- 先修复consumer消费者的问题,以确保其恢复消费速度,然后将现有consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先10倍的queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
#如何保证数据一致性,事务消息如何实现?
一条普通的MQ消息,从产生到被消费,大概流程如下:
- 生产者产生消息,发送带MQ服务器
- MQ收到消息后,将消息持久化到存储系统。
- MQ服务器返回ACk到生产者。
- MQ服务器把消息push给消费者
- 消费者消费完消息,响应ACK
- MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息。
我们举个下订单的例子吧。订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。
如何保证数据一致性呢?可以使用事务消息。一起来看下事务消息是如何实现的吧。
- 生产者产生消息,发送一条半事务消息到MQ服务器
- MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态。
- MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
- 生产者执行本地事务
- 如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback。
- 如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息。
- 如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK。
- 如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态。
#消息队列是参考哪种设计模式?
是参考了观察者模式和发布订阅模式,两种设计模式思路是一样的,举个生活例子:
- 观察者模式:某公司给自己员工发月饼发粽子,是由公司的行政部门发送的,这件事不适合交给第三方,原因是“公司”和“员工”是一个整体
- 发布-订阅模式:某公司要给其他人发各种快递,因为“公司”和“其他人”是独立的,其唯一的桥梁是“快递”,所以这件事适合交给第三方快递公司解决
上述过程中,如果公司自己去管理快递的配送,那公司就会变成一个快递公司,业务繁杂难以管理,影响公司自身的主营业务,因此使用何种模式需要考虑什么情况两者是需要耦合的
观察者模式
观察者模式实际上就是一个一对多的关系,在观察者模式中存在一个主题和多个观察者,主题也是被观察者,当我们主题发布消息时,会通知各个观察者,观察者将会收到最新消息,图解如下:每个观察者首先订阅主题,订阅成功后当主题发送消息时会循环整个观察者列表,逐一发送消息通知。
发布订阅模式
发布订阅模式和观察者模式的区别就是发布者和订阅者完全解耦,通过中间的发布订阅中心进行消息通知,发布者并不知道自己发布的消息会通知给谁,因此发布订阅模式有三个重要角色,发布者->发布订阅中心->订阅者。
图解如下:当发布者发布消息到发布订阅中心后,发布订阅中心会将消息通知给所有订阅该发布者的订阅者
#让你写一个消息队列,该如何进行架构设计?
这个问题面试官主要考察三个方面的知识点:
- 你有没有对消息队列的架构原理比较了解
- 考察你的个人设计能力
- 考察编程思想,如什么高可用、可扩展性、幂等等等。
遇到这种设计题,大部分人会很蒙圈,因为平时没有思考过类似的问题。大多数人平时埋头增删改啥,不去思考框架背后的一些原理。有很多类似的问题,比如让你来设计一个 Dubbo 框架,或者让你来设计一个MyBatis 框架,你会怎么思考呢?
回答这类问题,并不要求你研究过那技术的源码,你知道那个技术框架的基本结构、工作原理即可。设计一个消息队列,我们可以从这几个角度去思考:
- 首先是消息队列的整体流程,producer发送消息给broker,broker存储好,broker再发送给consumer消费,consumer回复消费确认等。
- producer发送消息给broker,broker发消息给consumer消费,那就需要两次RPC了,RPC如何设计呢?可以参考开源框架Dubbo,你可以说说服务发现、序列化协议等等
- broker考虑如何持久化呢,是放文件系统还是数据库呢,会不会消息堆积呢,消息堆积如何处理呢。
- 消费关系如何保存呢?点对点还是广播方式呢?广播关系又是如何维护呢?zk还是config server
- 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?
- 消息队列的高可用如何设计呢?可以参考Kafka的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
- 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送。
- MQ得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了。
#RocketMQ
#消息队列为什么选择RocketMQ的?
项目用的是 RocketMQ 消息队列。选择RocketMQ的原因是:
- 开发语言优势。RocketMQ 使用 Java 语言开发,比起使用 Erlang 开发的 RabbitMQ 来说,有着更容易上手的阅读体验和受众。在遇到 RocketMQ 较为底层的问题时,大部分熟悉 Java 的同学都可以深入阅读其源码,分析、排查问题。
- 社区氛围活跃。RocketMQ 是阿里巴巴开源且内部在大量使用的消息队列,说明 RocketMQ 是的确经得起残酷的生产环境考验的,并且能够针对线上环境复杂的需求场景提供相应的解决方案。
- 特性丰富。根据 RocketMQ 官方文档的列举,其高级特性达到了 12 种,例如顺序消息、事务消息、消息过滤、定时消息等。顺序消息、事务消息、消息过滤、定时消息。RocketMQ 丰富的特性,能够为我们在复杂的业务场景下尽可能多地提供思路及解决方案。
#RocketMQ和Kafka的区别是什么?如何做技术选型?
Kafka的优缺点:
- 优点:首先,Kafka的最大优势就在于它的高吞吐量,在普通机器4CPU8G的配置下,一台机器可以抗住十几万的QPS,这一点还是相当优越的。Kafka支持集群部署,如果部分机器宕机不可用,则不影响Kafka的正常使用。
- 缺点:Kafka有可能会造成数据丢失,因为它在收到消息的时候,并不是直接写到物理磁盘的,而是先写入到磁盘缓冲区里面的。Kafka功能比较的单一 主要的就是支持收发消息,高级功能基本没有,就会造成适用场景受限。
RocketMQ是阿里巴巴开源的消息中间件,优缺点
- 优点:支持功能比较多,比如延迟队列、消息事务等等,吞吐量也高,单机吞吐量达到 10 万级,支持大规模集群部署,线性扩展方便,Java语言开发,满足了国内绝大部分公司技术栈
- 缺点:性能相比 kafka 是弱一点,因为 kafka 用到了 sendfile 的零拷贝技术,而 RocketMQ 主要是用 mmap+write 来实现零拷贝。
该怎么选择呢?
- 如果我们业务只是收发消息这种单一类型的需求,而且可以允许小部分数据丢失的可能性,但是又要求极高的吞吐量和高性能的话,就直接选Kafka就行了,就好比我们公司想要收集和传输用户行为日志以及其他相关日志的处理,就选用的Kafka中间件。
- 如果公司的需要通过 mq 来实现一些业务需求,比如延迟队列、消息事务等,公司技术栈主要是Java语言的话,就直接一步到位选择RocketMQ,这样会省很多事情。
#RocketMQ延时消息的底层原理
总体的原理示意图,如下所示:
broker 在接收到延时消息的时候,会将延时消息存入到延时Topic的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始Topic,这些消息就会被各自的 producer 消费了。
#RocektMQ怎么处理分布式事务?
RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性,而不是像2PC、3PC、TCC那样强一致分布式事务
假设 A 给 B 转 100块钱,同时它们不是同一个服务上,现在目标是就是 A 减100块钱,B 加100块钱。
实际情况可能有四种:
- 1)就是A账户减100 (成功),B账户加100 (成功)
- 2)就是A账户减100(失败),B账户加100 (失败)
- 3)就是A账户减100(成功),B账户加100 (失败)
- 4)就是A账户减100 (失败),B账户加100 (成功)
这里 第1和第2 种情况是能够保证事务的一致性的,但是 第3和第4 是无法保证事务的一致性的。
那我们来看下RocketMQ是如何来保证事务的一致性的。
分布式事务的流程如上图:
- 1、A服务先发送个Half Message(是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它)给Brock端,消息中携带 B服务 即将要+100元的信息。
- 2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
- 3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
- 4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
- 4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
- 4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。
从上面流程可以得知 只有A服务本地事务执行成功 ,B服务才能消费该message。
那么 A账户减100 (成功),B账户加100 (失败),这时候B服务失败怎么办?
如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。
如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性。
#RocketMQ消息顺序怎么保证?
消息的有序性是指消息的消费顺序能够严格保存与消息的发送顺序一致。例如,一个订单产生了3条消息,分别是订单创建、订单付款和订单完成。在消息消费时,同一条订单要严格按照这个顺序进行消费,否则业务会发生混乱。同时,不同订单之间的消息又是可以并发消费的,比如可以先执行第三个订单的付款,再执行第二个订单的创建。
RocketMQ采用了局部顺序一致性的机制,实现了单个队列中的消息严格有序。也就是说,如果想要保证顺序消费,必须将一组消息发送到同一个队列中,然后再由消费者进行注意消费。
RocketMQ推荐的顺序消费解决方案是:安装业务划分不同的队列,然后将需要顺序消费的消息发往同一队列中即可,不同业务之间的消息仍采用并发消费。这种方式在满足顺序消费的同时提高了消息的处理速度,在一定程度上避免了消息堆积问题
RocketMQ 顺序消息的原理是:
- 在 Producer(生产者) 把一批需要保证顺序的消息发送到同一个 MessageQueue
- Consumer(消费者) 则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费。
#RocketMQ怎么保证消息不被重复消费
在业务逻辑中实现幂等性,确保即使消息被重复消费,也不会影响业务状态。例如,对于支付或转账类操作,可以使用唯一订单号或事务ID作为幂等性的标识符,确保同样的操作只会被执行一次。
#RocketMQ消息积压了,怎么办?
导致消息积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
要解决积压的问题,可以通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
#kafka
#对Kafka有什么了解吗?
Kafka特点如下:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
#Kafka 为什么这么快?
- 顺序写入优化:Kafka将消息顺序写入磁盘,减少了磁盘的寻道时间。这种方式比随机写入更高效,因为磁盘读写头在顺序写入时只需移动一次。
- 批量处理技术:Kafka支持批量发送消息,这意味着生产者在发送消息时可以等待直到有足够的数据积累到一定量,然后再发送。这种方法减少了网络开销和磁盘I/O操作的次数,从而提高了吞吐量。
- 零拷贝技术:Kafka使用零拷贝技术,可以直接将数据从磁盘发送到网络套接字,避免了在用户空间和内核空间之间的多次数据拷贝。这大幅降低了CPU和内存的负载,提高了数据传输效率。
- 压缩技术:Kafka支持对消息进行压缩,这不仅减少了网络传输的数据量,还提高了整体的吞吐量。
#kafka的模型介绍一下,kafka是推送还是拉取?
消费者模型
消息由生产者发送到kafka集群后,会被消费者消费。一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)。
推送模型(push)
- 基于推送模型(push)的消息系统,有消息代理记录消费者的消费状态。
- 消息代理在将消息推送到消费者后,标记这条消息已经消费,但这种方式无法很好地保证消费被处理。
- 如果要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只要收到消费者的确认请求后才更新为“已消费”,这就需要代理中记录所有的消费状态,但显然这种方式不可取。
缺点:
- push模式很难适应消费速率不同的消费者
- 因为消息发送速率是由broker决定的,push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
拉取模型(pull)
kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息。
说明:
- 有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。
- 消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。
- 这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
消费者组
kafka 消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。
上图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
优点在于:
- 消费者可以通过水平扩展的方式同时读取大量的消息。
- 如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
消费方式
kafka 消费者采用 pull(拉)模式从 broker中读取数据。
pull 的优点:
- pull 模式可以根据 consumer 的消费能力以适当的速率消费消息
缺点:
- 如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
#Kafka 如何保证顺序读取消息?
Kafka 可以保证在同一个分区内消息是有序的,生产者写入到同一分区的消息会按照写入顺序追加到分区日志文件中,消费者从分区中读取消息时也会按照这个顺序。这是 Kafka 天然具备的特性。
要在 Kafka 中保证顺序读取消息,需要结合生产者、消费者的配置以及合适的业务处理逻辑来实现。以下具体说明如何实现顺序读取消息:
- 生产者端确保消息顺序:为了保证消息写入同一分区从而确保顺序性,生产者需要将消息发送到指定分区。可以通过自定义分区器来实现,通过为消息指定相同的Key,保证相同Key的消息发送到同一分区。
- 消费者端保证顺序消费:消费者在消费消息时,需要单线程消费同一分区的消息,这样才能保证按顺序处理消息。如果使用多线程消费同一分区,就无法保证消息处理的顺序性。
Kafka 本身不能保证跨分区的消息顺序性,如果需要全局的消息顺序性,通常有以下两种方法:
- 只使用一个分区:将所有消息都写入到同一个分区,消费者也只从这个分区消费消息。但这种方式会导致 Kafka 的并行处理能力下降,因为 Kafka 的性能优势在于多分区并行处理。
- 业务层面保证:在业务代码中对消息进行编号或添加时间戳等标识,消费者在消费消息后,根据这些标识对消息进行排序处理。但这种方式会增加业务代码的复杂度。
#kafka 消息积压怎么办?
Kafka 消息积压是一个常见的问题,它可能会导致数据处理延迟,甚至影响业务的正常运行,下面是一些解决 Kafka 消息积压问题的常用方法:
- 增加消费者实例可以提高消息的消费速度,从而缓解积压问题。你需要确保消费者组中的消费者数量不超过分区数量,因为一个分区同一时间只能被一个消费者消费。
- 增加 Kafka 主题的分区数量可以提高消息的并行处理能力。在创建新分区后,你需要重新平衡消费者组,让更多的消费者可以同时消费消息。
#Kafka为什么一个分区只能由消费者组的一个消费者消费?这样设计的意义是什么?
同一时刻,一条消息只能被组中的一个消费者实例消费
如果两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。
#如果有一个消费主题topic,有一个消费组group,topic有10个分区,消费线程数和分区数的关系是怎么样的?
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。
所以,分区数决定了同组消费者个数的上限。
如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。
#消息中间件如何做到高可用?
消息中间件如何保证高可用呢?单机是没有高可用可言的,高可用都是对集群来说的,一起看下kafka的高可用吧。
Kafka 的基础集群架构,由多个broker组成,每个broker都是一个节点。当你创建一个topic时,它可以划分为多个partition,而每个partition放一部分数据,分别存在于不同的 broker 上。也就是说,一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
有些伙伴可能有疑问,每个partition放一部分数据,如果对应的broker挂了,那这部分数据是不是就丢失了?那还谈什么高可用呢?
Kafka 0.8 之后,提供了复制品副本机制来保证高可用,即每个 partition 的数据都会同步到其它机器上,形成多个副本。然后所有的副本会选举一个 leader 出来,让leader去跟生产和消费者打交道,其他副本都是follower。写数据时,leader 负责把数据同步给所有的follower,读消息时, 直接读 leader 上的数据即可。如何保证高可用的?就是假设某个 broker 宕机,这个broker上的partition 在其他机器上都有副本的。如果挂的是leader的broker呢?其他follower会重新选一个leader出来。
#Kafka 和 RocketMQ 消息确认机制有什么不同?
Kafka的消息确认机制有三种:0,1,-1:
- ACK=0:这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。
- ACK=1:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。
- ACK=-1:这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。
RocketMQ 提供了三种消息发送方式:同步发送、异步发送和单向发送:
- 同步发送:是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
- 异步发送:是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式,但是需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。适用于链路耗时较长,对响应时间较为敏感的业务场景,例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
#Kafka 和 RocketMQ 的 broker 架构有什么区别
- Kafka 的 broker 架构:Kafka 的 broker 架构采用了分布式的设计,每个 Kafka broker 是一个独立的服务实例,负责存储和处理一部分消息数据。Kafka 的 topic 被分区存储在不同的 broker 上,实现了水平扩展和高可用性。
- RocketMQ 的 broker 架构:RocketMQ 的 broker 架构也是分布式的,但是每个 RocketMQ broker 有主从之分,一个主节点和多个从节点组成一个 broker 集群。主节点负责消息的写入和消费者的拉取,从节点负责消息的复制和消费者的负载均衡,提高了消息的可靠性和可用性。
#RabbitMQ
#RabbitMQ的特性你知道哪些?
RabbitMQ 以 可靠性、灵活性 和 易扩展性 为核心优势,适合需要稳定消息传递的复杂系统。其丰富的插件和协议支持使其在微服务、IoT、金融等领域广泛应用,比较核心的特性有如下:
- 持久化机制:RabbitMQ 支持消息、队列和交换器的持久化。当启用持久化时,消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。例如,在声明队列时可以设置 durable 参数为 true 来实现队列的持久化:
- 消息确认机制:提供了生产者确认和消费者确认机制。生产者可以设置 confirm 模式,当消息成功到达 RabbitMQ 服务器时,会收到确认消息;消费者在处理完消息后,可以向 RabbitMQ 发送确认信号,告知服务器该消息已被成功处理,服务器才会将消息从队列中删除。
- 镜像队列:支持创建镜像队列,将队列的内容复制到多个节点上,提高消息的可用性和可靠性。当一个节点出现故障时,其他节点仍然可以提供服务,确保消息不会丢失。
- 多种交换器类型:RabbitMQ 提供了多种类型的交换器,如直连交换器(Direct Exchange)、扇形交换器(Fanout Exchange)、主题交换器(Topic Exchange)和头部交换器(Headers Exchange)。不同类型的交换器根据不同的规则将消息路由到队列中。例如,扇形交换器会将接收到的消息广播到所有绑定的队列中;主题交换器则根据消息的路由键和绑定键的匹配规则进行路由。
#RabbitMQ的底层架构是什么?
以下是 RabbitMQ 的一些核心架构组件和特性:
- 核心组件:生产者负责发送消息到 RabbitMQ、消费者负责从 RabbitMQ 接收并处理消息、RabbitMQ 本身负责存储和转发消息。
- 交换机:交换机接收来自生产者的消息,并根据 routing key 和绑定规则将消息路由到一个或多个队列。
- 持久化:RabbitMQ 支持消息的持久化,可以将消息保存在磁盘上,以确保在 RabbitMQ 重启后消息不丢失,队列也可以设置为持久化,以保证其结构在重启后不会丢失。
- 确认机制:为了确保消息可靠送达,RabbitMQ 使用确认机制,费者在处理完消息后发送确认给 RabbitMQ,未确认的消息会重新入队。
- 高可用性:RabbitMQ 提供了集群模式,可以将多个 RabbitMQ 实例组成一个集群,以提高可用性和负载均衡。通过镜像队列,可以在多个节点上复制同一队列的内容,以防止单点故障。
原始
mq
消息队列(Message Queue)是一种应用程序对应用程序的通信方法,它允许系统在不同的组件之间异步地传递消息。消息队列通常用于解耦生产者和消费者,提高系统的可用性和可扩展性。以下是消息队列的一些主要特点和用途:
- 异步通信:消息队列允许生产者发送消息而不必等待消费者处理,这样可以提高系统的响应速度和吞吐量。
- 解耦:生产者和消费者不需要同时在线,它们可以独立地工作,只需要通过消息队列进行通信。
- 削峰填谷
一般而言对于互联网项目,都可能出现高QPS的场景。
如果上游系统不限速的发送MQ,下游系统可能因为高负荷导致服务直接被压垮。
为了避免下游系统的崩溃,常见的优化方案有两种:
上游队列缓冲,限速发送
下游队列缓冲,限速执行
提要:
MQ有两种消费模式:1.推(push),2.拉(pull)
推(Push):服务端口主动推送消息给客户端
拉(Pull):客户端需要主动到服务端轮询获取数据
什么是消峰填谷:
削峰填谷采用的是消费者拉取(Pull)的方式进行MQ消费,将服务器暂时无法承载的消息堆积在MQ之中,这样流量高峰就被削减了,等QPS高峰期过去,下游系统将堆积的MQ处理慢慢消耗,这个过程就叫做填谷。整个过程让下游系统的负载维持在一定水平之内。
rabbit rocket 默认推
kafka默认拉 延迟大
对比
分布式
Kafka 的扩展方式是水平扩展(horizontal scaling),即通过增加集群中的节点数来提高性能和容量。因此在处理大容量、高吞吐量和实时数据流上性能优势很大,它每秒能够处理数百万个事件,并且可以处理大量数据。RabbitMQ 的扩展方式是垂直扩展(vertical scaling),即通过增加单个节点的硬件资源来提高性能和容量,因此它受到单个节点性能和容量的瓶颈,在性能方面是不如 Kafka 的。
延迟
RabbitMQ 使用推送模型(push model),即交换机将消息推送到队列,然后队列将消息推送到消费者,这样可以减少消息在队列中的等待时间,降低延迟;Kafka 使用拉取模型(pull model),即生产者将消息发布到主题,然后消费者从主题拉取消息,这样可以增加消费者对消息的控制力,提高吞吐量,但也会增加延迟。因此在在延迟方面 Kafka 是不如 RabbitMQ 的。
持久化
Kafka 将数据持久化到磁盘中,并且支持数据压缩和批量传输,以提高性能和节省空间。Kafka 可以支持 TB 级别甚至 PB 级别的数据存储,并且可以快速地重放历史数据。RabbitMQ 将数据缓存在内存中,并且支持消息确认和事务机制,以提高可靠性和一致性。RabbitMQ 也可以将数据持久化到磁盘中,但是会降低性能和吞吐量。RabbitMQ 更适合处理小规模且实时性较高的数据。
确认机制
RabbitMQ 支持消费者确认机制(consumer acknowledgement),即消费者在接收并处理完消息后向队列发送确认信号,队列才会删除该消息,这样可以保证消息的可靠传递;Kafka 不支持消费者确认机制,而是由消费者将消息附加到日志文件中,自己维护一个偏移量来记录已经消费过的消息,主题不会删除任何消息,该日志文件将一直留存到其保留期到期,除非达到预设的保留期限或大小限制。这样消费者可以在规定的时间内随时重新处理流式传输中的历史数据。
、
对比
“实时性” 的核心衡量标准是 **“端到端延迟”**(生产者发消息 → 消息到达 Broker → 消费者收到消息的总时间)。Kafka 的实时性优势,本质是其 “日志存储 + 分区并行” 架构,与 RabbitMQ 的 “队列路由”、RocketMQ 的 “混合存储” 形成鲜明差异。
先看三者核心架构对比
特性 | Kafka | RabbitMQ | RocketMQ |
---|---|---|---|
存储核心 | 顺序日志文件(分段存储) | 内存队列 + 磁盘持久化(随机写) | 内存缓存 + 磁盘日志(混合模式) |
消息模型 | 分区(Partition)并行 | 交换机(Exchange)+ 队列(Queue)路由 | Topic + 队列(Queue)+ 消费组 |
核心设计目标 | 高吞吐、低延迟(流数据场景) | 灵活路由、多协议支持(企业消息场景) | 高可靠、高吞吐(电商场景) |
典型端到端延迟 | 毫秒级(通常 10~100ms) | 毫秒级~秒级(路由复杂时延迟高) | 毫秒级(通常 50~200ms,略高于 Kafka) |
1. 日志存储:规避 “中间环节”,减少延迟损耗
Kafka 的消息流转路径极短:
生产者 → 直接追加到 Broker 的日志文件 → 消费者直接读取日志文件
整个过程无 “中间转换”:
- 无需像 RabbitMQ 那样,先将消息写入 “内存队列”,再通过交换机路由到目标队列(路由规则复杂时会增加延迟);
- 无需像 RocketMQ 那样,先将消息写入 “内存缓存(CommitLog)”,再异步刷盘到日志文件(虽支持同步刷盘,但会牺牲吞吐,默认异步刷盘有微小延迟)。
Kafka 的 “写入即持久化”(顺序写落盘),既保证了可靠性,又省去了 “缓存→磁盘” 的异步等待时间,延迟自然更低。
对比rocketmq
Kafka 采用单一日志文件 + 分区并行的设计:一个 Topic 的所有分区独立写入各自的日志文件,分区间无锁竞争,可通过增加分区数线性提升吞吐。而 RocketMQ 的所有 Topic 共享一个 CommitLog 文件(所有消息混存),虽通过 ConsumeQueue 索引加速查询,但高并发下会产生锁竞争,吞吐上限略低于 Kafka。
Kafka 消费消息时直接利用操作系统的 sendfile
系统调用实现零拷贝,数据从磁盘→内核缓冲区→消费者,减少 2 次内存拷贝;而 RocketMQ 需经过用户态缓冲区转发,拷贝次数更多,高吞吐场景下延迟差异会放大。
需要说明的是,RocketMQ 在以下场景更具优势,这也是它在国内互联网企业广泛应用的原因:
- 企业级特性更完善:内置事务消息、定时消息、重试机制(死信队列)、消息轨迹等,无需额外开发;
- 低延迟场景优化更好:在中小吞吐(10 万条 / 秒以下)场景下,端到端延迟通常比 Kafka 低 10-20ms;
- 部署运维更简单:自带控制台、监控告警等工具,对国内环境(如阿里云、K8s)的适配更友好。
延时消息
kafka 为什么性能比 RocketMQ 好
聊完两种零拷贝技术,我们回过头来看下 kafka 为什么性能比 RocketMQ 好。这是因为 RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile。kafka 以更少的拷贝次数以及系统内核切换次数,获得了更高的性能。但问题又来了,为什么 RocketMQ 不使用 sendfile?参考 kafka 抄个作业也不难啊?我们来看下 sendfile 函数长啥样。
ssize_t sendfile(int out_fd, int in_fd, off_t* offset, size_t count);
// num = sendfile(xxx);
再来看下 mmap 函数长啥样。
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
// buf = mmap(xxx)
注释里写的是两个函数的用法,mmap 返回的是数据的具体内容,应用层能获取到消息内容并进行一些逻辑处理。
而 sendfile 返回的则是发送成功了几个字节数,具体发了什么内容,应用层根本不知道。
而 RocketMQ 的一些功能,却需要了解具体这个消息内容,方便二次投递等,比如将消费失败的消息重新投递到死信队列中,如果 RocketMQ 使用 sendfile,那根本没机会获取到消息内容长什么样子,也就没办法实现一些好用的功能了。
而 kafka 却没有这些功能特性,追求极致性能,正好可以使用 sendfile。
除了零拷贝以外,kafka 高性能的原因还有很多,比如什么批处理,数据压缩啥的,但那些优化手段 rocketMQ 也都能借鉴一波,唯独这个零拷贝,那是毫无办法。
所以还是那句话,没有一种架构是完美的,一种架构往往用于适配某些场景,你很难做到既要又要还要。当场景不同,我们就需要做一些定制化改造,通过牺牲一部分能力去换取另一部分能力。做架构,做到最后都是在做折中。是不是感觉升华了。
但 Zookeeper 作为一个通用的分布式协调服务,它不仅可以用于服务注册与发现,还可以用于分布式锁、配置管理等场景。Kafka 其实只用到了它的部分功能,多少有点杀鸡用牛刀的味道。太重了。
所以 RocketMQ 直接将 Zookeeper 去掉,换成了 nameserver,用一种更轻量的方式,管理消息队列的集群信息。生产者通过 nameserver 获取到 topic 和 broker 的路由信息,然后再与 broker 通信,实现服务发现和负载均衡的效果。
---
### **2. 吞吐量差异的原因**
#### **RabbitMQ 吞吐量较低**
- **单节点架构**:
虽然支持集群,但消息默认不跨节点复制(需配置镜像队列),扩展性受限。
- **ACK 机制**:
消费者确认(ACK)后才会删除消息,确保可靠性但增加开销。
- **内存限制**:
默认优先将消息存储在内存,内存不足时性能下降。
#### **Kafka 吞吐量高**
- **分区 + 批量读写**:
数据分片(Partition)并行处理,支持批量发送/压缩(减少 I/O 次数)。
- **磁盘顺序写入**:
直接持久化到磁盘(利用顺序 I/O 性能接近内存),避免内存瓶颈。
- **零拷贝技术**:
通过 `sendfile` 和 `mmap` 减少数据拷贝次数,提升网络传输效率。
---
### **3. 实时性差异的原因**
#### **RabbitMQ 实时性更强**
- **低延迟设计**:
消息到达 Broker 后立即推送给消费者(默认 `push` 模式),延迟通常在毫秒级。
- **短生命周期消息**:
消息被消费后立即删除(非持久化场景),减少磁盘 I/O 延迟。
#### **Kafka 实时性较弱**
- **批量处理**:
默认积累一批消息或等待超时(`linger.ms`)才发送,增加延迟(可配置为 `0` 但牺牲吞吐)。
- **Pull 模式**:
消费者主动拉取消息(`poll`),可能引入空轮询延迟。
- **持久化开销**:
所有消息必须写磁盘,即使消费者在线也会产生磁盘 I/O 延迟。
---
### **4. 其他关键差异**
| **维度** | **RabbitMQ** | **Kafka** |
|----------------|------------------------------------------|----------------------------------------|
| **消息模型** | 队列(点对点)、Exchange(发布/订阅) | 分区 Topic(持久化日志流) |
| **消息保留** | 消费后默认删除 | 按时间/大小保留(可重复消费) |
| **协议支持** | AMQP、STOMP、MQTT | 自定义二进制协议 |
| **事务支持** | 支持(但性能差) | 支持(精确一次语义) |
却也损失了 AMQP 强大的功能,比如延时队列、死信队列、消息路由、优先级队列等功能。另外由于通常会有多个生产者、多个队列、多个消费者的存在,显然两者都不能保证消息的绝对顺序性。
- RabbitMQ:提供丰富的可靠性机制,支持消息确认(ACK)、持久化、事务等,可确保消息不丢失,但会牺牲部分性能。
- Kafka:通过副本机制(Replication)保证消息可靠性,默认异步复制,可通过参数调整同步策略,但消息确认机制相对简单。
同时在数据的持久化上两个系统存在明显差异:RabbitMQ 会删除已经被消费确认的消息,Kafka 不会主动删除,只是会更新分区的偏移量,即可以将过去的数据重放,这也是决策的一个关键点。
所以,可以看到了,这个场景用 RabbitMQ,出现了三个问题:
- 为了实现发布订阅功能,从而使用的消息复制会降低性能并耗费更多资源;
- 多个消费者无法严格保证消息顺序;
- 大量的订单集中在一个队列,吞吐量受到了限制。
1.Rabbitmq
Pubsub
Stream 消费者组
队列绑定交换机 接受 临时存储消息
路由模式:
topic 模式 topic
2 消息丢失
、RabbitMQ解决消息丢失问题:
生产端:启用Publisher Confirm模式(异步确认消息持久化)并设置mandatory=true
路由失败回退,结合备份交换机处理无法路由的消息;事务消息因性能损耗仅限关键场景使用。
Broker端:消息与队列均需持久化(durable=true
)防止宕机丢失,部署镜像队列集群实现多节点冗余;同步刷盘策略确保数据落盘后响应。
消费端:关闭自动ACK,采用手动ACK并在业务处理成功后提交确认;消费失败时重试(重试次数可配置)并最终转入死信队列人工干预,避免消息因异常未处理而丢失。
3 死信队列
延迟消息 设置ttl 过期后路由到死信交换机 重新发布
4 重复消费
- 使用消息唯一标识符(Message ID):在消息发送时,为每条消息附加一个唯一标识符。消费者在处理消息时,可以通过判断消息唯一标识符来避免重复消费。可以将消息ID记录在数据库或缓存中,用于去重检查。
- 消费者端去重处理:消费者在消费消息时,可以通过维护一个已消费消息的列表或缓存,来避免重复消费已经处理过的消息。
5 消息挤压
RabbitMQ通过调整消费者的消费速率来控制消息积压。
可以使用QoS(Quality of Service)机制设置每个消费者的预取计数,限制每次从队列中获取的消息数量,以控制消费者的处理速度。
RabbitMQ还支持消费者端的流量控制,通过设置basic.qos或basic.consume命令的参数来控制消费者的处理速度,避免消息过多导致积压。
6 顺序
- 应用场景:通常用于实时的、对可靠性要求较高的消息传递上
- 中小项目,项目消息量小、吞吐量不高、对延时敏感
- 遗留应用,如需要与旧系统或第三方系统进行集成或通信
- 复杂路由,如需要根据不同的规则或条件来分发或过滤消息
- 延迟敏感,对于消费者处理消息的及时性有非常高的要求
2.kafka
基于主题(Topic)和分区(Partition)的存储模型
高吞吐量 批处理+0拷贝
低延迟 顺序写
持久化 可扩展
kafka特点
1. 以日志(Log)为核心的存储模型
Kafka 将所有消息以 “日志文件” 的形式持久化存储,这是其最底层的设计基石:
- 顺序追加与不可修改:消息一旦写入分区(Partition),就以 append-only 方式追加到日志文件末尾,永不修改(仅允许删除过期数据)。这种设计最大化利用了磁盘 顺序 IO 的高性能(顺序写速度接近内存),同时避免了随机写带来的性能损耗。突破磁盘瓶颈:Kafka 将消息持久化到磁盘(而非内存),但通过顺序写入大幅提升效率(比随机写快 6000 倍)。
- 分区日志的独立性:每个主题(Topic)被拆分为多个分区,每个分区对应独立的日志文件。分区间并行写入、并行消费,实现了横向扩展,同时保证 “分区内有序,分区间独立” 的特性。
- 基于偏移量(Offset)的定位:每个消息在分区内有唯一的偏移量(单调递增的整数),作为消息的 “逻辑地址”。消费者通过 Offset 记录消费进度,实现了 “按需读取” 和 “断点续传”,简化了消息投递的可靠性保证。
2. 高性能的核心优化:批量与异步
Kafka 极致追求高吞吐,核心优化在于 “批量处理” 和 “异步 IO”:
内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,
- 批量写入与压缩:生产者(Producer)默认将多条消息批量打包发送(通过
batch.size
控制批量大小),并支持对批量消息压缩(如 GZIP、Snappy),减少网络传输量和磁盘 IO 次数。生产者/消费者均支持批量发送与拉取消息,减少网络 I/O 次数。
典型吞吐:单集群可达 每秒百万级消息(依赖硬件配置)。 - 异步刷盘与页缓存:消息写入时先写入操作系统的页缓存(Page Cache),再由 OS 异步刷盘(而非立即同步到磁盘),利用内存加速读写,同时通过
flush.messages
等参数平衡性能与可靠性。页缓存技术:利用操作系统 Page Cache 减少磁盘访问,读写操作直接与内存交互。 - 零拷贝(Zero-Copy)读取:消费者读取消息时,通过 Linux 的
sendfile()
系统调用直接将磁盘文件数据通过内核缓冲区发送到网络,避免用户态与内核态的数据拷贝,大幅提升读取性能。
3. 松耦合的生产者 - 消费者模型
Kafka 弱化了生产者与消费者的直接依赖,通过主题实现解耦:
- 发布 - 订阅模式:生产者仅需将消息发送到主题,无需关心谁在消费;消费者按需订阅主题,无需关心生产者的存在。这种解耦支持多对多通信(多个生产者向同一主题写入,多个消费组独立消费)。
- 消费进度由消费者自主管理:消费者通过 Offset 记录自己的消费位置,而非由 broker 维护,降低了 broker 的负担,同时支持 “重复消费”“回溯消费” 等灵活场景(如数据重放)。
- 消息持久化与过期策略:消息默认在 broker 持久化存储,并可通过
retention.ms
配置保留时间(如 7 天),消费者可随时消费历史数据,无需实时在线。
4. 分布式与分区(Partition)的水平扩展
Kafka 从设计之初就面向分布式场景,通过分区实现存储和计算的水平扩展:
- 分区作为扩展单元:主题的分区可以分布在不同 Broker 节点上,消息写入和消费可分散到多个节点,突破单节点的性能瓶颈。例如,一个 10 分区的主题可支持 10 倍于单分区的吞吐量。
- 消费端的并行消费:消费组(Consumer Group)中的多个消费者实例可同时消费不同分区的消息(一个分区仅被一个消费者消费),实现消费能力的线性扩展。
- 数据冗余与容错:每个分区可配置多个副本(Replica),通过领导者(Leader)与追随者(Follower)机制实现故障转移(Leader 故障后,Follower 自动晋升),保证数据不丢失且服务持续可用。
二、分布式
- 这得益于其分布式架构、分区机制以及批量处理等技术。定时持久化存储,非实时持久化储存
分片(Partition)机制
Topic 被划分为多个 Partition,分散在不同 Broker 上。
优势:
并行读写:生产者/消费者可同时操作多个 Partition。
负载均衡:Partition 可动态迁移。
2. 无缝扩容
新增 Broker 后,通过 kafka-reassign-partitions.sh 工具自动平衡 Partition 分布。
无需停机:扩容过程不影响服务可用性。
3. 副本(Replication)机制
每个 Partition 有多个副本(Leader + Followers),保障数据高可用。
ISR(In-Sync Replicas):仅同步的副本参与故障切换,避免脏数据。
三、持久化存储与数据可靠性
1. 消息持久化
数据默认保留 7 天(可配置为永久保留),支持按时间/大小滚动清理。
消费解耦:消费者可随时重放历史数据(区别于传统 MQ 的“阅后即焚”)。
2. 端到端数据保证
生产者:
acks=all:确保消息写入所有 ISR 副本后才返回成功。
幂等生产者(Idempotent Producer):避免网络重试导致重复消息。
消费者:
位移(Offset)提交到 Kafka 内部 Topic(__consumer_offsets),避免丢失。
五、高可用性与容错
1. Controller 选举机制
依赖 ZooKeeper(或 KRaft 模式)选举 Controller Broker,管理 Partition 状态。
Controller 故障时自动切换(秒级恢复)。
2. 无单点故障
所有组件(Broker、Producer、Consumer)均分布式部署。
客户端自动发现集群拓扑变化(Metadata 更新)。
5. 面向流式处理的设计
Kafka 不仅是消息队列,更是流式数据平台,其设计天然适配流式处理场景:
- 时间维度的消息有序性:分区内消息按时间顺序写入,支持基于时间的窗口计算(如统计 5 分钟内的订单量)。
- 高吞吐量支撑实时处理:通过批量处理和分区并行,Kafka 可支撑每秒百万级消息的吞吐,满足实时日志收集、监控数据上报等高频场景。
- 与流处理框架的集成:原生支持与 Spark Streaming、Flink 等流处理框架对接,提供 Exactly-Once 语义的消息投递,成为流式数据管道的核心枢纽。
kafka是一个基于发布订阅模式的消息队列中间件,通过topic进行消息分类,producer生产的消息可以根据消息类型发布到不同的topic,由不同的consumer订阅进行消费,这样就组成了一个基本可用的消息队列中间件;
一个可靠的消息队列中间件,就要解决单点问题(主从架构 副本机制),kafka中解决单点问题引入了broker,Broker 是 Kafka 的服务器实例,一个 Kafka 集群可以包含多个 Broker。
每个 Broker 都负责管理部分 Topic 的分区和消息。
每个 Broker 有一个唯一的 ID,用于标识它在集群中的身份。
Broker 的职责:
存储消息:将消息持久化到磁盘,按照分区(Partition)进行组织。
分发消息:接收生产者发送的消息,并分发给消费者。
分区和副本管理:分配分区的主节点和副本节点。
元数据管理:保存集群的元数据信息,如分区、Leader 副本等。
为了管理这些topic引入了zk(以topic维度主从) 分布式协调,选举leader、follower,leader会向follower复制topic数据,leader会和consumer打交道,而follower仅仅做备份,并且当一台broker挂掉后会重新选举新的leader(这时因为已经同步好topic就可以马上接力成为leader)
消费者组
一个分区只能被一个消费者选择 一个消费者可能多个分区
采用多消费者消费多个分区
副本
当一个topic数据非常多时,会严重影响性能及吞吐量,所以kafka会进行分区,一个topic可以把消息内容拆分多个平行的子topic提供给到外面进行消费,topic分区后,consumer也可以进行拆分成和分区一样的数量去消费,也就是消费组;
顺序 重复 可靠性(持久化+不丢)
一、顺序
Kafka实现顺序消息的核心在于分区顺序性:
- 生产端:相同业务标识(如订单ID)的消息通过固定Key哈希至同一分区(
Partitioner
),利用分区内消息天然有序性保序; - 消费端:每个分区仅由同一消费者组的一个线程消费(单线程串行处理),避免并发消费乱序;
一、分区的底层存储:以日志文件为核心
每个 Kafka 分区(Partition)在底层对应 一组有序的日志文件(Log Segment),这些文件以 “顺序写入、不可修改” 的方式存储消息,具体结构如下:
日志文件的物理结构
每个分区的消息被拆分到多个日志段文件(*.log
)中,文件名以该段包含的 最小偏移量(Offset) 命名(如00000000000000000000.log
、00000000000000123456.log
)。
新消息永远 追加到最后一个日志段的末尾,不会插入或修改已有内容(类似 append-only 日志)。消息的偏移量(Offset)
每个消息在分区内有唯一的 偏移量(一个单调递增的整数),表示消息在分区内的写入顺序。例如:第一个消息 Offset = 0
第二个消息 Offset = 1
...
偏移量直接映射到消息在日志文件中的物理顺序,消费者通过 Offset 定位消息位置,天然保证 “按写入顺序读取”。
二、为什么单一分区能保证有序?
核心原因是 Kafka 对分区日志的 写入和读取机制 严格遵循 “顺序性”:
写入:单线程追加 + 磁盘顺序 IO
Kafka 生产者发送的消息,会被分区器路由到指定分区后,由 分区对应的单线程 处理写入(同一分区的消息不会被多线程并行写入)。
消息被追加到日志文件末尾,利用磁盘的 顺序写性能优势(顺序写比随机写快 10 倍以上),同时保证写入顺序与消息发送顺序一致。
即使多个生产者向同一分区发送消息,Kafka 也会通过 分区内的锁机制 保证写入串行化,避免并发写入导致的顺序混乱。
三、特殊场景的处理
1. 消息重试与顺序性冲突
若消息处理失败需要重试,直接重试会导致 重试消息与新消息顺序错乱(例如:原顺序 msg1→msg2,msg1 失败重试后变成 msg2→msg1)。
解决方案:
- 使用 死信队列(DLQ):失败消息放入单独的死信主题,不影响主流程顺序;
- 重试时保持 key 不变:确保重试消息进入原分区,消费端按偏移量顺序处理(需在消费端逻辑中判断消息是否已处理)。
kafka重复消费
1. Kafka Broker上存储的消息,都有一个offset标记,维护当前已经消费的消息,每消费一批数据,kafka broker就会更新offset的值,避免重复消费
2. 在kafka里面有一个partition balance机制,就是把多个partition均衡的分配给多个消费者,
Consumer端会从分配的partition里面去消费信息,如果consumer在默认的5分钟内没办法处理完这一批消息,就会触发kafka的rebalance机制,从而导致offset自动提交失败;而在重新
Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费
的问题。
1. 提高消费端的处理性能避免触发balance,比如可以用异步的方式来处理消息,缩短单个消息消费
的时间,或者可以调整消息处理的超时时间,或者减少一次性从broker上拉取数据的条数;
2. 消费者端 可以针对消息生成md5然后保存到mysql或者redis里面,在处理消息之前先去判断幂等性,或者使用业务判断幂等性;
- 幂等性处理:在消费者端实现幂等性逻辑,即多次消费同一条消息所产生的结果与单次消费的结果一致。这可以通过在业务逻辑中引入唯一标识符或记录已处理消息的状态来实现。
- 消息确认机制:消费者在处理完消息后,提交已消费的偏移量(Offset)给Kafka,Kafka会记录已提交的偏移量,以便在消费者重新启动时从正确的位置继续消费。消费者可以定期提交偏移量,确保消息只被消费一次。
kafka如何保证数据不丢失
我们无法保证kafka消息不丢失,只能保证某种程度下,消息不丢失;
生产者
对生产者来说,其发送消息到 Kafka 服务器的过程可能会发生网络波动,导致消息丢失。对于这一个可
能存在的风险,我们可以通过合理设置 Kafka 客户端的 request.required.acks 参数来避免消息丢
失。该参数表示生产者需要接收来自服务端的 ack 确认,当收不到确认或者超时,便会抛出异常,从而
让生产者可以进一步进行处理。
该参数可以设置不同级别的可靠性,从而满足不同业务的需求,其参数设置及含义如下所示:
request.required.acks = 0 表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消
息。在这种情况下,如果 Leader 分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
request.required.acks = 1 表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送
下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同
步完成。所以如果在消息已经被写入 Leader 分片,但是还未同步到 Follower 节点,此时 Leader
分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
request.required.acks = -1 表示 Producer 等待来自 Leader 和所有 Follower 的 ACK 确认之
后,才发送下一条消息。在这种情况下,除非 Leader 节点和所有 Follower 节点都宕机了,否则不
会发生消息的丢失。
如上所示,如果业务对可靠性要求很高,那么可以将 request.required.acks 参数设置为 -1,这样就
不会在生产者阶段发生消息丢失的问题。
一、生产者(Producer)层面:确保数据成功发送到 Broker
生产者是数据流入 Kafka 的起点,其核心目标是避免因网络波动、Broker 故障等问题导致数据未被正确接收。关键机制包括:
1. 消息发送确认机制(acks 参数)
生产者通过配置 acks
参数控制消息发送的确认级别,决定消息 “成功发送” 的判定标准,避免消息在传输中丢失:
- acks=0:生产者发送消息后不等待 Broker 确认,直接认为发送成功。存在丢失风险(如 Broker 崩溃、网络丢包),仅用于对延迟要求极高但可容忍数据丢失的场景。
- acks=1(默认值):生产者等待分区 Leader 副本确认接收消息后,认为发送成功。此时若 Leader 已接收但未同步给 Follower 时 Leader 崩溃,消息可能丢失(需依赖后续 Leader 选举和数据恢复)。
- acks=-1 或 all:生产者等待分区 Leader 及其所有 ISR(In-Sync Replicas,同步副本) 确认接收消息后,才认为发送成功。这是最安全的级别,确保消息至少被多个副本持久化,几乎无丢失风险(除非所有 ISR 副本同时崩溃)。
2. 重试机制(retries 参数)
当消息发送失败(如网络超时、Leader 选举中),生产者会根据 retries
参数配置的重试次数自动重试,避免因瞬时故障导致的数据丢失。配合 retry.backoff.ms
可设置重试间隔,防止重试风暴。
3. 消息缓冲与批量发送
生产者会将消息先缓冲到本地内存(
buffer.memory
),并按batch.size
(批量大小)或linger.ms
(等待时间)触发批量发送。若缓冲满且发送失败,可通过block.on.buffer.full
(旧版本)或max.block.ms
控制行为(阻塞或抛异常),避免因缓冲溢出丢失消息。
二、Broker 层面:确保数据持久化与副本可靠性
Broker 是 Kafka 存储消息的核心节点,其数据不丢失依赖副本机制、持久化存储和故障恢复机制:
1. 副本机制(Replication)
Kafka 的每个分区(Partition)会配置多个副本(replication.factor
),包括 1 个 Leader 副本和多个 Follower 副本:
- Leader 副本:负责处理生产者和消费者的读写请求,所有消息先发送到 Leader。
- Follower 副本:通过日志复制(Log Replication) 机制从 Leader 同步消息,保持与 Leader 的数据一致性。
- ISR 列表:Leader 会维护一个 ISR 列表,仅包含与 Leader 数据同步延迟在阈值内的 Follower(通过
replica.lag.time.max.ms
控制)。当acks=all
时,消息需被 ISR 中所有副本确认,确保即使 Leader 崩溃,Follower 仍有完整数据。
持久化
2. 持久化存储(Log 日志)
Broker 接收消息后,会将消息立即写入磁盘(而非仅存于内存):
- 消息被追加到分区的日志文件(
.log
)中,同时通过页缓存(Page Cache) 优化写入性能(先写缓存,再异步刷盘)。 - 可通过
flush.messages
和flush.ms
配置强制刷盘策略(默认依赖操作系统刷盘),但频繁刷盘会降低性能,需平衡可靠性与吞吐。
3. 故障恢复与 Leader 选举
当 Leader 副本所在 Broker 崩溃时,Kafka 会从 ISR 列表中选举新的 Leader:
- 新 Leader 必须是 ISR 中的 Follower,确保其数据与旧 Leader 一致(无消息丢失)。
- 若 ISR 中所有副本均崩溃,可通过
unclean.leader.election.enable
配置是否允许非 ISR 副本成为 Leader(默认禁用,避免数据丢失;启用可能丢失消息,但可恢复服务)。
4. 最小同步副本数(min.insync.replicas)
配合 acks=all
使用,设置 ISR 中最少需要多少个副本确认消息,否则生产者会收到异常:
- 例如
min.insync.replicas=2
且replication.factor=3
时,需至少 2 个副本(Leader + 1 个 Follower)确认消息,避免因部分副本故障导致数据丢失。
三、消费者(Consumer)层面:确保数据被正确处理
消费者的目标是避免因处理失败、重启等问题导致已接收的消息未被正确处理而丢失。关键机制包括:
1. 偏移量(Offset)提交机制
消费者通过记录偏移量(消息在分区中的位置)标记已处理的消息,偏移量的提交策略直接影响数据是否丢失:
- 自动提交:默认每隔
auto.commit.interval.ms
自动提交偏移量。若消费者在自动提交前崩溃,重启后会重新消费未提交的消息(可能重复,但不丢失)。 - 手动提交:通过
commitSync()
或commitAsync()
手动提交偏移量,确保消息被处理完成后再提交,彻底避免丢失(需处理提交失败的重试)。
2. 消费者组重平衡(Rebalance)安全机制
当消费者组内成员变化(如新增 / 下线消费者)时,会触发分区重分配(Rebalance)。若处理不当,可能导致消息漏消费:
- Kafka 通过
ConsumerRebalanceListener
接口允许消费者在重平衡前提交偏移量,或在重平衡后恢复消费位置,避免因分区迁移导致的丢失。 - 建议使用协作重平衡(Cooperative Rebalance)(Kafka 2.4+),减少重平衡期间的消费中断。
最为详细的方案,请参考尼恩团队的架构方案: 得物面试:消息0丢失
2.4 kafka的延迟队列
a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可
以存放一个定时任务列表
b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项
c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask
d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间
轮中,类似于钟表就是一个三级时间轮
e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执
行相应任务列表外,层级时间轮也会进行相应调整
事务性:
上述幂等设计只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。
Kafka 现在通过新的事务 API 支持跨分区原子写入。这将允许一个生产者发送一批到不同分区的消息,
这些消息要么全部对任何一个消费者可见,要么对任何一个消费者都不可见。这个特性也允许在一个事
务中处理消费数据和提交消费偏移量,从而实现端到端的精确一次语义。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID 。
Transactin ID 与 PID 可能一一对应。区别在于 Transaction ID 由用户提供,将生产者的
transactional.id 配置项设置为某个唯一ID。而 PID 是内部的实现对用户透明。
另外,为了保证新的 Producer 启动后,旧的具有相同 Transaction ID 的 Producer 失效,每次
Producer 通过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer
的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。
2.7 kakfa 发送指定分区
kafka发送分区设置:
如果在发送消息的时候指定了分区,则消息投递到指定分区
如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区;
kakfa的分区作用是提供负载均衡的能力,实现系统的高伸缩性。分区之后,不同的分区能够放在不同的
物理设备上,而数据的读写操作也都是针对分区去进行,这样就可以使用每个分区都可以独立的处理自
己分区的读写请求,而且,我们还可以通过添加新的节点机器来提高整个系统的吞吐量。
分区策略:
轮询策略:round-robin策略,顺序分配;
随机策略:随机地将消息放置到任意一个分区上,;
key-ordering:kafka允许为每条消息定义消息键,简称key,他是一个有着明确业务含义的字符串,也
可以用来表征消息元数据,一旦消息被定义了key,那么就可以保证同一个key地所有消息都进入到相同
地分区里面,由于每个分区下地消息处理都是顺序地,故这个策略被称为按消息键保序策略;
消息积压
Kafka通过 横向扩展(增加分区及消费者实例)、优化消费者参数(如批量拉取、并发处理)、提升消费逻辑效率(异步化、减少I/O),并动态监控消费滞后指标。
必要时限流生产者或临时扩容消费组,结合分区再平衡策略快速分发积压消息负载。
Kafka还提供了消息清理(compaction)和数据保留策略,可以根据时间或者数据大小来自动删除过期的消息,避免消息积压过多。
3.Rocketmq
-
采用主题(Topic)和队列(Queue)二级结构
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。这类Consumer消费的是同一Topic类型的消息。消费者使得消息消费方面,实现负载均衡(这里的负载均衡指的是一个Topic中不同的Queue平均分配给一个Consumer Group中不同的Consumer消费)和容错的目标变得非常容易。
顺序:
本身支持顺序消息
生产端:
单一队列 生产者
串行发送
消费端:
消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。
避免异步处理 串行消费
有限重试
消息组尽量打散 避免集中
RocketMQ通过横向扩展(增加消费者实例、队列数量)、提升消费能力(线程池调优、批量消费)、动态扩容、消息预取、死信队列隔离无效消息,并支持消费限流及监控告警,快速定位处理积压问题。
RocketMQ还提供了消息拉取和推拉模式,消费者可以根据自身的处理能力主动拉取消息,避免消息积压过多。
最为详细的方案,请参考尼恩团队的架构方案: 阿里面试:如何保证RocketMQ消息有序?如何解决RocketMQ消息积压?
不过需要注意,使用锁来串行化消息处理虽然能确保顺序性,但也会牺牲并发性能,因为每次只能有一个 Goroutine 在处理消息。下面是如何加锁来确保顺序性的示例:
1、RocketMQ解决消息丢失问题:
生产端: 采用同步发送(等待Broker确认)并启用重试机制,结合事务消息(如预提交half消息+二次确认commit)确保消息可靠投递。
Broker端:配置同步刷盘(消息写入磁盘后返回确认)和多副本同步机制(主从节点数据冗余)防止宕机丢失,同时通过集群容灾保障高可用。
消费端:消费者需手动ACK确认,失败时触发自动重试(默认16次),最终失败消息转入死信队列人工处理,避免异常场景下消息丢失。
最为详细的方案,请参考尼恩团队的架构方案: 滴滴面试:Rocketmq消息0丢失,如何实现?
掌握高级特性
RocketMQ支持延时消息、事务消息等高级特性,
延时消息基于延时队列进行
比如,延时消息可以用于发送在未来某个时间点才需要处理的消息,
而事务消息则可以保证分布式系统中事务的一致性