一、集群
- Kafka 架构是由 producer(消息生产者)、consumer(消息消费者)、broker(kafka 集群的 server,负责处理消息读、写请求,存储消息,在 Kafka cluster 这一层里,其实里面是有很多个 broker 组成)、topic(消息队列 / 分类相当于队列,里面有生产者和消费者模型)、zookeeper 这些部分组成。
- kafka 里面的消息是有 topic 来组织的,简单的我们可以想象为一个队列,一个队列就是一个 topic,然后它把每个 topic 又分为很多个 partition,这个是为了做并行的,在每个 partition 内部消息是有顺序,相当于有序的队列,其中每个消息都有个序号 offset,比如 0 到 1,2,从前面读往后面写。一个 partition 对应一个 broker,一个 broker 可以管多个 partition,比如说,topic 有 6 个 partition,有两个 broker,那每个 broker 就管 3 个 partition,这个 partition 可以很简单想象为一个文件,当数据发过来的时候它就往这个 partition 上面 append,追加就行了,消息不经过内存缓冲,直接写入文件,kafka 的很多消息系统不一样,很多消息系统是消费完就把它删除掉,而 kafka 是按照时间策略删除,而不是消费完就删除,在 kafka 里面没有一个消费完这么个概念,只有过期这样一个概念。
- producer 自己决定往哪个 partition 里面面去写,这里有一些的策略,譬如如果 hash,不用多个 partition 之间去 queue 了。consumer 自己维护消费到哪个 offset,每个 consumer 都有对应的 group,group 内是数据消费模型(各个 consumer 要消费不同的 partition,因此一个消息在 group 内只消费一次),group 间是 publish-subscribe 消费模型,各个 group 各自独立消费,互不影响,因此一个消息在被每个 group 消费一次。
1. 搭建两台服务器
ip1: 192.168.31.249
ip2: 192.168.31.36
2. zookeeper部署
zookeeper还是先只部署一台,在ip2: 192.168.31.36 上启动zookeeper
3. 启动broker ip1:192.168.31.249
修改broker.id(也可以改为-1,自动分配)
broker.id=0
修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。
zookeeper.connect=192.168.31.249:2181
启动kafka:
sh kafka-server-start.sh -daemon ../config/server.properties
sh kafka-server-start.sh ../config/server.properties
默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。
4. 启动broker ip2: 192.168.31.36
修改broker.id(也可以改为-1,自动分配)
broker.id=1
修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。
zookeeper.connect=192.168.31.249:2181
启动kafka:
sh kafka-server-start.sh -daemon ../config/server.properties
默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。
5. 查看kafka集群
创建主题:
sh kafka-topics.sh --create --zookeeper 192.168.31.249:2181 -replication-factor 2 --partitions 2 - topic kafka-2
查看主题:
sh kafka-topics.sh --describe --zookeeper 192.168.31.249:2181 --topic kafka-2
显示信息:
Topic:kafka-2 PartitionCount:2
Topic: kafka-2 Partition: 0
Topic: kafka-2 Partition: 1
ReplicationFactor:2 Configs:
Leader: 1 Replicas: 1,0 Isr: 1,0
Leader: 0 Replicas: 0,1 Isr: 0
6. 测试集群
开三个终端:开启一个生产者,两个消费者
生产者: sh kafka-console-producer.sh --broker-list 192.168.31.249:9092 --topic kafka-2
消费者:
sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning
sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning
当两个消费者同属一个消费组开启后,消费者轮流收到发送者的数据。
kafka-console-consumer.sh部分支持的参数:
参数 | 值类型 | 说明 | 有效值 |
---|---|---|---|
--topic |
string | 被消费的 topic | |
--partition |
integer | 指定分区,除非指定--offset ,否则从分区结束(latest)开始消费 |
|
--offset |
string | 执行消费的起始 offset 位置,默认值:latest | latest, earliest |
--consumer-property |
string | 将用户定义的属性以key=value 的形式传递给使用者 |
|
--consumer.config |
string | 消费者配置属性文件,请注意,[consumer-property] 优先于此配置 |
|
--from-beginning |
从存在的最早消息开始,而不是从最新消息开始 | ||
--group |
string | 指定消费者所属组的 ID |
二、代码案例
一、环境准备
- Kafka 集群部署(本地测试)
- 下载 Kafka:官网(建议 2.8.1 版本)
- 启动 Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动 Kafka Broker:
bin/kafka-server-start.sh config/server.properties
- 创建测试主题(分区数 2,副本数 1):
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic demo-topic --partitions 2 --replication-factor 1
二、完整代码实现
以下为生产者(
kafka_producer.cpp
)和消费者(kafka_consumer.cpp
)的完整代码,使用 librdkafka 库实现。1. 生产者代码(发送消息)
// kafka_producer.cpp #include <iostream> #include <string> #include <chrono> #include <thread> #include "rdkafkacpp.h" // 投递结果回调(消息是否成功发送到Kafka) class DeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message& msg) override { if (msg.err()) { std::cerr << "消息投递失败 [" << msg.topic_name() << "][" << msg.partition() << "]: " << msg.errstr() << std::endl; } else { std::cout << "消息投递成功 [" << msg.topic_name() << "][" << msg.partition() << "]: 偏移量=" << msg.offset() << ", 大小=" << msg.len() << "字节" << std::endl; } } }; // 事件回调(如错误、日志) class EventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event& event) override { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cout << "统计信息: " << event.str() << std::endl; break; default: break; } } }; int main() { // 配置参数 const std::string brokers = "localhost:9092"; // Kafka服务器地址 const std::string topic_name = "demo-topic"; // 主题名称 const int send_interval_ms = 2000; // 消息发送间隔(2秒) // 创建配置对象 RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); std::string errstr; // 设置全局配置 if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK || conf->set("dr_cb", new DeliveryReportCb(), errstr) != RdKafka::Conf::CONF_OK || conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) { std::cerr << "配置失败: " << errstr << std::endl; return 1; } // 设置主题配置(可选) topic_conf->set("request.required.acks", "1", errstr); // Leader确认即可 // 创建Producer实例 RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr); if (!producer) { std::cerr << "创建Producer失败: " << errstr << std::endl; return 1; } // 创建Topic句柄 RdKafka::Topic* topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr); if (!topic) { std::cerr << "创建Topic失败: " << errstr << std::endl; delete producer; return 1; } // 循环发送消息 std::cout << "开始发送消息到主题 [" << topic_name << "], 按 Ctrl+C 停止..." << std::endl; int msg_count = 0; while (true) { // 构造消息内容(带时间戳) auto now = std::chrono::system_clock::now(); std::time_t time = std::chrono::system_clock::to_time_t(now); std::string payload = "Demo消息 #" + std::to_string(++msg_count) + " @ " + std::ctime(&time); // 发送消息(自动选择分区) RdKafka::ErrorCode err = producer->produce( topic, RdKafka::Topic::PARTITION_UA, // 自动分配分区 RdKafka::Producer::RK_MSG_COPY, const_cast<char*>(payload.c_str()), payload.size(), nullptr, // 无消息键 nullptr ); if (err != RdKafka::ERR_NO_ERROR) { std::cerr << "发送失败: " << RdKafka::err2str(err) << std::endl; } else { std::cout << "已发送消息 #" << msg_count << ": " << payload; } // 触发回调处理(必须定期调用poll) producer->poll(0); // 等待2秒 std::this_thread::sleep_for(std::chrono::milliseconds(send_interval_ms)); } // 清理资源(实际不会执行到,需通过信号捕获退出) delete topic; delete producer; delete conf; delete topic_conf; return 0; }
2. 消费者代码(接收消息)
// kafka_consumer.cpp #include <iostream> #include <vector> #include <string> #include "rdkafkacpp.h" // Rebalance回调(处理分区分配/撤销) class RebalanceCb : public RdKafka::RebalanceCb { public: void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) override { if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { // 分配分区:手动指定从最早位置开始消费(可选) for (auto* part : partitions) { part->set_offset(RdKafka::TopicPartition::OFFSET_BEGINNING); } consumer->assign(partitions); std::cout << "分配分区: "; for (auto* part : partitions) { std::cout << part->topic() << "[" << part->partition() << "] "; } std::cout << std::endl; } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { // 撤销分区 consumer->unassign(); std::cout << "撤销分区" << std::endl; } } }; // 事件回调(如错误、日志) class EventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event& event) override { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl; break; case RdKafka::Event::EVENT_LOG: std::cout << "日志事件: " << event.str() << std::endl; break; default: break; } } }; int main() { // 配置参数 const std::string brokers = "localhost:9092"; // Kafka服务器地址 const std::string group_id = "demo-consumer-group"; // 消费者组ID const std::vector<std::string> topics = {"demo-topic"}; // 订阅主题 // 创建配置对象 RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); std::string errstr; // 设置全局配置 if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK || conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK || conf->set("rebalance_cb", new RebalanceCb(), errstr) != RdKafka::Conf::CONF_OK || conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) { std::cerr << "配置失败: " << errstr << std::endl; return 1; } // 设置自动提交(可选,这里禁用自动提交,手动提交位移) conf->set("enable.auto.commit", "false", errstr); // 设置主题配置:从最早位置开始消费(若分区无已提交位移) topic_conf->set("auto.offset.reset", "earliest", errstr); conf->set("default_topic_conf", topic_conf, errstr); // 创建Consumer实例 RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr); if (!consumer) { std::cerr << "创建Consumer失败: " << errstr << std::endl; return 1; } // 订阅主题 if (consumer->subscribe(topics) != RdKafka::ERR_NO_ERROR) { std::cerr << "订阅主题失败" << std::endl; delete consumer; return 1; } std::cout << "开始消费主题 [" << topics[0] << "], 按 Ctrl+C 停止..." << std::endl; while (true) { // 拉取消息(超时1秒) RdKafka::Message* msg = consumer->consume(1000); if (!msg) continue; // 处理消息 switch (msg->err()) { case RdKafka::ERR_NO_ERROR: std::cout << "消费消息 [" << msg->topic_name() << "][" << msg->partition() << "]: 偏移量=" << msg->offset() << ", 内容=" << static_cast<char*>(msg->payload()) << std::endl; // 手动提交位移(每消费5条提交一次) static int count = 0; if (++count % 5 == 0) { consumer->commitSync(); std::cout << "已同步提交位移" << std::endl; } break; case RdKafka::ERR__PARTITION_EOF: std::cout << "到达分区末尾,等待新消息..." << std::endl; break; case RdKafka::ERR__TIMED_OUT: // 超时无消息,继续轮询 break; default: std::cerr << "消费错误: " << msg->errstr() << std::endl; break; } delete msg; } // 清理资源(实际不会执行到,需通过信号捕获退出) consumer->unsubscribe(); consumer->close(); delete consumer; delete conf; delete topic_conf; return 0; }
三、编译与运行
1. 依赖安装
- librdkafka:需安装开发库(包含头文件和动态库)。
- Ubuntu/Debian:
sudo apt-get install librdkafka-dev
- CentOS/Fedora:
sudo yum install librdkafka-devel
- 源码编译(推荐最新稳定版):
git clone https://github.com/edenhill/librdkafka.git cd librdkafka git checkout v2.2.0 # 选择稳定版本 ./configure --prefix=/usr/local make -j4 sudo make install sudo ldconfig # 更新动态链接库缓存
2. 编译命令
使用
g++
编译生产者和消费者代码(需链接 librdkafka++ 和 librdkafka 库):# 编译生产者 g++ kafka_producer.cpp -o kafka_producer -lrdkafka++ -lrdkafka -lpthread # 编译消费者 g++ kafka_consumer.cpp -o kafka_consumer -lrdkafka++ -lrdkafka -lpthread
3. 运行步骤
启动 Kafka 集群(确保 Zookeeper 和 Broker 已运行)。
运行生产者(发送消息到
demo-topic
):./kafka_producer
输出示例:
开始发送消息到主题 [demo-topic], 按 Ctrl+C 停止... 已发送消息 #1: Demo消息 #1 @ Tue May 28 15:30:00 2024 消息投递成功 [demo-topic][0]: 偏移量=0, 大小=35字节 已发送消息 #2: Demo消息 #2 @ Tue May 28 15:30:02 2024 消息投递成功 [demo-topic][1]: 偏移量=0, 大小=35字节
运行消费者(从
demo-topic
消费消息):./kafka_consumer
输出示例:
开始消费主题 [demo-topic], 按 Ctrl+C 停止... 分配分区: demo-topic[0] demo-topic[1] 消费消息 [demo-topic][0]: 偏移量=0, 内容=Demo消息 #1 @ Tue May 28 15:30:00 2024 消费消息 [demo-topic][1]: 偏移量=0, 内容=Demo消息 #2 @ Tue May 28 15:30:02 2024 已同步提交位移