如何保证 Kafka 数据实时同步到 Elasticsearch?

发布于:2025-05-25 ⋅ 阅读:(23) ⋅ 点赞:(0)

Kafka 数据实时同步到 Elasticsearch数据同步

  1. 核心配置文件 (需要创建新文件)
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3
topics=target_topic
key.ignore=true
connection.url=http://localhost:9200
type.name=_doc
value.converter=org.apache.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
schema.ignore=true
  1. 启动命令
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\es-sink.properties

关键优化参数说明

  • batch.size=2000:控制批量写入ES的文档数量
  • max.in.flight.requests=5:提升写入吞吐量
  • flush.timeout.ms=30000:设置刷新超时时间
  • retry.backoff.ms=5000:失败重试间隔
  • max.retries=10:最大重试次数

监控指标

// 在Kafka Connect配置中添加监控
config.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, 10000);
config.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, 10000);

异常处理方案

  1. 配置死信队列(DLQ)处理失败记录:
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-topic
errors.deadletterqueue.context.headers.enable=true
  1. 实现重试策略:
RetryUtil.executeWithRetry(
    () -> client.bulk(request), 
    3, 
    Duration.ofSeconds(2),
    Arrays.asList(ElasticsearchTimeoutException.class)
);

kafka数据转换和清洗到Elasticsearch

1. 核心配置文件(新增)

# 连接器基础配置
name=es-sink-connector  # 连接器实例名称
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3  # 并行任务数(根据ES集群规模调整)
topics=user_behavior  # 消费的Kafka主题名称

# Elasticsearch配置
connection.url=http://localhost:9200  # ES集群地址
type.name=_doc  # 文档类型(ES7+固定值)
behavior.on.null.values=ignore  # 空值处理策略

# 数据转换配置
value.converter=com.example.EsRecordConverter  # 自定义转换器(对应下方Java代码)
value.converter.schemas.enable=false  # 禁用schema验证

# 清洗配置SMT链
transforms=FilterInvalid,FormatField,AddTimestamp
transforms.FilterInvalid.type=org.apache.kafka.connect.transforms.Filter$Value
transforms.FilterInvalid.predicate=HasUserId  # 过滤无用户ID的记录
transforms.FilterInvalid.negate=true

transforms.FormatField.type=org.apache.kafka.connect.transforms.ReplaceField$Value 
transforms.FormatField.renames=userId:user_id,ipAddr:client_ip  # 字段重命名

transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddTimestamp.timestamp.field=event_time  # 添加处理时间戳

# 异常处理配置
errors.tolerance=all  # 错误容忍模式
errors.deadletterqueue.topic.name=es_sink_dlq  # 死信队列名称
errors.deadletterqueue.context.headers.enable=true  # 保留错误上下文

# 性能优化
batch.size=2000  # 批量写入条数
flush.timeout.ms=30000  # 刷新超时时间
max.in.flight.requests=5  # 并发请求数

2. 自定义转换器代码(新增)

/**
 * 自定义数据转换器(对应配置中的value.converter)
 */
public class EsRecordConverter implements Converter {
    private static final Logger LOG = LoggerFactory.getLogger(EsRecordConverter.class);

    // 数据转换入口方法
    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        try {
            JSONObject json = new JSONObject(new String(value, "UTF-8"));
            
            // 数据清洗:IP地址标准化
            if(json.has("client_ip")) {
                String ip = json.getString("client_ip");
                json.put("client_ip", ip.replace(" ", ""));  // 去除空格
            }
            
            // 添加清洗标记
            json.put("clean_flag", calculateCleanFlag(json));
            
            return new SchemaAndValue(null, json.toString());
        } catch (Exception e) {
            LOG.error("Data conversion failed", e);
            throw new DataException("Conversion error", e);
        }
    }

    // 生成数据清洗哈希值
    private String calculateCleanFlag(JSONObject data) {
        String rawData = data.toString();
        return DigestUtils.sha256Hex(rawData);
    }
}

3. Elasticsearch预处理管道(命令行执行)

# 创建数据预处理管道
curl -X PUT "localhost:9200/_ingest/pipeline/kafka_pipeline" -H 'Content-Type: application/json' -d'
{
  "description": "Data final cleaning",
  "processors": [
    {
      "remove": {  # 移除调试字段
        "field": ["debug_info", "temp_field"],
        "ignore_missing": true
      }
    },
    {
      "date": {  # 时间格式标准化
        "field": "event_time",
        "target_field": "@timestamp",
        "formats": ["UNIX_MS"]
      }
    }
  ]
}'

4. 启动命令(Windows环境)

:: 启动独立模式连接器
.\bin\windows\connect-standalone.bat ^  # 主启动脚本
.\config\connect-standalone.properties ^  # 通用配置
.\config\es-sink.properties  # 当前连接器配置

:: 参数说明:
:: 1. connect-standalone.properties - Kafka Connect基础配置
:: 2. es-sink.properties - 当前连接器专属配置
  1. 数据验证命令
# 查看ES中的清洗后数据
curl -X GET "localhost:9200/user_behavior/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "term": {
      "clean_flag": "d4e5f6..."  # 替换实际哈希值
    }
  }
}'

# 检查死信队列(需配置kafka-cli)
.\bin\windows\kafka-console-consumer.bat ^
--bootstrap-server localhost:9092 ^
--topic es_sink_dlq ^
--from-beginning

方案特点:

  1. 三层清洗架构

    • SMT层:基础格式处理
    • 转换器层:业务逻辑清洗
    • ES管道层:存储前终检
  2. 追踪机制

    • 通过clean_flag字段实现数据溯源
    • 死信队列保留原始错误数据
  3. 性能平衡

    • 批量大小与内存占用的最佳实践
    • 重试策略:10次指数退避重试(max.retries=10)

网站公告

今日签到

点亮在社区的每一天
去签到