消息队列-kafka为例

发布于:2025-05-30 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

消息队列应用场景和基础知识

MQ常见的应用场景

  1. 异步处理

将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。

  1. 应用解耦

通过消息队列,使得每个应用系统不必受其他系统影响,更加独立

  1. 流量削峰

一般在秒杀或团抢活动中使用广泛,后台系统根据消息队列中的消息信息,进行秒杀业务处理。

  1. 消息通讯

消息通讯是指应用间的数据通信。消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯。

  1. 延时任务

场景:用户在美团 APP 下单,假如没有立即支付,进入订单详情会显示倒计时,如果超过支付时间,订单就会被自动取消。

处理方式:订单服务生成订单后,发送一条延时消息到消息队列。消息队列在消息到达支付过期时间时,将消息投递给消费者,消费者收到消息之后,判断订单状态是否为已支付,假如未支付,则执行取消订单的逻辑。

  1. 数据中转枢纽

例如,当应用日志用于离线日志分析时,搜索单个日志记录同样不可或缺,而构建各自独立的工作流来采集每种类型的数据再导入到各自的专用系统显然不切实际,利用消息队列 Kafka 作为数据中转枢纽,同份数据可以被导入到不同专用系统中

  1. 分布式事务:保证数据一致性
  • 场景
    订单服务需要同时生成订单和扣减库存,这涉及两个不同的数据库操作。
    如果一个成功一个失败,就会导致数据不一致。

  • 解决方案
    通过MQ实现分布式事务。订单服务生成订单后,将扣减库存的任务交给MQ,最终实现数据的一致性。

通过“最终一致性”解决了分布式事务的难题,虽然短时间内可能有数据不一致,但最终状态一定是正确的。

MQ消息队列的两种消息模式

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

  • 点对点模式

    1. 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
    2. 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
    3. 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
  • 发布/订阅模式

    1. 每个消息可以有多个订阅者;
    2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
    3. 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行

如何保证消息队列的高可用?

  • 镜像集群模式

非分布式,如果某个 queue 负载很重,加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展 queue

在这里插入图片描述

  • 分布式,备份

分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据

例如

  1. 生产者写 leader,然后 leader 将数据落地写本地磁盘,接着其它 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者
  2. 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到

在这里插入图片描述

如何保证消息不丢失?

消息丢失的环节;

  1. 发送过程中丢失
  2. MQ还未持久化消息,然后宕机了
  3. 消费者消费到消息但是未处理(此时MQ把消息已经删除了),然后宕机了

如何保证不丢?

  1. 生产者,MQ之间确认
  2. MQ持久化
  3. MQ,消费者之间确认

如何保证消息不被重复消费?如何保证消息消费的幂等性?

重复消费的原因

消息重复的根本原因是网络不可达,且不可避免

  • 生产者发送时未收到MQ的响应(可能网络闪断,不过最终MQ处理完成了)然后重发,最终也是成功处理,这会导致重复消息进入MQ,后续消费重复

  • 消费者消费到消息,完成业务处理,当消费者给MQ服务端反馈应答的时候网络闪断。MQ还保留着消息,保证消息至少被消费一次,在网络恢复后再次把消息尝试投递给消费者去处理,这样消费者就收到两条一样的消息了

解决方案
  1. 消息发送者发送消息时携带一个全局唯一的消息id
  2. 消费者获取消息后,先根据id在 redis/db 中查询是否之前 该记录被消费过了
  3. 如果没有被消费过,则消费并写入 redis/db; 否则直接忽略

如何保证消息被消费的顺序性?

消息有序是指:按照消息发送的顺序来消费

  • 生产者保证消息的顺序到达MQ
  • 消费者保证顺序消费MQ中的消息

1:1:1,局部顺序消费

常见的消息队列中间件

特性 ActiveMQ RabbitMQ RocketMQ kafka
开发语言 java erlang java scala
单机吞吐量 万级 万级 10万级 10万级
时效性 ms级 us级 ms级 ms级以内
可用性 高(主从架构) 高(主从架构) 非常高(分布式架构) 非常高(分布式架构)
功能特性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

push 和 pull

