【Docker】安装kafka案例
【一】环境准备
(1)准备服务器或者虚拟机
(2)安装宝塔控制面板
(3)安装docker
(4)安装jdk
【二】安装kafka
【1】zookeeper作用
(1)保存和管理 Kafka 集群的元数据信息。例如,Kafka 集群中的 Broker 信息、Topic 和 Partition
的信息、Consumer 的偏移量信息等。
(2)Kafka 集群的选举服务。当 Kafka 集群中的某个或某些 Broker 宕机时,ZooKeeper 可以协助 Kafka
集群选举出新的 Leader Broker。
(3)Kafka 集群的状态监控。ZooKeeper 可以监控 Kafka 集群中各个 Broker 的在线状态,并及时通知 Kafka
集群进行相应的处理
【2】拉取镜像
(1)拉取zookeeper镜像
docker pull wurstmeister/zookeeper
(2)拉取kafka镜像
docker pull wurstmeister/kafka
【3】创建容器
(1)创建zookeeper容器
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
(2)创建kafka容器
前提要安装好jdk,并配置好环境变量
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
测试kafka
docker exec -it kafka /bin/bash
exit退出
【4】zookeeper服务启动
(1)找到Zookeeper安装路径
find / -name zkServer.sh
(2)执行启动
(1)进入zookeeper的config文件
cd /var/lib/docker/overlay2/80f2f1635bed9db5db5e5697053485db509c413aef6a5a6a3264330a7f53e8b4/diff/opt/zookeeper-3.4.13/conf/
(2)文件更名为zoo.cfg
将里面的zoo_xxx.cfg文件更名为zoo.cfg
mv zoo_xxx.cfg zoo.cfg
(3)执行启动脚本
进入zookeeper的bin目录
cd /var/lib/docker/overlay2/80f2f1635bed9db5db5e5697053485db509c413aef6a5a6a3264330a7f53e8b4/diff/opt/zookeeper-3.4.13/bin/
(4)执行./zkServer.sh start
./zkServer.sh start
(3)查询运行状态
./zkServer.sh status
该状态显示:以standalone模式运行。standalone模式意味着你只运行了一个Zookeeper服务器实例,而不是一个Zookeeper集群
(4)配置host
进入/etc/hosts 加上最后一行,192.168.x.x zookeeper。代表将将主机名映射到IP地址
【5】kafka服务启动
(1)找到kafka安装路径
find / -name kafka-topics.sh
进入到你查出来的kafka-topics.sh路径的上一个bin路径在进入config目录
cd /var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/config
ll
(2)修改server.properties
找到config文件下的server.properties并进行修改端口,保存
配置你的服务器ip,确保配置文件中绑定的地址允许外部访问
# 监听地址:默认是localhost,仅允许本地访问,需改为0.0.0.0允许所有IP访问
listeners=PLAINTEXT://0.0.0.0:9092
# 对外公告的地址(若客户端与broker不在同一网络,需配置为公网IP或可访问的内网IP)
advertised.listeners=PLAINTEXT://192.168.19.16:9092
注意:advertised.listeners必须是客户端能访问到的 IP(不能是localhost或 127.0.0.1),否则客户端会获取到无效地址,远程连接会被拒绝
(3)执行kafka-server-start.sh 命令
/var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/bin/kafka-server-start.sh /var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/config/server.properties
也可以进入bin目录,然后
./kafka-server-start.sh -daemon ../config/server.properties &
(4)kafka停止和重启
./kafka-server-stop.sh -daemon ../config/server.properties &
然后重新开启
(5)测试创建主题
/kafka-verifiable-producer.sh --broker-list localhost:9092 --topic mykafka
【6】检查zookeeper和kafka的状态
(1)查看ZooKeeper 容器运行状态
首先确认 ZooKeeper 容器是否正常启动:
# 查看所有运行的容器
docker ps
# 若未运行,查看所有容器(包括停止的)
docker ps -a
正常状态:ZooKeeper 容器的STATUS应为Up(运行中)。
若未运行,检查启动命令或日志:
# 查看ZooKeeper容器日志(替换为实际容器名或ID)
docker logs zookeeper
(2)连接 ZooKeeper 验证服务可用性
通过 ZooKeeper 客户端工具(容器内已内置)连接服务,验证是否可正常交互:
# 进入ZooKeeper容器
docker exec -it zookeeper /bin/bash
# 使用zkCli.sh连接本地ZooKeeper(默认端口2181)
./bin/zkCli.sh -server localhost:2181
成功连接后会显示Connected to localhost:2181,并进入[zk: localhost:2181(CONNECTED)]交互模式。
执行简单命令测试(如查看根节点):
[zk: localhost:2181(CONNECTED)] ls /
# 正常输出应包含默认节点(如/zookeeper)
输入quit退出客户端。
(3)查看 Kafka 容器运行状态
# 查看Kafka容器状态
docker ps | grep kafka
正常状态:STATUS为Up,且无频繁重启(Restarting)。
若异常,查看 Kafka 日志(关键!):
# 查看Kafka容器日志(替换为实际容器名或ID)
docker logs kafka
无ERROR级日志(如连接 ZooKeeper 失败的错误)即为正常。
(4)验证 Kafka broker 可用性
通过 Kafka 内置工具创建测试主题(Topic),验证 broker 是否正常工作:
# 进入Kafka容器
docker exec -it kafka /bin/bash
# 创建测试主题(名称为test,1个分区,1个副本)
./bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic test \
--partitions 1 \
--replication-factor 1
成功输出:Created topic test.
若失败,检查日志中是否有 “无法连接 broker” 或 “权限不足” 等错误。
(5)测试 Kafka 生产消费功能
进一步验证 Kafka 能否正常收发消息:
# (在Kafka容器内)启动消费者监听test主题
./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
打开另一个终端,进入 Kafka 容器启动生产者:
# (新终端)进入Kafka容器
docker exec -it kafka /bin/bash
# 启动生产者发送消息
./bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test
在生产者终端输入消息(如hello kafka),消费者终端应实时收到消息,说明 Kafka 服务正常。
(6)验证 Kafka 与 ZooKeeper 的连接是否正常
Kafka 依赖 ZooKeeper 存储元数据(如主题、分区信息),两者连接正常的核心标志是:Kafka 成功在 ZooKeeper 中注册元数据。
方法 1:通过 ZooKeeper 客户端查看 Kafka 注册的节点
(1)进入 ZooKeeper 容器并连接客户端(如前文步骤):
docker exec -it zookeeper /bin/bash
./bin/zkCli.sh -server localhost:2181
(2)查看 Kafka 在 ZooKeeper 中创建的节点:
# 查看Kafka相关节点(Kafka默认会在/zookeeper下注册元数据)
[zk: localhost:2181(CONNECTED)] ls /brokers
# 正常输出应包含/ids、/topics等子节点,例如:
# [ids, topics, seqid]
# 查看已注册的Kafka broker ID
[zk: localhost:2181(CONNECTED)] ls /brokers/ids
# 输出示例:[0](表示ID为0的broker已注册,即当前Kafka容器)
若/brokers节点存在且包含内容,说明 Kafka 已成功连接 ZooKeeper 并注册元数据。
方法 2:通过 Kafka 日志验证连接
查看 Kafka 容器日志,搜索与 ZooKeeper 相关的信息:
docker logs kafka | grep -i zookeeper
正常日志应包含 “成功连接 ZooKeeper” 的信息,例如:
INFO Successfully connected to ZooKeeper (org.apache.kafka.common.utils.AppInfoParser)
INFO ZooKeeper session established, sessionid: 0x10000000000000, negotiated timeout: 6000 (org.apache.zookeeper.ClientCnxn)
若日志中出现Connection to zookeeper failed或ZooKeeper session expired,说明连接失败(需检查网络或配置)。
方法 3:通过 Kafka 工具查看 broker 信息
使用 Kafka 工具查看 broker 是否正常注册(依赖 ZooKeeper):
# 进入Kafka容器
docker exec -it kafka /bin/bash
# 查看broker列表(需通过ZooKeeper地址查询)
./bin/kafka-brokers.sh --zookeeper zookeeper:2181 --list
正常输出:0(表示 ID 为 0 的 broker 已通过 ZooKeeper 注册)。
总结
验证流程:
ZooKeeper 正常:容器运行 +zkCli.sh可连接并查看节点。
Kafka 正常:容器运行 + 能创建主题 + 生产消费消息正常。
Kafka 连接 ZooKeeper 正常:ZooKeeper 中存在/brokers节点 + Kafka 日志无连接错误 +kafka-brokers.sh能列出 broker。
【7】docker的基本操作
(1)拉取镜像
# 拉取指定镜像(默认latest标签)
docker pull [镜像名]
# 示例:拉取ubuntu最新镜像
docker pull ubuntu
# 拉取指定版本镜像(通过标签)
docker pull [镜像名]:[标签]
# 示例:拉取mysql 8.0版本
docker pull mysql:8.0
(2)查看本地镜像
# 列出所有本地镜像
docker images
# 或(简洁格式)
docker image ls
# 过滤查看指定镜像
docker images [镜像名]
# 示例:查看所有ubuntu镜像
docker images ubuntu
(3)创建容器
# 基本格式:创建并启动容器(交互式,退出后容器停止)
docker run -it [镜像名] /bin/bash
# 常用参数说明:
# -d:后台运行(守护模式)
# -p:端口映射(宿主端口:容器端口)
# -v:挂载数据卷(宿主路径:容器路径)
# --name:指定容器名称
# --rm:容器停止后自动删除
# 示例1:后台启动nginx,映射80端口,命名为mynginx
docker run -d -p 80:80 --name mynginx nginx
# 示例2:交互式启动ubuntu,退出后自动删除
docker run -it --rm ubuntu /bin/bash
# 示例3:启动mysql,挂载数据卷,设置环境变量
docker run -d -p 3306:3306 \
-v /my/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mymysql \
mysql:8.0
(4)查看本地容器
# 列出所有运行中的容器
docker ps
# 列出所有容器(包括停止的)
docker ps -a
# 查看容器详细信息(通过容器名或ID)
docker inspect [容器名/ID]
# 查看容器运行状态(简洁)
docker stats [容器名/ID]
进入容器
# 进入运行中的容器(交互式,支持命令行操作)
docker exec -it [容器名/ID] /bin/bash
# 示例:进入mynginx容器
docker exec -it mynginx /bin/bash
# 若容器内无bash,可用sh
docker exec -it [容器名/ID] /bin/sh
查看容器内部进程
# 查看容器实时日志(默认最后10行)
docker logs [容器名/ID]
# 查看实时日志并持续输出(类似tail -f)
docker logs -f [容器名/ID]
# 查看指定行数的日志
docker logs -n 100 [容器名/ID] # 最后100行
# 查看包含时间戳的日志
docker logs -t [容器名/ID]
查看容器端口映射
# 查看容器的端口映射关系
docker port [容器名/ID]
(5)启动和关闭容器
# 启动已停止的容器
docker start [容器名/ID]
# 停止运行中的容器(优雅关闭)
docker stop [容器名/ID]
# 强制停止容器
docker kill [容器名/ID]
# 重启容器
docker restart [容器名/ID]
(6)删除容器
# 删除已停止的容器
docker rm [容器名/ID]
# 强制删除运行中的容器
docker rm -f [容器名/ID]
# 删除所有已停止的容器
docker container prune
(7)删除镜像
# 通过镜像ID删除(需先删除依赖该镜像的容器)
docker rmi [镜像ID]
# 或通过镜像名:标签删除
docker rmi [镜像名]:[标签]
# 强制删除(即使有容器依赖,慎用)
docker rmi -f [镜像ID]
# 删除所有未使用的镜像(无标签的虚悬镜像)
docker image prune
(7)查看日志
# 查看容器实时日志(默认最后10行)
docker logs [容器名/ID]
# 查看实时日志并持续输出(类似tail -f)
docker logs -f [容器名/ID]
# 查看指定行数的日志
docker logs -n 100 [容器名/ID] # 最后100行
# 查看包含时间戳的日志
docker logs -t [容器名/ID]
(8)其他操作
# 保存镜像为本地文件(.tar格式)
docker save -o [保存路径/文件名.tar] [镜像名]:[标签]
# 示例:保存ubuntu镜像为ubuntu.tar
docker save -o ./ubuntu.tar ubuntu:latest
# 加载本地镜像文件
docker load -i [镜像文件.tar]
# 给镜像打标签(用于推送至仓库)
docker tag [原镜像名:标签] [新标签,如仓库地址/镜像名:标签]
# 示例:标记本地镜像为私有仓库镜像
docker tag ubuntu:latest myrepo/ubuntu:v1
【三】服务快速重启
通过shell脚本实现对zookeeper和kafka的状态检查和重启
#!/bin/bash
# 脚本功能:检查Docker中的ZooKeeper和Kafka容器状态,未运行则重启
# 使用方法:chmod +x zk_kafka_monitor.sh && ./zk_kafka_monitor.sh
# 配置容器名称(根据实际情况修改)
ZOOKEEPER_CONTAINER="zookeeper"
KAFKA_CONTAINER="kafka"
# 日志文件路径
LOG_FILE="/var/log/zk_kafka_monitor.log"
# 记录日志函数
log() {
local timestamp=$(date "+%Y-%m-%d %H:%M:%S")
echo "[$timestamp] $1" >> $LOG_FILE
echo "[$timestamp] $1" # 同时输出到控制台
}
# 检查容器是否运行
check_container() {
local container_name=$1
# 检查容器是否存在且状态为运行中
if docker ps --filter "name=^/$container_name$" --filter "status=running" --format "{{.Names}}" | grep -q "$container_name"; then
return 0 # 运行中
else
return 1 # 未运行
fi
}
# 重启容器
restart_container() {
local container_name=$1
log "开始重启容器: $container_name"
# 尝试停止容器(若正在运行)
if docker ps --filter "name=^/$container_name$" --format "{{.Names}}" | grep -q "$container_name"; then
if docker stop $container_name; then
log "成功停止容器: $container_name"
else
log "警告:停止容器 $container_name 失败,尝试强制停止"
docker kill $container_name > /dev/null 2>&1
fi
fi
# 启动容器
if docker start $container_name; then
log "成功启动容器: $container_name"
# 等待3秒后再次检查状态
sleep 3
if check_container $container_name; then
log "容器 $container_name 重启后状态正常"
return 0
else
log "错误:容器 $container_name 重启后仍未正常运行"
return 1
fi
else
log "错误:启动容器 $container_name 失败"
return 1
fi
}
# 主逻辑
main() {
log "===== 开始检查ZooKeeper和Kafka状态 ====="
# 先检查ZooKeeper(Kafka依赖ZooKeeper)
if check_container $ZOOKEEPER_CONTAINER; then
log "ZooKeeper容器运行正常"
else
log "ZooKeeper容器未运行,需要重启"
restart_container $ZOOKEEPER_CONTAINER
# 若ZooKeeper重启失败,直接退出(Kafka依赖它)
if [ $? -ne 0 ]; then
log "ZooKeeper重启失败,终止检查"
exit 1
fi
fi
# 检查Kafka
if check_container $KAFKA_CONTAINER; then
log "Kafka容器运行正常"
else
log "Kafka容器未运行,需要重启"
restart_container $KAFKA_CONTAINER
fi
log "===== 状态检查与重启操作完成 ====="
echo "----------------------------------------"
}
# 执行主逻辑
main
添加权限
chmod +x zk_kafka_monitor.sh
运行脚本
./zk_kafka_monitor.sh
日志查看
tail -f /var/log/zk_kafka_monitor.log
脚本关键点说明
容器检查逻辑:通过docker ps过滤运行状态的容器,确保准确性
依赖处理:先检查并重启 ZooKeeper,再处理 Kafka,避免依赖问题
错误处理:停止容器失败时尝试强制终止,启动后再次验证状态
日志记录:同时输出到控制台和日志文件,方便调试和追溯
【四】安装kafka控制面板
【五】远程连接和测试案例
【1】测试端口接通
(1)关闭防火墙或者开发端口9092
(2)远程连接测试端口
telnet 192.168.19.16 9092