从java后端微服务学完第一次后,二次在hadoop里学完zookeeper的一些个人理解,超级详细,看了必会,小白必看

发布于:2022-11-08 ⋅ 阅读:(514) ⋅ 点赞:(0)

前言:

什么是zookeeper?

zookeeper内部结构及组成(官网分析):

是zookeeper的数据结构模型图:

zookeeper的使用(虚拟机安装使用):

选举机制 (针对集群)

zookeeper节点信息

监听器原理

分布式锁

结尾总结


前言:

上一次差不多半年前,学zookeeper的时候还是在我学java后端的时候那时候应该是刚学完springboot准备接触分布式,然后去学的dubbo里面就介绍到了zookeeper是它的注册中心其一,然后当时用docker搞得zookeeper,然后后面就学springcloud了就没咋用过dubbo,现在第二次是在学大数据,虽然现在还没学hadoop ha,因为继承hadoop和zookeeper的话是在hadoop高可用里面,也没有实战,不过应该过不了多久就能学到了,好了话不多说,开始今天的主题

 zookeeper官网地址

zookeeper的官网下载地址

什么是zookeeper?

下面是从官网粘贴的一份介绍,第二段是翻译

 ZooKeeper 是一种集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。所有这些类型的服务都由分布式应用程序以某种形式使用。每次实现它们时,都需要做很多工作来修复不可避免的错误和竞争条件。由于难以实现这些类型的服务,应用程序最初通常会吝啬它们,这使得它们在发生更改时变得脆弱且难以管理。即使正确完成,这些服务的不同实现也会导致部署应用程序时的管理复杂性。

其实意思就是说zookeeper是一个注册中心用来监管在它下面注册的一些文件或者说node信息的一个组件,然后服务本身是分布式的,高度可靠。共识、组管理和状态协议将由服务实现,以便应用程序不需要自行实现它们。

好了大致了解了zookeeper是啥之后我们来看看zookeeper的组成 

zookeeper内部结构及组成(官网分析):

这张图是从官网上面截下来的,我们看一下中间这个图,一个zookeeper service集群里面有多个server,然后每一个server(服务端)下面有多个client(客户端),然后每个server都会指向中间的server,然后中间的server后面有个红色的字写了一个leader,ok放在这里这是我们得到的图的信息。

我们看图中的英文信息,第一段告诉我们什么(翻译之后的内容如下)?

ZooKeeper允许分布式进程通过数据寄存器的共享分层名称空间(我们称这些寄存器为znodes)相互协调,就像文件系统一样。与普通文件系统不同,ZooKeeper 为其客户端提供高吞吐量、低延迟、高可用性、严格有序的 znode 访问。ZooKeeper的性能方面允许它在大型分布式系统中使用。可靠性方面防止它成为大型系统中的单点故障。其严格的排序允许在客户端实现复杂的同步基元。

首先是文件系统,哦那我们懂了就是类似于linux里面那样都是用文件存储的,每一个节点相当于一个文件。

第二段

ZooKeeper 提供的名称空间与标准文件系统的名称空间非常相似。名称是由斜杠 (“/”) 分隔的路径元素序列。ZooKeeper 名称空间中的每个 znode 都由路径标识。每个 znode 都有一个父节点,其路径是少一个元素的 znode 前缀;此规则的例外是没有父级的根 (“/”)。此外,与标准文件系统完全相同,如果 znode 有任何子节点,则无法删除它。

/分各元素序列,那我们懂了就跟linux里面那样cd /这样去访问zookeeper的子节点,然后它后面还说了每一个znode都有一个父节点,然后后面还有说路径是少了一个元素的znode的前缀,这不明显就是一个树的结构吗,朋友们,这就是一个树,但你不能说它是二叉树,因为它并不是每个节点都只有两个子节点。然后我们的liunx里面的文件存储也是类似一个树结构,也都是节点存储,这点很相似

第三段

ZooKeeper 和标准文件系统之间的主要区别在于,每个 znode 都可以有与之关联的数据(每个文件也可以是一个目录,反之亦然),并且 znode 仅限于它们可以拥有的数据量。ZooKeeper旨在存储协调数据:状态信息,配置,位置信息等。这种元信息通常以千字节为单位,如果不是字节的话。ZooKeeper具有1M的内置健全性检查,以防止它被用作大型数据存储,但通常它用于存储更小的数据片段。

这段我们通俗的来讲就是说zookeeper的znode节点适合存储小文件比如说1mb的内存大小

第四段

