Kafka Connect实战:从环境搭建到全流程操作

发布于:2025-06-19 ⋅ 阅读:(14) ⋅ 点赞:(0)

引言

在深入了解Kafka Connect的架构与原理后,是时候将理论转化为实践。本篇博客将以实际操作场景为导向,带你完成Kafka Connect从环境搭建、内置连接器配置到任务管理与监控的全流程,助力你快速上手并落地数据集成项目。

一、环境搭建与基础配置

1.1 软件安装与版本选择

  • Kafka安装:从Apache Kafka官网下载最新稳定版本(如kafka_2.13-3.5.0),解压后进入安装目录。确保系统已安装Java 8或更高版本,通过java -version命令检查。
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
  • Kafka Connect配置:Kafka Connect包含在Kafka安装包中,无需额外下载。主要配置文件位于config目录下,核心配置文件包括connect-standalone.properties(单机模式)和connect-distributed.properties(分布式模式)。生产环境建议使用分布式模式,开发测试可选择单机模式快速验证。

1.2 配置文件详解

单机模式配置(connect-standalone.properties
# Kafka集群地址
bootstrap.servers=localhost:9092
# 配置文件路径,可指定多个,逗号分隔
plugin.path=/path/to/kafka/plugins
# 数据转换格式,默认JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 偏移量存储方式,单机模式存于本地文件
offset.storage.file.filename=/tmp/connect.offsets
# 定期刷新偏移量存储的时间间隔
offset.flush.interval.ms=10000
分布式模式配置(connect-distributed.properties
# Kafka集群地址
bootstrap.servers=localhost:9092
# 配置文件路径
plugin.path=/path/to/kafka/plugins
# 数据转换格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 组ID,用于标识Connect集群
group.id=connect-cluster-1
# 偏移量存储主题
offset.storage.topic=__connect_offsets
# 配置存储主题
config.storage.topic=__connect_configs
# 状态存储主题
status.storage.topic=__connect_status

1.3 启动Kafka与Connect服务

  1. 启动Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 启动Kafka Broker:
bin/kafka-server-start.sh config/server.properties
  1. 单机模式启动Connect
bin/connect-standalone.sh config/connect-standalone.properties config/source-connector-config.json config/sink-connector-config.json
  1. 分布式模式启动Connect:在每个Worker节点执行以下命令,确保connect-distributed.properties配置一致。
bin/connect-distributed.sh config/connect-distributed.properties

二、内置连接器实战应用

2.1 File Connector:文件数据同步

File Source Connector

将本地文件数据读取并写入Kafka主题。

  1. 创建配置文件file-source-config.json
{
  "name": "file-source-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "1",
    "file.path": "/path/to/input.txt",
    "file.reader.class": "org.apache.kafka.connect.file.reader.SimpleLineReader",
    "topic": "file-data-topic",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}
  1. 启动连接器(单机模式):
bin/connect-standalone.sh config/connect-standalone.properties file-source-config.json
  1. 验证:向input.txt写入数据,使用Kafka消费者查看file-data-topic主题数据。
bin/kafka-console-consumer.sh --topic file-data-topic --bootstrap-server localhost:9092 --from-beginning
File Sink Connector

将Kafka主题数据写入本地文件。

  1. 创建配置文件file-sink-config.json
{
  "name": "file-sink-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "tasks.max": "1",
    "file.path": "/path/to/output.txt",
    "topics": "file-data-topic",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}
  1. 启动连接器,数据将从file-data-topic主题写入output.txt文件。

2.2 JDBC Connector:数据库数据同步

以MySQL数据库为例,实现数据增量同步。

  1. 准备工作
    • 下载MySQL JDBC驱动(如mysql-connector-java-8.0.26.jar),放置在config/plugin.path指定目录下。
    • 创建测试表:
CREATE TABLE users (
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(100),
  age INT,
  email VARCHAR(200)
);
JDBC Source Connector
  1. 创建配置文件jdbc-source-config.json
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydb?user=root&password=123456",
    "table.whitelist": "users",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "jdbc-",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}
  1. 启动连接器,新插入的users表数据将同步到以jdbc-为前缀的Kafka主题。
JDBC Sink Connector

将Kafka主题数据写入MySQL表。

  1. 创建配置文件jdbc-sink-config.json
{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydb?user=root&password=123456",
    "topics": "jdbc-users",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}
  1. 启动连接器,jdbc-users主题数据将写入users表。

2.3 REST Connector:API数据交互

通过REST API实现数据与Kafka的双向传输。

  1. 创建REST Source Connector
{
  "name": "rest-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.rest.RestSourceConnector",
    "tasks.max": "1",
    "connect.rest.source.uri": "https://api.example.com/data",
    "connect.rest.method.name": "GET",
    "topic.prefix": "rest-",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}
  1. 创建REST Sink Connector
{
  "name": "rest-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.rest.RestSinkConnector",
    "tasks.max": "1",
    "topics": "rest-data-topic",
    "connect.rest.sink.url": "https://api.example.com/submit",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

三、任务管理与监控

3.1 任务生命周期管理

  • 查看任务状态:使用REST API获取连接器和任务状态。
curl -X GET http://localhost:8083/connectors
curl -X GET http://localhost:8083/connectors/jdbc-source-connector/status
  • 暂停与重启任务
# 暂停连接器
curl -X PUT -H "Content-Type: application/json" --data '{"pause": true}' http://localhost:8083/connectors/jdbc-source-connector/pause
# 重启连接器
curl -X PUT -H "Content-Type: application/json" --data '{"resume": true}' http://localhost:8083/connectors/jdbc-source-connector/resume
  • 删除任务
curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector

3.2 监控指标与日志分析

  • JMX监控:通过JMX获取Connect运行指标,如kafka.connect:type=WorkerSourceTaskManager,name=task-0。可使用jconsole或Prometheus + Grafana搭建可视化监控面板。
  • 日志分析:Kafka Connect日志位于logs目录下,通过分析connect.log排查任务故障,例如连接器配置错误、数据转换异常等问题。

网站公告

今日签到

点亮在社区的每一天
去签到