x push模型 pull模型
描述 服务端主动发送数据给客户端 客户端主动从服务端拉取数据,通常客户端会定时拉取
实时性 较好,收到数据后可立即发送给客户端 一般,取决于pull的间隔时间
服务端状态 需要保存push状态,哪些客户端已经发送成功,哪些发送失败 服务端无状态
客户端状态 无需额外保存状态 需保存当前拉取的信息的状态,以便在故障或者重启的时候恢复
状态保存 集中式,集中在服务端 分布式,分散在各个客户端
负载均衡 服务端统一处理和控制 客户端之间做分配,需要协调机制,如使用zookeeper
其它 服务端需要做流量控制,无法最大化客户端的处理能力;其次,在客户端故障情况下,无效的push对服务端有一定负载。 客户端的请求可能很多无效或者没有数据可供传输,浪费带宽和服务器处理能力
缺点方案 服务器端的状态存储是个难点,可以将这些状态转移到DB或者key-value存储,来减轻server压力。 针对实时性的问题,可以将push加入进来,push小数据的通知信息,让客户端再来主动pull。针对无效请求的问题,可以设置逐渐延长间隔时间的策略,以及合理设计协议尽量缩小请求数据包来节省带宽。

kafka基础概念

kafka架构

  • Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
  • Producer将每一条消息顺序IO追加到partition对应到log文件末尾
    • 每条消息都有一个offset
    • Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件:一个是以.index结尾的索引文件,另一个是以.log结尾的数据文件,且两个文件的文件名完全相同。所有的Segment均存在于所属Partition的目录下。
    • 消息按规则路由到不同的partition中,每个partition内部消息是有序的,但是不同partition之间不是有序的
  • Producer生产消息push到Kafka集群;Consumer通过pull的方式从Kafka集群拉取消息

topic

  • Topic是Kafka数据写入操作的基本单元,可以指定副本
    一个Topic包含一个或多个Partition,建Topic的时候可以手动指定* Partition个数,个数与服务器个数相当 每条消息属于且仅属于一个Topic
  • Producer发布数据时,必须指定将该消息发布到哪个Topic
  • Consumer订阅消息时,也必须指定订阅哪个Topic的信息

Broker

Broker 是指 Kafka 集群中的一个节点(或服务器)。它是 Kafka 系统的核心组件之一,负责存储和管理消息数据,并处理生产者和消费者之间的消息传递。简单来说,Broker 就是一个运行 Kafka 服务的进程,它承担了以下主要职责:

  1. 存储消息数据
  • 每个 Broker 负责存储分配给它的分区(Partition)的数据。
  • 分区是 Kafka 中的基本存储单元,每个分区对应一个日志文件(Log Segment),Broker 会将消息持久化到磁盘中。
  1. 接收生产者的消息
  • 生产者(Producer)将消息发送到特定的 Topic,Broker 根据分区策略(如轮询、哈希等)将消息写入对应的分区。
  1. 为消费者提供消息
  • 消费者(Consumer)从 Broker 中拉取消息数据,Broker 负责根据消费者的消费进度(Offset)返回相应的消息。
  1. 副本管理
  • 每个分区可以有多个副本(Replica),其中一个副本是 Leader,其他副本是 Follower。
  • Broker 负责管理分区的 Leader 和 Follower 副本,确保数据的高可用性和一致性。
  1. 集群协调
  • 多个 Broker 组成一个 Kafka 集群,通过 Zookeeper 或 Kafka 自带的 Raft 协议进行协调。
  • Broker 之间会互相通信,完成分区分配、Leader 选举等任务。

Partition

  • 每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件
  • 一个Partition中的数据是有序、不可变的,使用偏移量(offset)唯一标识一条数据,是一个long类型的数据

为什么要分区的一些原因

  1. 分区类似hadoop分布式文件系统,也类似于spark中RDD以partition方式将数据分块存储,一是分块存储提高数据概率性安全,比如分布式的文件块,如果某台机器宕机数据丢失,也是丢失一部分,不会出现整个文件全军覆没。另外通过partition级别的冗余存储(replication.factor)来保证partition级别的安全性。

  2. 每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同partition上读数据,实现了水平扩展

  3. 并行读写:磁盘写入速度就是kafka处理速度的极限,处理不过来就要加机器。每台机器持有不同的partition。Producer产生10个新消息,可以交给10个broker上的partition一起写;Consumer消费10个新消息,可以从10个broker上面一起读。

  4. The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

