#作者:张桐瑞
文章目录
1 问题概述
由于kafka-exporter获取kafka指标时间过长,无法通过curl kafka-exporter:9308/metrics 获取指标,通过查看kafka-exporter日志,发现
time=“2025-07-22T02:18:00Z” level=error msg=“Cannot get offset of group xxxx: kafka: broker not connected” source=“kafka_exporter.go:396”
的报错信息,发现kafka-exporter在查询消费者组时出现超时,通过命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
同样出现超时信息。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConsumerGroups, deadlineMs=1753170317381, tries=1, nextAllowedTryMs=1753170317482) timed out at 1753170317382 after 1 attempt(s)
通过针对具体消费者组进行查看,发现返回信息较快,故计划过滤一部分groups来减少数据量。
2 修改方案
当前环境中,Kafka与Kafka Exporter共同部署在同一Pod 内,且各个Kafka Exporter未配置过滤规则,导致采集数据存在大量重复。为提升采集效率和准确性,计划通过以下措施进行优化:
- 基于不同的 ReplicaSet(RC)配置差异化过滤规则;
针对每个RC单独设置专属的消费者组过滤规则,避免重复采集。 - 利用消费者均分脚本生成过滤正则表达式;
通过脚本将所有消费者组按规则均分,生成适用于Kafka Exporter的正则表达式,保证每个 Exporter 只采集其分配范围内的消费者组。 - 将生成的消费者组过滤正则应用于不同 Exporter 配置中;
将对应的正则表达式配置到各自 Exporter 实例中,实现消费组采集的隔离和均衡分布
2.1修改参数
group.filter .* Regex that determines which consumer groups to collect
group.filter # 正则表达式,用于确定要收集哪些消费者组(Consumer Groups)的指标。
2.2配置示例
只需为每个Kafka Exporter添加对应的YAML配置段落即可。
2.2.1KAFKA_EXPORTER_1:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(A|a|B).*
2.2.2KAFKA_EXPORTER_2:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(E|b).*
2.2.3KAFKA_EXPORTER_3:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^©.*
3 消费者组均分脚本
3.1使用说明
配置参数:
- BOOTSTRAP_SERVER: 指定 Kafka 服务地址;
- EXPORTER_COUNT: 指定需要生成多少个 Exporter 的过滤规则。
# Bash group-distributor.sh
使用消费者组均分脚本进行消费者组均匀分布,脚本执行结果如下:
bash-5.0$ bash /bitnami/kafka/data/topic_to_consumer_group.sh
================ Hash by 首字母 (区分大小写) =================
Exporter 1: --group.filter=^(A|a|B).* [3 groups]
Exporter 2: --group.filter=^(E|b).* [2 groups]
Exporter 3: --group.filter=^(C).* [1 groups]
可参考对应的exporter过滤正则结果。
3.2脚本内容
3.2.1Hash分组脚本
#!/bin/bash
BOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3
# 获取所有消费者组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)
# 初始化数组
for ((i=0; i<EXPORTER_COUNT; i++)); do
prefix_sets[$i]=""
count[$i]=0
done
declare -A seen_prefix
# 遍历组名,按首字母 hash 均分
while IFS= read -r group; do
prefix=$(echo "$group" | cut -c1)
[[ -z "$prefix" ]] && continue
# 计算哈希分配位置
hash_hex=$(echo -n "$prefix" | md5sum | awk '{print $1}' | cut -c1-8)
hash_dec=$((16#$hash_hex))
idx=$((hash_dec % EXPORTER_COUNT))
# 记录唯一首字母,用于生成正则
if [[ -z "${seen_prefix[$prefix]}" ]]; then
seen_prefix[$prefix]=1
if [[ -z "${prefix_sets[$idx]}" ]]; then
prefix_sets[$idx]="$prefix"
else
prefix_sets[$idx]+="|$prefix"
fi
fi
# 每个实际组都会计入对应 exporter 的数量
count[$idx]=$((count[$idx] + 1))
done <<< "$all_groups"
# 输出结果
echo "================ Hash by 首字母 (区分大小写) ================="
for ((i=0; i<EXPORTER_COUNT; i++)); do
if [[ -n "${prefix_sets[$i]}" ]]; then
echo "Exporter $((i+1)): --group.filter='^(${prefix_sets[$i]}).*' [${count[$i]} groups]"
else
echo "Exporter $((i+1)): --group.filter='^()' [${count[$i]} groups]"
fi
done
3.2.2 排序分组脚本
#!/bin/bash
BOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3
# 获取所有消费组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)
# 按首字母分组(区分大小写)
declare -A initial_to_groups
declare -A initial_counts
while IFS= read -r group; do
[[ -z "$group" ]] && continue
first_char=${group:0:1}
initial_to_groups["$first_char"]+="$group"$'\n'
initial_counts["$first_char"]=$((initial_counts["$first_char"] + 1))
done <<< "$all_groups"
# 将首字母按消费组数量降序排序
sorted_initials=$(for k in "${!initial_counts[@]}"; do
echo -e "${initial_counts[$k]}\t$k"
done | sort -rn | awk '{print $2}')
# 初始化每个 exporter 的负载、过滤项和组列表
for ((i=0; i<EXPORTER_COUNT; i++)); do
exporter_filters[$i]=""
exporter_loads[$i]=0
exporter_groups[$i]=""
done
# 分配首字母到 exporter,按消费组数量均衡
for initial in $sorted_initials; do
count=${initial_counts[$initial]}
# 找当前负载最小的 exporter
min_index=0
min_load=${exporter_loads[0]}
for ((i=1; i<EXPORTER_COUNT; i++)); do
if (( exporter_loads[i] < min_load )); then
min_index=$i
min_load=${exporter_loads[i]}
fi
done
# 添加到对应 exporter
if [[ -z "${exporter_filters[$min_index]}" ]]; then
exporter_filters[$min_index]="$initial"
else
exporter_filters[$min_index]+="|$initial"
fi
exporter_loads[$min_index]=$((exporter_loads[$min_index] + count))
exporter_groups[$min_index]+="${initial_to_groups[$initial]}"
done
# 输出格式
echo "================ kafka-exporter group.filter 正则分片 ================="
for ((i=0; i<EXPORTER_COUNT; i++)); do
group_count=$(echo -n "${exporter_groups[$i]}" | grep -c '^')
echo "Exporter $((i+1)): ($group_count groups)"
echo " --group.filter='^(${exporter_filters[$i]}).*'"
echo
done
3.3实现原理说明
3.3.1Hash分组脚本说明
该脚本通过以下核心逻辑实现消费者组的分配与正则表达式生成:
- 获取消费者组列表:
利用 kafka-consumer-groups.sh --list 获取当前 Kafka 所有消费者组名。 - 提取首字母并去重:
对每个消费者组名提取首字母(区分大小写),使用关联数组 seen_prefix 确保相同首字母只处理一次。 - Hash 均分逻辑:
1)使用 md5sum 对首字母进行 Hash 处理;
2)将 Hash 值转为十进制,再通过取模操作将其平均分配到 N 个 Exporter 中;
3)每个 Exporter 收集分配到的首字母集合,组合成正则表达式。 - 生成过滤规则:
每个Exporter输出一条形如–group.filter=^(A|B|C).* 的正则规则,仅匹配以指定前缀开头的消费者组。
3.3.2排序分钟脚本说明
在首字母提取之后,脚本还会对首字母列表进行 排序处理,其作用是: - 确保稳定性
通过排序(sort 命令),可以保证即使 Kafka 集群中的消费者组顺序发生变化,脚本对首字母的处理顺序仍保持一致,从而保证正则分配的稳定性。 - 提升分配公平性
排序后的首字母经过统一的 Hash 计算和取模分配,有助于在 Exporter间更公平地分摊消费者组,提高采集性能和均衡性。 - 简化调试和查看
将字母排序输出后,便于在查看分配结果时快速定位具体字母属于哪个Exporter,也有助于问题定位和正则表达式核查。
4 KAFKA-EXPORTER流程代码
4.1KAFKA-EXPORTER拉取数据流程
4.1.1拉取所有消费者组
在getConsumerGroupMetrics函数中,首先通过broker.ListGroups(&sarama.ListGroupsRequest{})向Kafka broker拉取所有的消费者组(group)列表。
这一步会返回集群中所有存在的group名称。
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
groupIds = append(groupIds, groupId)
}
}
4.1.2 过滤消费者组
拉取到所有group后,遍历每个groupId,用e.groupFilter.MatchString(groupId)判断该group是否匹配过滤条件。
只有匹配的groupId才会被加入到后续的处理流程。
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
groupIds = append(groupIds, groupId)
}
}
4.1.3 DESCRIBE消费者组
只对通过过滤的group进行DescribeGroups、FetchOffset等详细指标采集。
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})
if err != nil {
glog.Errorf("Cannot get describe groups: %v", err)
return
}
for _, group := range describeGroups.Groups {
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
if e.offsetShowAll {
for topic, partitions := range offset {
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
}
}