1 事务简介
Kafka事务是Apache Kafka在流处理场景中实现Exactly-Once语义的核心机制。它允许生产者在跨多个分区和主题的操作中,以原子性(Atomicity)的方式提交或回滚消息,确保数据处理的最终一致性。例如,在流处理中,消费者读取消息后处理并生成新消息,若处理失败,事务可确保原始消息的消费偏移与新消息的发送同时回滚,避免数据不一致。
事务的核心作用:
原子性: 跨分区的写操作要么全部成功,要么全部失败。
隔离性: 事务未提交时,消息对消费者不可见(通过isolation.level=read_committed配置实现)。
持久性: 事务状态持久化至内部Topic __transaction_state,支持故障恢复。
2 事务原理详解
了解即可
kafka学习笔记(四、生产者、消费者(客户端)深入研究(三)——事务详解及代码实例)
3 示例
from confluent_kafka import Producer, KafkaException
import time
# 配置生产者
conf = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-id', # 唯一事务ID
'enable.idempotence': True, # 启用幂等性
'acks': 'all', # 必须为all
'retries': 5, # 必须启用重试
'debug': 'txn' # 开启事务调试日志(可选)
}
# 创建事务生产者
producer = Producer(conf)
try:
# 初始化事务(必须调用!)
producer.init_transactions()
# 开始事务
producer.begin_transaction()
try:
# 发送消息(事务内)
producer.produce(
topic='my_topic',
value=b'Message 1',
key=b'key1'
)
producer.produce(
topic='my_topic',
value=b'Message 2',
key=b'key2'
)
# 模拟业务逻辑(如数据库操作)
# ...
# 提交事务(消息对消费者可见)
producer.commit_transaction()
print("Transaction committed.")
except Exception as e:
# 回滚事务(丢弃未提交的消息)
producer.abort_transaction()
print(f"Transaction aborted: {e}")
except KafkaException as e:
print(f"Failed to initialize transactions: {e}")
finally:
# 关闭生产者
producer.flush(timeout=10)