kafka的ack机制

request.required.acks有三个值 0 1 -1

  • 0: 生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
  • 1: 服务端会等待ack值,leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
  • -1: 同样在1的基础上,服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢失消费消息

offset

Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

isr

全称是:In-Sync Replicas,isr 是一个副本的列表,里面存储的都是能跟leader 数据一致的副本,确定一个副本在isr列表中,有2个判断条件

  • 条件1:根据副本和leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从isr 中剔除,此时间差根据
    配置参数rerplica.lag.time.max.ms=10000 决定 单位ms
  • 条件2:根据leader 和副本的信息条数差值决定是否从isr 中剔除此副本,此信息条数差值根据配置参数rerplica.lag.max.messages=4000 决定 单位条 isr 中的副本删除或者增加 都是通过一个周期调度来管理的

AR(Assigned Replicas)

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In Sync Replicas)。

  • ISR 集合是 AR 集合的一个子集。消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。同步期间,follower副本相对于leader副本而言会有一定程度的滞后。前面所说的 ”一定程度同步“ 是指可忍受的滞后范围,这个范围可以通过参数进行配置

  • leader副本同步滞后过多的副本(不包括leader副本)将组成OSR (Out-of-Sync Replied)

  • AR = ISR + OSR。正常情况下,所有的follower副本都应该与leader 副本保持 一定程度的同步,即AR=ISR,OSR集合为空

LEO

LEO(log end offset) 称为日志末端位移,代表日志文件中下一条待写入消息的 offset,这个 offset 上实际是没有消息的。

分区 ISR 集合中的每个副本(所有的 leader 和 follower 副本)都会维护自身的 LEO。当 leader 副本收到生产者的一条消息,LEO 通常会自增 1,而 follower 副本需要从 leader 副本 fetch 到数据后,才会增加它的 LEO。

HW

HW(High Watermark) 称为高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。

SR 集合中最小的 LEO 即为分区的 HW,消费者只能读取到小于高水位线以下的消息,即成功复制到所有副本的最后一条消息。因此,对于同一个副本对象,其高水位值不会大于 LEO 值。

在这里插入图片描述

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交的消息,即图中位移小于 8 的所有消息

最后,leader 副本会比较自己的 LEO 以及满足条件的 follower 副本上的 LEO ,选取两者中较小值作为新的 HW ,来更新自己的 HW 值

kafka producer

3 台 broker, partition 2个备份

  • 三台broker(0,1,2), 设置topic的partition数目是2,replication备份因子是2
    1. 对于partition 0,leader 是 3,follower 是 1,两个备份
    2. 对于partition 1,leader 是 1,follower 是 2,两个备份
      在这里插入图片描述

发送消息

一次发送了4条消息,内容分别是

空
a1
b1
c1

在这里插入图片描述

消息路由到不同的partition

  • b1存储到了partition 0
  • a1c1存储到了partition 1
    在这里插入图片描述
    在这里插入图片描述

查看磁盘上的kafka-logs文件目录下对应的topic文件

  • .index结尾的索引文件,
  • .log结尾的数据文件
    在这里插入图片描述

在这里插入图片描述

Producer如何将消息可靠的发送给Kafka集群

ack应答机制(3种策略)

acks指定了必须有多少个分区副本接收到了消息,生产者才会认为消息是发送成功的。

  1. acks=0,生产者成功写入消息之前不会等待来自任何服务器的响应,这种配置,提高吞吐量,但是消息存在丢失风险。
  2. acks=1,只要集群的leader(master)收到了消息,生产者将会受到发送成功的一个响应,如果消息无撞到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future 对象的get()方法,显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
  3. acks=-1,所有参与复制的节点全部收到消息的时候,生产者才会收到来自服务器的一个响应,这种模式最安全,但是吞吐量受限制,它可以保证不止一个服务器收到消息,就算某台服务器奔溃,那么整个集群还是会正产运转。
