Logstash数据迁移之es-to-kafka.conf详细配置

发布于:2025-08-29 ⋅ 阅读:(16) ⋅ 点赞:(0)

在 Logstash 中配置从 Elasticsearch (ES) 读取数据并输出到 Kafka 是一个相对高级但强大的用法,通常用于数据迁移、重新索引、或构建新的数据管道。

下面我将详细解释配置文件的各个部分和细节。

核心配置文件结构 (es-to-kafka.conf)

一个完整的配置文件主要包含三个部分:input, filter (可选), 和 output

input {
  elasticsearch {
    # 输入配置:告诉Logstash如何从ES读取数据
  }
}

filter {
  # 过滤配置(可选):对从ES读取的数据进行加工、清洗、转换
}

output {
  kafka {
    # 输出配置:告诉Logstash如何将数据写入Kafka
  }
}

1. Input (Elasticsearch) 插件配置详解

用以定义数据来源

input {
  elasticsearch {
    # 【必需】ES集群的地址列表
    hosts => ["http://localhost:9200", "http://node2:9200"] 

    # 【必需】要查询的索引。支持通配符(如`my-index-*`)和逗号分隔。
    index => "source-index-*" 

    # 【强烈建议】查询语句。默认是 `{ "query": { "match_all": {} } }`,即查询所有。
    # 你可以根据需要添加时间范围过滤等,避免全量同步。
    query => '{
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-1h/d",  # 例如:只拉取过去1小时的数据
            "lte": "now/d"
          }
        }
      }
    }'

    # 【必需】分页大小。控制一次从ES拉取多少条数据。根据文档大小和JVM堆内存调整。
    size => 1000 

    # 【必需】滚动API的保持时间。每次滚动查询的上下文保持时间,应大于处理一批数据所需的时间。
    scroll => "5m" 

    # 【可选】认证信息。如果ES有安全认证
    user => "your_elasticsearch_user"
    password => "your_password"

    # 【可选】SSL/TLS配置(如果ES开启了HTTPS)
    ssl => true
    cacert => "/path/to/your/ca.crt" # 或使用 `ssl_certificate_verification => false` (不推荐,仅测试用)

    # 【可选】调度计划。默认只运行一次。
    # 如果希望持续从ES拉取新数据,可以使用cron表达式,但这通常不是好主意,容易重复消费。
    # schedule => "* * * * *" # 每分钟运行一次(谨慎使用!)

    # 【可选】用于排序的字段。建议使用唯一且递增的字段,如`@timestamp`或自增ID,与`docinfo`配合实现断点续传。
    sort => "@timestamp:asc" 
    
    # 【高级可选】启用文档元数据获取。可以将ES文档的_id, _index等信息也添加到Logstash event中。
    docinfo => true 
    docinfo_target => "[@metadata][elasticsearch]" 
    docinfo_fields => ["_index", "_type", "_id"] 

    # 【高级可选】设置请求重试次数
    retry_max_attempts => 3
  }
}

2. Filter 插件配置(可选)

从 ES 获取的数据已经是 JSON 格式,通常不需要复杂解析,但常用来进行一些调整。

filter {
  # 1. 移除不必要的字段。例如,从docinfo中获取的元数据可能不需要发送到Kafka。
  mutate {
    remove_field => ["@version", "@timestamp", "[@metadata][elasticsearch]"]
  }

  # 2. 添加Kafka消息所需的Key或Header(在output中可以使用)
  # 例如,使用文档的_id作为Kafka消息的Key,保证同一文档始终进入同一分区。
  mutate {
    add_field => {
      "[@metadata][kafka_key]" => "%{[@metadata][elasticsearch][_id]}"
    }
  }
  mutate {
      rename => {
          "旧的字段名" => "新的字段名"
          # 可以同时重命名多个字段
          "另一个旧字段" => "另一个新字段"
       }
  }
  # 3. 转换数据格式或内容
  # json {
  #   source => "message" # 如果ES里的某个字段是JSON字符串,可以在这里解析它
  # }
  # date {
  #   match => ["timestamp", "UNIX_MS"] # 格式化时间字段
  #   target => "timestamp"
  # }
}

