Kafka 数据实时同步到 Elasticsearch数据同步
- 核心配置文件 (需要创建新文件)
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3
topics=target_topic
key.ignore=true
connection.url=http://localhost:9200
type.name=_doc
value.converter=org.apache.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
schema.ignore=true
- 启动命令
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\es-sink.properties
关键优化参数说明:
batch.size=2000
:控制批量写入ES的文档数量max.in.flight.requests=5
:提升写入吞吐量flush.timeout.ms=30000
:设置刷新超时时间retry.backoff.ms=5000
:失败重试间隔max.retries=10
:最大重试次数
监控指标:
// 在Kafka Connect配置中添加监控
config.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, 10000);
config.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, 10000);
异常处理方案:
- 配置死信队列(DLQ)处理失败记录:
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-topic
errors.deadletterqueue.context.headers.enable=true
- 实现重试策略:
RetryUtil.executeWithRetry(
() -> client.bulk(request),
3,
Duration.ofSeconds(2),
Arrays.asList(ElasticsearchTimeoutException.class)
);
kafka数据转换和清洗到Elasticsearch
1. 核心配置文件(新增)
# 连接器基础配置
name=es-sink-connector # 连接器实例名称
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3 # 并行任务数(根据ES集群规模调整)
topics=user_behavior # 消费的Kafka主题名称
# Elasticsearch配置
connection.url=http://localhost:9200 # ES集群地址
type.name=_doc # 文档类型(ES7+固定值)
behavior.on.null.values=ignore # 空值处理策略
# 数据转换配置
value.converter=com.example.EsRecordConverter # 自定义转换器(对应下方Java代码)
value.converter.schemas.enable=false # 禁用schema验证
# 清洗配置SMT链
transforms=FilterInvalid,FormatField,AddTimestamp
transforms.FilterInvalid.type=org.apache.kafka.connect.transforms.Filter$Value
transforms.FilterInvalid.predicate=HasUserId # 过滤无用户ID的记录
transforms.FilterInvalid.negate=true
transforms.FormatField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.FormatField.renames=userId:user_id,ipAddr:client_ip # 字段重命名
transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddTimestamp.timestamp.field=event_time # 添加处理时间戳
# 异常处理配置
errors.tolerance=all # 错误容忍模式
errors.deadletterqueue.topic.name=es_sink_dlq # 死信队列名称
errors.deadletterqueue.context.headers.enable=true # 保留错误上下文
# 性能优化
batch.size=2000 # 批量写入条数
flush.timeout.ms=30000 # 刷新超时时间
max.in.flight.requests=5 # 并发请求数
2. 自定义转换器代码(新增)
/**
* 自定义数据转换器(对应配置中的value.converter)
*/
public class EsRecordConverter implements Converter {
private static final Logger LOG = LoggerFactory.getLogger(EsRecordConverter.class);
// 数据转换入口方法
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
try {
JSONObject json = new JSONObject(new String(value, "UTF-8"));
// 数据清洗:IP地址标准化
if(json.has("client_ip")) {
String ip = json.getString("client_ip");
json.put("client_ip", ip.replace(" ", "")); // 去除空格
}
// 添加清洗标记
json.put("clean_flag", calculateCleanFlag(json));
return new SchemaAndValue(null, json.toString());
} catch (Exception e) {
LOG.error("Data conversion failed", e);
throw new DataException("Conversion error", e);
}
}
// 生成数据清洗哈希值
private String calculateCleanFlag(JSONObject data) {
String rawData = data.toString();
return DigestUtils.sha256Hex(rawData);
}
}
3. Elasticsearch预处理管道(命令行执行)
# 创建数据预处理管道
curl -X PUT "localhost:9200/_ingest/pipeline/kafka_pipeline" -H 'Content-Type: application/json' -d'
{
"description": "Data final cleaning",
"processors": [
{
"remove": { # 移除调试字段
"field": ["debug_info", "temp_field"],
"ignore_missing": true
}
},
{
"date": { # 时间格式标准化
"field": "event_time",
"target_field": "@timestamp",
"formats": ["UNIX_MS"]
}
}
]
}'
4. 启动命令(Windows环境)
:: 启动独立模式连接器
.\bin\windows\connect-standalone.bat ^ # 主启动脚本
.\config\connect-standalone.properties ^ # 通用配置
.\config\es-sink.properties # 当前连接器配置
:: 参数说明:
:: 1. connect-standalone.properties - Kafka Connect基础配置
:: 2. es-sink.properties - 当前连接器专属配置
- 数据验证命令
# 查看ES中的清洗后数据
curl -X GET "localhost:9200/user_behavior/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"term": {
"clean_flag": "d4e5f6..." # 替换实际哈希值
}
}
}'
# 检查死信队列(需配置kafka-cli)
.\bin\windows\kafka-console-consumer.bat ^
--bootstrap-server localhost:9092 ^
--topic es_sink_dlq ^
--from-beginning
方案特点:
三层清洗架构:
- SMT层:基础格式处理
- 转换器层:业务逻辑清洗
- ES管道层:存储前终检
追踪机制:
- 通过clean_flag字段实现数据溯源
- 死信队列保留原始错误数据
性能平衡:
- 批量大小与内存占用的最佳实践
- 重试策略:10次指数退避重试(max.retries=10)