retries

生产者从服务器收到的错误消息有可能是临时的,当生产者收到服务器发来的错误消息,会启动重试机制,当重试了n(设置的值)次,还是收到错误消息,那么将会返回错误。生产者会在每次重试之间间隔100ms,不过可以通过retry.backoff.ms改变这个间隔。

batch.size

当多个消息发往同一个分区,生产者会将他们放进同一个批次,该参数指定了一个批次可以使用的内存大小,按照字节数进行计算(不是消息个数);当批次被填满,批次里面所有得消息将会被发送;半满的批次,甚至只包含一个消息也可能会被发送,所以即使把批次设置的很大,也不会造成延迟,只是占用的内存打了一些而已。但是设置的太小,那么生产者将会频繁的发送小,增加一些额外的开销。

linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程, 生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)

requests.timeout.ms

生产者发送数据时等待服务器返回响应的时间

max.in.flight.requests.per.connection

指定了生产者收到服务器响应之前可以发送多少个消息。它的值越高,将会消耗更多的内存,不过也会提升吞吐量。设置为1,可以保证消息是按照发送的顺序写入服务器。即使发生了重试。

kafkaProducer源码流程图

在这里插入图片描述

broker

文件目录和索引

在这里插入图片描述
log默认保留168小时(7天),默认一个segment文件大小1G,否则产生新文件

  • .log存储数据
  • .index存储索引(类似跳表的索引)
    在这里插入图片描述
    为数据文件建索引采取了稀疏存储:每隔一定字节的数据建立一条索引(这样的目的是为了减少索引文件的大小)

broker leader选举

leader选举借助zookeeper最简单最直观的方案是:leader在zk上创建一个临时节点,所有Follower对此节点注册监听,当leader宕机时,此时ISR里的所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

缺点:当kafka集群业务很多,partition达到成千上万时,当broker宕机时,造成集群内大量的调整,会造成大量Watch事件被触发,Zookeeper负载会过重,而zk是不适合大量写操作的。

kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

在这里插入图片描述

Kafka Controller 选举

Kafka Controller的选举是依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点,就可以成为Kafka Controller。

controller实现如上功能:

  • brokers列表:ls /brokers/ids
  • 某个broker信息:get /brokers/ids/0
  • topic信息:get /brokers/topics/kafka10-topic-20170924
  • partition信息:get /brokers/topics/kafka10-topic-20170924/partitions/0/state
  • controller中心节点变更次数:get /controller_epoch
  • conrtoller leader信息:get /controller

在这里插入图片描述

如何选举出 partition leader?

不同场景下的选举思路不同;基本思路如下

  1. 按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。

  2. 一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。

  3. 分区进行重分配(reassign)的时候也需要执行leader的选举动作。
    思路:从重分配的AR列表中找到第一个存活的副本,且这副本在目前的ISR列表中。

  4. 发生优先副本(preferred replica partition leader election)的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。

  5. 当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。

kafka consumer

Consumer Group(消费组)

consumer group是kafka提供的可扩展且具有容错性的消费者机制

  1. 一个消费组可以有多个消费者或消费者实例(consumer instance)
  2. 一个消费组下的所有消费者共享一个公共的ID,即group ID
  3. 一个消费组下订阅的topic下的每个partition只能分配给该group下的一个consumer(当然该分区还可以被分配给其他group),即一个partition只能被一个消费进程/线程消费,而不能被多个消费进程/线程消费(当然一个消费进程/线程可以消费多个partition)
  4. 每一个消费组都会被记录它在某一个分区的Offset,即不同consumer group针对同一个分区,都有"各自"的偏移量
  5. 同一个 Topic 可以多个消费组

在这里插入图片描述

消费者数据的不丢失?

通过offset commit来保证数据的不丢失。kafka自己记录了每次消费的offset数值,下次消费的时候从上次的offset继续消费

而offset的信息在kafka0.9版本之前保存在zookeeper中,在0.9版本后保存到了一个topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,即可接着上次消费。由于offset的信息写入的时候并不是每条消息消费完成就写入,所以会导致有重复消费的问题,但是不会丢失消息

唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置了KafkaSpoutConfig.builder.setGroupid的时候设置成了同样的groupid,这种情况会导致这两个组共享了同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。为了保证每个组都能独享一份消息数据,groupid一定不要重复

提交offset

offset提交的方式有两种:自动提交和手动提交。

offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种,所以这时我们就需要进行手动提交kafka的offset下标。

手动提交偏移量:
1. 同步提交
2. 异步提交
3. 异步+同步 组合的方式提交

低级消费者(或简单消费者)

在这里插入图片描述

高级消费者

  • 问题一: Kafka 高级消费者怎样才能达到最大吞吐量?

答:分区数量与线程数量一致。

  • 问题二: 消费者消费能力不足时,如果提高并发?

答:1. 增加分区个数; 2. 增加消费者线程数; 3.自动提交 offset

  • 在高阶消费者中,Offset 采用自动提交的方式。

自动提交时,假设 1s 提交一次 offset 的更新,设当前 offset=10,当消费者消费了 0.5s 的数据,offset 移动了 15,由于提交间隔为 1s,因此这一 offset 的更新并不会被提交,这时候我们写的消费者挂掉,重启后,消费者会去 ZooKeeper 上获取读取位置,获取到的 offset 仍为 10,它就会重复消费,这就是一个典型的重复消费问题。

高阶消费者存在一个弊端,即消费者消费到哪里由高阶消费者 API 进行提交,提交到 ZooKeeper,消费者线程不参与 offset 更新的过程,这就会造成数据丢失(消费者读取完成,高级消费者 API 的 offset 已经提交,但是还没有处理完成 Spark Streaming 挂掉,此时 offset 已经更新,无法再消费之前丢失的数据),还有可能造成数据重复读取(消费者读取完成, 高级消费者 API 的 offset 还没有提交,读取数据已经处理完成后 Spark Streaming 挂掉,此时 offset 还没有更新,重启后会再次消费之前处理完成的数据)。

kafka重复消费的根本原因就是"数据消费了,但是offset没更新"!而我们要探究一般什么情况下会导致offset没更新?

  • 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费
  • 原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费
  • 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
  • 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
  • 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致rebalance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

再均衡(Rebalance)

再平衡指的是在kafka consumer所订阅的topic发生变化时发生的一种分区重分配机制。

在这里插入图片描述

什么情况下会发生再平衡?

  • 有新的消费者加入消费组
  • 有消费者宕机或下线
    • 消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线
    • 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅
  • 消费组所对应的 GroupCoordinator 节点发生了变更,例如coordinator挂了,集群选举出新的coordinator。GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互
  • 消费组内所订阅的任一主题或者主题的partition数量发生变化

再均衡协议

  • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
  • LeaveGroup请求:主动告诉coordinator我要离开consumer group
  • SyncGroup请求:group leader把分配方案告诉组内所有成员
  • JoinGroup请求:成员请求加入组
  • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

再均衡过程

Kafka中分区再均衡主要分为两步,第一步是JoinGroup,第二步是SyncGroup

  1. Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader:注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

  2. Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

消费者组状态以及状态转换

  • Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
  • Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
  • PreparingRebalance:组准备开启新的rebalance,等待成员加入
  • AwaitingSync:正在等待leader consumer将分配方案传给各个成员
  • Stable:rebalance完成,可以开始消费了

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

再平衡例子和策略?

Range(默认)

range策略:首先会计算每个consumer可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个consumer;这种方式分配只是针对消费者订阅的topic的单个topic所有分区再分配

eg1: 10分区;2个机器实例,一个1个线程,一个2个线程(消费者线程排完序将会是C1-0, C2-0, C2-1)

C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区

eg2: 11分区 同上,可能是如下

C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C2-1 将消费 8, 9, 10 分区

eg3: 假如有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来如下

C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。

Round Robin

会采用轮询的方式将当前所有的分区依次分配给所有的consumer;这种分配策略是针对消费者消费的所有topic的所有分区进行分配。当有新的消费者加入或者有消费者退出,就会触发rebalance

使用RoundRobin策略有两个前提条件必须满足

  • 同一个Consumer Group里面的所有消费者的num.streams必须相等;
  • 每个消费者订阅的主题必须相同。

