rdKafka驾驭手册:从青铜到王者的异步消息屠龙术

发布于:2025-09-12 ⋅ 阅读:(21) ⋅ 点赞:(0)

简介

rdkafka 是 Rust 语言的 Apache Kafka 客户端库,它基于 librdkafka(一个用 C 语言编写的高性能 Kafka 客户端库)构建。rdkafka 提供了与 Kafka 集群交互的完整功能,包括生产者、消费者、管理员操作等。

本文档将详细介绍如何使用 rdkafka 进行各种 Kafka 操作,并提供清晰的代码示例。

目录

  1. 安装与配置
  2. 基本概念
  3. 生产者 (Producer)
  4. 消费者 (Consumer)
  5. 管理员 (Admin)
  6. 高级功能
  7. 错误处理
  8. 性能优化
  9. 最佳实践

安装与配置

添加依赖

Cargo.toml 文件中添加 rdkafka 依赖:

[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "ssl", "sasl"] }
tokio = { version = "1.0", features = ["full"] }

系统依赖

在 Windows 上,需要安装以下依赖:

  1. GNU toolchain:使用 MSYS2 安装 MinGW

    • 下载并安装 MSYS2:https://www.msys2.org/
    • 更新依赖:pacman -Syu
    • 安装工具链:pacman -S --needed base-devel mingw-w64-x86_64-toolchain
  2. CMake:从 https://cmake.org/download/ 下载并安装

  3. 其他可选依赖

    • zlib:压缩支持
    • libssl-dev:SSL 支持
    • libsasl2-dev:SASL 支持
    • libzstd-dev:Zstandard 压缩支持

在 Linux 上,可以使用包管理器安装:

# Ubuntu/Debian
sudo apt-get install build-essential cmake libssl-dev libsasl2-dev libzstd-dev

# CentOS/RHEL
sudo yum install gcc-c++ cmake openssl-devel cyrus-sasl-devel libzstd-devel

基本概念

Kafka 架构

  • Producer:消息生产者,向 Kafka broker 发送消息的客户端
  • Consumer:消息消费者,从 Kafka broker 读取消息的客户端
  • Consumer Group:消费者组,由多个 consumer 组成
  • Broker:一台 Kafka 服务器就是一个 broker
  • Topic:消息类别,可以理解为一个队列
  • Partition:为了实现扩展性,一个 topic 可以分为多个 partition
  • Replica:副本,一个 topic 的每个分区都有若干个副本
  • Leader:每个分区多个副本的"主",生产者发送数据的对象
  • Follower:每个分区多个副本中的"从",实时从 Leader 中同步数据

rdkafka 组件

rdkafka 提供了以下主要组件:

  • FutureProducer:异步生产者
  • BaseProducer:同步生产者
  • StreamConsumer:基于流的消费者
  • BaseConsumer:基础消费者
  • AdminClient:管理员客户端,用于管理 Kafka 集群

生产者 (Producer)

创建生产者

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, BaseProducer};
use rdkafka::producer::FutureRecord;
use std::time::Duration;

async fn create_producer(brokers: &str) -> FutureProducer {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .set("acks", "all")
        .create()
        .expect("Producer creation error");
    
    producer
}

发送消息(异步)

use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;

async fn send_message_async(
    producer: &FutureProducer,
    topic: &str,
    key: Option<&str>,
    payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
    let record = FutureRecord::to(topic)
        .key(key)
        .payload(payload);
    
    let delivery_status = producer.send(record, Duration::from_secs(0)).await;
    
    match delivery_status {
        Ok(_) => {
            println!("Message sent successfully");
            Ok(())
        }
        Err((e, _)) => {
            eprintln!("Failed to send message: {}", e);
            Err(e)
        }
    }
}

发送消息(同步)

use rdkafka::producer::BaseProducer;
use rdkafka::producer::BaseRecord;
use rdkafka::message::ToBytes;

fn send_message_sync(
    producer: &BaseProducer,
    topic: &str,
    key: Option<&str>,
    payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
    let record = BaseRecord::to(topic)
        .key(key)
        .payload(payload);
    
    producer.send(record)?;
    
    // 确保所有消息都已发送
    producer.flush(Duration::from_secs(1));
    
    Ok(())
}

批量发送消息

use rdkafka::producer::FutureRecord;
use futures::future::try_join_all;

