kafka实践与C++操作kafka

发布于:2025-05-27 ⋅ 阅读:(119) ⋅ 点赞:(0)

一、集群

  1. Kafka 架构是由 producer(消息生产者)、consumer(消息消费者)、broker(kafka 集群的 server,负责处理消息读、写请求,存储消息,在 Kafka cluster 这一层里,其实里面是有很多个 broker 组成)、topic(消息队列 / 分类相当于队列,里面有生产者和消费者模型)、zookeeper 这些部分组成。
  2. kafka 里面的消息是有 topic 来组织的,简单的我们可以想象为一个队列,一个队列就是一个 topic,然后它把每个 topic 又分为很多个 partition,这个是为了做并行的,在每个 partition 内部消息是有顺序,相当于有序的队列,其中每个消息都有个序号 offset,比如 0 到 1,2,从前面读往后面写。一个 partition 对应一个 broker,一个 broker 可以管多个 partition,比如说,topic 有 6 个 partition,有两个 broker,那每个 broker 就管 3 个 partition,这个 partition 可以很简单想象为一个文件,当数据发过来的时候它就往这个 partition 上面 append,追加就行了,消息不经过内存缓冲,直接写入文件,kafka 的很多消息系统不一样,很多消息系统是消费完就把它删除掉,而 kafka 是按照时间策略删除,而不是消费完就删除,在 kafka 里面没有一个消费完这么个概念,只有过期这样一个概念。
  3. producer 自己决定往哪个 partition 里面面去写,这里有一些的策略,譬如如果 hash,不用多个 partition 之间去 queue 了。consumer 自己维护消费到哪个 offset,每个 consumer 都有对应的 group,group 内是数据消费模型(各个 consumer 要消费不同的 partition,因此一个消息在 group 内只消费一次),group 间是 publish-subscribe 消费模型,各个 group 各自独立消费,互不影响,因此一个消息在被每个 group 消费一次。


1. 搭建两台服务器

        ip1: 192.168.31.249

         ip2: 192.168.31.36

2. zookeeper部署

        zookeeper还是先只部署一台,在ip2: 192.168.31.36 上启动zookeeper

3. 启动broker ip1:192.168.31.249

        修改broker.id(也可以改为-1,自动分配)

broker.id=0

        修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。

zookeeper.connect=192.168.31.249:2181

        启动kafka:

 sh kafka-server-start.sh -daemon ../config/server.properties
 sh kafka-server-start.sh  ../config/server.properties

        默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。

4. 启动broker ip2: 192.168.31.36

        修改broker.id(也可以改为-1,自动分配)

broker.id=1

        修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。

zookeeper.connect=192.168.31.249:2181

        启动kafka:

 sh kafka-server-start.sh -daemon ../config/server.properties

        默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。

5. 查看kafka集群

        创建主题:

sh kafka-topics.sh --create --zookeeper 192.168.31.249:2181 -replication-factor 2 --partitions 2 - topic kafka-2

        查看主题:

sh kafka-topics.sh --describe --zookeeper 192.168.31.249:2181 --topic kafka-2

        显示信息:

Topic:kafka-2   PartitionCount:2    
Topic: kafka-2  Partition: 0    
Topic: kafka-2  Partition: 1    
ReplicationFactor:2 Configs:
 Leader: 1   Replicas: 1,0   Isr: 1,0
 Leader: 0   Replicas: 0,1   Isr: 0

6. 测试集群

        开三个终端:开启一个生产者,两个消费者

        生产者: sh kafka-console-producer.sh --broker-list 192.168.31.249:9092 --topic kafka-2

        消费者:

        sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning

        sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning

        当两个消费者同属一个消费组开启后,消费者轮流收到发送者的数据。

        kafka-console-consumer.sh部分支持的参数:

参数 值类型 说明 有效值
--topic string 被消费的 topic
--partition integer 指定分区,除非指定--offset,否则从分区结束(latest)开始消费
--offset string 执行消费的起始 offset 位置,默认值:latest latest, earliest
--consumer-property string 将用户定义的属性以key=value的形式传递给使用者
--consumer.config string 消费者配置属性文件,请注意,[consumer-property]优先于此配置
--from-beginning 从存在的最早消息开始,而不是从最新消息开始
--group string 指定消费者所属组的 ID

二、代码案例

一、环境准备
  1. Kafka 集群部署(本地测试)
    • 下载 Kafka:官网(建议 2.8.1 版本)
    • 启动 Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties
    • 启动 Kafka Broker:bin/kafka-server-start.sh config/server.properties
    • 创建测试主题(分区数 2,副本数 1):
      bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic demo-topic --partitions 2 --replication-factor 1
      