服务本身在构成该服务的一组计算机上复制。这些计算机在持久存储中维护数据树的内存中映像以及事务日志和快照。由于数据保存在内存中,因此ZooKeeper能够获得非常高的吞吐量和低延迟的数字。内存数据库的缺点是 ZooKeeper 可以管理的数据库大小受到内存的限制。此限制是保持存储在 znode 中的数据量较小的进一步原因。

这段也就是说zookeeper是搞吞吐和低延迟,但是内存小限制了它的发挥

第五段和第六段没啥用

第七段

客户端只能连接到单个 ZooKeeper 服务器。客户端维护一个 TCP 连接,通过该连接发送请求、获取响应、获取监视事件和发送检测信号。如果与服务器的 TCP 连接中断,客户端将连接到其他服务器。当客户端首次连接到 ZooKeeper 服务时,第一个 ZooKeeper 服务器将为客户端设置会话。如果客户端需要连接到另一台服务器,则此会话将与新服务器重新建立。

这段就跟上面的图对应起来看,一个client客户端就只能连一个server服务端,并且建立一个会话,当这个服务器宕机或者与其他新的服务端连接在一起的时候,当前的会话就会断掉并重新建立。

第八段

由 ZooKeeper 客户端发送的读取请求在客户端连接到的 ZooKeeper 服务器上本地处理。如果读取请求在 znode 上注册了一个监视,则该监视也会在 ZooKeeper 服务器上本地跟踪。写入请求被转发到其他 ZooKeeper 服务器,并在生成响应之前经过共识。同步请求也会转发到另一台服务器,但实际上并没有达成共识。因此,读取请求的吞吐量随服务器数量而缩放,写入请求的吞吐量随服务器数量而降低。

这个就是关于zookeeper集群的说明

第九段

秩序对动物园管理员非常重要;几乎接近强迫症。所有更新都是完全订购的。ZooKeeper实际上在每个更新中都用一个反映此顺序的数字标记。我们称这个数字为zxid(ZooKeeper Transaction ID)。每个更新都会有一个唯一的 zxid。读取(和监视)是相对于更新排序的。读取响应将标记由为读取提供服务的服务器处理的最后一个 zxid。

这个zxid我们后面在详细说,现在先看到这

 下面是zookeeper的数据结构模型图:

 

好了我们现在已经大致了解了一下zookeeper是什么,干什么,由什么组成,那么怎么用是我们接下来考虑的怎么用

zookeeper的使用(虚拟机安装使用):

首先我把官网的使用说明贴出来,如果有小伙伴不想看的话,可以跟着我的操作来,保证你学会。

官网:使用说明

第一步

去官网下载.gz安装包(类似于这样的apache-zookeeper-3.x.x-bin.tar.gz),下载好了之后我们把它传到我们的虚拟机里面opt文件夹下面,因为我们一般在虚拟机里面安装就是往这个文件夹下面安,它也是虚拟机给我们专门留的安装位置。

移上去了之后我们用tar -zxvf给他解压缩即可

 第二步

更改配置文件,这也是官方让我们这样做的

 来到我们的conf目录下面看到我们第二个文件的名字了吗(正常刚下下来的话名字是和我这个不一样的,我们需要改成zoo.cfg这也是官方说的),改了之后我们在里面编辑一下我们的初始信息

  配置参数解读

Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
1 tickTime = 2000 :通信心跳时间, Zookeeper 服务器与客户端心跳时间,单位毫秒
2 initLimit = 10 LF 初始通信时限
Leader Follower 初始连接 时能容忍的最多心跳数( tickTime 的数量)
3 syncLimit = 5 LF 同步通信时限
Leader Follower 之间通信时间如果超过 syncLimit * tickTime Leader 认为 Follwer
掉,从服务器列表中删除 Follwer
4 dataDir 保存 Zookeeper 中的数据
注意:默认的 tmp 目录,容易被 Linux 系统定期删除,所以一般不用默认的 tmp 目录。
5 clientPort = 2181 :客户端连接端口,通常不做修改。

 首先我们标记1的地方是我们后面存储我们zookeeper相关操作信息的地方,如果不更改的话按照系统默认的存储地方每隔一个月就会清理一次所以还是改了比较好,所以我们自己创建一个文件夹把地址放上去即可,第二个点是我们搞集群的时候需要的,如果你们不搞得话就没必要弄了。

集群配置配置参数解读

server.A=B:C:D
A 是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件 myid ,这个文件在 dataDir 目录下,这个文件里面有一个数据
就是 A 的值, Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比
较从而判断到底是哪个 server
B 是这个服务器的地址;
C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的
Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。

第三步

启动,cd 到我们的zookeeper主目录下

bin/zkServer.sh start

输入即可启动成功(如果是集群的话,需要在启动一个服务器要不然没有投票结果不会产生leader,半数机制(后面说))