async fn send_messages_batch(
    producer: &FutureProducer,
    topic: &str,
    messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {
    let futures = messages.into_iter().map(|(key, payload)| {
        let record = FutureRecord::to(topic)
            .key(key)
            .payload(payload);
        
        producer.send(record, Duration::from_secs(0))
    });
    
    let results = try_join_all(futures).await;
    
    match results {
        Ok(_) => {
            println!("All messages sent successfully");
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to send some messages: {}", e);
            Err(e.into())
        }
    }
}

带消息头的消息发送

use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;

async fn send_message_with_headers(
    producer: &FutureProducer,
    topic: &str,
    key: Option<&str>,
    payload: Option<&[u8]>,
    headers: Vec<(String, Vec<u8>)>,
) -> Result<(), rdkafka::error::KafkaError> {
    let mut owned_headers = OwnedHeaders::new();
    
    for (key, value) in headers {
        owned_headers = owned_headers.add(Header::new(key, value));
    }
    
    let record = FutureRecord::to(topic)
        .key(key)
        .payload(payload)
        .headers(owned_headers);
    
    let delivery_status = producer.send(record, Duration::from_secs(0)).await;
    
    match delivery_status {
        Ok(_) => {
            println!("Message with headers sent successfully");
            Ok(())
        }
        Err((e, _)) => {
            eprintln!("Failed to send message with headers: {}", e);
            Err(e)
        }
    }
}

指定分区的消息发送

use rdkafka::producer::FutureRecord;

async fn send_message_to_partition(
    producer: &FutureProducer,
    topic: &str,
    partition: i32,
    key: Option<&str>,
    payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
    let record = FutureRecord::to(topic)
        .partition(partition)
        .key(key)
        .payload(payload);
    
    let delivery_status = producer.send(record, Duration::from_secs(0)).await;
    
    match delivery_status {
        Ok(_) => {
            println!("Message sent to partition {} successfully", partition);
            Ok(())
        }
        Err((e, _)) => {
            eprintln!("Failed to send message to partition {}: {}", partition, e);
            Err(e)
        }
    }
}

消费者 (Consumer)

创建消费者

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{StreamConsumer, BaseConsumer, CommitMode, Consumer};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;
use std::time::Duration;

async fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer {
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create()
        .expect("Consumer creation failed");
    
    consumer.subscribe(&topics.to_vec())
        .expect("Can't subscribe to specified topics");
    
    consumer
}

基础消费者(同步)

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use std::time::Duration;

fn consume_messages_sync(consumer: &BaseConsumer, timeout_ms: u64) {
    loop {
        match consumer.poll(Duration::from_millis(timeout_ms)) {
            Some(Ok(msg)) => {
                match msg.view() {
                    Some(payload) => {
                        println!("Received message: {:?}", payload);
                        
                        // 手动提交偏移量
                        consumer.commit_message(&msg, CommitMode::Sync)
                            .expect("Failed to commit message");
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Some(Err(e)) => {
                eprintln!("Error while consuming: {}", e);
            }
            None => {
                // 超时,没有消息
            }
        }
    }
}

流式消费者(异步)

use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::message::Message;
use futures::StreamExt;

async fn consume_messages_stream(consumer: &StreamConsumer) {
    let mut message_stream = consumer.stream();
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(msg) => {
                match msg.view() {
                    Some(payload) => {
                        println!("Received message: {:?}", payload);
                        
                        // 手动提交偏移量
                        consumer.commit_message(&msg, CommitMode::Async)
                            .expect("Failed to commit message");
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
            }
        }
    }
}

从特定分区和偏移量消费

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;

fn consume_from_offset(
    consumer: &BaseConsumer,
    topic: &str,
    partition: i32,
    offset: i64,
) {
    let mut tpl = TopicPartitionList::new();
    tpl.add_partition_offset(topic, partition, Offset::Offset(offset))
        .expect("Failed to add partition offset");
    
    consumer.assign(&tpl)
        .expect("Failed to assign partition");
    
    loop {
        match consumer.poll(Duration::from_millis(100)) {
            Some(Ok(msg)) => {
                match msg.view() {
                    Some(payload) => {
                        println!("Received message from partition {} at offset {}: {:?}", 
                                partition, msg.offset(), payload);
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Some(Err(e)) => {
                eprintln!("Error while consuming: {}", e);
            }
            None => {
                // 超时,没有消息
            }
        }
    }
}

获取消费者位置

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;

fn get_consumer_position(consumer: &BaseConsumer, topic: &str, partition: i32) -> i64 {
    let mut tpl = TopicPartitionList::new();
    tpl.add_partition(topic, partition);
    
    let position = consumer.position(&tpl)
        .expect("Failed to get consumer position");
    
    for elem in position.elements() {
        if elem.topic() == topic && elem.partition() == partition {
            return elem.offset().to_raw();
        }
    }
    
    -1
}

获取水印偏移量

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;

fn get_watermark_offsets(
    consumer: &BaseConsumer,
    topic: &str,
    partition: i32,
) -> (i64, i64) {
    let mut tpl = TopicPartitionList::new();
    tpl.add_partition(topic, partition);
    
    let (low, high) = consumer.get_watermark_offsets(&tpl, Duration::from_secs(1))
        .expect("Failed to get watermark offsets");
    
    (low, high)
}

消费者组管理

use rdkafka::consumer::{StreamConsumer, ConsumerGroupMetadata};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;

fn get_consumer_group_metadata(consumer: &StreamConsumer) -> ConsumerGroupMetadata {
    consumer.group_metadata()
        .expect("Failed to get consumer group metadata")
}

管理员 (Admin)

创建管理员客户端

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication, AlterConfig, ConfigSource};
use rdkafka::config::ClientConfig;
use std::time::Duration;

async fn create_admin_client(brokers: &str) -> AdminClient {
    let admin_client: AdminClient = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .create()
        .expect("Admin client creation failed");
    
    admin_client
}

创建主题

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};

async fn create_topic(
    admin_client: &AdminClient,
    topic_name: &str,
    num_partitions: i32,
    replication_factor: i32,
) -> Result<(), rdkafka::error::KafkaError> {
    let new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));
    
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.create_topics(&[new_topic], &options).await;
    
    match result {
        Ok(_) => {
            println!("Topic '{}' created successfully", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to create topic '{}': {}", topic_name, e);
            Err(e)
        }
    }
}

创建主题时的参数配置

在创建 Kafka 主题时,除了基本的分区数和复制因子外,还可以设置许多其他配置参数。这些参数可以通过 NewTopicset 方法来设置。

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;

async fn create_topic_with_configs(
    admin_client: &AdminClient,
    topic_name: &str,
    num_partitions: i32,
    replication_factor: i32,
    configs: &HashMap<String, String>,
) -> Result<(), rdkafka::error::KafkaError> {
    let mut new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));
    
    // 设置主题配置参数
    for (key, value) in configs {
        new_topic = new_topic.set(key, value);
    }
    
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.create_topics(&[new_topic], &options).await;
    
    match result {
        Ok(_) => {
            println!("Topic '{}' created successfully with custom configs", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to create topic '{}': {}", topic_name, e);
            Err(e)
        }
    }
}

主题配置参数详解

以下是创建 Kafka 主题时可以设置的所有参数及其说明:

1. 日志清理策略参数
参数名 默认值 说明 可选值
cleanup.policy delete 日志清理策略,决定如何处理旧数据 delete(基于时间或大小删除)、compact(日志压缩)、compact,delete(同时启用压缩和删除)
delete.retention.ms 86400000 (24小时) 已删除消息的保留时间(仅当 cleanup.policy 包含 delete 时有效) 正整数(毫秒)
min.compaction.lag.ms 0 消息被压缩前的最小等待时间(仅当 cleanup.policy 包含 compact 时有效) 正整数(毫秒)
max.compaction.lag.ms 9223372036854775807 消息被压缩前的最大等待时间(仅当 cleanup.policy 包含 compact 时有效) 正整数(毫秒)
min.cleanable.dirty.ratio 0.5 日志清理的触发阈值,当日志中未清理消息的比例超过此值时触发清理 0到1之间的浮点数
2. 日志保留策略参数
参数名 默认值 说明 可选值
retention.ms 604800000 (7天) 消息保留时间,超过此时间的消息将被删除 正整数(毫秒),-1表示永不删除
retention.bytes -1 主题的最大大小,超过此大小的消息将被删除 正整数(字节),-1表示无限制
retention.check.interval.ms 300000 (5分钟) 日志保留策略的检查间隔 正整数(毫秒)
segment.ms 604800000 (7天) 日志段滚动的时间间隔,超过此时间将创建新的日志段 正整数(毫秒)
segment.bytes 1073741824 (1GB) 单个日志段的最大大小 正整数(字节)
segment.index.bytes 10485760 (10MB) 日志索引文件的最大大小 正整数(字节)
segment.jitter.ms 0 日志段滚动时间的最大随机抖动,用于避免同时滚动多个日志段 正整数(毫秒)
3. 消息大小和时间戳参数
参数名 默认值 说明 可选值
max.message.bytes 1048588 (1MB) 单个消息的最大大小 正整数(字节)
message.timestamp.type CreateTime 消息时间戳类型 CreateTime(消息创建时间)、LogAppendTime(日志追加时间)
message.timestamp.difference.max.ms 9223372036854775807 允许的消息时间戳与 broker 时间戳之间的最大差异 正整数(毫秒)
message.downconversion.enable true 是否启用消息格式降级转换 truefalse
4. 日志刷新和同步参数
参数名 默认值 说明 可选值
flush.messages 9223372036854775807 日志刷新的消息数量阈值,超过此数量的消息将刷新到磁盘 正整数,-1表示禁用
flush.ms 9223372036854775807 日志刷新的时间间隔,超过此时间将刷新到磁盘 正整数(毫秒),-1表示禁用
unclean.leader.election.enable false 是否允许在数据丢失的情况下选举新的 leader truefalse
5. 索引和缓存参数
参数名 默认值 说明 可选值
index.interval.bytes 4096 (4KB) 索引项之间的字节数 正整数(字节)
preallocate false 是否预分配日志文件 truefalse
6. 压缩参数
参数名 默认值 说明 可选值
compression.type producer 消息压缩类型 producer(使用生产者指定的压缩类型)、nonegzipsnappylz4zstd
7. 副本和领导者参数
参数名 默认值 说明 可选值
min.insync.replicas 1 当生产者设置 acks=all 时,必须成功写入的最小副本数 正整数(1 ≤ 值 ≤ 副本因子)
leader.replication.throttled.replicas 被限制复制的副本列表 副本ID列表,例如:0:1,1:2
8. 远程日志存储参数
参数名 默认值 说明 可选值
remote.log.storage.enable false 是否启用远程日志存储 truefalse
local.log.retention.ms -2 本地日志保留时间,-2表示使用 retention.ms 的值 正整数(毫秒),-2表示使用 retention.ms 的值
local.log.retention.bytes -2 本地日志保留大小,-2表示使用 retention.bytes 的值 正整数(字节),-2表示使用 retention.bytes 的值

创建带配置的主题示例

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;

async fn create_topic_with_example_configs(
    admin_client: &AdminClient,
    topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {
    let mut configs = HashMap::new();
    
    // 设置日志清理策略为删除和压缩
    configs.insert("cleanup.policy".to_string(), "delete,compact".to_string());
    
    // 设置消息保留时间为3天
    configs.insert("retention.ms".to_string(), "259200000".to_string());
    
    // 设置单个日志段的最大大小为512MB
    configs.insert("segment.bytes".to_string(), "536870912".to_string());
    
    // 设置单个消息的最大大小为5MB
    configs.insert("max.message.bytes".to_string(), "5242880".to_string());
    
    // 设置消息时间戳类型为日志追加时间
    configs.insert("message.timestamp.type".to_string(), "LogAppendTime".to_string());
    
    // 设置压缩类型为lz4
    configs.insert("compression.type".to_string(), "lz4".to_string());
    
    // 设置必须成功写入的最小副本数为2
    configs.insert("min.insync.replicas".to_string(), "2".to_string());
    
    let mut new_topic = NewTopic::new(topic_name, 6, TopicReplication::Fixed(3));
    
    // 应用配置
    for (key, value) in &configs {
        new_topic = new_topic.set(key, value);
    }
    
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));
    
    let result = admin_client.create_topics(&[new_topic], &options).await;
    
    match result {
        Ok(_) => {
            println!("Topic '{}' created successfully with custom configs", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to create topic '{}': {}", topic_name, e);
            Err(e)
        }
    }
}

删除主题

use rdkafka::admin::{AdminClient, AdminOptions};

async fn delete_topic(
    admin_client: &AdminClient,
    topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.delete_topics(&[topic_name], &options).await;
    
    match result {
        Ok(_) => {
            println!("Topic '{}' deleted successfully", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to delete topic '{}': {}", topic_name, e);
            Err(e)
        }
    }
}

列出所有主题

use rdkafka::admin::{AdminClient, AdminOptions, Metadata};

async fn list_topics(admin_client: &AdminClient) -> Result<Vec<String>, rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let metadata: Metadata = admin_client.inner().fetch_metadata(None, Duration::from_secs(5))?;
    
    let topics: Vec<String> = metadata.topics()
        .iter()
        .map(|topic| topic.name().to_string())
        .collect();
    
    Ok(topics)
}

获取主题详情

use rdkafka::admin::{AdminClient, AdminOptions, Metadata};

async fn get_topic_details(
    admin_client: &AdminClient,
    topic_name: &str,
) -> Result<(i32, i32), rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let metadata: Metadata = admin_client.inner().fetch_metadata(Some(topic_name), Duration::from_secs(5))?;
    
    for topic in metadata.topics() {
        if topic.name() == topic_name {
            let num_partitions = topic.partitions().len() as i32;
            let replication_factor = topic.partitions()
                .first()
                .map(|p| p.replicas().len() as i32)
                .unwrap_or(0);
            
            return Ok((num_partitions, replication_factor));
        }
    }
    
    Err(rdkafka::error::KafkaError::UnknownTopicOrPartition)
}

修改主题配置

use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ConfigSource};

async fn alter_topic_config(
    admin_client: &AdminClient,
    topic_name: &str,
    config_updates: Vec<(String, String)>,
) -> Result<(), rdkafka::error::KafkaError> {
    let mut alter_config = AlterConfig::new(ConfigSource::Topic(topic_name.to_string()));
    
    for (key, value) in config_updates {
        alter_config = alter_config.set(key, value);
    }
    
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.alter_configs(&[alter_config], &options).await;
    
    match result {
        Ok(_) => {
            println!("Topic '{}' config updated successfully", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to update topic '{}' config: {}", topic_name, e);
            Err(e)
        }
    }
}

创建分区

use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions};

async fn create_partitions(
    admin_client: &AdminClient,
    topic_name: &str,
    new_total_count: i32,
) -> Result<(), rdkafka::error::KafkaError> {
    let new_partitions = NewPartitions::new(topic_name, new_total_count);
    
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.create_partitions(&[new_partitions], &options).await;
    
    match result {
        Ok(_) => {
            println!("Partitions for topic '{}' created successfully", topic_name);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to create partitions for topic '{}': {}", topic_name, e);
            Err(e)
        }
    }
}

列出消费者组

use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};

