Fluent Bit针对kafka心跳重连机制详解(下)

发布于:2025-08-29 ⋅ 阅读:(15) ⋅ 点赞:(0)

#作者:程宏斌

文章目录


接上篇:https://blog.csdn.net/qq_40477248/article/details/150957571?spm=1001.2014.3001.5501

disconnect

断开连接的情况主要是两种:
连接或传输过程中有错误发生
超时, 比如空闲时间超时

**
* Close and destroy a transport handle
*/
void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
...
        // 清除接收缓冲区
        if (rktrans->rktrans_recv_buf)
                rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
...
        if (rktrans->rktrans_s != -1) // 自定义close或者socket.close()
                rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
                                        rktrans->rktrans_s);
​
        rd_free(rktrans);
}
/**
* @brief Failure propagation to application.
*
* Will tear down connection to broker and trigger a reconnect.
*
* \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
* be debug-logged.
*
* @locality broker thread
*/
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
                        int level,
                        rd_kafka_resp_err_t err,
                        const char *fmt,
                        ...) {
...
        if (rkb->rkb_transport) {
                // close socket
                rd_kafka_transport_close(rkb->rkb_transport);
                rkb->rkb_transport = NULL;
...
        // 设置状态
        rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
}
​
/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
*        close the connection.
* 空闲时间探查
* @remark Must only be called if connections.max.idle.ms > 0 and
*         the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check(rd_kafka_broker_t *rkb) {
…
        // 连接空闲时间 是否超过 服务端最大空闲时间, 默认10分钟
        if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))
                return;
        // 超过, 服务端会断开连接; client保险起见, 强制关闭连接
        rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__TRANSPORT,
                            "Connection max idle time exceeded "
                            "(%dms since last activity)",
                            idle_ms);

reconnect

连接失败时, 系统自动发起重连. 重连不会终止, 直到连接成功或者系统退出.
nodename更改时, 会尝试断开重连

/**
* @brief Update the reconnect backoff.
*        Should be called when a connection is made, or all addresses
*        a broker resolves to has been exhausted without successful connect.
* 设置更新重试时间
* @locality broker thread
* @locks none
*/
static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t *rkb,
                                        const rd_kafka_conf_t *conf,
                                        rd_ts_t now) {
…
        /* 重试时间(间隔)已超过最大限制时间reconnect.backoff.max.ms
        * 重置下次的重试时间. */
        if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) < now)
                rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
​
        /* 在区间[-25%, +50%]内随机取一个重试时间*/
        backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),
                            (int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
​
        /* 不能超过reconnect.backoff.max.ms. */
        backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
​
        /* Set time of next reconnect */
        rkb->rkb_ts_reconnect         = now + (backoff * 1000);
        rkb->rkb_reconnect_backoff_ms = RD_MIN(
            rkb->rkb_reconnect_backoff_ms * 2, conf->reconnect_backoff_max_ms);
}
​
/**
* @brief Calculate time until next reconnect attempt.
*
* @returns the number of milliseconds to the next connection attempt, or 0
*          if immediate.
* @locality broker thread
* @locks none
*/
// 计算距离下次重试的时间间隔
static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
…
        remains = rkb->rkb_ts_reconnect - now;
…
}
​
static int rd_kafka_broker_thread_main(void *arg) {
...
                switch (rkb->rkb_state) {
...
                case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
...
                        /* Throttle & jitter reconnects to avoid
                        * thundering horde of reconnecting clients after
                        * a broker / network outage. Issue #403 */
                        backoff =
                            rd_kafka_broker_reconnect_backoff(rkb, rd_clock());
                        if (backoff > 0) {
                                rd_rkb_dbg(rkb, BROKER, "RECONNECT",
                                        "Delaying next reconnect by %dms",
                                        backoff);
                                rd_kafka_broker_serve(rkb, (int)backoff);
                                continue;
                        }
...
                case RD_KAFKA_BROKER_STATE_CONNECT:
                case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:
                case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:
                case RD_KAFKA_BROKER_STATE_AUTH_REQ:
                case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
                case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
                        /* Asynchronous connect in progress. */
                        rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
​
                        /* Connect failure.
                        * Try the next resolve result until we've
                        * tried them all, in which case we back off the next
                        * connection attempt to avoid busy looping. */
                        if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&
                            rd_kafka_broker_addresses_exhausted(rkb))
                                rd_kafka_broker_update_reconnect_backoff(
                                    rkb, &rkb->rkb_rk->rk_conf, rd_clock());
                        /* If we haven't made progress from the last state, and
                        * if we have exceeded
                        * socket_connection_setup_timeout_ms, then error out.
                        * Don't error out in case this is a reauth, for which
                        * socket_connection_setup_timeout_ms is not
                        * applicable. */
                        else if (
                            rkb->rkb_state == orig_state &&
                            !rkb->rkb_reauth_in_progress &&
                            rd_clock() >=
                                (rkb->rkb_ts_connect +
                                (rd_ts_t)rk->rk_conf
                                        .socket_connection_setup_timeout_ms *
                                    1000))
                                rd_kafka_broker_fail(
                                    rkb, LOG_WARNING,
                                    RD_KAFKA_RESP_ERR__TRANSPORT,
                                    "Connection setup timed out in state %s",
                                    rd_kafka_broker_state_names
                                        [rkb->rkb_state]);
​
                        break;
…
}
​
/**
* @brief Update the nodename (address) of broker \p rkb
*        with the nodename from broker \p from_rkb (may be NULL).
*
*        If \p rkb is connected, the connection will be torn down.
*        A new connection may be attempted to the new address
*        if a persistent connection is needed (standard connection rules).
*
*        The broker's logname is also updated to include \p from_rkb's
*        broker id.
*
* @param from_rkb Use the nodename from this broker. If NULL, clear
*                 the \p rkb nodename.
*
* @remark Must only be called for logical brokers.
*
* @locks none
*/
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,
                                rd_kafka_broker_t *from_rkb) {
…
        // nodename已更改过, 需要触发断线和重连
        /* Trigger a disconnect & reconnect */
        rd_kafka_broker_schedule_connection(rkb);
}

网站公告

今日签到

点亮在社区的每一天
去签到