Tomcat+Filebeat+Kafka+Logstash+ES+Kibana日志流实现

发布于:2022-12-07 ⋅ 阅读:(538) ⋅ 点赞:(0)

整个数据流示例如图:

1,Tomcat,Filebeat,ES,Kibana安装

这些软件的安装配置可以参考之前的文章:

Filebeat,ES,Kibana:

Tomcat+Filebeat+logstash+ES+Kibana日志监控配置(待续)_yangkei的博客-CSDN博客

Lostash安装请参考:Logstash8.4在Linux系统上的安装以及配置Tomcat日志(ELK安装part2)_yangkei的博客-CSDN博客

Kafka集群安装:

Kafka3.2.3基于Linux的集群安装(待续)_yangkei的博客-CSDN博客

2, 配置Filebeat

vi /app/filebeat/filebeat.kafka.yml

filebeat.prospectors:
  - type: log
    enabled: true
    paths:
        - /app/tomcat/logs/tomcat_access_json.2022-09-23.log
output.kafka:
    enabled: true
    hosts: ["192.168.88.5:9092","192.168.88.7:9092","192.168.88.9:9092"]
    topic: tomcatlogtest
type     输入类型。设置为log,表示输入源为日志。
enabled     设置配置是否生效:true:生效 false:不生效
paths     需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。    
topic     日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。
version  Kafka的版本,可在消息队列Kafka的实例详情页面获取。
不配置此参数会报错。
由于不同版本的Filebeat支持的Kafka版本不同,例如8.2及以上版本的Filebeat支持的Kafka版本为2.2.0,因此version需要设置为Filebeat支持的Kafka版本,否则会出现类似报错:Exiting: error initializing publisher: unknown/unsupported kafka vesion '2.2.0' accessing 'output.kafka.version' (source:'filebeat.kafka.yml'),详细信息请参见version。

修改权限:

chmod go-w /app/filebeat/filebeat.kafka.yml

3, 配置Kafka

创建主题tomcatlogtest
[root@goya2 kafka]# /app/kafka/bin/kafka-topics.sh --create --topic tomcatlogtest --partitions 1 --replication-factor 3 --bootstrap-server goya1:9092,goya2:9092,goya3:9092
Created topic tomcatlogtest.

启动Filebeat
cd /app/filebeat
./filebeat -e -c filebeat.kafka.yml

4,配置Logstash

vi /app/logstash/config/tomcatlogtest.conf

input {
  #kafka输入源配置
  kafka {
    #kafka集群地址
    bootstrap_servers => ["192.168.88.5:9092,192.168.88.7:9092,192.168.88.9:9092"]
    #从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致
    topics => ["tomcatlogtest"]     #这是kafka中的消费组者ID,默认值是“logstash”。kafka将消息发到每个消费者组中,同一个组中的消费者收到的数据不重复。例如有两个消费者组G1、G2,G1中有成员A、B,G2中有成员C、D。kafka从输入中收到了10条消息,会将这10条消息同时发送给G1和G2,A和B各会收到这10条消息中的一部分,他们收到消息的并集就是这10条消息,C和D同理。
    group_id => "filebeat-logstash"
    #kafka消费者组中每个消费者的ID,默认值是“logstash”
    #client_id => "logstashnode1"       #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。
    consumer_threads => 1  #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
    codec => json
  }
}

#过滤
filter {

}

#输出配置,这里表示输出到文件
output {
  elasticsearch {
    hosts => ["http://192.168.88.5:9200","http://192.168.88.7:9200","http://192.168.88.9:9200"]
    index => "kafka‐%{+YYYY.MM.dd}"
 }
}

启动Logstash

