【Project】基于kafka的高可用分布式日志监控与告警系统

发布于:2025-07-06 ⋅ 阅读:(24) ⋅ 点赞:(0)

底层学习笔记

kafka数据存储

在这里插入图片描述

kafka最高水位线机制

在这里插入图片描述

Kraft模式

在这里插入图片描述

zooker和kraft

在这里插入图片描述

系统部署与使用说明

本系统主要包含 Kafka、Filebeat、Nginx(搭配 Keepalived)以及 Python 消费者程序的配置与部署,用于处理和存储 Nginx 访问日志。以下是详细的配置、部署和使用说明。

1. Kafka 配置与启动

1.1 配置文件

Kafka 采用 KRaft 模式,涉及三个节点(kafka01、kafka02、kafka03),各节点配置文件如下:

  • kafka01/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • kafka02/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka02:9092,CONTROLLER://kafka02:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • kafka03/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka03:9092,CONTROLLER://kafka03:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

1.2 启动与关闭命令

# 启动 Kafka
/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
# 关闭 Kafka(可选)
#/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh

1.3 格式化存储

kafka-storage.sh format  --cluster-id BlQPRL0HSdaaMwvx_dVcAQ  --config /opt/kafka_2.13-3.6.1/config/kraft/server.properties --node-id 1

2. Filebeat 配置

Filebeat 用于收集 Nginx 访问日志并发送到 Kafka,配置文件如下:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /usr/local/nginx1/logs/access.log
output.kafka:
  hosts: ["kafka01:9092","kafka02:9092","kafka03:9092"]
  topic: nginxlog
  keep_alive: 10s

3. Nginx 配置

3.1 Keepalived 配置

! Configuration File for keepalived
global_defs {
    router_id LVS_DEVEL
}
vrrp_script chk_nginx {
    script "/etc/keepalived/check_nginx.sh"
    interval 10
    weight -5
    fall 2
    rise 1
}
vrrp_instance VI_1 {
    state MASTER
    interface ens33
    mcast_src_ip 192.168.100.150
    virtual_router_id 51
    priority 100
    advert_int 2
    authentication {
        auth_type PASS
        auth_pass BLUEKING_NGINX_HA
    }
    virtual_ipaddress {
        192.168.100.100/24
    }
    track_script {
      chk_nginx
    }
}

3.2 检查脚本

#!/bin/bash
process_name="nginx"
process_abs_path="/usr/local/nginx1/sbin/nginx"

function is_nginx_running() {
    process_info=$(ps --no-header -C $process_name -o ppid,pid,args | awk '{printf $1 "|" $2 "|" ; for (i=3; i<=NF; i++) { printf "%s ", $i };printf "\n"}' | grep master)
    if [[ -z "$process_info" ]]; then
        return 1
    else
        process_pids=($(echo "$process_info" | awk -F'|' '{print $2}'))
        for _pid in "${process_pids[@]}"; do
            abs_path=$(readlink -f /proc/$_pid/exe)
            if [ "$abs_path" == "$(readlink -f "$process_abs_path")" ]; then
                return 0
            fi
        done
        return 2
    fi
}

err=0
for k in $(seq 1 3); do
    is_nginx_running
    if [[ $? != "0" ]]; then
        err=$((err + 1))
        sleep 1
    else
        err=0
        break
    fi
done

if [[ $err != "0" ]]; then
    exit 1  # 仅返回失败状态,不停止 keepalived
else
    exit 0
fi

4. Python 程序

4.1 依赖安装

pip install kafka-python==2.0.2 pymongo==4.13.0 mysql-connector-python==9.3.0 redis==4.5.5 kombu==5.3.1 celery==5.3.1

4.2 代码

https://github.com/Lenoud/nginx-monitor-kafka-python

5. 注意事项

  • 确保各节点的网络连通性,特别是 Kafka 节点之间以及与 MySQL 数据库的连接。
  • 检查各配置文件中的 IP 地址、端口号、用户名和密码等信息是否正确。
  • 在启动 Kafka 之前,确保已完成存储格式化操作。
  • 若需要修改日志存储策略或 Kafka 配置,可根据实际需求调整相应的配置文件。

完成整个系统的部署和配置,实现 Nginx 访问日志的收集、传输和存储。