本地搭建
下载这个包,解压后,运行bin目录的对应cmd文件就可以了
https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.9.4/
代码连接服务端
将服务端的lib目录下的jar包拷贝到客户端的jar目录中即可,main文件路径:D:\java\AIAS-main\AIAS-main\1_image_sdks\onlyTest\src\main\ZooKeeperPathMonitor.java
package main;
// 导入 ZooKeeper 核心类:
// ZooKeeper:客户端核心类,用于与服务端建立连接和交互
// Watcher:事件监听器接口,用于处理 ZooKeeper 服务端推送的事件
// WatchedEvent:事件封装类,包含事件状态、类型、路径等信息
// KeeperException:ZooKeeper 操作异常类(如节点不存在、权限不足等)
// Stat:节点状态信息类(如版本号、创建时间、数据长度等)
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
// CountDownLatch:Java 并发工具类,用于同步线程,此处用于等待连接建立完成
import java.util.concurrent.CountDownLatch;
/**
* ZooKeeper 节点监控类
* 实现 Watcher 接口,通过监听事件实现对指定节点数据变化的实时监控
*/
public class ZooKeeperPathMonitor implements Watcher {
// 1. 成员变量定义
/**
* 同步计数器:用于阻塞主线程,直到 ZooKeeper 连接建立成功
* 初始计数为 1,连接成功后调用 countDown() 减为 0,释放主线程
*/
private static final CountDownLatch connectedSignal = new CountDownLatch(1);
/**
* ZooKeeper 客户端实例
* 所有与 ZooKeeper 服务端的交互(如读数据、改数据、设监听)都通过该实例完成
*/
private ZooKeeper zk;
/**
* 待监控的节点路径
* 如 "/testNode",存储需要监控的数据变化的节点路径
*/
private String monitoredPath;
// 2. 核心方法:与 ZooKeeper 建立连接并初始化监控
/**
* 连接 ZooKeeper 服务端,并初始化监控节点的初始数据和监听
* @param host ZooKeeper 服务端地址,格式为 "ip:port"(集群用逗号分隔,如 "ip1:2181,ip2:2181")
* @param sessionTimeout 会话超时时间(毫秒):若客户端与服务端心跳中断超过该时间,会话失效
* @param path 待监控的节点路径(必须是已存在的节点,否则会抛 KeeperException)
* @throws IOException 连接建立失败异常(如服务端地址错误、端口未开放)
* @throws InterruptedException 线程中断异常(如等待连接时线程被中断)
* @throws KeeperException ZooKeeper 操作异常(如节点不存在、权限不足)
*/
public void connect(String host, int sessionTimeout, String path) throws IOException, InterruptedException, KeeperException {
// 初始化待监控的节点路径,将参数 path 赋值给成员变量
this.monitoredPath = path;
// 创建 ZooKeeper 客户端实例,建立与服务端的连接
// 参数1:服务端地址;参数2:会话超时时间;参数3:事件监听器(当前类实现了 Watcher,故传 this)
zk = new ZooKeeper(host, sessionTimeout, this);
// 阻塞当前线程,直到 connectedSignal 计数为 0(即连接建立成功)
// 防止后续操作(如读数据)在连接未就绪时执行,导致异常
connectedSignal.await();
// 初始读取监控节点的数据,并注册监听
// zk.getData() 方法:读取节点数据
// 参数1:节点路径;参数2:是否注册监听器(传 this 表示用当前类处理该节点的后续事件);参数3:节点状态对象(传 new Stat() 表示获取最新状态)
byte[] data = zk.getData(path, this, new Stat());
// 输出初始数据:ZooKeeper 节点数据以字节数组存储,需转为 String 才能正常显示
System.out.println("初始数据: " + new String(data));
// 修改监控节点的数据
// zk.setData() 方法:修改节点数据
// 参数1:节点路径;参数2:新数据(字节数组);参数3:版本号(-1 表示忽略版本检查,直接覆盖)
zk.setData(path,"3424".getBytes(),-1);
// 再次读取修改后的节点数据,验证修改结果
// 此处再次调用 zk.getData(),并注册监听(确保后续数据变化仍能被监控)
System.out.println("初始数据1: " + new String(zk.getData(path, this, new Stat())));
}
// 3. 事件处理方法:实现 Watcher 接口的 process 方法,处理 ZooKeeper 推送的所有事件
/**
* 事件处理核心方法:ZooKeeper 服务端有事件发生时,会回调该方法
* @param event 事件对象:封装了事件的状态(如连接状态)、类型(如数据变化)、路径等信息
*/
@Override
public void process(WatchedEvent event) {
// 3.1 处理「连接建立成功」事件
// event.getState():获取事件状态,Event.KeeperState.SyncConnected 表示客户端与服务端同步连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
// 防止重复调用 countDown():只有当计数仍为 1 时(连接未完成),才执行减 1
if (connectedSignal.getCount() > 0) {
connectedSignal.countDown(); // 计数减 1(从 1 变为 0),释放 await() 阻塞的线程
System.out.println("成功连接到ZooKeeper服务端");
}
}
// 3.2 处理「节点数据变化」事件
// 条件1:事件类型为 NodeDataChanged(节点数据被修改)
// 条件2:事件关联的节点路径不为空(避免空指针异常)
// 条件3:事件路径与当前监控的节点路径一致(确保只处理目标节点的事件)
if (event.getType() == Event.EventType.NodeDataChanged &&
event.getPath() != null && event.getPath().equals(monitoredPath)) {
try {
// 重新读取变化后的节点数据,并再次注册监听
// 注意:ZooKeeper 的 Watcher 是「一次性的」,事件触发后监听自动失效,需重新调用 getData() 注册
byte[] data = zk.getData(monitoredPath, this, new Stat());
// 输出数据变化信息:包含节点路径和新数据
System.out.println("路径 " + monitoredPath + " 数据已更新: " + new String(data));
} catch (Exception e) {
// 捕获读取数据时的异常(如节点被删除、会话超时),打印异常堆栈信息
e.printStackTrace();
}
}
}
// 4. 资源释放方法:关闭 ZooKeeper 连接
/**
* 关闭 ZooKeeper 客户端连接,释放资源
* @throws InterruptedException 线程中断异常(如关闭连接时线程被中断)
*/
public void close() throws InterruptedException {
// 判空:避免空指针异常(若连接未建立,zk 为 null)
if (zk != null) {
zk.close(); // 关闭连接,释放客户端与服务端的会话资源
}
}
// 5. 主方法:程序入口,初始化监控并启动
public static void main(String[] args) {
// 5.1 配置参数:根据实际环境修改
String host = "localhost:2181"; // ZooKeeper 服务端地址(本地测试用 localhost:2181,生产环境填实际 IP)
int sessionTimeout = 5000; // 会话超时时间(5秒):超时后客户端需重新连接
String pathToMonitor = "/testNode"; // 待监控的节点路径(需确保该节点在服务端已存在)
// 5.2 创建监控实例
ZooKeeperPathMonitor monitor = new ZooKeeperPathMonitor();
try {
// 调用 connect 方法:建立连接、初始化数据、注册监听
monitor.connect(host, sessionTimeout, pathToMonitor);
// 保持程序持续运行:主线程睡眠 Long.MAX_VALUE(约 292 年),避免程序退出
// 因为 ZooKeeper 客户端操作是异步的,主线程退出会导致整个程序终止,无法监控后续事件
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
// 捕获所有异常(如连接失败、节点不存在、数据修改异常等),打印异常信息
e.printStackTrace();
} finally {
// 无论是否发生异常,程序退出前都关闭连接,释放资源
try {
monitor.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客户端打印日志控制
根据路径D:\java\AIAS-main\AIAS-main\1_image_sdks\onlyTest\src\logback.xml
,新建一个logback.xml,即可控制,具体内容如下
<configuration>
<!-- 设置 ZooKeeper 相关日志级别为 WARN -->
<logger name="org.apache.zookeeper" level="WARN"/>
<!-- 根日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
</configuration>
输出
成功连接到ZooKeeper服务端
初始数据: 3424
路径 /testNode 数据已更新: 3424
初始数据1: 3424