Kafka 生产者和消费者高级用法
1 生产者的事务支持
Kafka 从版本0.11开始引入了事务支持,使得生产者可以实现原子操作,确保消息的可靠性。
// 示例代码:使用 Kafka 事务
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.send(new ProducerRecord<>("my-other-topic", "key", "value"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.close();
throw e;
}
2 消费者的多线程处理
在高吞吐量的场景下,多线程消费消息是提高效率的重要手段。消费者可以通过多线程同时处理多个分区的消息。
// 示例代码:多线程消费者
properties.put("max.poll.records", 500);
properties.put("max.poll.interval.ms", 300000);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题 "my-topic"
consumer.subscribe(Collections.singletonList("my-topic"));
// 多线程消费消息
int numberOfThreads = 5;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processRecord(record));
}
}
// 关闭消费者
consumer.close();
executor.shutdown();
3 自定义序列化和反序列化
Kafka 默认提供了一些基本的序列化和反序列化器,但你也可以根据需求自定义实现。这在处理复杂数据结构时非常有用。
// 示例代码:自定义序列化器
public class CustomSerializer implements Serializer<MyObject> {
@Override
public byte[] serialize(String topic, MyObject data) {
// 实现自定义序列化逻辑
}
}