async fn list_consumer_groups(admin_client: &AdminClient) -> Result<Vec<GroupListing>, rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let groups = admin_client.list_consumer_groups(&options).await?;
    
    Ok(groups)
}

描述消费者组

use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};

async fn describe_consumer_group(
    admin_client: &AdminClient,
    group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let group_description = admin_client.describe_consumer_groups(&[group_id], &options).await?;
    
    for description in group_description {
        println!("Group ID: {}", description.group_id());
        println!("State: {}", description.state());
        println!("Coordinator: {:?}", description.coordinator());
        
        for member in description.members() {
            println!("Member ID: {}", member.member_id());
            println!("Client ID: {}", member.client_id());
            println!("Host: {}", member.client_host());
            
            for assignment in member.assignment() {
                println!("Topic: {}, Partition: {}", assignment.topic(), assignment.partition());
            }
        }
    }
    
    Ok(())
}

删除消费者组

use rdkafka::admin::{AdminClient, AdminOptions};

async fn delete_consumer_group(
    admin_client: &AdminClient,
    group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    let result = admin_client.delete_consumer_groups(&[group_id], &options).await;
    
    match result {
        Ok(_) => {
            println!("Consumer group '{}' deleted successfully", group_id);
            Ok(())
        }
        Err(e) => {
            eprintln!("Failed to delete consumer group '{}': {}", group_id, e);
            Err(e)
        }
    }
}