二、完整代码实现

以下为生产者(kafka_producer.cpp)和消费者(kafka_consumer.cpp)的完整代码,使用 librdkafka 库实现。

1. 生产者代码(发送消息)
// kafka_producer.cpp
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include "rdkafkacpp.h"

// 投递结果回调(消息是否成功发送到Kafka)
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "消息投递失败 [" << msg.topic_name() << "][" << msg.partition() 
                      << "]: " << msg.errstr() << std::endl;
        } else {
            std::cout << "消息投递成功 [" << msg.topic_name() << "][" << msg.partition() 
                      << "]: 偏移量=" << msg.offset() << ", 大小=" << msg.len() << "字节" << std::endl;
        }
    }
};

// 事件回调(如错误、日志)
class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::Event::EVENT_STATS:
                std::cout << "统计信息: " << event.str() << std::endl;
                break;
            default:
                break;
        }
    }
};

int main() {
    // 配置参数
    const std::string brokers = "localhost:9092";  // Kafka服务器地址
    const std::string topic_name = "demo-topic";   // 主题名称
    const int send_interval_ms = 2000;             // 消息发送间隔(2秒)

    // 创建配置对象
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    std::string errstr;

    // 设置全局配置
    if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||
        conf->set("dr_cb", new DeliveryReportCb(), errstr) != RdKafka::Conf::CONF_OK ||
        conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置失败: " << errstr << std::endl;
        return 1;
    }

    // 设置主题配置(可选)
    topic_conf->set("request.required.acks", "1", errstr);  // Leader确认即可

    // 创建Producer实例
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "创建Producer失败: " << errstr << std::endl;
        return 1;
    }

    // 创建Topic句柄
    RdKafka::Topic* topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);
    if (!topic) {
        std::cerr << "创建Topic失败: " << errstr << std::endl;
        delete producer;
        return 1;
    }

    // 循环发送消息
    std::cout << "开始发送消息到主题 [" << topic_name << "], 按 Ctrl+C 停止..." << std::endl;
    int msg_count = 0;
    while (true) {
        // 构造消息内容(带时间戳)
        auto now = std::chrono::system_clock::now();
        std::time_t time = std::chrono::system_clock::to_time_t(now);
        std::string payload = "Demo消息 #" + std::to_string(++msg_count) + " @ " + std::ctime(&time);

        // 发送消息(自动选择分区)
        RdKafka::ErrorCode err = producer->produce(
            topic,
            RdKafka::Topic::PARTITION_UA,  // 自动分配分区
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(payload.c_str()), payload.size(),
            nullptr,  // 无消息键
            nullptr
        );

        if (err != RdKafka::ERR_NO_ERROR) {
            std::cerr << "发送失败: " << RdKafka::err2str(err) << std::endl;
        } else {
            std::cout << "已发送消息 #" << msg_count << ": " << payload;
        }

        // 触发回调处理(必须定期调用poll)
        producer->poll(0);

        // 等待2秒
        std::this_thread::sleep_for(std::chrono::milliseconds(send_interval_ms));
    }

    // 清理资源(实际不会执行到,需通过信号捕获退出)
    delete topic;
    delete producer;
    delete conf;
    delete topic_conf;
    return 0;
}
2. 消费者代码(接收消息)
// kafka_consumer.cpp
#include <iostream>
#include <vector>
#include <string>
#include "rdkafkacpp.h"

// Rebalance回调(处理分区分配/撤销)
class RebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb(RdKafka::KafkaConsumer* consumer, 
                      RdKafka::ErrorCode err, 
                      std::vector<RdKafka::TopicPartition*>& partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            // 分配分区:手动指定从最早位置开始消费(可选)
            for (auto* part : partitions) {
                part->set_offset(RdKafka::TopicPartition::OFFSET_BEGINNING);
            }
            consumer->assign(partitions);
            std::cout << "分配分区: ";
            for (auto* part : partitions) {
                std::cout << part->topic() << "[" << part->partition() << "] ";
            }
            std::cout << std::endl;
        } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
            // 撤销分区
            consumer->unassign();
            std::cout << "撤销分区" << std::endl;
        }
    }
};

// 事件回调(如错误、日志)
class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::Event::EVENT_LOG:
                std::cout << "日志事件: " << event.str() << std::endl;
                break;
            default:
                break;
        }
    }
};

