加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
https://1024bat.cn/
https://github.com/webVueBlog/fastapi_plus
https://webvueblog.github.io/JavaPlusDoc/
点击勘误issues,哪吒感谢大家的阅读
发送流程(Producer → Broker)
应用构造消息:设置 topic、key(可选,决定有序性)、value、headers、timestamp。
序列化:
key.serializer / value.serializer
把对象转为字节。分区选择:
有 key:按 key 的 hash 选定分区(同 key 固定到同一分区,分区内有序)。
无 key:采用“粘性分区”(Sticky),尽量把一段时间的消息打到同一分区以增大批量。
批量聚合:进入
RecordAccumulator
(按分区分别聚合);达到batch.size
或等待到linger.ms
即形成一个批次。可选 压缩(gzip/lz4/zstd/snappy)。Sender 发送:后台 Sender 线程读取批次,按 元数据 找到该分区的 Leader Broker,发出
ProduceRequest
。幂等与重试:网络/瞬时错误时按
retries
与退避重试;开启enable.idempotence
可避免重复写入导致的乱序/重复。Broker 追加(Leader):顺序写入日志(先入页缓存→追加到 segment,更新索引),并向 ISR 中的 Follower 复制。
确认策略(acks) :
-
acks=0
:不等确认直接返回(最快,可靠性最低)。acks=1
:Leader 追加成功即回 ACK。acks=all
:ISR 都追到 高水位 HW 再回 ACK(最安全)。
回调完成:Producer 收到
RecordMetadata(topic, partition, offset, timestamp)
或异常,触发回调。(可选)事务 EOS:
initTransactions → beginTransaction → send(...) → sendOffsetsToTransaction(...) → commitTransaction
,实现“生产 + 提交位点”同事务;消费者需isolation.level=read_committed
。Broker 内部(极简文字版)
日志:按 segment 切分与保留策略(大小/时间);
复制:Controller 维护 ISR;Follower 落后会被踢出 ISR;Leader 变更时短暂漂移;
高水位 HW:表示所有 ISR 都已复制到的最大位点,用于
acks=all
与read_committed
的可见性。
消费流程(Consumer Group)
入组:消费者用
group.id
找到 Coordinator,JoinGroup
汇报订阅,分配器(建议 CooperativeStickyAssignor)做分区分配,SyncGroup
下发结果。可配 静态成员(group.instance.id) 减少抖动。poll 循环取数:消费者对各自分配到的分区发
FetchRequest
;Broker 返回成批的消息数据(可能为压缩块)。解压/反序列化:客户端按批解压并用
deserializer
还原对象。业务处理:把记录交给应用逻辑/线程池处理;若需要分区内严格有序,应“同分区单通道”处理。
回压控制:业务慢时提高
max.poll.interval.ms
、降低max.poll.records
,或临时对分区pause()
,处理完再resume()
。提交位点(offset) :
自动:
enable.auto.commit=true
周期性提交(简单但控制弱)。手动:处理成功后
commitSync/commitAsync
;位点写入__consumer_offsets
(压缩主题)。事务链路:使用
sendOffsetsToTransaction(...)
与生产同事务提交,消费端设置read_committed
。
再均衡:成员增减/订阅变化触发再均衡;容器先
onPartitionsRevoked
做最后提交/落库,再onPartitionsAssigned
接手新分区继续处理。失败/重试/DLT(死信)文字路径
业务异常 → 错误处理器判断是否可重试。
进程内退避重试 N 次(固定或指数退避)。
仍失败 → 投递到 DLT(<原Topic>.DLT) ,同时把 original-topic/partition/offset/exception 等信息放在 headers。
监控与排查:DLT 上统计异常类型与堆积量。
回放:修复后从 DLT 读出 →(可修复字段)→ 用原 key 重投回主 Topic 或“回放专用 Topic”,下游消费者幂等处理。
一图看懂(端到端路径)
App(Producer) | | 1. send(record) → 序列化(Serializer) → 分区器(Partitioner) v RecordAccumulator(批次聚合/压缩) --linger.ms/batch.size--> | | 2. Sender线程取批次 → 查元数据(metadata) → 选择Leader v Broker Leader 分区 | | 3. 追加顺序写入(页缓存→日志段segment/索引) → 复制给Follower | • acks=0:不等 | • acks=1:Leader写入后回 | • acks=all:ISR同步到HW后回 v Producer 回调(onCompletion) ←——————— (返回ack或异常) ↑ Consumer Group | | | 4. 加入组(Find/Join/SyncGroup) | 5. Fetch循环:拉批次、解压/反序列化、业务处理 | 分配分区(Assignor) | → 提交位点(Commit offset) v | __consumer_offsets(压缩主题) <——— 提交offset(auto/手动/事务内)
发送路径(Producer)—关键步骤
应用→客户端
producer.send(record, callback)
把 POJO/JSON 交给 Serializer(如StringSerializer
、JsonSerializer
)转字节。分区选择
有 key:
StickyPartitioner
/默认分区器按 key hash → 固定分区(保证同 key 有序)。无 key:粘性分区策略减少批次碎片(提高吞吐)。
累加与打批(RecordAccumulator)
按分区维护批次:满足
batch.size
或到达linger.ms
即可发送。可开启压缩:
compression.type=gzip|lz4|zstd|snappy
(强烈建议开)。
Sender 线程发送
取批次 → 查元数据(主题/分区→Leader broker) → 以 ProduceRequest 发给 Leader。
重试:网络/瞬时错误自动重试(
retries
/retry.backoff.ms
),要配合-
enable.idempotence=true
(默认开启)max.in.flight.requests.per.connection<=5
(或=1以严格顺序)
以避免重复/乱序。
Broker Leader 处理
先写 OS Page Cache → 顺序追加到 日志段(segment) ,更新索引;
复制给 Follower:达到 HW(高水位) 后,按
acks
规则回 ACK:-
acks=0
:不等复制;acks=1
:Leader 落盘即回;acks=all
:ISR 全部跟上 HW 再回(最安全)。
回调完成
成功:返回
RecordMetadata(topic, partition, offset, timestamp)
;失败:返回异常(可区分可重试/不可重试)。
事务/Exactly-Once:设置
transactional.id
,initTransactions()
→beginTransaction()
→send(...)
→ (可选)sendOffsetsToTransaction(...)
→commitTransaction()
;消费方用isolation.level=read_committed
。发送端关键参数(高频实战)
可靠性:
acks=all
,retries
大,enable.idempotence=true
,delivery.timeout.ms
合理放大吞吐:
linger.ms
(如 550ms)、256KB)、batch.size
(如 64KBcompression.type=zstd/snappy
顺序:
max.in.flight.requests.per.connection=1~5
(幂等+小并发)
Broker 内部简述
日志:
log.segment.bytes
/segment.ms
控制切段;log.retention.*
控制保留;cleanup.policy=delete|compact
。复制:控制器监控 ISR;Follower 落后出 ISR;Leader 变更时可能触发短暂不可用。
时间戳:
CreateTime
(生产时)与LogAppendTime
(追加时),受log.message.timestamp.type
影响。
消费路径(Consumer)—关键步骤
加入消费组
findCoordinator
找到组协调者 →JoinGroup
(汇报订阅的主题/分区) → 分配器(如CooperativeStickyAssignor
)分配分区 →SyncGroup
下发结果。稳定性增强:
-
协作式再均衡(减少抖动);
静态成员:
group.instance.id
绑定实例身份,缩短扩缩容抖动;心跳:
heartbeat.interval.ms
,会话:session.timeout.ms
。
拉取数据(poll 循环)
fetch.min.bytes
+fetch.max.wait.ms
控制“凑批”与等待;max.partition.fetch.bytes
控制单分区单次最大字节;收到数据后解压、反序列化,交给业务线程。
业务处理与回压
max.poll.records
控制每次最多拉多少;处理慢要提高
max.poll.interval.ms
,或用pause()/resume()
对分区回压;严格有序时,可“每分区一个工作队列”。
提交位点(offset commit)
自动:
enable.auto.commit=true
(简单,但易丢/重复);手动:处理成功后
commitSync
/commitAsync
;位点存储在
__consumer_offsets
(压缩主题);EOS 链路:用事务
sendOffsetsToTransaction(...)
与下游写入同事务提交,实现精确一次对外可见。
消费端关键参数(高频实战)
并发:同组实例数 ≤ 分区数;Spring:
@KafkaListener(concurrency="N")
提交:
enable.auto.commit=false
+ 手动 ack回压:
max.poll.records
、max.poll.interval.ms
、pause/resume
再均衡:
partition.assignment.strategy=CooperativeStickyAssignor
,group.instance.id
失败链与 DLT 所在位置(串上前文)
业务抛异常 → 错误处理器决定退避重试或转发 DLT(
<topic>.DLT
);DLT 携带 original-topic/partition/offset/exception 等 headers,后续人工或自动回放。
高吞吐/长时间重试:用重试主题分离重试,避免阻塞主消费。
最小可用模板
Producer(Spring Kafka)
spring: kafka: bootstrap-servers: broker1:9092,broker2:9092 producer: acks: all retries: 10 properties: enable.idempotence: true max.in.flight.requests.per.connection: 5 linger-ms: 20 batch-size: 131072 # 128KB compression-type: zstd
@Autowired KafkaTemplate<String, String> kt; public void sendOrder(String key, String json) { kt.send("order.events", key, json) .whenComplete((md, ex) -> { if (ex != null) log.error("send fail", ex); else log.info("ok p={} off={}", md.partition(), md.offset()); }); }
Consumer(Spring Kafka,手动 ack + DLT)
spring: kafka: consumer: group-id: order-consumer enable-auto-commit: false auto-offset-reset: latest properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor # 静态成员(K8s可用Pod名等) group.instance.id: ${HOSTNAME:} listener: ack-mode: MANUAL
@Configuration @EnableKafka class Cfg { @Bean ConcurrentKafkaListenerContainerFactory<String,String> factory( ConsumerFactory<String,String> cf, KafkaTemplate<String,String> kt) { var f = new ConcurrentKafkaListenerContainerFactory<String,String>(); f.setConsumerFactory(cf); f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); var recoverer = new DeadLetterPublishingRecoverer( kt, (r,e) -> new TopicPartition(r.topic()+".DLT", r.partition())); var backoff = new FixedBackOff(1000, 3); // 1s×3次 f.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff)); return f; } } @Component class OrderListener { @KafkaListener(topics="order.events", concurrency="6") public void on(ConsumerRecord<String,String> r, Acknowledgment ack) { process(r.key(), r.value()); // 幂等 ack.acknowledge(); } }
原生消费者(要点)
一个线程一个 Consumer;
poll()
→ 业务线程池 → 按分区维护已完成位点 →commitAsync/Sync
;需要严格分区内有序就“每分区专属线程/队列”。
1)DLT 是什么,为什么要有?
定义:当一条消息在“正常消费流程”中多次处理失败(比如 JSON 解析错误、字段缺失、业务幂等校验不过、下游服务永久不可用等),为了不阻塞同分区后续消息、方便隔离排查与后续人工/自动回放,把这条“问题消息”单独发到一个备用主题,这个主题就叫 DLT(常见命名:
<原主题>.DLT
)。作用:
防止所谓 poison pill(毒丸消息) 卡死分区;
和“重试主题/延迟重试”搭配,形成完整的失败处置链;
提供可观测(统计失败原因)、可回放(修复后重新处理)的安全网。
2)一条消息从“正常消费”走到 DLT 的全过程
用一个按键下单的例子串起来(
order.events
→ 消费者 → DLT):[Producer] --> [order.events 主题] | v [Consumer(同组,多实例)] | 业务处理 成功 --------------------> ack 提交位点 失败 | v 错误处理器(ErrorHandler) | \ 可重试? 不可重试(如格式错/幂等冲突) | \ |(退避重试N次) 直接投 DLT v \ 重试仍失败 --------------> [order.events.DLT] | 人工/自动回放
落地细节(以 Spring Kafka 为例):
每次失败时错误处理器会决定:再试一次(可能做固定/指数退避),还是转发到 DLT。
转发时会把原始元数据(原 topic、partition、offset、timestamp、异常类型与消息)放到 headers,用于排查与回放定位。
DLT 的分区选择通常沿用原分区或沿用消息 key 的 hash(可配置),这样回放时仍能大体保持键内顺序。
3)三种常见的失败处置策略
A. “失败即入 DLT”
适合明显不可重试的错误:JSON 结构错误、必填字段缺失等。
优点:实现最简单,不拖累主消费。
缺点:对瞬时故障没有容错。
B. “进程内重试 N 次 + DLT”(最常见)
消费者线程内捕获异常,退避重试 3~5 次,再仍失败就投 DLT。
优点:不需要额外主题;对偶发故障友好。
缺点:重试期间占用分区,并发受限;长重试会延迟后续消息。
C. “分离重试主题(延迟队列)+ DLT”(推荐在高吞吐/长重试场景)
失败后发到重试主题(如
order.events.RETRY.5s
、RETRY.30s
),由独立消费组在指定延时后再处理;多级失败后再进 DLT。优点:不阻塞主消费;灵活控制多级退避。
缺点:需要多建主题与路由配置。
4)Spring Kafka 实战
4.1 “进程内重试 + DLT”(
DefaultErrorHandler
+DeadLetterPublishingRecoverer
)@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> cf, KafkaTemplate<String, String> kt) { var f = new ConcurrentKafkaListenerContainerFactory<String, String>(); f.setConsumerFactory(cf); f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 失败后转发到 <topic>.DLT,同分区 var recoverer = new DeadLetterPublishingRecoverer( kt, (rec, ex) -> new TopicPartition(rec.topic() + ".DLT", rec.partition())); // 固定退避:每次间隔1s,共重试3次(失败再进 DLT) var backoff = new FixedBackOff(1000L, 3L); f.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff)); return f; } }
运行时,进入 DLT 的记录会带上 headers:原 topic/partition/offset/timestamp、异常类名、异常消息等,方便排查。
监听器示例:
@KafkaListener(topics = "order.events", groupId = "order-consumer", concurrency = "6") public void onMessage(ConsumerRecord<String, String> r, Acknowledgment ack) { try { process(r.key(), r.value()); // 业务幂等 ack.acknowledge(); } catch (TransientException e) { throw e; // 让默认错误处理器重试 } catch (Exception fatal) { // 明确不可重试的异常也可直接抛出,快速进入 DLT throw fatal; } }
4.2 “分离重试主题 + DLT”(
@RetryableTopic
配置式)@Configuration @EnableKafka public class RetryTopicConfig { @Bean public RetryTopicConfiguration retryableTopics(KafkaTemplate<String, String> kt) { return RetryTopicConfigurationBuilder .newInstance() .exponentialBackoff(1000, 2.0, 30000) // 1s -> 2s -> 4s ... 最大30s .maxAttempts(5) // 共5次(含初次),之后进 DLT .doNotAutoCreateRetryTopics() // 也可让它自动创建 .create(kafkaTemplate()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return kt; } }
然后:
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2.0, maxDelay = 30000)) @KafkaListener(topics = "order.events", groupId = "order-consumer") public void onMessage(ConsumerRecord<String, String> r) { process(r.key(), r.value()); } @DltHandler // 监听 DLT(可做告警/落库/标记) public void dlt(ConsumerRecord<String, String> r) { // 这里能拿到 headers:原始位点与异常信息 }
5)主题与参数规划建议
命名规范:
-
主题:
order.events
DLT:
order.events.DLT
重试:
order.events.RETRY.5s
/30s
/5m
(或框架自动生成)
分区数:DLT 一般与主主题同分区数,方便回放保持 key/分区相对一致。
副本因子:与生产主题一致(例如
3
)。保留策略:DLT 通常较长保留(7~30 天),不开启 compaction(需要保留每条失败记录)。
消息键:仍使用业务 key(如 orderId)→ DLT 内同 key 聚集,便于排查与重放。
监控:对 DLT 的 消息堆积、增长速率、异常类型分布报警;DLT 消费失败量也要监控。
创建示例(命令行):
kafka-topics.sh --create --topic order.events.DLT \ --partitions 12 --replication-factor 3 \ --config retention.ms=1209600000 # 14 天
6)Headers 里一般会带什么(用于排查/回放)
不同框架键名略有差异,核心信息都在:
原始定位:original-topic / original-partition / original-offset / original-timestamp
异常信息:exception-class / exception-message / stacktrace
可能还会带:原消息的 key/headers 透传
用途:在 DLT 消费或回放程序里精确定位原消息位置与失败原因,生成报表/告警,或选择性回放。
7)如何回放 DLT(replay)
三种常见做法:
人工一次性回放:写个小工具(或开关型消费者),从 DLT 拉取 → 修复/补齐字段 → 再投回原主题(或“专用回放主题”)。
规则引擎自动回放:比如遇到“下游 5xx”类异常,半小时后自动重投一次。
离线导出 + 批修复:把 DLT 导出到仓库(ClickHouse/ES/S3),数据修复后按批重投。
最简单的回放代码(示意):
@KafkaListener(topics = "order.events.DLT", groupId = "dlt-replayer") public void onDeadLetter(ConsumerRecord<String, String> r) { var repaired = tryFix(r.value()); // 修复/补齐/做兼容 if (repaired != null) { kafkaTemplate.send("order.events", r.key(), repaired); // 重投 } else { // 仍无法修复:落库、告警、人工介入 } }
注意:
回放时尽量沿用原 key,保持与主消费的一致分区路由;
要有幂等,避免二次入库产生重复。
8)常见坑 & 最佳实践
无限重试:坚决避免。要么限定次数,要么走重试主题。
阻塞分区:进程内重试时间太长会阻塞后续消息;高吞吐/长重试请用 重试主题。
丢元数据:投 DLT 时一定带上原位点与异常信息(headers),否则后续很难排查。
无监控:DLT 不监控就等于黑洞;至少对“增长速率、积压量、异常类型 TOP N”报警。
无回放通道:上线前就准备好回放工具或 DLT 消费器,别等 DLT 积了一堆才补。
顺序问题:跨重试/回放无法保证全局顺序,只能保证分区内顺序;需要强顺序时,务必按业务 key 分区并限制并发。
一、核心原则(先记住这几条)
并发的硬上限 = 分区数(partitions) :同一消费组内,一个分区同一时刻只会被一个消费者实例的一个线程消费。要进一步扩并发,优先加分区或加实例(同组)。
多实例=多节点扩容:多个进程/容器用同一个 group.id 就会分摊分区;缩容/扩容都会触发再均衡(rebalance)。
顺序只在“分区内”保证:如果你需要按业务键有序,务必设置消息 key,让同 key 进同一分区。
至少一次交付(At-Least-Once)+ 幂等处理是最常见组合;严格 EOS(Exactly-Once)只在“消费→处理→再写回 Kafka/DB”链路必须做到时再考虑。
再均衡优化:启用 CooperativeStickyAssignor + Static Membership,能大幅降低扩缩容时的抖动。
二、并发模型对比与选型
多进程/多节点并发(推荐)
多副本服务实例(K8s 部署 N 副本),同一
group.id
。优点:简单、弹性、隔离好;扩容=加副本。
注意:实例数不要超过分区数(超过会有实例空闲)。
单进程内多线程
Spring Kafka:
@KafkaListener(concurrency="N")
;原生客户端:一个线程一个 Consumer(Consumer 不是线程安全的)。优点:资源利用高;缺点:和进程级扩容比,故障域更大。
单 Consumer + 工作线程池(work-pool)
一个 Consumer 线程
poll()
,把消息投给业务线程池处理;用 pause()/resume() 做回压;处理完成后再按分区顺序提交位点。优点:能控制业务处理并行度且保序;缺点:实现复杂。
三、容量规划与分区数估算
粗略估算:
所需分区数 ≈ 峰值QPS × 单条处理耗时(秒) ÷ 单分区极限吞吐
举例:峰值 5,000 msg/s,每条处理 20 ms(0.02s),单分区能稳态 500 msg/s,
→ 分区数 ≈ 5000×0.02 ÷ 500 = 0.2 ≈ 至少 1;考虑抖动与冗余,建议 6~12 分区起步,方便水平扩展。在线增加分区是可行的(会改变 key→partition 映射,跨分区无序不受影响;分区内仍保序)。
四、偏移量与投递语义
自动提交(enable.auto.commit=true) :吞吐高但丢失控制弱,不推荐生产。
手动同步提交:处理完再
commitSync
,典型 At-Least-Once。手动异步提交:
commitAsync
吞吐更好,但失败需回调补偿。Exactly-Once(严格) :
-
Kafka→Kafka:启用事务(
transactional.id
,生产端enable.idempotence=true
),消费端用 read_committed,Spring Kafka 用KafkaTransactionManager
。Kafka→DB:要么用幂等写(唯一键/去重表),要么引入本地事务外加 outbox。
五、回压与再均衡稳定性
回压:当业务处理跟不上
poll()
,提升max.poll.interval.ms
;或pause(partitions)
等处理完再resume
;或降低max.poll.records
。再均衡抖动优化:
-
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
(协作式、增量再均衡)。静态成员:为每个实例设置稳定的
group.instance.id
,缩短扩缩容抖动。合理设置
session.timeout.ms
/heartbeat.interval.ms
,减少误判掉线。
六、Spring Kafka 实践模板
1) application.yml(关键参数)
spring: kafka: bootstrap-servers: PLAINTEXT://kafka-1:9092,kafka-2:9092,kafka-3:9092 consumer: group-id: order-consumer enable-auto-commit: false auto-offset-reset: latest # 拉取与处理节奏 max-poll-records: 500 max-poll-interval: 300000 # 5min,业务慢时适当调大 # 再均衡与稳定性 properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor group.instance.id: ${HOSTNAME:}${random.uuid} # K8s: 用 Pod 名或节点唯一ID,确保静态成员 # 拉取批量与等待 fetch.min.bytes: 1048576 # 1MB fetch.max.wait.ms: 500 listener: ack-mode: MANUAL # 手动确认 type: single # single/ batch
2) Listener 与并发
@Component public class OrderConsumer { // 并发度=容器线程数(不要超过分区总数) @KafkaListener(topics = "order.events", concurrency = "6", containerFactory = "kafkaListenerContainerFactory") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { try { // 业务处理(注意幂等) process(record.key(), record.value()); ack.acknowledge(); // 成功再提交偏移量 } catch (TransientException e) { throw e; // 交给错误处理器重试/投 DLT } catch (Exception fatal) { // 可记录并直接投 DLT(避免阻塞) throw fatal; } } private void process(String key, String json) { /* ... */ } }
3) 容器工厂 + 错误处理 + DLT
@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> cf, KafkaTemplate<String, String> kafkaTemplate) { var factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(cf); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 重试 & 死信:3 次间隔退避,失败投到 *.DLT var recoverer = new DeadLetterPublishingRecoverer( kafkaTemplate, (r, e) -> new TopicPartition(r.topic() + ".DLT", r.partition()) ); var backoff = new FixedBackOff(1000L, 3L); factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff)); // 可选:批量监听时启用 // factory.setBatchListener(true); return factory; } }
4) 事务(Kafka→Kafka 严格 EOS 可选)
spring: kafka: producer: properties: enable.idempotence: true transaction-id-prefix: tx-order-
@Bean public KafkaTransactionManager<String, String> kafkaTxManager(ProducerFactory<String, String> pf) { return new KafkaTransactionManager<>(pf); }
然后在业务里使用
@Transactional("kafkaTxManager")
包住“消费→处理→发送到下游主题”的逻辑。七、原生 Java 消费者:单 Consumer + 线程池回压示例(保序提交)
public class WorkerPoolConsumer implements Runnable { private final KafkaConsumer<String, String> consumer; private final ExecutorService pool = new ThreadPoolExecutor( 8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200)); // 有界队列做回压 private final Map<TopicPartition, OffsetAndMetadata> commitMap = new ConcurrentHashMap<>(); public WorkerPoolConsumer(Properties props, String topic) { this.consumer = new KafkaConsumer<>(props); consumer.subscribe(List.of(topic)); } @Override public void run() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); if (poolIsBusy()) { consumer.pause(consumer.assignment()); // 回压:暂停拉取 continue; } else { consumer.resume(consumer.assignment()); } for (ConsumerRecord<String, String> r : records) { pool.submit(() -> { try { process(r.key(), r.value()); // 业务幂等 // 记录待提交 offset(分区内最大已完成位点) commitMap.compute(new TopicPartition(r.topic(), r.partition()), (tp, old) -> newer(old, new OffsetAndMetadata(r.offset() + 1))); } catch (Exception ex) { // 记录并投递 DLT(略) } }); } // 提交已完成的位点(异步) if (!commitMap.isEmpty()) { consumer.commitAsync(new HashMap<>(commitMap), (m, e) -> { /* 回调日志 */ }); } } } finally { try { consumer.commitSync(commitMap); } catch (Exception ignore) {} consumer.close(); pool.shutdown(); } } private boolean poolIsBusy() { ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool; return tpe.getQueue().remainingCapacity() < 50; // 阈值可调 } private OffsetAndMetadata newer(OffsetAndMetadata a, OffsetAndMetadata b) { return (a == null || b.offset() > a.offset()) ? b : a; } private void process(String key, String value) {/* ... */} }
注意:以上模式在分区内可能乱序提交必须谨慎。若严格分区内保序,可为每个分区维护单独的有序工作队列或使用“每分区一个消费线程”。
八、扩缩容与运维要点
扩容步骤:
看 分区数 是否已成为瓶颈(消费者副本数 ≤ 分区数);
增加副本(同组)→ 观察再均衡耗时与 lag;
仍不够则增分区(注意热点 key),或做批处理/并行度优化;
栈调参数:
max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
、max.partition.fetch.bytes
。
监控(Prometheus + kafka_exporter):
-
消费 lag、rebalance 次数、处理耗时分布、失败率、DLT 量、
max.poll.interval.ms
触发次数。
K8s 自动扩缩:可依据 lag 或处理时延设置 HPA(如将 lag 转成自定义指标)。