高级功能

事务支持

use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use std::time::Duration;

async fn create_transactional_producer(
    brokers: &str,
    transactional_id: &str,
) -> TransactionalProducer {
    let producer: TransactionalProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("transactional.id", transactional_id)
        .set("enable.idempotence", "true")
        .create()
        .expect("Transactional producer creation failed");
    
    producer
}

async fn send_messages_in_transaction(
    producer: &TransactionalProducer,
    topic: &str,
    messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {
    // 初始化事务
    producer.init_transactions(Duration::from_secs(5)).await?;
    
    // 开始事务
    producer.begin_transaction().await?;
    
    // 发送消息
    for (key, payload) in messages {
        let record = FutureRecord::to(topic)
            .key(key)
            .payload(payload);
        
        producer.send(record, Duration::from_secs(0)).await?;
    }
    
    // 提交事务
    producer.commit_transaction(Duration::from_secs(5)).await?;
    
    Ok(())
}

精确一次语义 (Exactly-Once Semantics)

use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::message::Message;
use std::time::Duration;

async fn exactly_once_processing(
    input_topic: &str,
    output_topic: &str,
    brokers: &str,
    group_id: &str,
    transactional_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
    // 创建事务性生产者
    let producer: TransactionalProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("transactional.id", transactional_id)
        .set("enable.idempotence", "true")
        .create()
        .expect("Transactional producer creation failed");
    
    // 创建消费者
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.auto.commit", "false")
        .set("isolation.level", "read_committed")
        .create()
        .expect("Consumer creation failed");
    
    consumer.subscribe(&[input_topic])
        .expect("Can't subscribe to input topic");
    
    // 初始化事务
    producer.init_transactions(Duration::from_secs(5)).await?;
    
    let mut message_stream = consumer.stream();
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(input_msg) => {
                // 开始事务
                producer.begin_transaction().await?;
                
                // 处理消息
                let processed_payload = process_message(input_msg.view().unwrap_or(&[]));
                
                // 发送处理后的消息
                let record = FutureRecord::to(output_topic)
                    .key(input_msg.key())
                    .payload(&processed_payload);
                
                producer.send(record, Duration::from_secs(0)).await?;
                
                // 提交消费者偏移量
                producer.send_offsets_to_transaction(
                    &consumer.consumer_group_metadata()?,
                    Duration::from_secs(5),
                ).await?;
                
                // 提交事务
                producer.commit_transaction(Duration::from_secs(5)).await?;
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
                
                // 中止事务
                producer.abort_transaction(Duration::from_secs(5)).await?;
            }
        }
    }
    
    Ok(())
}

fn process_message(input: &[u8]) -> Vec<u8> {
    // 这里实现消息处理逻辑
    // 示例:简单地将消息转换为大写
    input.to_uppercase()
}

自定义分区器

use rdkafka::producer::{DefaultProducerContext, FutureProducer, ProducerContext};
use rdkafka::config::ClientConfig;
use rdkafka::message::{Message, ToBytes};
use rdkafka::util::Timeout;
use std::time::Duration;

struct CustomPartitioner;

impl ProducerContext for CustomPartitioner {
    fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: i32, _key_data: Option<&[u8]>) -> i32 {
        // 自定义分区逻辑
        // 示例:基于键的哈希值进行分区
        if let Some(key) = key {
            let hash = key.iter().fold(0, |acc, &x| acc.wrapping_add(x as usize));
            (hash % partition_count as usize) as i32
        } else {
            // 如果没有键,使用轮询策略
            let current_time = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap_or_default()
                .as_secs();
            (current_time % partition_count as u64) as i32
        }
    }
}

async fn create_producer_with_custom_partitioner(brokers: &str) -> FutureProducer<CustomPartitioner> {
    let producer: FutureProducer<CustomPartitioner> = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .create_with_context(CustomPartitioner)
        .expect("Producer creation error");
    
    producer
}

自定义序列化器

use serde::{Serialize, Deserialize};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::error::Error;

#[derive(Serialize, Deserialize, Debug)]
struct MyData {
    id: u32,
    name: String,
    timestamp: i64,
}

trait MessageSerializer<T> {
    fn serialize(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>;
}

trait MessageDeserializer<T> {
    fn deserialize(&self, bytes: &[u8]) -> Result<T, Box<dyn Error>>;
}

struct JsonSerializer;

impl MessageSerializer<MyData> for JsonSerializer {
    fn serialize(&self, data: &MyData) -> Result<Vec<u8>, Box<dyn Error>> {
        Ok(serde_json::to_vec(data)?)
    }
}

struct JsonDeserializer;

impl MessageDeserializer<MyData> for JsonDeserializer {
    fn deserialize(&self, bytes: &[u8]) -> Result<MyData, Box<dyn Error>> {
        Ok(serde_json::from_slice(bytes)?)
    }
}

async fn send_serialized_message(
    producer: &FutureProducer,
    topic: &str,
    key: Option<&str>,
    data: &MyData,
    serializer: &impl MessageSerializer<MyData>,
) -> Result<(), Box<dyn Error>> {
    let payload = serializer.serialize(data)?;
    
    let record = FutureRecord::to(topic)
        .key(key)
        .payload(&payload);
    
    let delivery_status = producer.send(record, Duration::from_secs(0)).await;
    
    match delivery_status {
        Ok(_) => {
            println!("Serialized message sent successfully");
            Ok(())
        }
        Err((e, _)) => {
            eprintln!("Failed to send serialized message: {}", e);
            Err(Box::new(e))
        }
    }
}

消息拦截器

use rdkafka::producer::{FutureProducer, ProducerContext};
use rdkafka::consumer::{StreamConsumer, ConsumerContext};
use rdkafka::message::{Message, OwnedMessage};
use rdkafka::config::ClientConfig;
use std::time::Duration;

struct LoggingProducerContext;

impl ProducerContext for LoggingProducerContext {
    fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult) {
        match delivery_result {
            Ok(message) => {
                println!("Message delivered to topic {}, partition {} at offset {}",
                        message.topic(), message.partition(), message.offset());
            }
            Err((e, message)) => {
                eprintln!("Failed to deliver message to topic {}, partition {}: {}",
                         message.topic(), message.partition(), e);
            }
        }
    }
}

