目录
在上一篇文章 zookeeper 实现分布式锁-CSDN博客,中,我们重点学习了 zookeeper 实现分布式锁的核心原理以及具体实现,在本篇文章中,我们就来看 Curator 是如何实现分布式锁的
Curator 主要通过 InterProcessMutex 实现 可重入的分布式排他锁(类似于 ReentrantLock),接下来,我们就来通过一个示例来了解 InterProcessMutex 的基本使用
InterProcessMutex 基本使用
添加 Maven 依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.7.0</version>
</dependency>
InterProcessMutex 使用:
public class CuratorLockTest {
public static void main(String[] args) {
/**
* CuratorFramework client: Curator 客户端
* String path: 加锁节点路径
*/
InterProcessMutex mutex1 = new InterProcessMutex(getCuratorFramework(), "/lock");
InterProcessMutex mutex2 = new InterProcessMutex(getCuratorFramework(), "/lock");
new Thread(() -> {
try {
// 阻塞直到获取锁
mutex1.acquire();
System.out.println("线程" + Thread.currentThread().getName() + " 获取到锁...");
mutex1.acquire();
System.out.println("线程" + Thread.currentThread().getName() + " 再次获取到锁...");
Thread.sleep(3 * 1000);
mutex1.release();
System.out.println("线程" + Thread.currentThread().getName() + " 释放锁...");
mutex1.release();
System.out.println("线程" + Thread.currentThread().getName() + " 再次释放锁...");
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
new Thread(() -> {
try {
mutex2.acquire();
System.out.println("线程" + Thread.currentThread().getName() + " 获取到锁...");
mutex2.acquire();
System.out.println("线程" + Thread.currentThread().getName() + " 再次获取到锁...");
Thread.sleep(3 * 1000);
mutex2.release();
System.out.println("线程" + Thread.currentThread().getName() + " 释放锁...");
mutex2.release();
System.out.println("线程" + Thread.currentThread().getName() + " 再次释放锁...");
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("47.112.48.236:2182")
.sessionTimeoutMs(300000)
.connectionTimeoutMs(300000)
.retryPolicy(new ExponentialBackoffRetry(300000, 4)) // 重试策略
.build();
// 启动客户端
curatorFramework.start();
return curatorFramework;
}
}
运行结果:
可以看到,InterProcessMutex 支持可重入
那么,InterProcessMutex 具体是如何实现 acquire() 和 release() 的呢?
接下来,我们就来分别看 InterProcessMutex 的加锁和解锁实现过程
加锁
InterProcessMutex 也是采用 临时顺序节点 + 监听前驱 来实现分布式锁的:
获取当前路径下的所有子节点,排序后判断自己创建的节点是否是最小:
若是 -> 获取锁
不是 -> 监听前一个节点的删除事件
acquire
我们来看 acquire 方法:
public void acquire() throws Exception {
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
当 internalLock(-1, null) 返回 false 时,抛出异常,也就是说 internalLock(-1, null) 中实现了可重入分布式锁,若返回 false,则表示获取锁失败
internalLock
我们继续看 internalLock(-1, null) 方法:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private boolean internalLock(long time, TimeUnit unit) throws Exception {
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
我们先来看上面部分:
在 internalLock 中,会先获取该方法的线程对象( Thread.currentThread()),检查当前线程是否已经持有锁(判断 threadData 中是否保存当前线程锁信息),若已经持有锁,则将锁计数 + 1,并直接返回 true
通过 锁计数器 来实现同一个线程可以多次 acquire 同一把锁
这样,只有 第一次加锁时需要创建临时顺序节点,后续再加锁只需要增加计数值就可实现再次加锁
threadData
我们来看 ConcurrentMap<Thread, LockData> threadData:
它通过 Map 来维护线程与锁的绑定(当前线程对象 -> LoackData),从而记录哪个线程持有锁,持有了几次
LoackData 中保存了每个线程持有的锁信息,包括:owningThread(当前线程),lockPath(创建节点路径) 和 lockCount(加锁次数)
我们继续来看 internalLock 方法中的注释:
一个 lockData 实例只由一个线程操作,因此不需要加锁
也就是说, threadData 是 线程隔离 的,每个 LockData 实例只被其所属线程访问,因此不会有并发修改风险,无需额外同步
我们继续看下面部分:
调用 internals.attemptLock(time, unit, getLockNodeBytes()) 获取锁,并返回创建的临时顺序节点路径 lockPath
若 lockPath 不为空,获取锁成功,创建 LockData 对象,并放入 threadData 中,返回 true
若 lockPath 为空,获取锁失败,返回 false
internals.attemptLock
我们继续看 internals.attemptLock(time, unit, getLockNodeBytes()) :
我们先看while循环上面部分:
startMillis:记录开始时间,用于后续计算是否超时
millisToWait:最大等待毫秒数,为 null 表示无限等待
localLockNodeBytes:表示写入节点的数据,若锁是 可撤销的(revocable),则不写入数据(new byte[0])
retryCount:重试次数
ourPath:创建的 ZNode 节点路径
hasTheLock:是否成功获取到锁
isDone:控制 while 循环是否结束
其中,revocable(AtomicReference<RevocationSpec> revocable)是用于实现可撤销分布式锁(Revocable Lock)的关键字段,用于标记当前锁是否是 "可撤销" 状态,并保存撤销时的处理逻辑
当某个客户端持有分布式锁时,管理员可以通过某种机制(如发送指令)强制让该客户端释放锁
例如,当客户端 锁持有时间过长 或 需要紧急干预 时,管理员就可以强制让客户端释放锁
接下来,我们来重点看 while 循环部分:
一进入循环,就将 isDone 设置为 true,即,默认本次尝试完成后不再重试
当有 KeeperException.NoNodeException 异常抛出时,也就是锁节点被删除(可能此时会话过期、连接丢失等),若此时设置了重试策略,且当前允许重试(不超过最大等待时间、未达重试上限...),则将 isDone 设置为 false,继续尝试
否则,则不允许重试,抛出异常
循环过程中,通过 driver.createsTheLock(client, path, localLockNodeBytes) 来创建临时顺序节点,参与锁竞争
创建临时顺序节点:
创建的临时顺序节点:
通过 internalLockLoop(startMillis, millisToWait, ourPath) 进行锁竞争
最后,当退出循环时:
若获取到锁,则返回 ZNode 路径,若未获取到锁,则返回 null
我们继续来看锁竞争逻辑
internalLockLoop
internalLockLoop(startMillis, millisToWait, ourPath):
我们先看 while 循环上面的内容:
haveTheLock:标记是否已获得锁
通过 revocable.get() 判断当前锁是否设置为 "可撤销",若为可撤销状态,则监听当前节点的数据变化
当 管理员调用 revoke() 时,会修改该节点的数据(如写入特殊标记),触发 revocableWatcher,而在 revocableWatcher 会中断等待或是主动释放锁
我们接着看 while 循环,也就是竞争锁的逻辑实现:
(client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock:若当前客户端处于运行状态且未获得锁,则进入循环:
1. 通过 getSortedChildren() 获取目标节点下的所有子节点,并按照字典序排序(ZooKeeper 顺序节点保证唯一性和有序性)
2. 提取当前节点的序号部分(如:_c_45cf6a55-2717-40fd-b222-2d7d29202558-lock-0000000011)
3. 判断是否可以获得锁 driver.getsTheLock(client, children, sequenceNodeName, maxLeases):
public PredicateResults getsTheLock(
CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 获取当前节点在排序列表中的索引
int ourIndex = children.indexOf(sequenceNodeName);
// 验证索引有效性, 若ourIndex < 0
// 说明当前节点不在子节点列表中,可能是被删除或传参错误,
// 此时会抛出对应 NoNodeException 异常,防止后续错误判断
validateOurIndex(sequenceNodeName, ourIndex);
// 如果当前节点在前 maxLeases 个节点中(索引 < maxLeases), 此时可以获取锁
// maxLeases = 1: 互斥锁,只有第一个节点可以获取锁
// maxLeases > 1: 共享锁(如 读锁),前 maxLeases 个节点都能同时获得锁
boolean getsTheLock = ourIndex < maxLeases;
// 计算监听路径:
// 若获取到锁, 不需要监听任何节点, pathToWatch = null
// 若没获取到锁, 监听第 children.get(ourIndex - maxLeases) 个节点
// 只有当索引为 ourIndex - maxLeases 的节点消失后
// 当前节点才能进入前 maxLeases 名,从而获得锁
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
// 封装 获取锁结果 和 监听节点 并返回
// 若 getsTheLock == true:获取到锁
// 若 getsTheLock == false:需要注册对 pathToWatch 的监听,当该节点被删除时重新判断是否可以获得锁
return new PredicateResults(pathToWatch, getsTheLock);
}
若 maxLeases = 1,也就是当前为互斥锁:
若 maxLeases = 3,最多有 3 个客户端并发持有锁:
lock-000004:监听 children[3-3=0] → 第一个节点
lock-000005:监听 children[4-3=1] → 第二个节点
lock-000006:监听 children[5-3=2] → 第三个节点
lock-000007:监听 children[6-3=3] → 第四个节点
......
当前节点只有在前面第 maxLeases 个节点释放后才可能上位
4. 判断当前是否能获取到锁,若 predicateResults.getsTheLock() 为 true,设置 haveTheLock 为 true,否则,监听节点并等待
5. 监听前一个节点并等待:
// 构造前一个节点, 准备监听
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
// 监听节点
// 使用 getData() 而不是 exists() 来监听节点的删除
// 虽然 exists() 也可以注册 watcher, 但 getData() 更常用
// 且 Curator 建议避免使用 exists() 留下无用 watcher,造成资源泄漏
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
break;
}
// 调用 wait() 阻塞当前线程
wait(millisToWait);
} else {
wait();
}
} catch (KeeperException.NoNodeException e) {
// 若 forPath(previousSequencePath) 时节点已被删除, 则前驱已释放锁
// 跳出当前 try, 继续循环
// 下一轮循环会重新获取子节点列表,判断自己是否成为最小节点
}
}
使用 synchronized + wait/notify 基于 当前线程 进行 wait() 和 notify()
6. 异常处理和中断检查:
ThreadUtils.checkInterrupted(e):如果是 InterruptedException,会重新设置线程中断标志
deleteOurPathQuietly(ourPath, e):尝试删除自己创建的节点(避免残留临时节点)
最后抛出异常,外层可能进行重试或失败
循环结束:
如果最终没拿到锁(比如超时),就删除自己创建的临时节点,避免资源泄漏
以上,就是加锁的全部流程,我们再通过流程图来梳理一遍整体流程
整体流程
我们对比之前实现的加锁流程,来总结一下 InterProcessMutex.acquire() 的优势和特点:
1. 支持可重入(Reentrant):InterProcessMutex 使用 ConcurrentMap<Thread, LockData> threadData 记录线程持有锁,以及持有锁的次数
2. 自动处理会话失效与重连:InterProcessMutex 基于 CuratorFramework 客户端,具备 自动重连机制、会话丢失后自动重建锁状态等
3. Watcher 一次性问题的自动管理:Curator 内部使用 ConnectionStateListener 和 PathChildrenCache 等高级组件,自动重新注册 Watcher,确保事件不丢失
4. 完善的异常处理机制:InterProcessMutex 对所有 ZooKeeper 异常进行封装和重试,并提供 RetryPolicy 可配置重试策略
5. 支持超时获取锁:超时后自动放弃,避免系统雪崩
6. 线程安全与状态管理更精细:InterProcessMutex 为每次加锁操作维护独立的状态,确保多线程并发调用 acquire() 安全,每个线程独立等待自己的前驱节点
接下来,我们继续来看解锁过程:
解锁
release()
public void release() throws Exception {
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
获取当前线程的加锁状态:
获取失败 -> 未加锁,无需解锁,抛出异常
获取成功 -> 锁计数 -1:
此时的锁计数 > 0,此时不能释放锁,直接返回
此时的锁计数 < 0,非法次数,抛出异常
此时的锁计数 = 0,internals.releaseLock(lockData.lockPath) 释放锁,并将当前线程的锁信息从 threadData 中移除
我们继续看 internals.releaseLock(lockData.lockPath)
internals.releaseLock
final void releaseLock(String lockPath) throws Exception {
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
当前客户端即将释放锁,因此不需要再关系任何节点编号,此时通过 client.removeWatchers() 移除当前注册的所有 Watcher,包括
监听前一个节点的 NodeDeleted 事件
监听自身节点的 NodeDataChanged 事件(用于可撤销锁)
...
避免这些 Watcher 继续触发回调或占用内存
一旦锁被释放,就不再需要响应“撤销”指令,此时需要将 AtomicReference<RevocationSpec> 设置为 null
最后,调用 deleteOurPath(lockPath) 删除当前客户端创建的临时顺序节点:
以上就是释放锁的全部流程,我们还是通过流程图来梳理一下
整体流程
了解了 加锁 和 解锁 的具体流程,最后,我们还需要学习一下 InterProcessMutex 是如何唤醒线程来继续加锁的
Watcher
我们找到 LockInternals 中注册的 watcher:
若监听的节点发生变更,就会调用事件回调方法 process(WatchedEvent event)
此时,watcher 会调用 client.postSafeNotify(LockInternals.this) 来处理 "唤醒请求":
default CompletableFuture<Void> postSafeNotify(Object monitorHolder) {
return runSafe(() -> {
synchronized (monitorHolder) {
monitorHolder.notifyAll();
}
});
}
monitorHolder 是锁对象(monitor)的引用,也就是 LockInternals.this,我们在
postSafeNotify 将锁对象的唤醒任务提交给 runSafe(Runnable) 执行:
private final Executor runSafeService;
@Override
public CompletableFuture<Void> runSafe(Runnable runnable) {
return CompletableFuture.runAsync(runnable, runSafeService);
}
通过 runSafeService 来异步执行锁唤醒任务
也就是说,Watcher 的 process() 只负责 接收事件通知,而具体的唤醒操作由 runSafe() 来执行
将 事件处理 和 业务响应 解耦,保证了事件线程能够立即返回,继续处理下一个通知,保证了客户端的高响应性和可靠性