[es@goya1 config]$ logstash -f /app/logstash/config/tomcatlogtest.conf
Using bundled JDK: /app/logstash/jdk
Sending Logstash logs to /app/logstash/logs which is now configured via log4j2.properties
[2022-09-26T17:26:33,696][INFO ][logstash.runner          ] Log4j configuration path used is: /app/logstash/config/log4j2.properties
[2022-09-26T17:26:33,725][WARN ][logstash.runner          ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2022-09-26T17:26:33,725][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.4.0", "jruby.version"=>"jruby 9.3.6.0 (2.6.8) 2022-06-27 7a2cbcd376 OpenJDK 64-Bit Server VM 17.0.4+8 on 17.0.4+8 +indy +jit [x86_64-linux]"}
[2022-09-26T17:26:33,727][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2022-09-26T17:26:34,087][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-09-26T17:26:35,835][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-09-26T17:26:36,480][INFO ][org.reflections.Reflections] Reflections took 160 ms to scan 1 urls, producing 125 keys and 434 values
[2022-09-26T17:26:36,849][INFO ][logstash.codecs.json     ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2022-09-26T17:26:37,116][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2022-09-26T17:26:37,171][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://192.168.88.5:9200", "http://192.168.88.7:9200", "http://192.168.88.9:9200"]}
[2022-09-26T17:26:38,065][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.88.5:9200/, http://192.168.88.7:9200/, http://192.168.88.9:9200/]}}
[2022-09-26T17:26:38,585][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.5:9200/"}
[2022-09-26T17:26:38,613][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (8.4.0) {:es_version=>8}
[2022-09-26T17:26:38,614][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
[2022-09-26T17:26:38,719][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.7:9200/"}
[2022-09-26T17:26:38,760][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.9:9200/"}
[2022-09-26T17:26:38,819][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2022-09-26T17:26:38,838][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2022-09-26T17:26:38,843][WARN ][logstash.outputs.elasticsearch][main] Elasticsearch Output configured with `ecs_compatibility => v8`, which resolved to an UNRELEASED preview of version 8.0.0 of the Elastic Common Schema. Once ECS v8 and an updated release of this plugin are publicly available, you will need to update this plugin to resolve this warning.
[2022-09-26T17:26:38,929][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>8, :ecs_compatibility=>:v8}
[2022-09-26T17:26:38,976][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/app/logstash/config/tomcatlogtest.conf"], :thread=>"#<Thread:0x19c69d89 run>"}
[2022-09-26T17:26:40,159][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.18}
[2022-09-26T17:26:40,238][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2022-09-26T17:26:40,362][WARN ][org.apache.kafka.clients.CommonClientConfigs][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Configuration 'client.dns.lookup' with value 'default' is deprecated and will be removed in future version. Please use 'use_all_dns_ips' or another non-deprecated value.
[2022-09-26T17:26:40,363][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [192.168.88.5:9092, 192.168.88.7:9092, 192.168.88.9:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = logstash-0
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = filebeat-logstash
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 50
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2022-09-26T17:26:40,485][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka version: 2.8.1
[2022-09-26T17:26:40,506][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2022-09-26T17:26:40,540][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka commitId: 839b886f9b732b15
[2022-09-26T17:26:40,540][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka startTimeMs: 1664184400484
[2022-09-26T17:26:40,562][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Subscribed to topic(s): tomcatlogtest
[2022-09-26T17:26:40,627][INFO ][logstash.codecs.json     ][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2022-09-26T17:26:41,397][INFO ][org.apache.kafka.clients.Metadata][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Cluster ID: 2EZR2uwmRDCpvvfztVGRNA
[2022-09-26T17:26:41,398][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Discovered group coordinator goya1:9092 (id: 2147483646 rack: null)
[2022-09-26T17:26:41,416][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] (Re-)joining group
[2022-09-26T17:26:41,488][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] (Re-)joining group
[2022-09-26T17:26:44,510][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Successfully joined group with generation Generation{generationId=1, memberId='logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938', protocol='range'}
[2022-09-26T17:26:44,511][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Finished assignment for group at generation 1: {logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938=Assignment(partitions=[tomcatlogtest-0])}
[2022-09-26T17:26:44,550][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Successfully synced group in generation Generation{generationId=1, memberId='logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938', protocol='range'}
[2022-09-26T17:26:44,550][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Notifying assignor about the new Assignment(partitions=[tomcatlogtest-0])
[2022-09-26T17:26:44,552][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Adding newly assigned partitions: tomcatlogtest-0
[2022-09-26T17:26:44,576][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Found no committed offset for partition tomcatlogtest-0
[2022-09-26T17:26:44,648][INFO ][org.apache.kafka.clients.consumer.internals.SubscriptionState][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Resetting offset for partition tomcatlogtest-0 to position FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[goya1:9092 (id: 1 rack: null)], epoch=0}}.
 

测试数据

Tomcat产生测试数据:

ES内部查看: 

Kibana设置及测试:

通过验证,Tomcat的日志已经成功导入ES并通过Kibana可以查看。至此,本文结束。


网站公告

今日签到

点亮在社区的每一天
去签到