int main() {
    // 配置参数
    const std::string brokers = "localhost:9092";   // Kafka服务器地址
    const std::string group_id = "demo-consumer-group";  // 消费者组ID
    const std::vector<std::string> topics = {"demo-topic"};  // 订阅主题

    // 创建配置对象
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    std::string errstr;

    // 设置全局配置
    if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||
        conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK ||
        conf->set("rebalance_cb", new RebalanceCb(), errstr) != RdKafka::Conf::CONF_OK ||
        conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置失败: " << errstr << std::endl;
        return 1;
    }

    // 设置自动提交(可选,这里禁用自动提交,手动提交位移)
    conf->set("enable.auto.commit", "false", errstr);

    // 设置主题配置:从最早位置开始消费(若分区无已提交位移)
    topic_conf->set("auto.offset.reset", "earliest", errstr);
    conf->set("default_topic_conf", topic_conf, errstr);

    // 创建Consumer实例
    RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!consumer) {
        std::cerr << "创建Consumer失败: " << errstr << std::endl;
        return 1;
    }

    // 订阅主题
    if (consumer->subscribe(topics) != RdKafka::ERR_NO_ERROR) {
        std::cerr << "订阅主题失败" << std::endl;
        delete consumer;
        return 1;
    }

    std::cout << "开始消费主题 [" << topics[0] << "], 按 Ctrl+C 停止..." << std::endl;
    while (true) {
        // 拉取消息(超时1秒)
        RdKafka::Message* msg = consumer->consume(1000);
        if (!msg) continue;

        // 处理消息
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR:
                std::cout << "消费消息 [" << msg->topic_name() << "][" << msg->partition() 
                          << "]: 偏移量=" << msg->offset() << ", 内容=" 
                          << static_cast<char*>(msg->payload()) << std::endl;
                // 手动提交位移(每消费5条提交一次)
                static int count = 0;
                if (++count % 5 == 0) {
                    consumer->commitSync();
                    std::cout << "已同步提交位移" << std::endl;
                }
                break;
            case RdKafka::ERR__PARTITION_EOF:
                std::cout << "到达分区末尾,等待新消息..." << std::endl;
                break;
            case RdKafka::ERR__TIMED_OUT:
                // 超时无消息,继续轮询
                break;
            default:
                std::cerr << "消费错误: " << msg->errstr() << std::endl;
                break;
        }

        delete msg;
    }

    // 清理资源(实际不会执行到,需通过信号捕获退出)
    consumer->unsubscribe();
    consumer->close();
    delete consumer;
    delete conf;
    delete topic_conf;
    return 0;
}
三、编译与运行
1. 依赖安装
  • librdkafka:需安装开发库(包含头文件和动态库)。
    • Ubuntu/Debian:
      sudo apt-get install librdkafka-dev
      
    • CentOS/Fedora:
      sudo yum install librdkafka-devel
      
    • 源码编译(推荐最新稳定版):
      git clone https://github.com/edenhill/librdkafka.git
      cd librdkafka
      git checkout v2.2.0  # 选择稳定版本
      ./configure --prefix=/usr/local
      make -j4
      sudo make install
      sudo ldconfig  # 更新动态链接库缓存
      
2. 编译命令

使用g++编译生产者和消费者代码(需链接 librdkafka++ 和 librdkafka 库):

 
# 编译生产者
g++ kafka_producer.cpp -o kafka_producer -lrdkafka++ -lrdkafka -lpthread

# 编译消费者
g++ kafka_consumer.cpp -o kafka_consumer -lrdkafka++ -lrdkafka -lpthread
3. 运行步骤
  1. 启动 Kafka 集群(确保 Zookeeper 和 Broker 已运行)。

  2. 运行生产者(发送消息到demo-topic):

    ./kafka_producer
    

    输出示例:

    开始发送消息到主题 [demo-topic], 按 Ctrl+C 停止...
    已发送消息 #1: Demo消息 #1 @ Tue May 28 15:30:00 2024
    消息投递成功 [demo-topic][0]: 偏移量=0, 大小=35字节
    已发送消息 #2: Demo消息 #2 @ Tue May 28 15:30:02 2024
    消息投递成功 [demo-topic][1]: 偏移量=0, 大小=35字节
    
  3. 运行消费者(从demo-topic消费消息):

    ./kafka_consumer
    

    输出示例:

    开始消费主题 [demo-topic], 按 Ctrl+C 停止...
    分配分区: demo-topic[0] demo-topic[1] 
    消费消息 [demo-topic][0]: 偏移量=0, 内容=Demo消息 #1 @ Tue May 28 15:30:00 2024
    消费消息 [demo-topic][1]: 偏移量=0, 内容=Demo消息 #2 @ Tue May 28 15:30:02 2024
    已同步提交位移

 

0voice · GitHub 


网站公告

今日签到

点亮在社区的每一天
去签到