这个命令是进入到我们的客户端,后面的127.0.0.1按照自己服务器配置更改即可

bin/zkCli.sh -server 127.0.0.1:2181

启动成功如下,我们用jps命令来查看当前的启动信息,第一个zookeepermain是我们的服务器端,第三个QuorumPeerMain是我们的客户端:

启动好了之后我们先不急着看怎么使用,我们返回来再看看zookeeper的选举机制

选举机制 (针对集群)

启动好了之后我们返回上面的图

 我们仔细看看这个图,leader代表什么意思?领导者对吧,说明了什么中间的server是领导其它server节点的boss,那么它是怎么产生的这是一个面试重点,我们下面把这种情况叫做选举机制

 我们测试一下我们当前刚启动的server状态是什么?

bin/zkServer.sh status

 发现有一个错误

 说它并没有成功启动,哎,我们不是刚刚明明已经启动了吗这是怎么回事?

原来啊这是人家zookeeper选举机制专门的策略导致的,由于我们刚才配置了三个集群,人家内部有一个选举机制只有再你集群启动半数服务器及以上的时候它才会触发,由于我们刚才只有一个服务器启动了zookeeper所以它才显示错误,当我们启动一班及以上也就是2个即可,然后我们再启动一个hadoop103看看,当hadoop103启动成功后我们就可以看见102成为了follower,hadoop103成为了leader,那这个时候我们启动hadoop104会如何呢?发现他还是follower,那么我们现在来解释一下什么是选票机制

 

 

 这里面借用一下其他人讲的,因为比较清晰

第一次选举(刚刚启动的时候) 

 第二次选举(当leader发生故障不可用的时候)

 至此我们就已经大概了解完了zookeeper的工作机制和组测原理

下面我们来看看zookeeper节点的一些相关操作:

zookeeper节点信息

第一个就是我们需要知道节点的类型,分为 持久/短暂/有序号/无序号

持久:就是指我们创建完了关闭流了之后这个节点还会存在

短暂:创建完了当我们关闭流了节点就会消失

有序号:一般情况下一个节点下面会有多个节点,这些节点一般是无序的,如果你创建的时候带有序号那么他就是有序的,序号是递增的,类似于mysql的主键

无序号:就是我们正常创建的普通节点

然后创建节点的操作我们可以去help命令里面查看我们启动客户端输入help就可以看到各种创建的操作,所以大家自己去实践一下是最好的。 

当然idea也可以连接操作,或者用linux里面的命令行也行

我们再idea里面连接的时候会遇到watch监听器,下面我们看看它的原理

监听器原理

 然后就是分布式锁了

分布式锁

当然不建议大家自己去写api,因为也没必要,人家zookeeper已经封装好了一个框架我们直接拿来用就行了,只需要我们导入一个maven依赖即可,很简单,然后我们去调试一下分布式锁大家来看看。

这个是手写api分布式锁的定义

package org.notme.lockt;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {

        // 获取连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch  如果连接上zk  可以释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }

                // waitLatch  需要释放
                if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });

        // 等待zk正常连接后,往下走程序
        connectLatch.await();

        // 判断根节点/locks是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建一下根节点
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zklock() {
        // 创建对应的临时带序号节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // wait一小会, 让结果更清晰一些
            Thread.sleep(10);

            // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点

            List<String> children = zk.getChildren("/locks", false);

            // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                return;
            } else {
                Collections.sort(children);

                // 获取节点名称 seq-00000000
                String thisNode = currentMode.substring("/locks/".length());
                // 通过seq-00000000获取该节点在children集合的位置
                int index = children.indexOf(thisNode);

                // 判断
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // 就一个节点,可以获取锁了
                    return;
                } else {
                    // 需要监听  他前一个节点变化
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath,true,new Stat());

                    // 等待监听
                    waitLatch.await();

                    return;
                }
            }


        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    // 解锁
    public void unZkLock() {

        // 删除节点
        try {
            zk.delete(this.currentMode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

}

下面是运行代码

package org.notme.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功");
        return client;
    }
}

结果如下 ,成功了

 下面是框架:

package org.notme.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功");
        return client;
    }
}

也是成功运行

一般大公司里面的人家才会自己手写原生api,自己玩的话就用人家框架得了,原理的话大家可以自己去搜索网上很多,这里就不过的细说了。 

结尾总结

关于zookeeper面试的话,面试官主要问分布式锁和选举机制以及节点的相关信息,当然如果你已经都会了的话建议可以多看看源码,因为源码里面讲的很详细也很复杂,最好说一句祝大家早日卷到自己理想的工作,一起加油