Doris 消费kafka消息

发布于:2025-09-08 ⋅ 阅读:(15) ⋅ 点赞:(0)

Doris 通过 Routine Load 功能来消费 Kafka 消息,这是一种自动化的、持续的数据导入方式。

核心概念:Routine Load

  • 持续消费:Doris 会作为一个消费者,持续地从 Kafka Topic 的一个或多个 Partition 中拉取数据。

  • ** exactly-once 语义**:Doris 能够保证数据不丢不重(在大多数正常场景下)。

  • 自动管理:你只需要定义一个例行导入作业,Doris 会自动管理消费偏移量 (offset)、故障转移和负载均衡。

  • 数据类型支持:支持导入 CSV、JSON 等格式的文本消息。


前提条件

  1. Doris 集群:已安装并正常运行。

  2. Kafka 集群:已安装并正常运行,并且有 Topic 和数据。

  3. 目标表:在 Doris 中创建好要导入数据的表。


步骤 1:在 Doris 中创建目标表

假设 Kafka 中的消息是用户行为日志,格式为 CSV:user_id, item_id, behavior, timestamp

sql

-- 在Doris中创建一个示例表
CREATE DATABASE IF NOT EXISTS example_db;
USE example_db;

CREATE TABLE IF NOT EXISTS user_behavior (
    user_id INT,
    item_id INT,
    behavior VARCHAR(20),
    -- 假设Kafka中的时间戳是秒级或毫秒级,这里用BIGINT接收,后续可转为Doris的DATETIME
    event_time BIGINT,
    -- 生成一个计算列,将event_time转换为易读的日期时间格式
    dt AS FROM_UNIXTIME(event_time, '%Y-%m-%d %H:%i:%s'),
    -- 根据计算列dt生成一个分区列,按天分区
    date_label AS DATE(FROM_UNIXTIME(event_time))
) 
ENGINE=OLAP
DUPLICATE KEY(user_id, item_id, event_time) -- 明细模型
PARTITION BY RANGE(date_label) (
    PARTITION p202310 VALUES [('2023-10-01'), ('2023-11-01'))
    -- 可以动态添加分区,或使用动态分区特性
) 
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES (
    "replication_num" = "3"
);

步骤 2:创建 Routine Load 作业

这是最核心的一步。我们将创建一个作业,将 Kafka 的 user_behavior_topic 中的数据持续导入到刚创建的 user_behavior 表中。

示例 1:导入 CSV 格式数据

假设 Kafka 消息是简单的 CSV 格式:123,456,pv,1698321234

sql

CREATE ROUTINE LOAD example_db.kafka_csv_load ON user_behavior
COLUMNS TERMINATED BY ",", -- 指定CSV列分隔符
COLUMNS (user_id, item_id, behavior, event_time) -- 定义Kafka消息中的列顺序,并映射到表字段
-- ROUTINE LOAD 高级属性
PROPERTIES
(
    "desired_concurrent_number" = "3", -- 期望的并发任务数,通常小于等于Kafka分区数
    "max_error_number" = "1000",       -- 在达到此错误行数之前,作业不会暂停
    "max_batch_interval" = "20",       -- 每个子任务最大的执行间隔(秒)
    "max_batch_rows" = "200000",       -- 每个子任务最多消费的行数
    "max_batch_size" = "104857600"     -- 每个子任务最多消费的数据量(100MB)
)
FROM KAFKA
(
    "kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092", -- Kafka Broker列表
    "kafka_topic" = "user_behavior_topic", -- Topic名称
    "property.group.id" = "doris-routine-load-group", -- 消费组ID,用于偏移量管理
    "property.security.protocol" = "SASL_PLAINTEXT", -- 如果Kafka有安全认证,需要配置
    "property.sasl.mechanism" = "PLAIN",
    "property.sasl.username" = "your_username",
    "property.sasl.password" = "your_password"
);
示例 2:导入 JSON 格式数据(更常见)

假设 Kafka 消息是 JSON 格式:

json

{"user_id": 123, "item_id": 456, "behavior_type": "pv", "timestamp": 1698321234}

sql