假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;
Sticky
sticky 英[ˈstɪki][ˈstɪki]
adj.(); 一面带黏胶的; 闷热的;
n. 告事贴;

Sticky策略是新版本中新增的策略,顾名思义,这种策略会保证再分配时已经分配过的分区尽量保证其能够继续由当前正在消费的consumer继续消费,当然,前提是每个consumer所分配的分区数量都大致相同,这样能够保证每个consumer消费压力比较均衡

eg:三个consumer:C0、C1和C2,三个topic:t0、t1和t2,这三个topic分别有1、2和3个分区;C0订阅了t0,C1订阅了t0和t1,C2则订阅了t0、t1和t2

分区排序结果 定于的consumer数量 订阅的consumer
t2-0 1 c2
t2-1 1 c2
t2-2 1 c2
t1-0 2 c1, c2
t1-1 2 c1, c2
t0-0 3 c0,c2, c2

consumer进行排序:c0,c1,c2;然后将各个分区依次遍历分配给各个consumer,首先需要注意的是,这里的遍历并不是C0分配完了再分配给C1,而是每次分配分区的时候都整个的对所有的consumer从头开始遍历分配,如果当前consumer没有订阅当前分区,则会遍历下一个consumer。

最终分配结果如下

consumer topic-partition
c0 t0-0
c1 t1-0, t1-1
c2 t2-0, t2-1, t2-2

eg2: 假设开始分配如下,然后c1宕机

consumer topic-partition
c0 t0-0, t1-1, t3-0
c1 t1-1, t2-0, t3-1
c2 t1-0, t2-1

然后在平衡后

**consumer topic-partition
c0 t0-0, t1-1, t3-0, t2-0
c2 t1-0, t2-1, t0-1, t3-1

问题:kafka 消息丢失/重复场景

在这里插入图片描述

问题:Kafka分区数越多越好吗?

并非分区数量越多,效率越高

Topic 每个 partition 在 Kafka 路径下都有一个自己的目录,该目录下有两个主要的文件:base_offset.log 和 base_offset.index。Kafka 服务端的 ReplicaManager 会为每个 Broker 节点保存每个分区的这两个文件的文件句柄。所以如果分区过多,ReplicaManager 需要保持打开状态的文件句柄数也就会很多。

每个 Producer, Consumer 进程都会为分区缓存消息,如果分区过多,缓存的消息越多,占用的内存就越大;
n 个分区有 1 个 Leader,(n-1) 个 Follower,如果运行过程中 Leader 挂了,则会从剩余 (n-1) 个 Followers 中选举新 Leader;如果有成千上万个分区,那么需要很长时间的选举,消耗较大的性能。

问题:Kafka如何消息的顺序性和一致性

Kafka 在消息处理的顺序性方面有一些机制,但并不保证消息的严格有序性。以下是 Kafka 处理消息顺序性的一些特点:

1、分区内有序性: 在每个分区内,消息是有序存储的。Kafka 保证对于每个分区,消息的写入和消费是按照消息的顺序进行的。这意味着对于同一个分区的消息,它们将按照发送的顺序被消费。这样保证了在单个分区内的消息顺序性。

2、分区间无序性: 在多个分区之间,消息的顺序性不能得到保证。不同分区的消息在 Kafka 集群中是并行处理的,而且 Kafka 也不会跨分区地维护全局有序性。因此,对于多个分区的消息,它们在消费者端接收的顺序可能与发送顺序不一致。

3、消息复制: Kafka 支持多副本复制,每个分区可以有多个副本存储在不同的 Broker 上。在进行消息复制时,Kafka 会保证消息的副本在各个 Broker 上的复制顺序与领导者(Leader)分区中的消息顺序保持一致,从而确保数据的一致性。

在这里插入图片描述

LEO & HW 保证副本的数据一致性

在这里插入图片描述
在这里插入图片描述

幂等性

幂等生产者的启用

  • enable.idempotence: 设置为 true 以启用幂等生产者。
  • transactional.id: 可选参数,用于事务性生产者。如果需要事务支持,必须设置此参数。

