Kafka Connect notebook

发布于:2022-12-31 ⋅ 阅读:(441) ⋅ 点赞:(0)

使用kafka connect confluent 组件传输多种格式的文件:

实现效果:启用kafka connect instance 后,在配置中设置的input目录下匹配正则表达式的文件将传输至finished文件目录:

官方文档链接:

Spool Dir Source Connector for Confluent Platform | Confluent Documentation

1. CSV格式文件:标准的CSV,使用逗号分隔符(44),kafka会读取从首行读取数据至kafka的指定topic中。

2. Binary文件。需要FromXML转码(unfinished)

3. CSV格式:配置中指定Schema, 为第一种类型的衍生,指定同一个connect.class

4. 外部日志文件:需要指定日志文件专用格式(unfinished)

5. Json格式。

6.Fix文件。

7. Json格式:Schemaless.

避雷指南:

1. connect instance 被创建时,命名不可于存在的重复。

2. 实例的命名会在kafka记录中保存命名读取记录,kafka会记录指定此名字的instance读取文件行数,删除instance后重新创建同名实例不可行。

3. 分隔符:配置文件中分隔符的数字为ASCII码,TSV文件分隔符为Tab(9),CSV文件分隔符为逗号(44)。

相关命令行:

# create a kafka connect instance

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{

    "name": "kafka_file_trans_csv",

    "config": {

            "tasks.max": "1",

            "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",

            "input.path": "/path/to/data",

            "input.file.pattern": "csv-spooldir-source.csv",

            "error.path": "/path/to/error",

            "finished.path": "/path/to/finished",

            "halt.on.error": "false",

            "topic": "spooldir-testing-topic",

            "csv.first.row.as.header": "true",

            "schema.generation.enabled": "true"

        }

    }'

#delete connector instance

curl -XDELETE http://localhost:8083/connectors/kafka_file_trans_csv

# list installed connector plugins

curl -s http://localhost:8083/connector-plugins

# list connector instances

curl -s -XGET "http://localhost:8083/connectors/"

以上均可用Postman 导入执行

# kafka show topic data

bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.100:9092, localhost:9092 --topic file-trans-csv-schema --from-beginning

# kafka create topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic file-trans-json-schemaless --replication-factor 3 --config retention.ms=86400000 --config retention.bytes=1048576

# kafka list topics

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal

# kafka delete topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic file-trans-csv

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到