Zookeeper ---- ZooKeeper分布式锁案例
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫做分布式锁。
1. 原生Zookeeper实现分布式锁案例
1. 分布式锁实现
package com.fickler.zkcase2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author dell
* @version 1.0
*/
public class DistributedLock {
private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zooKeeper;
private String rootNode = "locks";
private String subNode = "seq-";
private String waitPath;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String currentNode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//连接建立时,打开latch,唤醒wait在该latch上的线程
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
//发生了waitPath的删除事件
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
waitLatch.countDown();
}
}
});
//等待zk正常连接后,往下走程序
connectLatch.await();
//判断根节点/locks是否存在
Stat stat = zooKeeper.exists("/" + rootNode, false);
if (stat == null){
System.out.println("根节点不存在");
zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//对zk加锁
public void zkLock() throws InterruptedException, KeeperException {
//创建对应的临时带序号节点
String currentMode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号的前一个节点
List<String> children = zooKeeper.getChildren("/locks", false);
//如果children只有一个值,那就直接获取资源,如果有多个,需要判断谁最小
if (children.size() == 1){
return;
}else {
Collections.sort(children);
//获取节点名称
String thisNode = currentMode.substring("/locks/".length());
//通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
//判断
if (index == -1){
System.out.println("数据异常");
}else if (index == 0){
//只有一个节点,就可以获取锁了
return;
}else {
//需要监听,他前一个节点的变化
waitPath = "/locks/" + children.get(index - 1);
//在waitPath上注册监听器,当waitPath被删除时,zookeeper会回调监听器process方法
zooKeeper.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
}
//解锁
public void zkUnlock() throws InterruptedException, KeeperException {
zooKeeper.delete(this.currentNode, -1);
}
}
2. 分布式锁测试
- 创建两个线程
package com.fickler.zkcase2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
* @author dell
* @version 1.0
*/
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//创建分布式锁1
final DistributedLock lock1 = new DistributedLock();
//创建分布式锁2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable(){
@Override
public void run() {
//获取锁对象
try {
lock1.zkLock();
System.out.println("线程1获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程1释放锁");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
//获取锁对象
try {
lock2.zkLock();
System.out.println("线程2获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程2释放锁");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
- 观察控制台变化
2. Curator框架实现分布式锁案例
1. 原生的 Java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
2. Curator是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题。
官方文档:https://curator.apache.org/index.html
3. Curator案例实操
- 添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
- 代码实现
package com.fickler.Lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
private String rootNode = "/locks";
// zookeeper server 列表
private String connectString =
"hadoop102:2181,hadoop103:2181,hadoop104:2181";
// connection 超时时间
private int connectionTimeout = 2000;
// session 超时时间
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
// 测试
private void test() {
// 创建分布式锁 1
final InterProcessLock lock1 = new
InterProcessMutex(getCuratorFramework(), rootNode);
// 创建分布式锁 2
final InterProcessLock lock2 = new
InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.acquire();
System.out.println("线程 1 获取锁");
// 测试锁重入
lock1.acquire();
System.out.println("线程 1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程 1 释放锁");
lock1.release();
System.out.println("线程 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("线程 2 获取锁");
// 测试锁重入
lock2.acquire();
System.out.println("线程 2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程 2 释放锁");
lock2.release();
System.out.println("线程 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 分布式锁初始化
public CuratorFramework getCuratorFramework() {
//重试策略,初试时间 3 秒,重试 3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
//通过工厂创建 Curator
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
//开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
- 观察控制台变化