kafka 问与答

发布于:2025-05-20 ⋅ 阅读:(16) ⋅ 点赞:(0)

kafka Q&A

How does the client connect to kafka and discovery the brokers.

client 只需要知道一部分nodes(brokers)的地址既可以,client 会自动发现剩下的所有topic partition leader nodes, 然后连接上。

When a client connects:
    It uses the bootstrap servers list (e.g., broker1:9092,broker2:9092).
    It connects to one of those nodes and performs metadata discovery.

    It learns:
        Which brokers exist
        What topics/partitions exist
        Who is the leader for each partition

🔁 Then What?
    The client then connects directly to the broker that is the leader for each partition it needs to consume or produce from.
    This is done transparently — you only give it the bootstrap nodes, and Kafka handles the rest.

⚠️ Notes:
    You don't need to list all brokers in the bootstrap list — just enough for discovery.
    If a leader broker goes down, the client automatically detects it and reconnects to the new leader using updated metadata.

kafka partition 和consumer 的故事

说法 正确性 说明
一个 partition 同时只能被 group 内一个 consumer 消费 ✅ 正确
一个 partition 可以被多个不同 group 的消费者同时消费 ✅ 正确
一个 consumer 可以连接多个 partition ✅ 正确

To get all messages from a topic with multiple partitions, you need multiple consumers instances in the same consumer group, and Kafka will assign partitions to them.
if you only use one consumer in a consumer group, Kafka will assign all partitions to that one consumer, and it will pull messages from each in turn.

How to identify clients into different groups?

// Each consumer client has a config:
group.id=my-consumer-group

The process when producer write the data to kafka topic.

producer 发送数据到topic 的leader partition, 然后follower partition 会从leader异步同步replicas。
过程大概如下

Let’s say:

    Topic has Partition 0 and 3 replicas 
    Partition 0 has:
        Leader = Broker A
        Followers = Broker B and Broker C

    ISR = [A, B, C]

Here’s what happens when the producer sends a message with acks=1:

    1. Producer sends message to Broker A (the leader).

    2. Broker A writes the message to its local log.

    3. Broker A immediately sends acknowledgment back to the producer.

    4. Meanwhile, Broker B and Broker C (followers) pull the message from A and append it asynchronously (in the background).

如果ACK=ALL, 那么 第3.4步 会如下

3. Broker A waits until all in-sync replicas (Broker B and Broker C) pull and replicate that message.
4. Once all ISRs have acknowledged, Broker A sends an acknowledgment back to the producer.

What is message batching when client send message to broker

出于效率的原因,client producer 往同一个partition 发送records 时,会先cache 起来,然后批量发送。
这个是通过参数 linger.ms (by time) or batch.size (by size) 来控制的
kafka-producer-perf-test.sh 可以测试kafka 的producer的性能,可以用来调教client的参数,例如 linger.msbatch.size

Batching is a mechanism where the producer of messages holds onto several messages before forwarding them to the broker. By sending a single request that includes multiple messages to the broker, the client reduces network bandwidth at the cost of a bit of latency. For those clients that support it, a single batch can be compressed into a single message. The gains from compression are significantly better when there is more data, compared to single, small messages that may not benefit much individually from compression. How long the client holds the batch, the size in bytes of the batch, or the number of messages in the batch, is configurable by the client.

在这里插入图片描述

What is the commited messages in consumer side?

The consumer commit the messages that has been consumpted and comfirm the message offset back to kafka.

What is committed messages for producer

In Apache Kafka, committed data refers to messages that have been:

  1. Written to a Kafka topic partition, and
  2. Successfully replicated to meet the topic’s configured min.insync.replicas or acks configurations.

Producer 生产的records 之后被committed 之后才可以被consumer 消费。

Why doesn’t consumer.poll consume all delivered messages?

consumer.poll doesnt grant to fetch all sent records via once execution of poll

    producer.send(new ProducerRecord<>(TOPIC, "key-a", "value-a"), callBack);
    producer.send(new ProducerRecord<>(TOPIC, "value-b"), callBack);
    producer.flush();

	Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

    ConsumerRecords<String, String> crs = consumer.poll(Duration.ofSeconds(2));

    for (ConsumerRecord<String, String> cr : crs) {
      log.info("The consumed key {} and value is {}", cr.key(), cr.value());
    }

// This may only fetch the "value-a" record but not "value-b", need to poll again to consume "value-b"
// This is becuase poll doesnt grant to fetch all sent records via once exeuction of poll

  public void waitReceivingMessageAndClear(int milliSeconds)  {
    long endTimeStamp = System.currentTimeMillis() + milliSeconds;
    while (System.currentTimeMillis() < endTimeStamp) {
      // Why need to poll cycle to clear the cached messages?
      // This is becuase poll doesnt grant to fetch all sent records via once exeuction of poll
      // So poll may only fetch one record even though producer already sent multi successfully
      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
      consumer.poll(Duration.ZERO);
    }
  }

消息丢失

kafka 如何保证数据的availability

其实就是使用replication 机制。
可以通过下面的方式来达到:

  1. 可以使用 client 端的 acks=all 这种配置来保证replicas.
  2. 可以设置全局配置 min.insync.replicas 来保证replicas.

