Kafka 4.0 五大 API 选型指南、依赖坐标、上手示例与最佳实践

发布于:2025-08-29 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、怎么选:五大 API 的“场景分工”

  • Producer API:把事件写进某个 Topic。——“我产生日志、订单、埋点、传感器数据,要写进 Kafka。”
  • Consumer API:从 Topic 拉取事件并消费处理。——“我做风控、告警、入库、实时特征,订阅并处理。”
  • Streams API:把输入 Topic 的数据变换输出 Topic(过滤、聚合、窗口、Join、状态)。——“我做实时计算/微服务,既读又写,还要有状态。”
  • Connect API:以连接器持续同步外部系统 ↔ Kafka。——“我想把 MySQL/对象存储/ES 等批量接入/导出,少写代码甚至零代码。”
  • Admin API:管理与巡检主题、Broker、ACL 等对象。——“我需要创建/修改 Topic、查状态、运维脚本化。”

Kafka 的功能通过与语言无关的协议提供,多语言客户端很多;但仅 Java 客户端由主项目维护,其余为独立开源实现。

二、依赖坐标(Maven)

统一用 4.0.0。Gradle 可自行换写法。

Producer / Consumer / Admin(同一坐标):

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>4.0.0</version>
</dependency>

Streams(Java/Scala 通用核心库):

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>4.0.0</version>
</dependency>

Streams(Scala 2.13 可选 DSL 封装):

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams-scala_2.13</artifactId>
  <version>4.0.0</version>
</dependency>

三、Producer API:最小可用写入

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class DemoProducer {
  public static void main(String[] args) {
    Properties p = new Properties();
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    // 生产建议
    p.put(ProducerConfig.ACKS_CONFIG, "all");               // 强一致写入
    p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等
    // Kafka 4.0 默认 linger.ms=5,批量更高效
    try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
      ProducerRecord<String,String> rec = new ProducerRecord<>("quickstart-events", "key1", "hello kafka");
      producer.send(rec, (md, ex) -> {
        if (ex != null) ex.printStackTrace();
        else System.out.printf("OK topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());
      });
      producer.flush();
    }
  }
}

要点

  • acks=all + 幂等(enable.idempotence=true)是可靠写的默认组合。
  • 4.0 默认 linger.ms=5,通常更高吞吐且延迟不劣

四、Consumer API:订阅与拉取

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class DemoConsumer {
  public static void main(String[] args) {
    Properties p = new Properties();
    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    p.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新组从头开始

    try (KafkaConsumer<String, String> c = new KafkaConsumer<>(p)) {
      c.subscribe(List.of("quickstart-events"));
      while (true) {
        // 4.0 用 poll(Duration),不再使用 poll(long)
        ConsumerRecords<String,String> recs = c.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String,String> r : recs) {
          System.out.printf("key=%s value=%s offset=%d%n", r.key(), r.value(), r.offset());
        }
        // 手动提交示例
        c.commitAsync();
      }
    }
  }
}

要点

  • 4.0 不再提供 poll(long),请使用 poll(Duration)
  • 消费组并行度 = 分区数;按需扩展分区或消费者实例。

五、Streams API:把 Topic 当作“输入表/输出表”

一个最小单词计数拓扑(输入 input-topic → 输出 output-topic):

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {
  public static void main(String[] args) {
    Properties p = new Properties();
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String,String> text = builder.stream("input-topic");
    KTable<String,Long> counts = text
        .flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+")))
        .groupBy((k, word) -> word)
        .count();

    counts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
    KafkaStreams streams = new KafkaStreams(builder.build(), p);
    streams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}

要点

  • Streams 把 Kafka 作为“日志 + 状态快照”的存储,支持窗口、聚合、Join、事务(恰好一次) 等。
  • 4.0 清理了过时 API;常用 DSL 基本不受影响。

六、Connect API:零/少代码联通外部系统

多数场景无需编写自定义连接器:直接使用预构建 Source/Sink 即可(如 JDBC、对象存储、ES、BigQuery…)。

典型启动(Standalone 示例):

bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/connect-file-source.properties \
  config/connect-file-sink.properties

建议

  • 生产使用 分布式模式(多 worker、弹性容错)。
  • 设计好 DLT(死信主题) 与重试策略,避免“毒数据”卡死。
  • 有强 Schema 需求:结合 Schema Registry 管控演进。

七、Admin API:脚本化管理运维

与 Producer/Consumer 使用同一 kafka-clients 依赖。

创建 Topic(示例)

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class CreateTopicDemo {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties p = new Properties();
    p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    try (Admin admin = Admin.create(p)) {
      NewTopic t = new NewTopic("demo-admin-topic", 3, (short)1);
      admin.createTopics(List.of(t)).all().get();
      System.out.println("Created.");
    }
  }
}

修改配置(4.0 用 incrementalAlterConfigs)

ConfigResource res = new ConfigResource(ConfigResource.Type.TOPIC, "demo-admin-topic");
AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "2"),
                                     AlterConfigOp.OpType.SET);
admin.incrementalAlterConfigs(Map.of(res, List.of(op))).all().get();

八、从“单点调用”到“端到端应用”的组合拳

  1. 数据进入:业务服务 → Producer APIorders.created

  2. 实时处理Streams API 读取 orders.created,聚合出用户维度统计 → user.order.stats

  3. 下游分发

    • Consumer API:风控服务订阅 orders.created 实时判定
    • Connect:把 user.order.stats Sink 到 OLAP/湖仓
  4. 运维管理Admin API 创建/巡检 Topic、配额、ACL

九、Kafka 4.0 相关注意事项(API 侧)

  • 仅 Java 客户端为官方维护;其他语言(Go、Python、.NET、C/C++…)社区实现良多,但请留意版本兼容与协议支持。
  • Consumer:使用 poll(Duration)committed(TopicPartition) 等旧签名在 4.0 已移除对应变体(请用集合参数形式)。
  • Admin:使用 incrementalAlterConfigs,不要再用已移除的 alterConfigs
  • Producer:默认 linger.ms=5;幂等与事务结合使用时注意 max.in.flight.requests.per.connection 的设置。

十、落地清单(Checklist)

  • 统一依赖版本到 4.0.0kafka-clients / kafka-streams / kafka-streams-scala_2.13
  • Producer:acks=all、幂等、压测确认批量参数(linger.msbatch.size
  • Consumer:poll(Duration) 改造、消费组并行度与重试/DLT
  • Streams:状态存储大小与容错、窗口语义(事件时间/水位线)
  • Connect:选对 Source/Sink、分布式部署、DLT/重试、监控指标
  • Admin:脚本化创建/变更 Topic 与配额、滚动发布流程与回滚方案

网站公告

今日签到

点亮在社区的每一天
去签到