C语言操作Kafka

发布于:2025-05-31 ⋅ 阅读:(18) ⋅ 点赞:(0)

Kafka服务

Kafka的快速入门 文档很详细,基本上几步就可以搭建一个Kafka测试环境。

下载Kafka的二进制包,然后解压。

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0

生成集群ID,使用集群ID格式化存储

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动Kafka Server。

$ bin/kafka-server-start.sh config/server.properties

启动之后,默认的Kafka Server会连续输出日志到控制台。后面的测试命令,需要在另外的终端执行。

创建主题

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

测试生产

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

测试消费

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

C语言的Kafka库:librdkafka

在C语言中操作Kafka,有不少库可以选择,其中就有librdkafka。

根据librdkafka在gitthub上的信息,它主要是由C语言开发,但是也提供了C++的支持。

当我们在Linux平台上安装librdkafka之后,通过pkg-config命令加库名rdkafka,可以获取编译相关的信息。

如在Fedora 42上:

pkg-config --cflags --libs rdkafka  
-DWITH_GZFILEOP -lrdkafka

需要注意的是,rdkafka.pc中记录的头文件路径是/usr/include,但是它的头文件都位于/usr/include/librdkafka。在C语言的源代码中需要写成:

#include <librdkafka/rdkafka.h>

在C++中则需要写成:

#include <librdkafka/rdkafkacpp.h>

但是C++的API与C语言的类似,后续不再提及C++的相关API,仅以C语言举例。

rdkafka API

rd_kafka_t

API的核心是结构rd_kafka_t

它的创建和销毁是一对函数:

rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,  
                         rd_kafka_conf_t *conf,  
                         char *errstr,  
                         size_t errstr_size);  
  
void rd_kafka_destroy(rd_kafka_t *rk);

其中,rd_kafka_type_t是一个enum,分别为RD_KAFKA_PRODUCERRD_KAFKA_CONSUMER,即生产者与消费者。

rd_kafka_conf_t

rd_kafka_conf_t是另一个结构,创建与销毁函数为:

rd_kafka_conf_t *rd_kafka_conf_new(void);

void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);

rd_kafka_conf_t创建之后,使用如下函数设置参数:

rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,  
                                      const char *name,  
                                      const char *value,  
                                      char *errstr,  
                                      size_t errstr_size);

如:bootstrap.serverssasl.usernamesasl.password等。

rd_kafka_conf_t设置相关的函数返回值都是rd_kafka_conf_res_t。如果成功,值为RD_KAFKA_CONF_OK,即0。

如果要进行更详细的控制,还可以使用其它一些函数。

如:

void rd_kafka_conf_set_dr_msg_cb(  
    rd_kafka_conf_t *conf,  
    void (*dr_msg_cb)(rd_kafka_t *rk,  
                      const rd_kafka_message_t *rkmessage,  
                      void *opaque));

可以设置消息被处理之后的回调函数。其中,rkmessage是被处理的消息,opaque是使用rd_kafka_conf_set_opaque()设置的用户层指针。

证书设置

rd_kafka_conf_set_ssl_cert用于设置证书。

rd_kafka_conf_res_t  
rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t *conf,  
                           rd_kafka_cert_type_t cert_type,  
                           rd_kafka_cert_enc_t cert_enc,  
                           const void *buffer,  
                           size_t size,  
                           char *errstr,  
                           size_t errstr_size);

其中,cert_type的值分别为:RD_KAFKA_CERT_PUBLIC_KEYRD_KAFKA_CERT_PRIVATE_KEYRD_KAFKA_CERT_CA,即公钥、私钥与CA。

cert_enc的值分别为:RD_KAFKA_CERT_ENC_PKCS12RD_KAFKA_CERT_ENC_DERRD_KAFKA_CERT_ENC_PEM,即分别是三种证书格式。

另外还可以设置证书验证的回调函数:

rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(  
    rd_kafka_conf_t *conf,  
    int (*ssl_cert_verify_cb)(rd_kafka_t *rk,  
                              const char *broker_name,  
                              int32_t broker_id,  
                              int *x509_error,  
                              int depth,  
                              const char *buf,  
                              size_t size,  
                              char *errstr,  
                              size_t errstr_size,  
                              void *opaque));

如果成功,回调函数必须返回1。否则需要设置errstr为错误原因,并且返回0。

rd_kafka_message_t

处理Kafka另外一个重要的结构是rd_kafka_message_t,即消息。它的定义为:

typedef struct rd_kafka_message_s {  
        rd_kafka_resp_err_t err;
        rd_kafka_topic_t *rkt; 
        int32_t partition;
        void *payload;
        size_t len;                        
        void *key;
        size_t key_len;      
        int64_t offset;
        void *_private; 
} rd_kafka_message_t;

最开头的err非常重要。每次使用rd_kafka_consume*族函数取得一条消息,以及在生产一条消息的回调函数中时,都要检查这个值,确定是否成功。

生产者

使用rd_kafka_t生产一条消息的函数为:

int rd_kafka_produce(rd_kafka_topic_t *rkt,  
                     int32_t partition,  
                     int msgflags,  
                     void *payload,  
                     size_t len,  
                     const void *key,  
                     size_t keylen,  
                     void *msg_opaque);

成功返回0。

生成一批消息的函数为:

int rd_kafka_produce_batch(rd_kafka_topic_t *rkt,  
                           int32_t partition,  
                           int msgflags,  
                           rd_kafka_message_t *rkmessages,  
                           int message_cnt);

返回值为成功的消息数目。

等待所有消息处理完成的函数为:

rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms);

或者放弃没有发送的消息:

rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags);

消费者

控制Kafka消费的函数有一对:

int rd_kafka_consume_start(rd_kafka_topic_t *rkt,  
                           int32_t partition,  
                           int64_t offset);

int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);

注意这两个函数的第一个参数是rd_kafka_topic_t

调用rd_kafka_consume_start()之后,kafka将开始把成批的消息放入本地的队列中,应用需要使用rd_kafka_consume()函数来消费。

rd_kafka_message_t *  
rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms);

这个函数在timeout_ms毫秒以内,返回一条消息,或者NULL。返回的消息,必须使用rd_kafka_message_destroy()释放。

类似生产者,消费者也有批量处理的函数:

ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt,  
                               int32_t partition,  
                               int timeout_ms,  
                               rd_kafka_message_t **rkmessages,  
                               size_t rkmessages_size);