Kafka生产者事务机制原理

发布于:2025-08-09 ⋅ 阅读:(17) ⋅ 点赞:(0)

【博客】
Kafka生产者事务机制原理


一、为什么要引入事务?

在使用 Kafka 的早期版本时,开发者经常会遇到两种场景:

  1. 跨会话重复消息
    Producer 重启后,之前的重试逻辑会导致同一条消息被再次发送,消费者需要做幂等处理。

  2. 跨分区原子性缺失
    一批消息要同时写入多个 Topic / 多个 Partition,如果某一条失败,前面成功的消息无法回滚,业务数据出现“中间状态”。

Kafka 在 0.11.0.0 引入的事务(Transactions)正是为了解决“恰好一次(Exactly-Once)语义”的痛点,同时兼顾跨会话幂等性、跨分区原子性和 consume-process-produce 模式的一致性。


二、事务的四大核心目标

目标 说明
原子性 一组消息要么全部成功,要么全部失败。
跨会话幂等 Producer 重启后,仍能识别并去重“上一次未完成的事务”。
一致性 consume-process-produce 模式下,消费位点与下游发送结果保持一致。
隔离性 事务未提交的消息对消费者不可见,防止脏读。

三、事务 API 速查表

Kafka Producer 端只提供了 5 个与事务相关的方法,掌握它们就能完成 90% 的编程需求:

方法 作用
initTransactions() 向 Coordinator 注册全局唯一 transactional.id,做初始化。
beginTransaction() 显式开启一个事务。
sendOffsetsToTransaction() 把消费者 offset 作为事务的一部分提交,用于 consume-process-produce 模式。
commitTransaction() 全部成功,两阶段提交中的“真正提交”。
abortTransaction() 出现异常,回滚当前事务。

Spring Boot 用户可以用 @TransactionalkafkaTemplate.executeInTransaction() 进行声明式/编程式事务,原理一致。


四、事务运行流程(两阶段提交 2PC)

Kafka 没有照搬传统 XA 的复杂协议,而是基于内部 Topic 实现了一个轻量级 2PC。

1. 组件角色

角色 说明
Producer 业务进程,负责发送消息。
Transaction Coordinator 一个 Broker 内的模块,充当 2PC 的协调者。
__transaction_state 内部 Topic,持久化事务状态(Ongoing → Prepare → Commit/Abort)。
目标 Topic-Partition 最终存放业务数据。

2. Kafka事务机制原理

源码位置:org.apache.kafka.clients.producer.internals.TransactionManager,画出Kafka 事务 2PC 全景

┌────────────────────────────────────────────────────────────┐
│                     Kafka 事务 2PC 全景                    │
├───────────────┐   ┌──────────────────┐   ┌──────────────────┐
│   Producer    │   │Transaction       │   │   Brokers        │
│(transaction.id│──▶│Coordinator(TC)   │◀──┤(Data partitions)│
└───────────────┘   └──────────────────┘   └──────────────────┘
        │                      │                     │
        │ 1. initTransactions  │                     │
        │--------------------->2. 写 __transaction_state
        │                      │   topic 记录 BEGIN
        │ 3. send()            │                     │
        │-----------------------------4. 写消息到目标分区
        │                      │                     │
        │ 5. commitTransaction │                     │
        │--------------------->6. 写 PREPARE_COMMIT
        │                      │                     │
        │                      │ 7. 给各分区写 COMMIT 标记
        │                      │◀────────────────────┘
        │                      │ 8. 写 __transaction_state
        │                      │   记录 COMMITTED
  1. 初始化阶段
    initTransactions() → 找到 Coordinator → 注册 transactional.id,幂等 Producer 自动开启(enable.idempotence=true)。

  2. 开始事务
    beginTransaction() 仅在客户端打一个标记,不会立即与 Broker 交互。

  3. 发送消息
    调用 producer.send(),消息并未直接写入目标分区,而是暂存客户端的 RecordAccumulator,并标记为事务消息。

  4. 预提交(Prepare)
    客户端 flush 或 commitTransaction() 时,Coordinator 收到 EndTxn(Prepare),把事务状态写入 __transaction_state,并向所有涉及的 Topic-Partition 写入 事务控制消息(Control Batch)。

  5. 正式提交(Commit)
    Coordinator 收到所有 Partition 的 ACK 后,写入 __transaction_state 的 Commit 标记,并向各 Partition Leader 发送 COMMIT Marker
    消费者只有在看到 COMMIT Marker 后,才能看到这批消息。至此事务对外可见。

  6. 异常回滚(Abort)
    任何一步失败,Producer 捕获异常后调用 abortTransaction(),流程同上,只是把标记改成 ABORT,消息对消费者永久不可见。

__transaction_state 状态有ongoingprepare committed,和对应操作的具体图示:

Producer            Transaction Coordinator            日志 & 分区
   |                       |                               |
   |--- init(t.id) ------>|--- 记录事务ID ---------------->|
   |                       |                               |
   |--- begin() --------->|--- 状态=ongoing -------------->|
   |                       |                               |
   |--- send 消息 -------->|--- 写入未提交数据 ------------>|
   |                       |                               |
   |--- commit() -------->|--- 状态=prepare -------------->|
   |                       |--- 写 commit marker ---------->|
   |                       |--- 状态=committed ------------>|

生产者、Transactions Coordinator的相互作用图示:
在这里插入图片描述
A:生产者通过initTransactions API向Coordinator 注册事务ID
B:Transactions Coordinator 记录事务日志
C:生产者把消息写入分区
D:分区和Coordinator的交互。(当事务完成以后,消息的状态应该是已提交,消费者才可以消费)


五、代码实现

原生 API

// 1. 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("transactional.id", "order-tx-" + UUID.randomUUID());
props.put("enable.idempotence", "true");        // 自动开启幂等
props.put("isolation.level", "read_committed"); // 消费者只读已提交

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

// 2. 初始化
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) continue;

    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> r : records) {
            String newVal = transform(r.value());   // 业务逻辑
            producer.send(new ProducerRecord<>("target-topic", r.key(), newVal));
        }
        // 把消费位点也放进事务
        producer.sendOffsetsToTransaction(
            offsets(records), 
            new ConsumerGroupMetadata("myGroup")
        );
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
    }
}

Spring Boot(声明式)

@Component
public class OrderListener {

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(topics = "order-in")
    public void listen(ConsumerRecord<String, String> record,
                       Acknowledgment ack) {
        template.executeInTransaction(t -> {
            try {
                // 1. 业务
                String newVal = processOrder(record.value());
                // 2. 写下游
                t.send("order-out", record.key(), newVal);
                // 3. 提交 offset
                t.sendOffsetsToTransaction(
                    Map.of(new TopicPartition(record.topic(), record.partition()),
                           new OffsetAndMetadata(record.offset() + 1)),
                    new ConsumerGroupMetadata("order-group"));
                return null;
            } catch (Exception e) {
                throw new KafkaException("事务失败", e); // 触发回滚
            }
        });
    }

    private String processOrder(String json) {
        // 业务逻辑
        return json.toUpperCase();
    }
}

六、小结

Kafka 事务 = 幂等 Producer + 两阶段提交 + 内部 Topic 日志。
掌握 init → begin → commit/abort 三步曲,即可获得消息层面的 ACID。
🚀 下一步:把本地数据库事务与 Kafka 事务组合,实现真正的 端到端 Exactly-Once。用思维导图总结本博客内容:

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到