本期主要聊聊kafka消费者组与分区
消费者组 & 消费者
每个消费者都需要归属每个
消费者组
,每个分区只能被消费者组中一个消费者消费
上面这段话还不够直观,我们举个例子来说明。
订单系统 订单消息通过
order_topic
发送,该topic 有5个分区
结算系统 订阅订单变更消息订阅订单的消息,进行相关结算业务,结算系统的消费者组是
order_consumer_settlement
结算系统有
三台服务器
,所有消费者组是order_consumer_settlement
有三个消费者,由于每个分区被不同消费者(服务器
)处理,所以消息不会被不同结算服务节点重复消费
,且每条消息都有一个唯一的机器处理结算
审计系统 也订阅订单变更消息,它的消费者组是
order_consumer_audit
,审计系统部署2台,它的消息处理与结算系统之间毫不相干。
如果订单系统发送一条订单变更消息,该消息所在分区是确定的,结算系统的一个服务节点会消费到这条消息,审计系统的一个服务节点也会消费到该消息。
通常情况下我们一个 应用 对应某个Topic 的 消费者组,该应用的每个服务节点都对应一个消费者。
按照应用对应消费者组方式,一条消息有且只有一个服务节点处理,在排除配置和一些特殊情况,消息不会重复消息,也不会丢失
消费者分区与主题数量关系
可以在日志中grep Adding newly assigned partition
关键字查看当前服务节点分配的分区
不同数量消费者分区情况
分区数量为10 ,消费者组有2个节点
消费者分配到的分区是
5、5
分区数量为10 ,消费者组有3个节点
消费者分配到的分区是
4、3、3
分区数量为10 ,消费者组有4个节点
消费者分配到的分区是
3、2、2、3
分区数量为10 ,消费者组有5个节点
消费者分配到的分区是
2、2、2、2、2
从上面数据看每个消费者分配的分区数量是均匀的
举一反三
如果 分区数量
小于 消费者数量
,猜测会有消费者一个分区也分配不到
测试分区数量为3 , 消费者数量为4 时。下面日志证明结论正确的
刨根到底,分区分配策略如何调整
前面都是默认情况下,分区配置策略,不同版本默认策略是不同的,通过partition.assignment.strategy
配置,常见分配策略有
RangeAssignor
RangeAssignor策略对每个Topic进行独立的分区分配
分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序
分配时尽量均衡地将分区分配给消费者
缺点只保证单个主题
情况下均匀分配,对于消费者同时订阅多个主题情况下可能会造成 总体分区分配不合理。
RoundRobinAssignor
所有消费者和
所有分区
都进行排序,然后按照轮询的方式将分区分配给消费者
该策略在RangeAssignor进行了优化,当然如果消费者订阅的主题列表不同情况下分配结果也是不均匀的。
StickyAssignor
它在RangeAssignor的基础上引入了“粘性”的限制。当消费者组中消费者离开或加入
,尽量保留现有的分配结果,并使新的分配结果均衡。
CooperativeStickyAssignor
它是Kafka 2.4.0 引入的一种新的分配策略。它将原来的一次全局分区重平衡改为多次小规模分区重平衡。这种策略能够更平滑地处理消费者加入或离开的情况,减少因全局重平衡带来的性能开销。
How 修改默认策略 ?
参考下面代码设置ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
甚至可以自定义策略
private Properties getConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"xxxx:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,consumerGroup);
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Arrays.asList(RangeAssignor.class.getName()))
return properties;
}