目录
1. Logstash 简介
Logstash 是一个开源的数据收集引擎,具有实时管道处理能力,属于 Elastic Stack(原 ELK Stack)的一部分,常与 Elasticsearch 和 Kibana 配合使用。
1.1 主要功能
数据收集:从各种来源(日志文件、数据库、消息队列等)采集数据
数据处理:过滤、解析和转换数据
数据输出:将处理后的数据发送到目标存储或分析系统
1.2 核心组件
Logstash 处理管道包含三个主要部分:
Input(输入插件):负责接收数据
常见输入源:文件、syslog、Redis、Beats、Kafka、JDBC 等
Filter(过滤插件):负责处理数据
常用过滤器:Grok(模式匹配)、Mutate(字段操作)、Date(日期处理)、GeoIP(地理位置)等
Output(输出插件):负责发送数据
常见输出目标:Elasticsearch、文件、Email、TCP、HTTP 等
1.3 主要特点
插件化架构:丰富的插件生态系统
可扩展性:能够处理高吞吐量的数据
灵活性:支持多种数据格式和协议
实时处理:数据采集和处理几乎实时完成
1.4 典型应用场景
日志收集与分析
事件监控和告警
数据转换和规范化
作为数据管道连接不同系统
2. 部署logstash
2.1 创建Namespace
kubectl create namespace elk
2.2 创建ConfigMap
vim logstash-configmap.yaml
--- apiVersion: v1 kind: ConfigMap metadata: namespace: elk name: logstash-config labels: app: logstash data: logstash.conf: |- input { kafka { bootstrap_servers => "kafka-0.kafka-headless.elk.svc.cluster.local:9092" topics => ["k8s-outlog"] group_id => "logstash-consumer-group" codec => "json" consumer_threads => 1 decorate_events => true security_protocol => "PLAINTEXT" } } filter { if [fields][logformat] == "json" { json { source => "message" target => "message" } } } output { if [fields][logtype] =~ "k8s-outlog.*" { elasticsearch { hosts => ["http://elasticsearch-0.elasticsearch-cluster.elk.svc.cluster.local:9200"] index => "k8s-outlog-%{+YYYY.MM.dd}" } } if [fields][logtype] =~ "k8s-messagelog.*" { elasticsearch { hosts => ["http://elasticsearch-0.elasticsearch-cluster.elk.svc.cluster.local:9200"] index => "k8s-messagelog-%{+YYYY.MM.dd}" } } }
2.3 创建Service
vim logstash-service.yaml
apiVersion: v1 kind: Service metadata: name: logstash namespace: elk labels: app: logstash spec: selector: app: logstash ports: - protocol: TCP port: 5044 targetPort: 5044 type: ClusterIP
2.4 创建Deployment
vim logstash-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: logstash namespace: elk spec: replicas: 1 selector: matchLabels: app: logstash template: metadata: labels: app: logstash spec: containers: - name: logstash image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.elastic.co/logstash/logstash:7.17.3 env: - name: "PIPELINE_WORKERS" value: "2" - name: "PIPELINE_BATCH_SIZE" value: "5000" - name: "PIPELINE_BATCH_DELAY" value: "2" - name: "LS_JAVA_OPTS" value: "-Xms512m -Xmx1g" - name: "path.config" value: "/usr/share/logstash/pipeline" - name: "xpack.monitoring.elasticsearch.hosts" value: "http://elasticsearch-0.elasticsearch-cluster.elk.svc.cluster.local:9200" volumeMounts: - name: config mountPath: /usr/share/logstash/pipeline/logstash.conf readOnly: true subPath: logstash.conf - mountPath: /etc/localtime readOnly: true name: tz-config volumes: - name: config configMap: name: logstash-config - name: tz-config hostPath: path: /etc/localtime
2.5 部署所有资源
[root@master1 Logstash]# ls logstash-configmap.yaml logstash-deployment.yaml logstash-service.yaml [root@master1 Logstash]# kubectl apply -f ./ configmap/logstash-config created deployment.apps/logstash created service/logstash created
2.6 检查Logstash Pod状态
[root@master1 Logstash]# kubectl get pod -n elk NAME READY STATUS RESTARTS AGE elasticsearch-0 1/1 Running 1 (29m ago) 21h filebeat-6db9l 1/1 Running 1 (29m ago) 22h filebeat-qllxg 1/1 Running 1 (29m ago) 22h filebeat-r5hw7 1/1 Running 1 (29m ago) 22h kafka-0 1/1 Running 1 (30m ago) 21h logstash-6d88fd886d-2cg9p 1/1 Running 0 65s