CREATE ROUTINE LOAD example_db.kafka_json_load ON user_behavior
-- 使用jsonpaths来指定JSON字段的路径。'$'表示根目录
-- 你也可以使用json_root来指定根节点,如果JSON有外层嵌套的话
-- "json_root" = "$.data",
COLUMNS(
    user_id = $.user_id,
    item_id = $.item_id,
    behavior = $.behavior_type, -- 将JSON中的behavior_type字段映射到表的behavior列
    event_time = $.timestamp
)
PROPERTIES
(
    "desired_concurrent_number" = "3",
    "max_error_number" = "1000",
    "strip_outer_array" = "true" -- 如果JSON消息是数组格式,如 [{"..."}, {"..."}],需要设置此属性为true以剥除外层数组
)
FROM KAFKA
(
    "kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092",
    "kafka_topic" = "user_behavior_topic_json",
    "property.group.id" = "doris-routine-load-group-json",
    -- 指定格式为json
    "format" = "json"
);

关键参数说明:

  • COLUMNS定义了 Kafka 消息中的字段与 Doris 表字段的映射关系。你可以在这里进行简单的计算和转换(如字段重命名)。

  • desired_concurrent_number建议设置成与 Kafka Topic 的分区数一致,以实现最大并行度。

  • max_error_number:允许的最大错误行数。超过此值,作业会自动暂停,需要手动检查原因并恢复。

  • property.group.id:Doris 会用这个消费组 ID 来存储消费偏移量。


步骤 3:监控和管理 Routine Load 作业

作业创建后,Doris 会自动开始消费数据。你需要监控其状态。

1. 查看所有 Routine Load 作业

sql

SHOW ROUTINE LOAD FOR example_db;
SHOW ALL ROUTINE LOAD \G; -- \G 用于格式化输出,更易读
2. 查看特定作业的详细状态

sql

SHOW ROUTINE LOAD FOR example_db.kafka_json_load \G;

关注以下字段:

  • StateRUNNING(运行中)、PAUSED(已暂停,需要查看 ReasonOfStateChanged)、NEED_SCHEDULE(等待调度)。

  • Progress:

    • committedOffset: 已提交的 Kafka offset。

    • visibleOffset: 已导入并对查询可见的 Kafka offset。

  • ReasonOfStateChanged: 如果状态是 PAUSED,这里会显示暂停原因(例如:Too many filtered rows 过滤行太多)。

3. 查看作业的错误数据

如果 max_error_number 设置得较大,作业不会暂停,但错误数据会被记录。

sql

SHOW ROUTINE LOAD ERROR WHERE JobName = 'kafka_json_load';
4. 控制作业
  • 暂停作业

    sql

    PAUSE ROUTINE LOAD FOR example_db.kafka_json_load;
  • 恢复已暂停的作业

    sql

    RESUME ROUTINE LOAD FOR example_db.kafka_json_load;
  • 停止作业(不可恢复):

    sql

    STOP ROUTINE LOAD FOR example_db.kafka_json_load;

常见问题与调优

  1. 导入速度跟不上

    • 检查 SHOW BACKENDS\G 查看集群负载。

    • 适当增加 desired_concurrent_number(但不要超过 Kafka 分区数)。

    • 调整 max_batch_interval 和 max_batch_size,让每个批次处理更多数据。

  2. 作业频繁 PAUSED

    • ReasonOfStateChanged: There are 1000 error rows in ...: 数据格式错误。检查 Kafka 消息格式是否与 COLUMNS 中定义的匹配。可以先用 max_error_number 容忍一些错误,或者修复数据源。

    • ReasonOfStateChanged: ...: 根据具体原因排查,如网络问题、表不存在等。

  3. 数据延迟

    • 使用 SHOW ROUTINE LOAD ...\G 查看 Lag 字段,它显示了尚未消费的 Kafka 消息数。

    • 如果 Lag 持续增长,说明消费速度跟不上生产速度,需要参考第 1 点进行调优。

总结

使用 Doris Routine Load 消费 Kafka 数据的流程非常清晰:

  1. 准备:创建 Doris 目标表。

  2. 创建作业:使用 CREATE ROUTINE LOAD 语句,明确定义 Kafka 源信息、数据格式和字段映射。

  3. 监控:使用 SHOW ROUTINE LOAD 和 SHOW ROUTINE LOAD ERROR 命令监控作业状态和错误。

  4. 管理:根据监控结果,使用 PAUSE/RESUME/STOP 对作业进行控制。

这种方式为构建实时数仓提供了稳定高效的数据管道。


网站公告

今日签到

点亮在社区的每一天
去签到