使用kafka connect confluent 组件传输多种格式的文件:
实现效果:启用kafka connect instance 后,在配置中设置的input目录下匹配正则表达式的文件将传输至finished文件目录:
官方文档链接:
Spool Dir Source Connector for Confluent Platform | Confluent Documentation
1. CSV格式文件:标准的CSV,使用逗号分隔符(44),kafka会读取从首行读取数据至kafka的指定topic中。
2. Binary文件。需要FromXML转码(unfinished)
3. CSV格式:配置中指定Schema, 为第一种类型的衍生,指定同一个connect.class
4. 外部日志文件:需要指定日志文件专用格式(unfinished)
5. Json格式。
6.Fix文件。
7. Json格式:Schemaless.
避雷指南:
1. connect instance 被创建时,命名不可于存在的重复。
2. 实例的命名会在kafka记录中保存命名读取记录,kafka会记录指定此名字的instance读取文件行数,删除instance后重新创建同名实例不可行。
3. 分隔符:配置文件中分隔符的数字为ASCII码,TSV文件分隔符为Tab(9),CSV文件分隔符为逗号(44)。
相关命令行:
# create a kafka connect instance
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "kafka_file_trans_csv",
"config": {
"tasks.max": "1",
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.path": "/path/to/data",
"input.file.pattern": "csv-spooldir-source.csv",
"error.path": "/path/to/error",
"finished.path": "/path/to/finished",
"halt.on.error": "false",
"topic": "spooldir-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "true"
}
}'
#delete connector instance
curl -XDELETE http://localhost:8083/connectors/kafka_file_trans_csv
# list installed connector plugins
curl -s http://localhost:8083/connector-plugins
# list connector instances
curl -s -XGET "http://localhost:8083/connectors/"
以上均可用Postman 导入执行
# kafka show topic data
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.100:9092, localhost:9092 --topic file-trans-csv-schema --from-beginning
# kafka create topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic file-trans-json-schemaless --replication-factor 3 --config retention.ms=86400000 --config retention.bytes=1048576
# kafka list topics
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal
# kafka delete topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic file-trans-csv