整个数据流示例如图:
1,Tomcat,Filebeat,ES,Kibana安装
这些软件的安装配置可以参考之前的文章:
Filebeat,ES,Kibana:
Tomcat+Filebeat+logstash+ES+Kibana日志监控配置(待续)_yangkei的博客-CSDN博客
Lostash安装请参考:Logstash8.4在Linux系统上的安装以及配置Tomcat日志(ELK安装part2)_yangkei的博客-CSDN博客
Kafka集群安装:
Kafka3.2.3基于Linux的集群安装(待续)_yangkei的博客-CSDN博客
2, 配置Filebeat
vi /app/filebeat/filebeat.kafka.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /app/tomcat/logs/tomcat_access_json.2022-09-23.log
output.kafka:
enabled: true
hosts: ["192.168.88.5:9092","192.168.88.7:9092","192.168.88.9:9092"]
topic: tomcatlogtest
type 输入类型。设置为log,表示输入源为日志。
enabled 设置配置是否生效:true:生效 false:不生效
paths 需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。
topic 日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。
version Kafka的版本,可在消息队列Kafka的实例详情页面获取。
不配置此参数会报错。
由于不同版本的Filebeat支持的Kafka版本不同,例如8.2及以上版本的Filebeat支持的Kafka版本为2.2.0,因此version需要设置为Filebeat支持的Kafka版本,否则会出现类似报错:Exiting: error initializing publisher: unknown/unsupported kafka vesion '2.2.0' accessing 'output.kafka.version' (source:'filebeat.kafka.yml'),详细信息请参见version。
修改权限:
chmod go-w /app/filebeat/filebeat.kafka.yml
3, 配置Kafka
创建主题:tomcatlogtest
[root@goya2 kafka]# /app/kafka/bin/kafka-topics.sh --create --topic tomcatlogtest --partitions 1 --replication-factor 3 --bootstrap-server goya1:9092,goya2:9092,goya3:9092
Created topic tomcatlogtest.
启动Filebeat
cd /app/filebeat
./filebeat -e -c filebeat.kafka.yml
4,配置Logstash
vi /app/logstash/config/tomcatlogtest.conf
input {
#kafka输入源配置
kafka {
#kafka集群地址
bootstrap_servers => ["192.168.88.5:9092,192.168.88.7:9092,192.168.88.9:9092"]
#从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致
topics => ["tomcatlogtest"] #这是kafka中的消费组者ID,默认值是“logstash”。kafka将消息发到每个消费者组中,同一个组中的消费者收到的数据不重复。例如有两个消费者组G1、G2,G1中有成员A、B,G2中有成员C、D。kafka从输入中收到了10条消息,会将这10条消息同时发送给G1和G2,A和B各会收到这10条消息中的一部分,他们收到消息的并集就是这10条消息,C和D同理。
group_id => "filebeat-logstash"
#kafka消费者组中每个消费者的ID,默认值是“logstash”
#client_id => "logstashnode1" #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。
consumer_threads => 1 #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
codec => json
}
}
#过滤
filter {
}
#输出配置,这里表示输出到文件
output {
elasticsearch {
hosts => ["http://192.168.88.5:9200","http://192.168.88.7:9200","http://192.168.88.9:9200"]
index => "kafka‐%{+YYYY.MM.dd}"
}
}
启动Logstash
[es@goya1 config]$ logstash -f /app/logstash/config/tomcatlogtest.conf
Using bundled JDK: /app/logstash/jdk
Sending Logstash logs to /app/logstash/logs which is now configured via log4j2.properties
[2022-09-26T17:26:33,696][INFO ][logstash.runner ] Log4j configuration path used is: /app/logstash/config/log4j2.properties
[2022-09-26T17:26:33,725][WARN ][logstash.runner ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2022-09-26T17:26:33,725][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"8.4.0", "jruby.version"=>"jruby 9.3.6.0 (2.6.8) 2022-06-27 7a2cbcd376 OpenJDK 64-Bit Server VM 17.0.4+8 on 17.0.4+8 +indy +jit [x86_64-linux]"}
[2022-09-26T17:26:33,727][INFO ][logstash.runner ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2022-09-26T17:26:34,087][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-09-26T17:26:35,835][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-09-26T17:26:36,480][INFO ][org.reflections.Reflections] Reflections took 160 ms to scan 1 urls, producing 125 keys and 434 values
[2022-09-26T17:26:36,849][INFO ][logstash.codecs.json ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2022-09-26T17:26:37,116][INFO ][logstash.javapipeline ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2022-09-26T17:26:37,171][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://192.168.88.5:9200", "http://192.168.88.7:9200", "http://192.168.88.9:9200"]}
[2022-09-26T17:26:38,065][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.88.5:9200/, http://192.168.88.7:9200/, http://192.168.88.9:9200/]}}
[2022-09-26T17:26:38,585][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.5:9200/"}
[2022-09-26T17:26:38,613][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (8.4.0) {:es_version=>8}
[2022-09-26T17:26:38,614][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
[2022-09-26T17:26:38,719][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.7:9200/"}
[2022-09-26T17:26:38,760][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://192.168.88.9:9200/"}
[2022-09-26T17:26:38,819][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2022-09-26T17:26:38,838][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2022-09-26T17:26:38,843][WARN ][logstash.outputs.elasticsearch][main] Elasticsearch Output configured with `ecs_compatibility => v8`, which resolved to an UNRELEASED preview of version 8.0.0 of the Elastic Common Schema. Once ECS v8 and an updated release of this plugin are publicly available, you will need to update this plugin to resolve this warning.
[2022-09-26T17:26:38,929][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>8, :ecs_compatibility=>:v8}
[2022-09-26T17:26:38,976][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/app/logstash/config/tomcatlogtest.conf"], :thread=>"#<Thread:0x19c69d89 run>"}
[2022-09-26T17:26:40,159][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>1.18}
[2022-09-26T17:26:40,238][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2022-09-26T17:26:40,362][WARN ][org.apache.kafka.clients.CommonClientConfigs][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Configuration 'client.dns.lookup' with value 'default' is deprecated and will be removed in future version. Please use 'use_all_dns_ips' or another non-deprecated value.
[2022-09-26T17:26:40,363][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.88.5:9092, 192.168.88.7:9092, 192.168.88.9:9092]
check.crcs = true
client.dns.lookup = default
client.id = logstash-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = filebeat-logstash
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2022-09-26T17:26:40,485][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka version: 2.8.1
[2022-09-26T17:26:40,506][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2022-09-26T17:26:40,540][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka commitId: 839b886f9b732b15
[2022-09-26T17:26:40,540][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] Kafka startTimeMs: 1664184400484
[2022-09-26T17:26:40,562][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Subscribed to topic(s): tomcatlogtest
[2022-09-26T17:26:40,627][INFO ][logstash.codecs.json ][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2022-09-26T17:26:41,397][INFO ][org.apache.kafka.clients.Metadata][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Cluster ID: 2EZR2uwmRDCpvvfztVGRNA
[2022-09-26T17:26:41,398][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Discovered group coordinator goya1:9092 (id: 2147483646 rack: null)
[2022-09-26T17:26:41,416][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] (Re-)joining group
[2022-09-26T17:26:41,488][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] (Re-)joining group
[2022-09-26T17:26:44,510][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Successfully joined group with generation Generation{generationId=1, memberId='logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938', protocol='range'}
[2022-09-26T17:26:44,511][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Finished assignment for group at generation 1: {logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938=Assignment(partitions=[tomcatlogtest-0])}
[2022-09-26T17:26:44,550][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Successfully synced group in generation Generation{generationId=1, memberId='logstash-0-57f240b4-7a63-4c73-a954-94bc1df45938', protocol='range'}
[2022-09-26T17:26:44,550][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Notifying assignor about the new Assignment(partitions=[tomcatlogtest-0])
[2022-09-26T17:26:44,552][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Adding newly assigned partitions: tomcatlogtest-0
[2022-09-26T17:26:44,576][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Found no committed offset for partition tomcatlogtest-0
[2022-09-26T17:26:44,648][INFO ][org.apache.kafka.clients.consumer.internals.SubscriptionState][main][74570df197463f163c6ac7f587be8b2afb83f81d62f67c31733e752f6deb3067] [Consumer clientId=logstash-0, groupId=filebeat-logstash] Resetting offset for partition tomcatlogtest-0 to position FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[goya1:9092 (id: 1 rack: null)], epoch=0}}.
测试数据
Tomcat产生测试数据:
ES内部查看:
Kibana设置及测试:
通过验证,Tomcat的日志已经成功导入ES并通过Kibana可以查看。至此,本文结束。