struct LoggingConsumerContext;

impl ConsumerContext for LoggingConsumerContext {
    fn message_consumed(&self, message: &OwnedMessage) {
        match message.payload_view::<str>() {
            Ok(Some(payload)) => {
                println!("Message consumed from topic {}, partition {} at offset {}: {}",
                        message.topic(), message.partition(), message.offset(), payload);
            }
            Ok(None) => {
                println!("Empty message consumed from topic {}, partition {} at offset {}",
                        message.topic(), message.partition(), message.offset());
            }
            Err(e) => {
                eprintln!("Error while consuming message from topic {}, partition {}: {}",
                         message.topic(), message.partition(), e);
            }
        }
    }
}

async fn create_producer_with_interceptor(brokers: &str) -> FutureProducer<LoggingProducerContext> {
    let producer: FutureProducer<LoggingProducerContext> = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .create_with_context(LoggingProducerContext)
        .expect("Producer creation error");
    
    producer
}

async fn create_consumer_with_interceptor(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer<LoggingConsumerContext> {
    let consumer: StreamConsumer<LoggingConsumerContext> = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create_with_context(LoggingConsumerContext)
        .expect("Consumer creation failed");
    
    consumer.subscribe(&topics.to_vec())
        .expect("Can't subscribe to specified topics");
    
    consumer
}

消息过滤

use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;

async fn filter_messages(
    consumer: &StreamConsumer,
    filter_fn: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {
    let mut message_stream = consumer.stream();
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(msg) => {
                match msg.view() {
                    Some(payload) => {
                        if filter_fn(payload) {
                            println!("Filtered message: {:?}", payload);
                            // 处理符合条件的消息
                        }
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
            }
        }
    }
}

// 示例过滤函数
fn example_filter(payload: &[u8]) -> bool {
    // 只处理包含特定字符串的消息
    let payload_str = std::str::from_utf8(payload).unwrap_or("");
    payload_str.contains("important")
}

消息转换

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;

async fn transform_messages(
    input_consumer: &StreamConsumer,
    output_producer: &FutureProducer,
    input_topic: &str,
    output_topic: &str,
    transform_fn: impl Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
) {
    let mut message_stream = input_consumer.stream();
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(msg) => {
                match msg.view() {
                    Some(payload) => {
                        // 转换消息
                        let transformed_payload = transform_fn(payload);
                        
                        // 发送转换后的消息
                        let record = FutureRecord::to(output_topic)
                            .key(msg.key())
                            .payload(&transformed_payload);
                        
                        match output_producer.send(record, Duration::from_secs(0)).await {
                            Ok(_) => {
                                println!("Transformed message sent successfully");
                            }
                            Err((e, _)) => {
                                eprintln!("Failed to send transformed message: {}", e);
                            }
                        }
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
            }
        }
    }
}

// 示例转换函数
fn example_transform(payload: &[u8]) -> Vec<u8> {
    // 将消息转换为大写
    let payload_str = std::str::from_utf8(payload).unwrap_or("");
    payload_str.to_uppercase().into_bytes()
}

错误处理

基本错误处理

use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;

async fn send_message_with_error_handling(
    producer: &FutureProducer,
    topic: &str,
    key: Option<&str>,
    payload: Option<&[u8]>,
) -> Result<(), Box<dyn std::error::Error>> {
    let record = FutureRecord::to(topic)
        .key(key)
        .payload(payload);
    
    match producer.send(record, Duration::from_secs(0)).await {
        Ok(_) => {
            println!("Message sent successfully");
            Ok(())
        }
        Err((e, _)) => {
            match e {
                KafkaError::MessageProduction(RDKafkaError::BrokerTransportFailure(_)) => {
                    eprintln!("Broker transport failure: {}", e);
                    // 可以尝试重新连接或使用备用 broker
                }
                KafkaError::MessageProduction(RDKafkaError::QueueFull) => {
                    eprintln!("Producer queue is full: {}", e);
                    // 可以等待一段时间后重试
                }
                KafkaError::MessageProduction(RDKafkaError::MessageSizeTooLarge) => {
                    eprintln!("Message size too large: {}", e);
                    // 可以尝试压缩消息或减小消息大小
                }
                KafkaError::MessageProduction(RDKafkaError::UnknownTopicOrPartition) => {
                    eprintln!("Unknown topic or partition: {}", e);
                    // 可以尝试创建主题或检查主题名称
                }
                _ => {
                    eprintln!("Failed to send message: {}", e);
                }
            }
            Err(Box::new(e))
        }
    }
}

重试机制

use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
use tokio::time::sleep;

async fn send_message_with_retry(
    producer: &FutureProducer,
    topic: &str,
    key: Option<&str>,
    payload: Option<&[u8]>,
    max_retries: u32,
    retry_delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut retries = 0;
    
    loop {
        let record = FutureRecord::to(topic)
            .key(key)
            .payload(payload);
        
        match producer.send(record, Duration::from_secs(0)).await {
            Ok(_) => {
                println!("Message sent successfully");
                return Ok(());
            }
            Err((e, _)) => {
                retries += 1;
                
                if retries >= max_retries {
                    eprintln!("Failed to send message after {} retries: {}", max_retries, e);
                    return Err(Box::new(e));
                }
                
                eprintln!("Failed to send message (attempt {}/{}): {}, retrying in {:?}...",
                         retries, max_retries, e, retry_delay);
                
                sleep(retry_delay).await;
            }
        }
    }
}

死信队列处理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;

async fn dead_letter_queue_handler(
    main_consumer: &StreamConsumer,
    dlq_producer: &FutureProducer,
    main_topic: &str,
    dlq_topic: &str,
    error_handler: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {
    let mut message_stream = main_consumer.stream();
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(msg) => {
                match msg.view() {
                    Some(payload) => {
                        // 尝试处理消息
                        if error_handler(payload) {
                            // 处理成功
                            println!("Message processed successfully");
                        } else {
                            // 处理失败,发送到死信队列
                            let record = FutureRecord::to(dlq_topic)
                                .key(msg.key())
                                .payload(payload);
                            
                            match dlq_producer.send(record, Duration::from_secs(0)).await {
                                Ok(_) => {
                                    println!("Failed message sent to dead letter queue");
                                }
                                Err((e, _)) => {
                                    eprintln!("Failed to send message to dead letter queue: {}", e);
                                }
                            }
                        }
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
            }
        }
    }
}

// 示例错误处理函数
fn example_error_handler(payload: &[u8]) -> bool {
    // 模拟处理消息,有时会失败
    let payload_str = std::str::from_utf8(payload).unwrap_or("");
    
    // 如果消息包含 "error",则处理失败
    if payload_str.contains("error") {
        return false;
    }
    
    // 否则处理成功
    true
}

性能优化

批量处理

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use futures::future::try_join_all;
use std::time::Duration;

async fn batch_send_messages(
    producer: &FutureProducer,
    topic: &str,
    messages: Vec<(Option<&str>, Option<&[u8]>)>,
    batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {
    for chunk in messages.chunks(batch_size) {
        let futures = chunk.iter().map(|(key, payload)| {
            let record = FutureRecord::to(topic)
                .key(*key)
                .payload(*payload);
            
            producer.send(record, Duration::from_secs(0))
        });
        
        let results = try_join_all(futures).await;
        
        match results {
            Ok(_) => {
                println!("Batch of {} messages sent successfully", chunk.len());
            }
            Err(e) => {
                eprintln!("Failed to send batch of messages: {}", e);
                return Err(e.into());
            }
        }
    }
    
    Ok(())
}

异步并发处理

use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;
use tokio::task;

async fn concurrent_message_processing(
    consumer: &StreamConsumer,
    concurrency_level: usize,
    process_fn: impl Fn(&[u8]) + Send + Sync + 'static,
) {
    let mut message_stream = consumer.stream();
    let mut tasks = Vec::with_capacity(concurrency_level);
    
    while let Some(result) = message_stream.next().await {
        match result {
            Ok(msg) => {
                match msg.view() {
                    Some(payload) => {
                        let payload = payload.to_vec();
                        let process_fn = &process_fn;
                        
                        // 如果已经达到并发限制,等待一个任务完成
                        if tasks.len() >= concurrency_level {
                            if let Some(task_result) = tasks.pop() {
                                let _ = task_result.await;
                            }
                        }
                        
                        // 启动新的处理任务
                        let task = task::spawn(async move {
                            process_fn(&payload);
                        });
                        
                        tasks.push(task);
                    }
                    None => {
                        println!("Received empty message");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error while consuming: {}", e);
            }
        }
    }
    
    // 等待所有剩余任务完成
    for task in tasks {
        let _ = task.await;
    }
}

// 示例处理函数
fn example_process_fn(payload: &[u8]) {
    // 模拟耗时处理
    let payload_str = std::str::from_utf8(payload).unwrap_or("");
    println!("Processing message: {}", payload_str);
    
    // 模拟处理耗时
    std::thread::sleep(std::time::Duration::from_millis(100));
}

连接池管理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::config::ClientConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;

struct KafkaConnectionPool {
    producers: Arc<Mutex<HashMap<String, FutureProducer>>>,
    consumers: Arc<Mutex<HashMap<String, StreamConsumer>>>,
    brokers: String,
}

impl KafkaConnectionPool {
    fn new(brokers: &str) -> Self {
        Self {
            producers: Arc::new(Mutex::new(HashMap::new())),
            consumers: Arc::new(Mutex::new(HashMap::new())),
            brokers: brokers.to_string(),
        }
    }
    
    fn get_producer(&self, client_id: &str) -> FutureProducer {
        let mut producers = self.producers.lock().unwrap();
        
        if let Some(producer) = producers.get(client_id) {
            return producer.clone();
        }
        
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", &self.brokers)
            .set("client.id", client_id)
            .set("message.timeout.ms", "5000")
            .create()
            .expect("Producer creation error");
        
        producers.insert(client_id.to_string(), producer.clone());
        producer
    }
    
    fn get_consumer(&self, group_id: &str, topics: &[&str]) -> StreamConsumer {
        let key = format!("{}:{}", group_id, topics.join(","));
        let mut consumers = self.consumers.lock().unwrap();
        
        if let Some(consumer) = consumers.get(&key) {
            return consumer.clone();
        }
        
        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", group_id)
            .set("bootstrap.servers", &self.brokers)
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "false")
            .create()
            .expect("Consumer creation failed");
        
        consumer.subscribe(&topics.to_vec())
            .expect("Can't subscribe to specified topics");
        
        consumers.insert(key, consumer.clone());
        consumer
    }
}

缓冲区优化

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::time::Duration;

async fn optimized_send_messages(
    producer: &FutureProducer,
    topic: &str,
    messages: Vec<(Option<&str>, Option<&[u8]>)>,
    linger_ms: u32,
    batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {
    // 配置生产者以优化批量发送
    let mut config = producer.client().config().clone();
    config.set("linger.ms", linger_ms.to_string());
    config.set("batch.size", batch_size.to_string());
    
    // 创建新的生产者实例
    let optimized_producer: FutureProducer = config.create()
        .expect("Optimized producer creation error");
    
    // 发送消息
    for (key, payload) in messages {
        let record = FutureRecord::to(topic)
            .key(key)
            .payload(payload);
        
        match optimized_producer.send(record, Duration::from_secs(0)).await {
            Ok(_) => {
                // 消息已添加到缓冲区
            }
            Err((e, _)) => {
                eprintln!("Failed to send message: {}", e);
                return Err(e);
            }
        }
    }
    
    // 刷新缓冲区,确保所有消息都已发送
    optimized_producer.flush(Duration::from_secs(5));
    
    Ok(())
}

最佳实践

资源管理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::admin::AdminClient;
use std::sync::Arc;

struct KafkaClient {
    producer: Arc<FutureProducer>,
    consumer: Arc<StreamConsumer>,
    admin: Arc<AdminClient>,
}

impl KafkaClient {
    fn new(brokers: &str, group_id: &str, topics: &[&str]) -> Self {
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "5000")
            .create()
            .expect("Producer creation error");
        
        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", group_id)
            .set("bootstrap.servers", brokers)
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "false")
            .create()
            .expect("Consumer creation failed");
        
        consumer.subscribe(&topics.to_vec())
            .expect("Can't subscribe to specified topics");
        
        let admin: AdminClient = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .create()
            .expect("Admin client creation failed");
        
        Self {
            producer: Arc::new(producer),
            consumer: Arc::new(consumer),
            admin: Arc::new(admin),
        }
    }
    
    fn producer(&self) -> Arc<FutureProducer> {
        self.producer.clone()
    }
    
    fn consumer(&self) -> Arc<StreamConsumer> {
        self.consumer.clone()
    }
    
    fn admin(&self) -> Arc<AdminClient> {
        self.admin.clone()
    }
}

// 使用示例
async fn use_kafka_client() {
    let brokers = "localhost:9092";
    let group_id = "my-group";
    let topics = &["my-topic"];
    
    let kafka_client = KafkaClient::new(brokers, group_id, topics);
    
    // 使用生产者
    let producer = kafka_client.producer();
    // ... 使用生产者发送消息
    
    // 使用消费者
    let consumer = kafka_client.consumer();
    // ... 使用消费者接收消息
    
    // 使用管理员
    let admin = kafka_client.admin();
    // ... 使用管理员执行管理操作
}

配置管理

use serde::{Deserialize, Serialize};
use std::env;

#[derive(Debug, Serialize, Deserialize)]
struct KafkaConfig {
    brokers: String,
    group_id: String,
    topics: Vec<String>,
    producer_config: ProducerConfig,
    consumer_config: ConsumerConfig,
    admin_config: AdminConfig,
}

#[derive(Debug, Serialize, Deserialize)]
struct ProducerConfig {
    linger_ms: u32,
    batch_size: usize,
    message_timeout_ms: u32,
    acks: String,
    retries: u32,
}

#[derive(Debug, Serialize, Deserialize)]
struct ConsumerConfig {
    session_timeout_ms: u32,
    auto_commit: bool,
    auto_offset_reset: String,
    max_poll_records: i32,
}

#[derive(Debug, Serialize, Deserialize)]
struct AdminConfig {
    request_timeout_ms: u32,
}

impl KafkaConfig {
    fn from_env() -> Result<Self, env::VarError> {
        Ok(Self {
            brokers: env::var("KAFKA_BROKERS")?,
            group_id: env::var("KAFKA_GROUP_ID")?,
            topics: env::var("KAFKA_TOPICS")?
                .split(',')
                .map(|s| s.to_string())
                .collect(),
            producer_config: ProducerConfig {
                linger_ms: env::var("KAFKA_PRODUCER_LINGER_MS")?
                    .parse()
                    .unwrap_or(10),
                batch_size: env::var("KAFKA_PRODUCER_BATCH_SIZE")?
                    .parse()
                    .unwrap_or(16384),
                message_timeout_ms: env::var("KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS")?
                    .parse()
                    .unwrap_or(5000),
                acks: env::var("KAFKA_PRODUCER_ACKS")?
                    .parse()
                    .unwrap_or_else(|_| "all".to_string()),
                retries: env::var("KAFKA_PRODUCER_RETRIES")?
                    .parse()
                    .unwrap_or(3),
            },
            consumer_config: ConsumerConfig {
                session_timeout_ms: env::var("KAFKA_CONSUMER_SESSION_TIMEOUT_MS")?
                    .parse()
                    .unwrap_or(10000),
                auto_commit: env::var("KAFKA_CONSUMER_AUTO_COMMIT")?
                    .parse()
                    .unwrap_or(false),
                auto_offset_reset: env::var("KAFKA_CONSUMER_AUTO_OFFSET_RESET")?
                    .parse()
                    .unwrap_or_else(|_| "earliest".to_string()),
                max_poll_records: env::var("KAFKA_CONSUMER_MAX_POLL_RECORDS")?
                    .parse()
                    .unwrap_or(500),
            },
            admin_config: AdminConfig {
                request_timeout_ms: env::var("KAFKA_ADMIN_REQUEST_TIMEOUT_MS")?
                    .parse()
                    .unwrap_or(5000),
            },
        })
    }
    
    fn to_producer_config(&self) -> rdkafka::config::ClientConfig {
        let mut config = rdkafka::config::ClientConfig::new();
        config.set("bootstrap.servers", &self.brokers);
        config.set("linger.ms", self.producer_config.linger_ms.to_string());
        config.set("batch.size", self.producer_config.batch_size.to_string());
        config.set("message.timeout.ms", self.producer_config.message_timeout_ms.to_string());
        config.set("acks", &self.producer_config.acks);
        config.set("retries", self.producer_config.retries.to_string());
        config
    }
    
    fn to_consumer_config(&self) -> rdkafka::config::ClientConfig {
        let mut config = rdkafka::config::ClientConfig::new();
        config.set("bootstrap.servers", &self.brokers);
        config.set("group.id", &self.group_id);
        config.set("session.timeout.ms", self.consumer_config.session_timeout_ms.to_string());
        config.set("enable.auto.commit", self.consumer_config.auto_commit.to_string());
        config.set("auto.offset.reset", &self.consumer_config.auto_offset_reset);
        config.set("max.poll.records", self.consumer_config.max_poll_records.to_string());
        config
    }
    
    fn to_admin_config(&self) -> rdkafka::config::ClientConfig {
        let mut config = rdkafka::config::ClientConfig::new();
        config.set("bootstrap.servers", &self.brokers);
        config.set("request.timeout.ms", self.admin_config.request_timeout_ms.to_string());
        config
    }
}

// 使用示例
async fn use_config() -> Result<(), Box<dyn std::error::Error>> {
    // 从环境变量加载配置
    let kafka_config = KafkaConfig::from_env()?;
    
    // 创建生产者
    let producer: rdkafka::producer::FutureProducer = kafka_config.to_producer_config()
        .create()?;
    
    // 创建消费者
    let consumer: rdkafka::consumer::StreamConsumer = kafka_config.to_consumer_config()
        .create()?;
    
    // 创建管理员
    let admin: rdkafka::admin::AdminClient = kafka_config.to_admin_config()
        .create()?;
    
    // 使用客户端...
    
    Ok(())
}

监控和指标

use rdkafka::consumer::StreamConsumer;
use rdkafka::producer::FutureProducer;
use rdkafka::statistics::Statistics;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

struct KafkaMetrics {
    messages_produced: u64,
    messages_consumed: u64,
    bytes_produced: u64,
    bytes_consumed: u64,
    produce_errors: u64,
    consume_errors: u64,
    last_update: Instant,
}

impl KafkaMetrics {
    fn new() -> Self {
        Self {
            messages_produced: 0,
            messages_consumed: 0,
            bytes_produced: 0,
            bytes_consumed: 0,
            produce_errors: 0,
            consume_errors: 0,
            last_update: Instant::now(),
        }
    }
    
    fn record_message_produced(&mut self, size: usize) {
        self.messages_produced += 1;
        self.bytes_produced += size as u64;
        self.last_update = Instant::now();
    }
    
    fn record_message_consumed(&mut self, size: usize) {
        self.messages_consumed += 1;
        self.bytes_consumed += size as u64;
        self.last_update = Instant::now();
    }
    
    fn record_produce_error(&mut self) {
        self.produce_errors += 1;
        self.last_update = Instant::now();
    }
    
    fn record_consume_error(&mut self) {
        self.consume_errors += 1;
        self.last_update = Instant::now();
    }
    
    fn print_metrics(&self) {
        println!("Kafka Metrics:");
        println!("  Messages produced: {}", self.messages_produced);
        println!("  Messages consumed: {}", self.messages_consumed);
        println!("  Bytes produced: {}", self.bytes_produced);
        println!("  Bytes consumed: {}", self.bytes_consumed);
        println!("  Produce errors: {}", self.produce_errors);
        println!("  Consume errors: {}", self.consume_errors);
        println!("  Last update: {:?}", self.last_update.elapsed());
    }
}

struct MonitoredProducer {
    producer: FutureProducer,
    metrics: Arc<Mutex<KafkaMetrics>>,
}

impl MonitoredProducer {
    fn new(producer: FutureProducer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {
        Self { producer, metrics }
    }
    
    async fn send(
        &self,
        record: rdkafka::producer::FutureRecord<&[u8], &[u8]>,
        timeout: Duration,
    ) -> Result<(), (rdkafka::error::KafkaError, rdkafka::producer::FutureRecord<&[u8], &[u8]>)> {
        let size = record.payload.map(|p| p.len()).unwrap_or(0);
        
        match self.producer.send(record, timeout).await {
            Ok(_) => {
                let mut metrics = self.metrics.lock().unwrap();
                metrics.record_message_produced(size);
                Ok(())
            }
            Err(e) => {
                let mut metrics = self.metrics.lock().unwrap();
                metrics.record_produce_error();
                Err(e)
            }
        }
    }
}

struct MonitoredConsumer {
    consumer: StreamConsumer,
    metrics: Arc<Mutex<KafkaMetrics>>,
}

impl MonitoredConsumer {
    fn new(consumer: StreamConsumer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {
        Self { consumer, metrics }
    }
    
    async fn consume(&self) {
        let mut message_stream = self.consumer.stream();
        
        while let Some(result) = message_stream.next().await {
            match result {
                Ok(msg) => {
                    let size = msg.payload().map(|p| p.len()).unwrap_or(0);
                    let mut metrics = self.metrics.lock().unwrap();
                    metrics.record_message_consumed(size);
                    
                    // 处理消息...
                }
                Err(e) => {
                    let mut metrics = self.metrics.lock().unwrap();
                    metrics.record_consume_error();
                    eprintln!("Error while consuming: {}", e);
                }
            }
        }
    }
}

// 使用示例
async fn use_monitored_clients() {
    let metrics = Arc::new(Mutex::new(KafkaMetrics::new()));
    
    // 创建生产者
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");
    
    let monitored_producer = MonitoredProducer::new(producer, metrics.clone());
    
    // 创建消费者
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "my-group")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create()
        .expect("Consumer creation failed");
    
    consumer.subscribe(&["my-topic"])
        .expect("Can't subscribe to specified topics");
    
    let monitored_consumer = MonitoredConsumer::new(consumer, metrics.clone());
    
    // 启动消费者任务
    tokio::spawn(async move {
        monitored_consumer.consume().await;
    });
    
    // 定期打印指标
    tokio::spawn(async move {
        loop {
            tokio::time::sleep(Duration::from_secs(10)).await;
            let metrics = metrics.lock().unwrap();
            metrics.print_metrics();
        }
    });
    
    // 使用生产者发送消息...
}

完整示例

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer, CommitMode};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::message::{Message, ToBytes};
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Kafka 配置
    let brokers = "localhost:9092";
    let topic = "example-topic";
    let group_id = "example-group";
    
    // 创建管理员客户端
    let admin_client: AdminClient = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .create()
        .expect("Admin client creation failed");
    
    // 创建主题(如果不存在)
    let new_topic = NewTopic::new(topic, 3, TopicReplication::Fixed(1));
    let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
    
    match admin_client.create_topics(&[new_topic], &options).await {
        Ok(_) => {
            println!("Topic '{}' created or already exists", topic);
        }
        Err(e) => {
            eprintln!("Failed to create topic '{}': {}", topic, e);
        }
    }
    
    // 创建生产者
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .set("acks", "all")
        .create()
        .expect("Producer creation error");
    
    // 创建消费者
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create()
        .expect("Consumer creation failed");
    
    consumer.subscribe(&[topic])
        .expect("Can't subscribe to specified topics");
    
    // 启动消费者任务
    let consumer_handle = tokio::spawn(async move {
        let mut message_stream = consumer.stream();
        
        while let Some(result) = message_stream.next().await {
            match result {
                Ok(msg) => {
                    match msg.view() {
                        Some(payload) => {
                            let payload_str = std::str::from_utf8(payload).unwrap_or("");
                            println!("Received message: {}", payload_str);
                            
                            // 手动提交偏移量
                            consumer.commit_message(&msg, CommitMode::Async)
                                .expect("Failed to commit message");
                        }
                        None => {
                            println!("Received empty message");
                        }
                    }
                }
                Err(e) => {
                    eprintln!("Error while consuming: {}", e);
                }
            }
        }
    });
    
    // 发送一些消息
    for i in 0..10 {
        let payload = format!("Message {}", i);
        let record = FutureRecord::to(topic)
            .key(Some(&format!("key-{}", i)))
            .payload(payload.as_bytes());
        
        match producer.send(record, Duration::from_secs(0)).await {
            Ok(_) => {
                println!("Message {} sent successfully", i);
            }
            Err((e, _)) => {
                eprintln!("Failed to send message {}: {}", i, e);
            }
        }
        
        tokio::time::sleep(Duration::from_millis(500)).await;
    }
    
    // 等待消费者处理完所有消息
    tokio::time::sleep(Duration::from_secs(5)).await;
    
    // 取消消费者任务
    consumer_handle.abort();
    
    println!("Example completed");
    
    Ok(())
}

总结

本文档详细介绍了如何使用 rdkafka 进行各种 Kafka 操作,包括:

  1. 生产者操作:发送消息(同步/异步)、批量发送、带消息头的消息发送、指定分区的消息发送等。
  2. 消费者操作:基础消费、流式消费、从特定分区和偏移量消费、获取消费者位置和水印偏移量等。
  3. 管理员操作:创建/删除主题、列出主题、获取主题详情、修改主题配置、创建分区、管理消费者组等。
  4. 高级功能:事务支持、精确一次语义、自定义分区器、自定义序列化器、消息拦截器、消息过滤和转换等。
  5. 错误处理:基本错误处理、重试机制、死信队列处理等。
  6. 性能优化:批量处理、异步并发处理、连接池管理、缓冲区优化等。
  7. 最佳实践:资源管理、配置管理、监控和指标等。

通过遵循本文档中的示例和最佳实践,您可以有效地使用 rdkafka 在 Rust 应用程序中与 Kafka 集群进行交互,构建高性能、可靠的消息处理系统。

参考资料

许可证

本文档基于 MIT 许可证发布。