如果expect 的 replicas=3, 那么acks=all 情况下的 client 端的latency 是 acks=1 多2.5 倍数。多出的一倍要等多个 followers 并行同步,另外0.5 倍是要等leader确认follower 都同步了。

kafka如何保证发送message不丢失

kafka client 先在内存缓存然后就先respond了,之后再统一发送到kafka。
所以当client crash 掉是有可能出现消息丢失的。
但是因为client 没有收到 ack 那就认为消息没有成功发送,会重新发送。

所以只要开启了 enable.idempotence = trueacks=all 就可以了,参考下面如何保证消息顺序的解释

消息无序

如何保证produced message顺序

在broker 中绑定特定client 和partition的 network Thread只有respond了来自同一个client的request,才会处理下一个request。
这样就保证了来自同一个client,去往同一个partition 的消息是顺序的。
如果想要消息保证顺序,就需要发往同一个partition,即要保证前后两条消息的key要相同。

订单创建 → partition 0 (orderId=1001)
订单支付 → partition 0 (orderId=1001)  // 保证顺序
订单创建 → partition 1 (orderId=1002)  // 与1001的消息无序

Ps: network Thread 会监听多个clients.

但是在leader failure 的情况下, 还是可能出现messages 已经被存储/replicated了,但是leader还没有来得及返回response 给client的情况。这样client 会重新发送相同的数据到(新)leader,导致数据重复。 所以要设置 enable.idempotence = true on the producer which is the default value as of Kafka 3.0。With this set, the producer tags each event with a producer ID and a sequence number. These values will be sent with the events and stored in the log. If the events are sent again because of a failure, those same identifiers will be included. If duplicate events are sent, the broker will see that the producer ID and sequence number already exist and will reject those events and return a DUP response to the client.

Combining acks=all, producer idempotence, and keyed events results in a powerful end-to-end ordering guarantee. Events with a specific key will always land in a specific partition in the order they are sent, and consumers will always read them from that specific partition in that exact order.

简单来说就是针对每条消息有一个唯一的ID 来标记,每次发送的时候都会带上,再结合 acks=all 确保不丢失,就可以保证同一partition 的 端到端的 message 的顺序了。
如果没有acks=all 那么可能中间有消息丢失了,导致producer 这边是按顺序发送,但是consumer 这边中间有些消息没有收到。

总结就是kafka顺序保证是指可以保证相同key的消息的顺序。

消息重复

如何保证producer 不生产重复的message

这个其实和如何保证消息的序列(ordering)是一样的问题,如果消息能保证不丢失却有顺序,那就不会重复。

如何保证message不被重复消费

例如 consumer crash 之类导致offset 没有写回kafka,那么可能会导致的重复消费。

其实单独通过kafka 是没法保证message 不被重复消费的。
因为被消费的message offset 需要被写回kafka 才可以。

要达到 exact-once 需要 producer 和 consumer 配合才行。

producer 要求不能推送重复的消息,即幂等生产设计。
consumer 要求不能消费重复的消息,即幂等消费设计。

例如可以将消费过的消息ID保持在外部的redis/db中,然后再commit offset 回kafka.
处理下一条消息时,如果发现消息处理过了,就可以跳过。

这样也不能完全消除,如果要更加严格,就需使用kafka事务了。

过程类似如下:

1. 生产者开启事务
2. 写入业务消息
3. 消费者读取消息
4. 处理并写入结果
5. 提交消费位移(作为事务一部分)
6. 提交事务

Kafka transactions provide a mechanism for ensuring that a set of operations (production and consumption) are treated as a single atomic unit.
This can be used to ensure that a message is processed exactly once by coordinating the production and consumption steps within a transaction

这些都是需要trade-off的,更高的性能还是更严格的一致性。

Followers 怎么知道要向leader 发送同步请求呢?

Follower 总是会定期向 Leader 发送同步请求(FetchRequest),即使没有新消息, 由参数replica.fetch.wait.max.ms 控制,默认每 500ms。

Example:

FetchRequest
  offset: 3  # FetchRequest 会带上follower 自己的record offset,leader 会将差值在response 中返回

kafka 的record values 需要加上版本吗? 方便以后数据的演进

最佳实践建议, 对于重要业务数据, 强烈推荐包含版本号。使用结构化格式(JSON/Protobuf/Avro)

版本号设计example


// 生产端示例
public class VersionedRecord {
    private String schemaVersion = "1.0"; // 使用语义化版本
    private Object payload;
    
    // getters & setters
}

Consumer group 中的consumers 是如何被分发partitions的?

在consumer group 中,consumer的partition assignment 是在client 端决定的。
Client 这边可以定义partition assignment strategy.
这样实现的原因是,client 才知道自己的业务场景,才知道如何去哪些client consume 哪些partition。

例如两个不同的topic share相同的key,那么两个topic相同key所在的partitions都需要被相同的client consume。
这个就需要用到range partition assignment strategy.
其他的strategy 还有 round robin and stick partition assignment strategies 等。
Strategy 决定了clients如何连接partitions.


网站公告

今日签到

点亮在社区的每一天
去签到