幂等生产者的内部机制

  • 生产者ID(Producer ID)
    唯一标识:每个幂等生产者在启动时会从Kafka集群获取一个唯一的 Producer ID。
    持久化:Producer ID 由Kafka集群持久化存储,确保在生产者重启后仍然有效。

  • 序列号(Sequence Number)
    递增序列:每个Partition维护一个递增的序列号。
    唯一性:每条消息在发送时会携带一个唯一的序列号,确保消息在Partition中的唯一性。

  • 请求重试
    自动重试:幂等生产者会自动重试发送失败的消息。
    幂等性保证:即使消息被重试多次,Kafka也会确保每条消息只被写入一次

其它知识

kafka为什么很快

顺序写入日志

追加数据到日志文件尾部,顺序IO

零拷贝 mmap

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统 分页存储 来利用内存提高I/O效率。

Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。

使用这种方式可以获取很大的I/O提升, 省去了用户空间到内核空间 复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠, 写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。 Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫 异步 (async)。

在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。 sendfile的引入不仅减少了数据复制,还减少了上下文切换。

sendfile(socket, file, len);,运行流程如下:

  • sendfile系统调用,文件数据被copy至内核缓冲区
  • 再从内核缓冲区copy至内核中socket相关的缓冲区
  • 最后再socket相关的缓冲区copy到协议引擎

相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。

在apache,nginx,lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。

Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,配合mmap作为文件读写方式,直接把它传给sendfile。

预读数据/批处理(读一块然后处理)
日志分段(有类似跳表索引)
数据压缩

当启用压缩时,对批处理的影响特别明显,因为随着数据大小的增加,压缩通常会变得更有效

特别是在使用基于文本的格式时,比如 JSON,压缩的效果会非常明显,压缩比通常在5x7x之间

此外,记录的批处理主要作为一个客户端操作,负载在传递的过程中,不仅对网络带宽有积极影响,而且对服务端的磁盘 I/O 利用率也有积极影响

kafka使用page cache

在这里插入图片描述

~ free -m
             total       used       free     shared    buffers     cached
Mem:        128956      96440      32515          0       5368      39900
-/+ buffers/cache:      51172      77784
Swap:        16002          0      16001

page cache用于缓存文件的页数据,buffer cache用于缓存块设备(如磁盘)的块数据。页是逻辑上的概念,因此page cache是与文件系统同级的;块是物理上的概念,因此buffer cache是与块设备驱动程序同级的。

producer生产消息时,会使用pwrite()系统调用【对应到Java NIO中是FileChannel.write() API】按偏移量写入数据,并且都会先写入page cache里。consumer消费消息时,会使用sendfile()系统调用【对应FileChannel.transferTo() API】,零拷贝地将数据从page cache传输到broker的Socket buffer,再通过网络传输。

同时,page cache中的数据会随着内核中flusher线程的调度以及对sync()/fsync()的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。另外,如果consumer要消费的消息不在page cache里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入page cache,以方便下一次读取。

如果Kafka producer的生产速率与consumer的消费速率相差不大,那么就能几乎只靠对broker page cache的读写完成整个生产-消费过程,磁盘访问非常少。这个结论俗称为"读写空中接力"。并且Kafka持久化消息到各个topic的partition文件时,是只追加的顺序写,充分利用了磁盘顺序访问快的特性,效率高

kafka集群多Topic性能下降

参考:Kafka vs RocketMQ——多Topic对性能稳定性的影响-转自阿里中间件

死信队列(Dead-Letter Queue)

当一条消息初次消费失败,消息队列 MQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 MQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)

与此对应的还有一个回退队列的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演

重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大

eg: 消息第一次消费失败入重试队列Q1,Q1的重新投递延迟为5s,在5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延迟为10s,在10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递

注意:Kafka不支持重试机制也就不支持消息重试,也不支持死信队列,因此使用kafka做消息队列时,如果遇到了消息在业务处理时出现异常的场景时,需要额外实现消息重试的功能。

数据传输的事物定义有哪三种?

数据传输的事务定义通常有以下三种级别:

  1. 至多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
  2. 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
  3. 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的

网站公告

今日签到

点亮在社区的每一天
去签到