ReentrantLock本质通过AQS同步器实现,结合AQS认识ReentrantLock。
非公平锁实现原理
竞争失败后会执行执行ReentrantLock实现的acquire方法,在JDK11中,acquire直接由AQS实现了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Mode.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter(Mode.EXCLUSIVE)
acquireQueued
shouldParkAfterFailedAcquire,-1节点负责唤醒后面的紧挨的0节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Thread-0 释放锁,进入 tryRelease 流程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁成功,继续执行acquireQueued
可重入原理
重入一次state+1,释放一次state-1,直到state=0,setExclusiveOwnerThread(null)。
static final class NonfairSync extends Sync {
//...
// Sync 继承过来的方法,方便阅读,放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// Sync 继承过来的方法,方便阅读,放在此处
protected final boolean tryRelease(int releases) {
// state-- 1 -1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入,只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可打断原理(默认)
在此模式下,即使它被打断,仍会驻留在AQS队列中,等获得锁后方能继续运行(是继续运行!只是打断标记被设置为 true)
被打断后,return Thread.interrupted();重置标志位,返回true。parkAndcheckInterrupt()返回真,interrupted记录打断标志,获得到锁后,acquireQueued(addWaiter(Node.EXCLUSIVE), arg)为真,执行selfInterrupt();重新产生一个中断。
所以在队列排队时,不能立刻响应中断,需要获得锁后才能打断。事实上被打断你是一定要作出回应的,这里在队列里时不回应,在获取到锁后进行一次补偿回应。所以获取到锁后进行打断。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
//...
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 还是需要获得锁后,才能返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒,返回打断状态为 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}
可打断模式
尝试获取锁调用acquireInterruptibly而不是acquire,acquire的打断后置标志位变为acquireInterruptibly的抛出异常throw new InterruptedException();。
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁,进入 (一)
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// (一) 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常,而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
非公平锁原理
获取锁调用nonfairTryAcquire直接参与锁的竞争compareAndSetState(0, acquires),不检查等待队列是否有任务。
// (三) Sync 继承过来的方法,方便阅读,放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没获得锁
if (c == 0) {
// 尝试用 cas 获得,这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败,回到调用处
return false;
}
公平锁原理
获取锁时调用tryAcquire,检查等待队列是否有任务。
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点,没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// (一) AQS 继承过来的方法,方便阅读,放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
条件变量实现原理
await流程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// fullyRelease是对锁重入次数的全部释放,包括可重入锁
final int fullyRelease是对锁的全部释放,包括可重入锁(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Thread-0执行LockSupport.park(this);阻塞。t条件变量的Node为-2为等待状态Node.CONDITION。
signal 流程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
isHeldExclusively判断是否是占用锁的线程,doSignal(first);会获取在条件变量中等待的队列中的第一个。
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
更新条件变量中等待的队列的指针指向,transferForSignal将唤醒的队列加入到等待锁的队列中,转移失败则唤醒条件变量中下一个任务。转移失败原因:如任务等待超时放弃对锁的使用。
enq(node)执行转移逻辑,返回前驱节点p,通过p.waitStatus将前驱节点转为-1状态,让前驱节点有责任唤醒后继节点。