zookeeper Curator(3):Watch事件监听

发布于:2025-06-30 ⋅ 阅读:(14) ⋅ 点赞:(0)

本章代码已分享至Gitee: https://gitee.com/lengcz/curator01

Curator API 常用操作 Watch事件监听

在这里插入图片描述

在这里插入图片描述

  • zookeeper 允许用户在指定节点上注册一些Watcher ,并且在一些特定事件触发的时候,zookeeper 服务端会将事件通知到感兴趣的客户端上,该机制是zookeeper 实现分布式协调服务的重要特性。

  • zookeeper 中引入了Wather 机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态发生变化时,会通知所有订阅者。

  • zookeeper 原生支持通过注册Wather 来进行事件监听,但是其使用起来特别不方便,需要开发人员自己反复注册Wather,比较繁琐。

  • Curator 引入了Cache 来实现对zookeeper 服务端事件的监听。

  • zookeeper 提供了三种Watcher

    • NodeCache : 只是监听某一个特定的节点
    • PathChildrenCache: 监听一个ZNode的子节点。
    • TreeCache :可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache 的结合

NodeCache

NodeCache 用于监听单个节点的变化,包括节点的创建、更新和删除事件。适用于需要关注特定节点数据变化的场景。

 /**
     * NodeCache  给指定一个节点注册监听器
     * @throws Exception
     */
    @Test
    public void testNodeCache() throws Exception{
        //1  创建NodeCache 对象

        NodeCache nodeCache = new NodeCache(client,"/app1",false);

        //2 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                logger.info("节点变化了");
                byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
                logger.info(new String(data));
            }
        });

        //3 开启监听,如果设置 true,则开启监听,加载缓冲数据
        nodeCache.start(true);

        // 防止虚拟机退出
        while(true){

        }

    }

NodeCache 会自动处理连接中断和会话过期,并在重新连接后恢复监听。可以通过 nodeCache.getCurrentData() 获取当前节点数据。

PathChildrenCache

PathChildrenCache 监听指定路径下子节点的变化,包括子节点的添加、移除和更新事件。适用于需要监控目录结构变化的场景。
tips: 监听只会对子节点有效,对本节点无效。

   /**
     * PathChildrenCache  监听某个子节点的所有子节点(不含本节点)
     * @throws Exception
     */
    @Test
    public void testPathChildrenCache() throws Exception{
        //1  创建PathChildrenCache对象

        PathChildrenCache nodeCache = new PathChildrenCache(client,"/app2",true);

        //2 注册监听
        nodeCache.getListenable().addListener(new PathChildrenCacheListener() {

            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                logger.info("子节点变化");
                logger.info(JSONObject.toJSONString(pathChildrenCacheEvent));
                if(pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ //子节点变化,打印数据
                    byte[] data = pathChildrenCacheEvent.getData().getData();
                    logger.info(new String(data));
                }
            }
        });

        //3 开启监听,如果设置 true,则开启监听,加载缓冲数据
        nodeCache.start();

        // 防止虚拟机退出
        while(true){

        }
    }

在这里插入图片描述

TreeCache

TreeCache 结合了 NodeCache 和 PathChildrenCache 的功能,可以监听指定节点及其所有子节点的变化。适用于需要完整树形结构监控的场景。

  /**
     * TreeCache  监听某个子节点自己和所有子节点, 相当于NodeCache和PathChildrenCache的组合
     * @throws Exception
     */
    @Test
    public void testTreeCache() throws Exception{
        //1  创建TreeCache对象

        TreeCache nodeCache = new TreeCache(client,"/app2");

        //2 注册监听
        nodeCache.getListenable().addListener(new TreeCacheListener() {


            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                logger.info("节点变化");
                logger.info(JSONObject.toJSONString(treeCacheEvent));
                if(treeCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ //子节点变化,打印数据
                    byte[] data = treeCacheEvent.getData().getData();
                    logger.info(new String(data));
                }
            }
        });

        //3 开启监听,如果设置 true,则开启监听,加载缓冲数据
        nodeCache.start();

        // 防止虚拟机退出
        while(true){

        }
    }

TreeCache 提供的事件类型更丰富,包括 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 等。可以获取完整的节点树结构变化信息。
在这里插入图片描述

发布/订阅

由于Watcher是发布订阅模式,所以多个监听器都会收到同一条消息。

 /**
     * NodeCache  给指定一个节点注册监听器
     * @throws Exception
     */
    @Test
    public void testNodeCache() throws Exception{

        {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
            String connectString = "localhost:2181";
            CuratorFramework clientTemp = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(60_000)
                    .connectionTimeoutMs(15_000)
                    .retryPolicy(retryPolicy).namespace("demo01").build();
            clientTemp.start();
            //1  创建NodeCache 对象
            NodeCache nodeCache = new NodeCache(clientTemp,"/app1",false);

            //2 注册监听
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
                    logger.info("节点1收到消息:"+new String(data));
                }
            });

            //3 开启监听,如果设置 true,则开启监听,加载缓冲数据
            try {
                nodeCache.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


        {
            //1  创建NodeCache 对象
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
            String connectString = "localhost:2181";
            CuratorFramework clientTemp = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(60_000)
                    .connectionTimeoutMs(15_000)
                    .retryPolicy(retryPolicy).namespace("demo01").build();
            clientTemp.start();
            NodeCache nodeCache = new NodeCache(clientTemp,"/app1",false);

            //2 注册监听
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
                    logger.info("节点2收到消息:"+new String(data));
                }
            });

            //3 开启监听,如果设置 true,则开启监听,加载缓冲数据
            try {
                nodeCache.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        while (true){

        }
    }

启动示例后,在zk客户端set 数据
在这里插入图片描述

示例输出

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到