Kafka服务
Kafka的快速入门 文档很详细,基本上几步就可以搭建一个Kafka测试环境。
下载Kafka的二进制包,然后解压。
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0
生成集群ID,使用集群ID格式化存储
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
启动Kafka Server。
$ bin/kafka-server-start.sh config/server.properties
启动之后,默认的Kafka Server会连续输出日志到控制台。后面的测试命令,需要在另外的终端执行。
创建主题
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
测试生产
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
测试消费
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
C语言的Kafka库:librdkafka
在C语言中操作Kafka,有不少库可以选择,其中就有librdkafka。
根据librdkafka在gitthub上的信息,它主要是由C语言开发,但是也提供了C++的支持。
当我们在Linux平台上安装librdkafka之后,通过pkg-config
命令加库名rdkafka,可以获取编译相关的信息。
如在Fedora 42上:
pkg-config --cflags --libs rdkafka
-DWITH_GZFILEOP -lrdkafka
需要注意的是,rdkafka.pc中记录的头文件路径是/usr/include
,但是它的头文件都位于/usr/include/librdkafka
。在C语言的源代码中需要写成:
#include <librdkafka/rdkafka.h>
在C++中则需要写成:
#include <librdkafka/rdkafkacpp.h>
但是C++的API与C语言的类似,后续不再提及C++的相关API,仅以C语言举例。
rdkafka API
rd_kafka_t
API的核心是结构rd_kafka_t
。
它的创建和销毁是一对函数:
rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_conf_t *conf,
char *errstr,
size_t errstr_size);
void rd_kafka_destroy(rd_kafka_t *rk);
其中,rd_kafka_type_t
是一个enum,分别为RD_KAFKA_PRODUCER
与RD_KAFKA_CONSUMER
,即生产者与消费者。
rd_kafka_conf_t
rd_kafka_conf_t
是另一个结构,创建与销毁函数为:
rd_kafka_conf_t *rd_kafka_conf_new(void);
void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
rd_kafka_conf_t
创建之后,使用如下函数设置参数:
rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
const char *name,
const char *value,
char *errstr,
size_t errstr_size);
如:bootstrap.servers
、sasl.username
、sasl.password
等。
rd_kafka_conf_t
设置相关的函数返回值都是rd_kafka_conf_res_t
。如果成功,值为RD_KAFKA_CONF_OK,即0。
如果要进行更详细的控制,还可以使用其它一些函数。
如:
void rd_kafka_conf_set_dr_msg_cb(
rd_kafka_conf_t *conf,
void (*dr_msg_cb)(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage,
void *opaque));
可以设置消息被处理之后的回调函数。其中,rkmessage是被处理的消息,opaque是使用rd_kafka_conf_set_opaque()
设置的用户层指针。
证书设置
rd_kafka_conf_set_ssl_cert
用于设置证书。
rd_kafka_conf_res_t
rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t *conf,
rd_kafka_cert_type_t cert_type,
rd_kafka_cert_enc_t cert_enc,
const void *buffer,
size_t size,
char *errstr,
size_t errstr_size);
其中,cert_type
的值分别为:RD_KAFKA_CERT_PUBLIC_KEY
、RD_KAFKA_CERT_PRIVATE_KEY
与RD_KAFKA_CERT_CA
,即公钥、私钥与CA。
而cert_enc
的值分别为:RD_KAFKA_CERT_ENC_PKCS12
、RD_KAFKA_CERT_ENC_DER
与RD_KAFKA_CERT_ENC_PEM
,即分别是三种证书格式。
另外还可以设置证书验证的回调函数:
rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(
rd_kafka_conf_t *conf,
int (*ssl_cert_verify_cb)(rd_kafka_t *rk,
const char *broker_name,
int32_t broker_id,
int *x509_error,
int depth,
const char *buf,
size_t size,
char *errstr,
size_t errstr_size,
void *opaque));
如果成功,回调函数必须返回1。否则需要设置errstr为错误原因,并且返回0。
rd_kafka_message_t
处理Kafka另外一个重要的结构是rd_kafka_message_t
,即消息。它的定义为:
typedef struct rd_kafka_message_s {
rd_kafka_resp_err_t err;
rd_kafka_topic_t *rkt;
int32_t partition;
void *payload;
size_t len;
void *key;
size_t key_len;
int64_t offset;
void *_private;
} rd_kafka_message_t;
最开头的err非常重要。每次使用rd_kafka_consume*
族函数取得一条消息,以及在生产一条消息的回调函数中时,都要检查这个值,确定是否成功。
生产者
使用rd_kafka_t
生产一条消息的函数为:
int rd_kafka_produce(rd_kafka_topic_t *rkt,
int32_t partition,
int msgflags,
void *payload,
size_t len,
const void *key,
size_t keylen,
void *msg_opaque);
成功返回0。
生成一批消息的函数为:
int rd_kafka_produce_batch(rd_kafka_topic_t *rkt,
int32_t partition,
int msgflags,
rd_kafka_message_t *rkmessages,
int message_cnt);
返回值为成功的消息数目。
等待所有消息处理完成的函数为:
rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms);
或者放弃没有发送的消息:
rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags);
消费者
控制Kafka消费的函数有一对:
int rd_kafka_consume_start(rd_kafka_topic_t *rkt,
int32_t partition,
int64_t offset);
int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
注意这两个函数的第一个参数是rd_kafka_topic_t
。
调用rd_kafka_consume_start()
之后,kafka将开始把成批的消息放入本地的队列中,应用需要使用rd_kafka_consume()
函数来消费。
rd_kafka_message_t *
rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms);
这个函数在timeout_ms
毫秒以内,返回一条消息,或者NULL。返回的消息,必须使用rd_kafka_message_destroy()
释放。
类似生产者,消费者也有批量处理的函数:
ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt,
int32_t partition,
int timeout_ms,
rd_kafka_message_t **rkmessages,
size_t rkmessages_size);