Fluent Bit针对kafka心跳重连机制详解(上)

发布于:2025-08-29 ⋅ 阅读:(22) ⋅ 点赞:(0)

#作者:程宏斌


心跳重连验证测试

测试方案

当我们进行测试时,按照以下步骤操作:

  • 首先启动Fluent Bit,并记录在数据正常采集到Kafka后,Fluent Bit debug日志的内容。
  • 停止Kafka服务后,发送日志并观察Fluent Bit debug日志的变化。特别注意观察在采集日志时是否有尝试请求Kafka服务。
  • 再次启动Kafka服务,继续观察Fluent Bit debug日志的变化。注意检查之前存储的日志是否重新发送,并确认是否自动重新连接到Kafka。
  • 这样的测试可以帮助我们验证Fluent Bit在Kafka服务可用性变化时的表现和自动处理能力。

测试日志汇总

  • 下面是针对kafka断开重连日志汇总,详细的地方有#号开头的注解,请仔细阅读注解。
# 下面两个coro_id是正常的日志采集流程
[2024/06/13 11:31:58] [debug] in produce_message
[2024/06/13 11:31:58] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:31:58] [debug] [out flush] cb_destroy coro_id=0
[2024/06/13 11:32:20] [debug] in produce_message
[2024/06/13 11:32:20] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:32:20] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:32:20] [debug] [out flush] cb_destroy coro_id=1

# 此处停止kafka服务,模拟kafka故障,会发现有明显的Disconnected关键字。此时开始fluent-bit针对kafka的自检代码开始运行。
[2024/06/13 11:32:47] [info] 192.168.123.50:9092/bootstrap: Disconnected (after 50404ms in state UP)
[2024/06/13 11:32:47] [info] 961ddde63cfb:9092/0: Disconnected (after 49401ms in state UP)
[2024/06/13 11:32:47] [error] 2/2 brokers are down
[2024/06/13 11:32:47] [error] Connect to ipv4#192.168.128.2:9092 failed: Connection refused
[2024/06/13 11:32:49] [error] Failed to resolve '961ddde63cfb:9092': Name or service not known
[2024/06/13 11:32:49] [warn] Metadata request failed: broker down: Local: Host resolution failure
[2024/06/13 11:32:49] [error] Connect to ipv4#192.168.123.50:9092 failed: Connection refused

# 当Kafka服务发生故障时,Fluent Bit在采集日志后会自动尝试重新连接到Kafka。
[2024/06/13 11:33:18] [error] Connect to 192.168.123.50:9092 failed: Connection refused
[2024/06/13 11:33:24] [error] Failed to resolve '961ddde63cfb:9092'
[2024/06/13 11:34:02] [error] Failed to resolve '961ddde63cfb:9092'
[2024/06/13 11:34:18] [error] Connect to 192.168.123.50:9092 failed: Connection refused
[2024/06/13 11:34:21] [error] Connect to ipv4#192.168.128.2:9092 failed: Connection refused

# 在启动 Kafka 后,之前未成功发送的历史消息将会随着下一批日志数据一同发送到 Kafka,并显示为 "message delivered"。
[2024/06/13 11:34:33] [debug] in produce_message
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:45] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)

测试总结

这里是针对上诉日志结果的一个总结:
在停止 Kafka 服务后,模拟 Kafka 故障会导致在日志中出现明显的 “Disconnected” 关键字。 fluent-bit 针对 Kafka 的自检代码开始运行。
当 Kafka 服务发生故障时,Fluent Bit 在采集日志后会自动尝试重新连接到 Kafka。在 Fluent Bit 的 debug 日志中,coro_id=6 表示第七条消息的发送(计数从 0 开始)。在前两条消息成功发送后,从第三到第七条每条日志都会触发一次 Kafka 请求。
在启动 Kafka 后,之前未成功发送的历史消息将会随着下一批日志数据一同发送到 Kafka,并显示为 “message delivered”。

代码详解

fluent-bit kafka reconnect逻辑:
1.
代码位置: fluent-bit/lib/librdkafka-2.3.0/src
librdkafka本质上是一个状态机: state+op构成eventloop循环;
terminate

// ./lib/librdkafka-2.3.0/src/rdkafka_int.h
/**
* @returns true if \p rk handle is terminating.
* 判断rdkafka是否停止服务, 如果已经停止, 则eventloop不再循环
*/
#define rd_kafka_terminating(rk)                                               \
        (rd_atomic32_get(&(rk)->rk_terminate) & RD_KAFKA_DESTROY_F_TERMINATE)
// ./lib/librdkafka-2.3.0/src/rdkafka.c
// producer/consumer退出时, 设置关闭标记
static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
...
        rd_atomic32_set(&rk->rk_terminate,
                        flags | RD_KAFKA_DESTROY_F_TERMINATE);
...
}

connect

//  ./lib/librdkafka-2.3.0/src/rdkafka_transport.c
// 管理连接的线程
// state == RD_KAFKA_BROKER_STATE_TRY_CONNECT时, 尝试进行连接
static int rd_kafka_broker_thread_main(void *arg) {
…
        while (!rd_kafka_broker_terminating(rkb)) {
…
                switch (rkb->rkb_state) {
…
                case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
…
                        /* Initiate asynchronous connection attempt.
                        * Only the host lookup is blocking here. */
                        r = rd_kafka_broker_connect(rkb);
…
/**
* Initiate asynchronous connection attempt.
* 初始化一个异步连接
* Locality: broker thread
*/
rd_kafka_transport_t *rd_kafka_transport_connect(rd_kafka_broker_t *rkb,
                                                const rd_sockaddr_inx_t *sinx,
                                                char *errstr,
                                                size_t errstr_size) {
…
        // 创建socket, 默认linux的socket函数
        s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, SOCK_STREAM,
                                        IPPROTO_TCP,
                                        rkb->rkb_rk->rk_conf.opaque);
…
        /* Connect to broker */
        if (rkb->rkb_rk->rk_conf.connect_cb) {
                // 使用自定义的connect
                rd_kafka_broker_lock(rkb); /* for rkb_nodename */
                r = rkb->rkb_rk->rk_conf.connect_cb(
                    s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
                    rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);
                rd_kafka_broker_unlock(rkb);
        } else {
                // 使用标准api的connect
                if (connect(s, (struct sockaddr *)sinx,
                            RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&
                    (rd_socket_errno != EINPROGRESS
#ifdef _WIN32
                    && rd_socket_errno != WSAEWOULDBLOCK
#endif
                    ))
                        r = rd_socket_errno;
                else
                        r = 0;
        }
…
// ./lib/librdkafka-2.3.0/src/rdposix.h
/** @brief Last socket error */
#define rd_socket_errno errno
// ./lib/librdkafka-2.3.0/src/rdkafka_broker.c
/**
* @brief Initiate asynchronous connection attempt to the next address
*        in the broker's address list.
*        While the connect is asynchronous and its IO served in the
*        CONNECT state, the initial name resolve is blocking.
* 
* @returns -1 on error, 0 if broker does not have a hostname, or 1
*          if the connection is now in progress.
*/
static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
…
        if (*nodename) // 有name, 预先设置为连接成功状态
                rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT);
…
        // 设置/更新重连时间
        rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf,
                                                rd_clock());
…
        // 尝试连接
        if (!(rkb->rkb_transport = rd_kafka_transport_connect(
                rkb, sinx, errstr, sizeof(errstr)))) {
                rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
                                    "%s", errstr);
                return -1;
        }
​
        rkb->rkb_ts_connect = rd_clock();
​
        return 1;
}

网站公告

今日签到

点亮在社区的每一天
去签到