引言
在 Java 多线程编程中,同步是确保线程安全的关键。Java 并发包(java.util.concurrent)提供了许多高级工具,如 ReentrantLock、Semaphore 和 CountDownLatch,这些工具的底层都依赖于一个强大的框架——AbstractQueuedSynchronizer(AQS)。AQS 由并发大师 Doug Lea 设计,是 Java 并发包的核心组件之一。它通过一个原子整数状态(state)和 FIFO 等待队列管理线程竞争,提供了独占和共享两种同步模式。
AQS 的设计目标是简化锁和同步器的开发。开发者无需从零实现复杂的线程排队和阻塞逻辑,只需继承 AQS 并实现几个关键方法即可创建自定义同步器。例如,ReentrantLock 使用 AQS 实现可重入锁,Semaphore 使用 AQS 管理许可数量。本文将从核心概念、基本使用、源码分析到高级主题,全面解析 AQS 的工作原理,并通过示例代码帮助读者理解其应用。
本文的目标是让读者不仅理解 AQS 的理论,还能通过详细的源码分析和代码示例掌握其底层机制。无论你是 Java 并发编程的初学者还是希望深入研究的高级开发者,本文都将为你提供清晰且实用的指导。
AQS 的核心概念
要理解 AQS,首先需要掌握其核心概念。以下是 AQS 的四个关键组成部分:
1. 同步状态(State)
AQS 使用一个 volatile int 类型的状态(state)来表示同步状态。这个状态的具体含义由子类定义。例如:
在 ReentrantLock 中,state 表示锁的持有状态(0 表示未被持有,大于 0 表示被持有,数值表示重入次数)。
在 Semaphore 中,state 表示可用许可的数量。
AQS 提供了以下方法操作状态:
getState():获取当前状态值,具有 volatile 读语义。
setState(int newState):设置状态值,具有 volatile 写语义。
compareAndSetState(int expect, int update):使用 CAS(比较并交换)原子性地更新状态。
这些方法确保状态操作的线程安全性。
2. FIFO 等待队列
AQS 使用一个基于 CLH(Craig, Landin, Hagersten)锁的变体队列来管理等待线程。这个队列是一个双向链表,包含以下关键字段:
head:队列头部,volatile Node 类型。
tail:队列尾部,volatile Node 类型。
队列中的每个节点(Node)表示一个等待线程,包含以下字段:
volatile int waitStatus:节点状态,可能值为:
CANCELLED (1):节点因超时或中断被取消。
SIGNAL (-1):后继节点需要被唤醒。
CONDITION (-2):节点在条件队列中等待。
PROPAGATE (-3):用于共享模式,传播释放操作。
0:正常状态。
volatile Node prev:前驱节点引用。
volatile Node next:后继节点引用。
volatile Thread thread:关联的线程。
Node nextWaiter:条件队列中的下一个节点或共享模式标记。
队列通过 compareAndSetTail 和 compareAndSetHead 实现原子性操作,确保线程安全。
3. 独占与共享模式
AQS 支持两种同步模式:
独占模式:一次只有一个线程可以获取同步器。例如,ReentrantLock 使用独占模式确保只有一个线程持有锁。
共享模式:多个线程可以同时获取同步器。例如,Semaphore 允许多个线程获取许可,直到许可耗尽。
子类通过实现以下方法定义同步 Pragmatic Play 同步逻辑:
独占模式:tryAcquire(int arg) 和 tryRelease(int arg)。
共享模式:tryAcquireShared(int arg) 和 tryReleaseShared(int arg)。
4. 条件对象(Condition Object)
在独占模式下,AQS 提供了 ConditionObject 类,实现 Condition 接口,类似于 Java 的 wait/notify 机制。条件对象允许线程在特定条件下等待(通过 await())或被唤醒(通过 signal() 或 signalAll())。每个条件对象维护一个独立的等待队列。
AQS 的基本使用
AQS 的强大之处在于其扩展性。开发者可以通过继承 AQS 并实现以下方法创建自定义同步器:
tryAcquire(int arg):尝试在独占模式下获取同步器。
tryRelease(int arg):尝试在独占模式下释放同步器。
tryAcquireShared(int arg):尝试在共享模式下获取同步器。
tryReleaseShared(int arg):尝试在共享模式下释放同步器。
isHeldExclusively():判断同步器是否被当前线程独占持有。
以下是一个简单的互斥锁(Mutex)实现示例:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public java.util.concurrent.locks.Condition newCondition() {
return sync.newCondition();
}
}
示例代码:使用 Mutex
以下是如何使用上述 Mutex 的示例:
public class MutexDemo {
private final Mutex mutex = new Mutex();
private int counter = 0;
public void increment() {
mutex.lock();
try {
counter++;
System.out.println(Thread.currentThread().getName() + " incremented counter to: " + counter);
} finally {
mutex.unlock();
}
}
public static void main(String[] args) {
MutexDemo demo = new MutexDemo();
Runnable task = () -> {
for (int i = 0; i < 5; i++) {
demo.increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
t1.start();
t2.start();
}
}
运行结果将显示两个线程交替增加计数器,确保线程安全。
AQS 源码分析
AQS 的实现复杂但高效。以下从队列管理、获取/释放流程和条件对象三个方面详细分析其源码(基于 Java 8)。
队列管理
Node 类
AQS 的等待队列由 Node 对象组成,每个节点代表一个等待线程。Node 类的关键字段包括:
waitStatus:节点状态(0、CANCELLED、SIGNAL、CONDITION、PROPAGATE)。
prev 和 next:双向链表的前后指针。
thread:关联的线程。
nextWaiter:条件队列或共享模式的标记。
addWaiter(Node mode)
此方法为当前线程创建节点并加入队列尾部:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
功能:创建新节点并尝试通过 CAS 操作加入队列尾部。
流程:若尾节点存在,尝试快速加入;否则调用 enq 方法。
线程安全:通过 compareAndSetTail 确保原子性。
enq(Node node)
当快速加入失败时,enq 方法确保节点正确插入:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
功能:通过自旋和 CAS 操作将节点插入队列尾部。
流程:若队列为空,初始化一个空节点作为头节点;否则将新节点加入尾部。
关键点:自旋确保插入成功,处理并发竞争。
cancelAcquire(Node node)
取消节点的获取操作:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
功能:从队列中移除取消的节点。
流程:清除节点线程引用,更新前驱和后继节点的链接,可能唤醒后继节点。
关键点:确保队列结构的完整性,处理复杂的中断或超时情况。
独占模式获取锁
acquire(int arg)
独占模式获取同步器,忽略中断:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
功能:尝试获取锁,若失败则加入队列并等待。
流程:
调用 tryAcquire(arg) 尝试获取锁。
若失败,调用 addWaiter(Node.EXCLUSIVE) 创建独占模式节点并加入队列。
调用 acquireQueued 在队列中等待。
若被中断,调用 selfInterrupt 恢复中断状态。
关键点:忽略中断,确保线程继续尝试获取锁。
tryAcquire(int arg)
由子类实现,定义获取锁的逻辑。默认实现抛出 UnsupportedOperationException:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
功能:尝试获取锁,返回是否成功。
子类实现:例如,ReentrantLock 检查状态是否为 0 或当前线程是否已持有锁。
关键点:必须确保线程安全,通常使用 compareAndSetState。
acquireQueued(Node node, int arg)
在队列中等待获取锁:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
功能:使线程在队列中等待直到获取锁。
流程:
检查前驱节点是否为头节点且能否获取锁。
若成功,设置当前节点为头节点,清除前驱节点的 next 引用。
若失败,调用 shouldParkAfterFailedAcquire 判断是否需要挂起。
若需要挂起,调用 parkAndCheckInterrupt 挂起线程并检查中断。
若被取消,调用 cancelAcquire 移除节点。
关键点:通过自旋和挂起机制高效管理线程等待。
shouldParkAfterFailedAcquire(Node pred, Node node)
判断是否需要挂起线程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
功能:确保前驱节点状态正确,决定是否挂起当前线程。
流程:
若前驱节点状态为 SIGNAL,返回 true 表示需要挂起。
若前驱节点被取消(ws > 0),移除取消节点。
否则,将前驱节点状态设为 SIGNAL。
关键点:确保队列中只有有效节点,优化唤醒流程。
parkAndCheckInterrupt()
挂起线程并检查中断状态:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
功能:挂起当前线程,唤醒后检查是否被中断。
流程:调用 LockSupport.park 挂起线程,唤醒后调用 Thread.interrupted 返回中断状态并清除标志。
关键点:中断状态在唤醒后处理,确保线程继续执行。
独占模式释放锁
release(int arg)
释放独占模式锁:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
功能:释放锁并唤醒后继线程。
流程:
调用 tryRelease(arg) 释放锁。
若释放成功且头节点有效,调用 unparkSuccessor 唤醒后继节点。
关键点:只有当锁完全释放(tryRelease 返回 true)时才唤醒后继线程。
tryRelease(int arg)
由子类实现,定义释放锁的逻辑。默认实现抛出 UnsupportedOperationException:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
功能:释放锁,返回是否完全释放。
子类实现:例如,ReentrantLock 减少状态值,若为 0 则表示锁完全释放。
关键点:必须确保线程安全。
unparkSuccessor(Node node)
唤醒后继节点的线程:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
功能:唤醒队列中下一个有效节点的线程。
流程:
将当前节点状态重置为 0(若为负值)。
查找后继节点,若无效则从尾部向前查找第一个有效节点。
调用 LockSupport.unpark 唤醒线程。
关键点:确保只唤醒有效节点,处理节点取消情况。
共享模式操作
acquireShared(int arg)
共享模式获取同步器:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
功能彼此
流程:
调用 tryAcquireShared(arg) 尝试获取共享锁。
若失败(返回负值),调用 doAcquireShared 加入队列等待。
关键点:负值表示资源不足,线程进入队列。
tryAcquireShared(int arg)
由子类实现,定义共享模式获取逻辑。默认抛出 UnsupportedOperationException。
doAcquireShared(int arg)
在队列中等待共享锁:
private void doAcquireShared(int arg) {
Node node = addWaiter(Node.SHARED);
for (;;) {
Node p = node.predecessor();
if (p == head && tryAcquireShared(arg) >= 0) {
setHeadAndPropagate(node);
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
Thread.currentThread().interrupt();
}
}
功能:使线程在队列中等待直到获取共享锁。
流程:类似 acquireQueued,但调用 tryAcquireShared 和 setHeadAndPropagate。
关键点:支持多线程同时获取锁。
releaseShared(int arg)
释放共享模式锁:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
功能:释放共享锁并唤醒后继线程。
流程:调用 tryReleaseShared 和 doReleaseShared。
关键点:可能唤醒多个线程。
tryReleaseShared(int arg)
由子类实现,定义共享模式释放逻辑。
doReleaseShared()
处理共享模式释放:
private void doReleaseShared() {
Node h = head;
for (boolean loop = true; loop; ) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
compareAndSetWaitStatus(h, Node.SIGNAL, 0);
unparkSuccessor(h);
break;
}
if (ws > 0)
h = h.next;
else
compareAndSetWaitStatus(h, ws, Node.PROPAGATE);
if (h == null || h == head)
break;
h = h.next;
}
}
功能:释放共享锁并传播唤醒。
流程:检查头节点状态,唤醒后继节点或传播状态。
关键点:支持连续的共享锁获取。
条件对象
newCondition()
创建新的条件对象:
public final Condition newCondition() {
return new ConditionObject();
}
功能:创建用于独占模式的条件对象。
关键点:每个条件对象维护独立的等待队列。
ConditionObject.await()
使线程在条件队列中等待:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
fullyRelease();
int savedState = getState();
while (!isSignalled(node))
LockSupport.park(this);
acquire(savedState);
}
功能:释放锁,加入条件队列,等待信号。
流程:
检查中断状态。
调用 addConditionWaiter 加入条件队列。
调用 fullyRelease 释放锁。
挂起线程,等待 signal 或中断。
唤醒后重新获取锁。
关键点:确保锁完全释放后才挂起。
ConditionObject.signal()
唤醒条件队列中的一个线程:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
transferForSignal(first);
}
功能:将条件队列的第一个节点移到主队列。
流程:检查锁持有状态,调用 transferForSignal。
关键点:确保线程安全地转移节点。
ConditionObject.signalAll()
唤醒条件队列中的所有线程:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter)
transferForSignal(w);
}
功能:将条件队列的所有节点移到主队列。
流程:遍历条件队列,调用 transferForSignal。
关键点:批量转移节点。
其他关键方法
getState()
获取当前状态值:
protected final int getState() {
return state;
}
功能:返回 volatile 状态值。
关键点:提供线程安全的读取。
setState(int newState)
设置状态值:
protected final void setState(int newState) {
state = newState;
}
功能:设置 volatile 状态值。
关键点:提供线程安全的写入。
compareAndSetState(int expect, int update)
原子性地更新状态:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
功能:使用 CAS 更新状态。
关键点:确保原子性。
hasQueuedThreads()
检查是否有线程在队列中等待:
public final boolean hasQueuedThreads() {
return head != null && head.next != null;
}
功能:返回队列是否非空。
关键点:常用于监控。
hasContended()
检查是否发生过竞争:
public final boolean hasContended() {
return head != null;
}
功能:返回是否发生过竞争。
关键点:用于监控竞争状态。
getFirstQueuedThread()
获取队列中第一个线程:
public final Thread getFirstQueuedThread() {
return head == null ? null : head.thread;
}
功能:返回队列头部线程。
关键点:可能需要遍历队列。
isQueued(Thread thread)
检查指定线程是否在队列中:
public final boolean isQueued(Thread thread) {
for (Node h = head; h != null; h = h.next)
if (h.thread == thread)
return true;
return false;
}
功能:返回线程是否在队列中。
关键点:需要遍历队列。
hasQueuedPredecessors()
检查是否有比当前线程更早入队的线程:
public final boolean hasQueuedPredecessors() {
Node h = head;
Node t = tail;
Node s = h != null ? h.next : null;
Thread current = Thread.currentThread();
for (Node q = s; q != null; q = q.next)
if (q != t && q.thread != null && q.thread != current)
return true;
return false;
}
功能:用于实现公平锁。
关键点:检查队列中是否有更早的线程。
getQueueLength()
估算队列中等待线程数量:
public final int getQueueLength() {
int n = 0;
for (Node p = head; p != null; p = p.next)
if (p.thread != null)
n++;
return n;
}
功能:返回队列长度。
关键点:用于监控。
getQueuedThreads()
获取队列中所有线程:
public final Collection<Thread> getQueuedThreads() {
Collection<Thread> threads = new ArrayList<>();
for (Node p = head; p != null; p = p.next)
if (p.thread != null)
threads.add(p.thread);
return threads;
}
功能:返回队列中线程集合。
关键点:用于监控。
getExclusiveQueuedThreads()
获取独占模式等待线程:
public final Collection<Thread> getExclusiveQueuedThreads() {