重要提示@metadata 字段中的内容不会在输出中显示,非常适合用来做流程中的临时变量(比如上面的 kafka_key)。

3. Output (Kafka) 插件配置详解

用以定义数据的目的地。

output {
  kafka {
    # 【必需】Kafka集群的broker列表
    bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"

    # 【必需】目标Topic的名称
    topic_id => "target-topic-name"

    # 【可选】消息的Key。常用于分区选择。这里使用filter阶段设置的metadata。
    # 如果没有key,Kafka会使用轮询策略分配分区。
    codec => "json" # 非常重要!指定消息的序列化格式为JSON。

    # 【可选】消息格式序列化器。`json` codec已经帮我们处理了,所以不需要单独设置。
    # value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

    # 【可选】压缩算法,可以有效减少网络传输量和存储空间。
    compression_type => "snappy" # 可选 "gzip", "lz4", "snappy"

    # 【可选】生产者ACK机制,关系到数据可靠性。
    acks => "1" # "0"(不等待), "1"(等待Leader确认), "all"(等待所有ISR确认)

    # 【可选】批量发送设置,提高吞吐量。
    batch_size => 16384
    linger_ms => 1000 # 发送前等待更多消息加入batch的时间(毫秒)

    # 【可选】SSL/SASL认证(如果Kafka集群需要)
    ssl_truststore_location => "/path/to/kafka.client.truststore.jks"
    ssl_truststore_password => "password"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_SSL"

    # 【可选】遇到错误(如Topic不存在)时重试次数
    retries => 3
  }

  # 强烈建议添加一个备用输出(如stdout),用于调试和查看错误信息。
  stdout {
    codec => rubydebug
  }
}

完整配置示例

假设我们将 app-logs-* 索引中过去 15 分钟的数据,迁移到名为 logstash-migration-topic 的 Kafka Topic 中,并使用文档 ID 作为 Kafka Message Key。

input {
  elasticsearch {
    hosts => ["http://es-node1:9200"]
    index => "app-logs-*"
    query => '{
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-15m",
            "lte": "now"
          }
        }
      }
    }'
    size => 500
    scroll => "5m"
    docinfo => true
    docinfo_target => "[@metadata][es_doc]"
    schedule => "*/5 * * * *" # 每5分钟运行一次(谨慎!可能导致数据重复)
  }
}

filter {
  # 使用文档的_id作为Kafka消息的Key
  mutate {
    add_field => {
      "[@metadata][kafka_key]" => "%{[@metadata][es_doc][_id]}"
    }
  }
  # 移除一些不必要的系统字段
  mutate {
    remove_field => ["@version", "@timestamp", "[@metadata][es_doc]"]
  }
}

output {
  kafka {
    bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"
    topic_id => "logstash-migration-topic"
    codec => "json"
    compression_type => "lz4"
    acks => "all" # 追求高可靠性
  }

  stdout {}
}

运行命令

将上述配置保存为 es-to-kafka.conf 文件,然后使用以下命令运行 Logstash:

bin/logstash -f /path/to/your/es-to-kafka.conf --config.test_and_exit # 测试配置文件语法
bin/logstash -f /path/to/your/es-to-kafka.conf # 启动运行

重要注意事项

  1. 性能与资源:这种操作对 ES 和 Logstash 都是 资源密集型 的。务必调整 size 参数,监控 JVM 内存和 CPU 使用率。
  2. 重复数据:默认情况下,每次运行 input 都会重新查询。使用 schedule 会导致数据重复。要实现增量迁移,必须在 query 中使用严格的时间范围或自增 ID,并记录上次获取的位置。
  3. 数据类型:ES 输入插件会将整个文档作为一个 Logstash event,message 字段就是原始的 JSON 文档。使用 json codec 输出可以保持其结构。
  4. 错误处理:网络中断、Kafka Topic 不存在等都可能导致任务失败。建议在测试环境充分测试,并配置好 retriesretry_max_attempts
  5. 版本兼容性:确保你的 Logstash 版本与 ES 和 Kafka 集群版本兼容。插件可能因版本不同而参数略有差异。

网站公告

今日签到

点亮在社区的每一天
去签到