AQS(AbstractQueuedSynchronizer)解析

发布于:2025-05-12 ⋅ 阅读:(64) ⋅ 点赞:(0)


推荐移步下面的文章,写的非常详细
美团AQS解析通过reentrantlock反观aqs
博客园
Java全栈
只是因为今天刚复习完操作系统的锁,发现AQS有很多之前不明白的思想突然有点开朗所以整理了一下。
并发笔记-锁(一)

一、AQS简介

AQS是java.util.concurrent.locks包下的一个抽象类,它是构建锁和同步组件(如ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock, FutureTask等)的基础框架。理解AQS是掌握Java并发包高级用法的关键。

二、核心设计思想

2.1 核心设计思想回顾

AQS的核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁的变体实现的,即将暂时获取不到锁的线程加入到队列中。

它主要通过以下两个核心组件实现:

  1. volatile int state: 一个原子更新的状态变量,代表同步状态。
  2. FIFO同步队列 (CLH队列的变体): 一个用于管理等待获取同步状态的线程的队列。

AQS的设计模式是模板方法模式。它定义了获取和释放同步状态的主要逻辑(骨架),但将具体的同步状态的判断和修改操作(如tryAcquire, tryRelease)留给子类去实现。

2.2 CLH锁队列简介

在深入AQS的队列之前,有必要了解一下原始的CLH锁(Craig, Landin, and Hagersten lock)。CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁

以下是一个简化的CLH锁实现示例,帮助理解其原理:

public class CLHLock {
    // CLH队列的节点
    static class CLHNode {
        // 当前节点的锁状态,true表示已持有锁或正在等待获取锁
        volatile boolean locked = false;
    }
    
    // 线程本地存储,每个线程持有自己的节点
    private ThreadLocal<CLHNode> currentNode = ThreadLocal.withInitial(CLHNode::new);
    
    // 线程本地存储,每个线程持有前驱节点的引用
    private ThreadLocal<CLHNode> predecessorNode = new ThreadLocal<>();
    
    // 尾节点的原子引用,用于队列的尾部操作
    private AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
    
    // 获取锁
    public void lock() {
        CLHNode node = currentNode.get();
        node.locked = true; // 设置当前节点状态为等待或持有锁
        
        // 原子操作:将当前节点设置为新的尾节点,并获取原尾节点(前驱)
        CLHNode predecessor = tail.getAndSet(node);
        predecessorNode.set(predecessor);
        
        // 自旋等待前驱节点释放锁
        while (predecessor.locked) {
            // 自旋等待,直到前驱节点的locked变为false
        }
        // 前驱节点已释放锁,当前线程成功获取锁
    }
    
    // 释放锁
    public void unlock() {
        CLHNode node = currentNode.get();
        node.locked = false; // 设置当前节点状态为已释放
        
        // 为下次使用准备新节点(重用节点可能会有问题,在实际实现中通常需要新建)
        currentNode.set(predecessorNode.get());
    }
}

原始CLH锁的核心思想:

  1. 隐式链表结构: 锁维护一个等待线程队列,这个队列是通过节点之间的指针(通常是每个节点指向其前驱节点)隐式形成的。

    // 每个节点持有对前驱节点的引用,形成隐式链表
    private ThreadLocal<CLHNode> predecessorNode = new ThreadLocal<>();
    
  2. 节点状态: 每个线程请求锁时,会创建一个节点。节点通常包含一个状态位(例如,lockedwaiting),表示该线程是否获得了锁或者是否正在等待。

    static class CLHNode {
        volatile boolean locked = false; // true表示正在等待或持有锁
    }
    
  3. 尾指针: 锁本身维护一个指向队列尾部的指针(tail)。

    private AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
    
  4. 获取锁:

    • 线程创建一个新节点,将其locked状态设为true(表示它在等待或想获取锁)。
    • 线程通过原子操作(如getAndSetcompareAndSet)将自己的节点设置为新的队尾,并获取到之前队尾的节点(即它的前驱节点)。
    • 然后,线程开始自旋,不断检查其前驱节点locked状态。
    • 当前驱节点的locked状态变为false时,意味着前驱已经释放了锁,当前线程就可以停止自旋,成功获取锁。
    public void lock() {
        CLHNode node = currentNode.get();
        node.locked = true;
        
        // 原子操作:设置新的尾节点,返回旧的尾节点(前驱)
        CLHNode predecessor = tail.getAndSet(node);
        predecessorNode.set(predecessor);
        
        // 自旋等待前驱节点释放锁
        while (predecessor.locked) {
            // 自旋
        }
    }
    
  5. 释放锁:

    • 持有锁的线程将自己节点locked状态设为false。这将允许其后继节点(如果存在)停止自旋并获取锁。
    public void unlock() {
        CLHNode node = currentNode.get();
        node.locked = false; // 释放锁,通知后继节点
    }
    
  6. 优点:

    • 公平性: 线程按照它们到达的顺序获取锁(FIFO)。
    • 可扩展性好: 在多处理器系统上,每个线程只在本地缓存的前驱节点状态上自旋,减少了对共享内存的竞争和缓存一致性流量。
    // 每个线程都在自己的前驱节点上自旋
    while (predecessor.locked) {
        // 本地自旋,减少全局竞争
    }
    
    • 无饥饿: 由于是公平的,不会产生饥饿现象。
  7. 缺点:

    • 自旋消耗CPU: 即使是高效的自旋,在锁被长时间持有时,等待的线程仍然会消耗CPU资源。
    • NUMA系统下的性能问题: 虽然本地自旋,但在NUMA(Non-Uniform Memory Access)架构下,如果前驱节点在远程内存,访问开销依然存在。

CLH锁到AQS的演进

理解了原始CLH锁的原理后,就能更好地理解AQS为什么要对其进行改进。AQS借鉴了CLH锁的FIFO公平性和队列结构,但对其进行了重要改进:

  1. 将纯自旋改为阻塞等待(使用LockSupport.park
  2. 引入waitStatus等复杂状态管理
  3. 使用双向链表结构
  4. 支持取消、中断等高级特性

2.3 AQS对CLH队列的改动及其原因

AQS的同步队列是CLH锁队列的一个重要变体。它继承了CLH队列的基本思想(FIFO、节点链接),但做了关键的修改以适应更广泛的同步场景,特别是支持线程阻塞而不是纯自旋

AQS队列与原始CLH队列的主要区别和改动:

  1. 节点结构 (Node):

    • 双向链表: AQS的Node包含prevnext指针,构成一个双向链表。原始CLH通常是单向的(每个节点知道其前驱)。
      • 原因: 双向链表使得节点的插入和移除(尤其是在处理CANCELLED节点时)更加方便和高效。例如,unparkSuccessor方法中从尾部向前遍历寻找下一个有效等待者时,prev指针非常有用。
    • waitStatus状态字段: AQS的Node有一个waitStatus字段,它比原始CLH中简单的locked状态更复杂,用于协调线程的阻塞和唤醒。状态包括SIGNAL, CANCELLED, CONDITION, PROPAGATE
      • 原因: AQS需要管理线程的阻塞 (park) 和唤醒 (unpark),而不仅仅是自旋。waitStatus用于确保前驱节点在释放锁时能够可靠地唤醒后继节点。例如,SIGNAL状态表示后继节点需要被唤醒。
    • 封装线程: AQS的Node直接持有等待的Thread对象。
      • 原因: 当需要唤醒一个节点时,可以直接通过node.thread获取到线程对象并调用LockSupport.unpark(node.thread)
  2. 阻塞代替纯自旋:

    • 原始CLH是纯自旋锁。线程会一直循环检查前驱节点的状态。
    • AQS中,当一个线程发现获取锁失败且其前驱节点不是head时(或者即使是head的后继,但尝试获取再次失败),它通常不会无限自旋。通过shouldParkAfterFailedAcquireparkAndCheckInterrupt机制,线程会被LockSupport.park()挂起,进入阻塞状态,从而释放CPU。
      • 原因: 纯自旋只适用于锁持有时间非常短的场景。对于可能较长的锁持有时间,或者在线程数量较多时,阻塞是更有效的方式,避免了CPU资源的浪费。Java中的锁和同步器需要适应更通用的场景。
  3. 明确的headtail指针:

    • AQS显式维护headtail指针。head是一个虚拟节点(dummy node),不代表任何等待线程,它仅仅指向当前持有锁的线程(或者说,当前持有锁的线程"曾经是"head的后继,现在它自己成为了head)。
      • 原因: 虚拟head节点简化了队列为空和非空时的边界条件处理。新入队的节点总是添加到tail之后。当一个节点获取到锁后,它成为新的head,原head节点被断开。
  4. 取消操作 (CANCELLED状态):

    • AQS支持线程在等待过程中取消(例如,由于中断或超时)。被取消的节点其waitStatus会被标记为CANCELLED。这些节点会被从有效的等待链中"跳过"。
      • 原因: 实际应用中,线程等待可能会被中断或超时,需要一种机制来处理这种情况,避免无效的等待和资源泄漏。原始CLH通常不直接处理节点取消。
  5. 条件队列 (ConditionObject):

    • AQS通过内部类ConditionObject支持条件变量。这意味着同一个Node对象可能先在条件队列中等待,被signal后再转移到同步队列中等待获取锁。
      • 原因: 这是Java并发包中Lock接口提供Condition的需要,允许更灵活的线程间协作,类似于内置锁的wait/notify。原始CLH锁本身不包含条件变量的概念。
  6. 共享模式 (SHARED) 支持:

    • AQS的节点可以标记为SHARED模式,允许实现如SemaphoreCountDownLatch、读写锁的读锁等共享型同步器。在共享模式下,锁的释放可能会唤醒多个等待线程(通过PROPAGATE状态和doReleaseShared的传播机制)。
      • 原因: 纯粹的CLH锁是独占锁。AQS需要一个更通用的框架来支持不同类型的同步器。

为什么要进行这些改动?

  • 通用性: AQS的设计目标是成为一个通用的同步框架,能够支持多种不同类型的同步器(独占锁、共享锁、可重入锁、信号量、倒计时门闩等)。原始CLH锁主要是一种特定的独占自旋锁实现。
  • 效率与资源利用: 对于Java应用,纯自旋通常不是最佳选择,因为它可能在锁竞争激烈或锁持有时间长时浪费大量CPU。通过引入阻塞机制,AQS可以在线程等待时释放CPU资源,提高系统整体吞吐量。
  • 功能完备性: 现实世界的并发场景需要处理中断、超时、条件等待等复杂情况。AQS通过waitStatusConditionObject等机制提供了这些功能。
  • Java并发模型的需求: Java的Lock接口定义了可中断的锁获取、尝试获取锁、条件变量等特性,AQS需要支持这些特性。
    AQS借鉴了CLH队列FIFO和本地自旋(尽管AQS中自旋非常短暂,主要目的是在park前尝试获取或设置状态)的思想,但对其进行了大幅度的扩展和改造,使其成为一个既能高效处理无竞争情况(通过CAS),又能有效管理有竞争情况下线程阻塞和唤醒的强大同步基础组件。它将CLH的核心思想从一个特定的自旋锁实现提升到了一个通用的同步器构建框架。

三、核心组件详解

3.1 state 状态变量

private volatile int state;
  • volatile关键字: 保证了state变量在多线程之间的可见性,并且在一定程度上防止指令重排序。但volatile本身不保证原子性,AQS通过CAS操作来保证对state修改的原子性。
  • 含义由子类定义:
    • ReentrantLock: state表示锁的持有计数。0表示未被锁定,大于0表示已被某个线程锁定,并且值是该线程重入的次数。
    • Semaphore: state表示当前可用的许可数量。
    • CountDownLatch: state表示需要等待的计数数量。
    • ReentrantReadWriteLock: state被拆分为高16位表示读锁的持有数量,低16位表示写锁的持有数量(或重入次数)。
  • 操作方法:
    • protected final int getState(): 获取当前state的值。
    • protected final void setState(int newState): 设置state的值。注意: 这个方法不是原子的,通常在子类确认已经独占访问或者在tryAcquire/tryRelease逻辑中,能确保线程安全时才直接调用。
    • protected final boolean compareAndSetState(int expect, int update): 原子地比较state的当前值是否等于expect,如果是,则将其更新为update。这是实现原子更新的关键,底层依赖Unsafe类的CAS操作。

3.2 同步队列 (FIFO双向链表)

AQS内部维护了一个用于管理等待线程的FIFO双向链表,通常被称为CLH队列的变体。CLH队列是一种自旋锁,但AQS中的队列节点在大多数情况下会让线程阻塞,而不是纯粹自旋。

static final class Node {
    // 节点模式: 共享模式 vs 独占模式
    static final Node SHARED = new Node(); // 标记节点在共享模式下等待
    static final Node EXCLUSIVE = null;   // 标记节点在独占模式下等待 (默认)

    // 节点的等待状态 (waitStatus)
    static final int CANCELLED =  1;  // 线程的请求已被取消 (超时或中断)
    static final int SIGNAL    = -1;  // 后继节点需要被唤醒 (unpark)
    static final int CONDITION = -2;  // 节点在条件队列中等待 (与同步队列不同)
    static final int PROPAGATE = -3;  // (仅共享模式) releaseShared需要向后传播

    volatile int waitStatus;    // 节点当前状态 (重要!)
    volatile Node prev;         // 指向前驱节点
    volatile Node next;         // 指向后继节点
    volatile Thread thread;     // 封装的等待线程
    Node nextWaiter;           // 指向条件队列中的下一个等待者 (用于ConditionObject)

    // 构造函数
    Node() { } // Dummy node for SHARED
    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    // ... 其他辅助方法,如isShared()
}

// 同步队列的头尾指针
private transient volatile Node head;
private transient volatile Node tail;
  • 队列结构:
    • 双向链表: 方便节点插入、移除以及从尾部向前遍历(unparkSuccessor中会用到)。
    • head节点: 是一个虚拟节点(dummy node),不持有实际的等待线程。它代表当前持有锁(或已成功获取同步状态)的线程。当一个线程成功获取锁后,原来的head节点出队,该线程对应的节点成为新的head
    • tail节点: 指向队列的尾部。新加入的等待线程会被添加到队尾。
    • volatile修饰: head, tail, 以及Node中的prev, next, waitStatus, thread都是volatile的,确保多线程下的可见性。
  • Node.waitStatus详解:
    • 0 (默认): 节点的初始状态,或在某些情况下表示节点不需要被特别处理。
    • CANCELLED (1): 表示节点中的线程因为超时或中断而放弃了等待。已取消的节点不会再参与锁竞争,会被从队列中移除。
    • SIGNAL (-1): 核心状态。表示当前节点的后继节点(或后继节点中的某个线程)已经被(或即将被)阻塞 (park)。因此,当当前节点释放锁或被取消时,它必须唤醒 (unpark) 它的后继节点。
    • CONDITION (-2): 表示节点当前在条件队列 (Condition Queue) 中等待,而不是在同步队列中。当线程调用Condition.await()时,它会进入条件队列。
    • PROPAGATE (-3): (仅用于共享模式) 当共享模式下的操作完成(如releaseShared),需要将状态向后传播,以确保其他共享模式的等待者也能被唤醒。
  • Node.nextWaiter:
    • 在同步队列中,它被用来区分节点是SHARED还是EXCLUSIVE模式。
    • 在条件队列中,它被用来链接条件队列中的节点(形成单向链表)。

四、核心方法深度解析

AQS提供了两类主要的获取/释放同步状态的方法:独占模式(如acquire/release)和共享模式(如acquireShared/releaseShared)。我们以独占模式为例进行深入分析。

4.1 获取同步状态 (独占模式) - acquire(int arg)

public final void acquire(int arg) {
    // 1. 尝试直接获取同步状态 (由子类实现具体逻辑)
    if (!tryAcquire(arg) &&
        // 2. 如果获取失败,则将当前线程加入等待队列,并使其在队列中等待
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 3. 如果线程在等待过程中被中断,则在获取到锁后,进行自我中断
        selfInterrupt();
}

流程分解:

(a) tryAcquire(int arg) (由子类实现)

  • 这是模板方法模式的核心。子类(如ReentrantLock.FairSyncNonfairSync)必须重写此方法。
  • 职责: 尝试以独占方式获取同步状态。
    • 如果成功,返回true
    • 如果失败,返回false
  • 实现要点:
    • 必须是线程安全的,通常需要使用CAS操作来修改state
    • 需要判断当前state是否允许获取(例如,ReentrantLock中state为0,或当前线程已持有锁)。
    • 如果获取成功,通常需要记录当前持有锁的线程(AQS提供了setExclusiveOwnerThread方法)。
  • 返回值: acquire方法的行为完全依赖于tryAcquire的返回值。

(b) addWaiter(Node mode) (如果tryAcquire失败)

  • 如果tryAcquire返回false,表示当前线程未能获取到锁,需要将其加入等待队列。
  • 职责: 将当前线程封装成一个新的Node对象,并将其安全地添加到同步队列的尾部。
  • 参数mode: Node.EXCLUSIVE (独占模式) 或 Node.SHARED (共享模式)。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); // 创建新节点
    // 快速尝试CAS尾部插入 (乐观尝试,大部分情况下能成功)
    Node pred = tail;
    if (pred != null) { // 队列已初始化
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // CAS设置新的尾节点
            pred.next = node; // 原尾节点的next指向新节点
            return node;
        }
    }
    // 如果CAS失败 (说明tail被其他线程修改了) 或队列未初始化,则进入enq自旋
    enq(node);
    return node;
}

© enq(final Node node) (如果addWaiter中的快速尝试失败)

  • 职责: 通过CAS自旋的方式,安全地将节点加入队列尾部。同时处理队列的初始化(如果headtailnull)。
private Node enq(final Node node) {
    for (;;) { // 无限循环,直到成功入队
        Node t = tail;
        if (t == null) { // 队列为空,需要初始化
            // 创建一个虚拟头节点
            if (compareAndSetHead(new Node()))
                tail = head; // 初始化后,tail也指向虚拟头节点
        } else { // 队列不为空
            node.prev = t; // 新节点的前驱指向当前尾节点
            if (compareAndSetTail(t, node)) { // CAS尝试将新节点设置为尾节点
                t.next = node; // 成功后,原尾节点的next指向新节点
                return t; // 返回原尾节点 (即新节点的前驱)
            }
        }
        // 如果CAS失败,说明在设置过程中tail被其他线程修改,循环重试
    }
}
  • 初始化: 如果队列为空,enq会先创建一个虚拟的head节点,然后tail也指向这个head节点。之后真正的第一个等待节点会挂在tail后面。

(d) acquireQueued(final Node node, int arg) (核心等待逻辑)

  • 如果线程成功加入队列 (通过addWaiterenq),此方法负责让该线程在队列中等待,直到轮到它获取锁。
  • 职责:
    1. 检查当前节点是否是head的后继节点(即实际的队首)。
    2. 如果是,则再次尝试调用tryAcquire获取锁。
    3. 如果获取成功,则将当前节点设置为新的head,并从队列中断开原head
    4. 如果获取失败,或者当前节点不是队首,则判断是否需要阻塞当前线程 (park)。
    5. 如果线程在等待过程中被中断,记录中断状态。
  • 返回值: true如果线程在等待时被中断,否则false
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 标记是否成功获取锁,用于finally块中处理取消
    try {
        boolean interrupted = false; // 标记线程是否被中断过
        for (;;) { // 自旋检查和等待
            final Node p = node.predecessor(); // 获取当前节点的前驱
            // 关键判断: 如果前驱是head,说明当前节点是实际的队首
            // 此时有资格尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 获取成功,将当前节点设为新的head
                p.next = null; // 原head的next置为null,帮助GC
                failed = false; // 标记成功获取
                return interrupted; // 返回中断状态
            }
            // 如果获取锁失败,或者不是队首,则判断是否应该park当前线程
            // shouldParkAfterFailedAcquire会检查前驱节点的waitStatus
            // parkAndCheckInterrupt会真正park线程并检查中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; // 如果park后发现被中断,标记为true
        }
    } finally {
        if (failed) // 如果获取锁最终失败 (例如,tryAcquire抛异常或线程被取消)
            cancelAcquire(node); // 取消当前节点的排队
    }
}

(e) shouldParkAfterFailedAcquire(Node pred, Node node)

  • acquireQueued中,当一个节点尝试获取锁失败后,调用此方法来决定是否应该阻塞(park)当前线程。
  • 职责: 检查前驱节点predwaitStatus
    • 如果pred.waitStatus == Node.SIGNAL (-1): 表示前驱节点承诺在释放锁或被取消时会唤醒当前节点。因此,当前线程可以安全地park。返回true
    • 如果pred.waitStatus > 0 (CANCELLED): 表示前驱节点已被取消。当前节点需要跳过这个已取消的前驱,向前找到一个未取消的节点作为新的有效前驱。返回false(表示暂时不park,需要重试循环)。
    • 如果pred.waitStatus是0或PROPAGATE: 表示前驱节点当前没有义务唤醒后继。当前节点需要通过CAS将前驱的waitStatus设置为SIGNAL,以确保前驱在将来会唤醒自己。返回false(表示暂时不park,已尝试设置SIGNAL,下一轮循环再检查)。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 前驱状态为SIGNAL,可以直接park
        return true;
    if (ws > 0) { // 前驱已取消 (CANCELLED)
        // 跳过已取消的前驱节点,向前寻找
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node; // 将当前节点链接到新的有效前驱
    } else { // 前驱状态为0或PROPAGATE
        // CAS将前驱状态设置为SIGNAL,表示当前节点需要前驱来唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false; // 返回false表示当前不park,acquireQueued会再次循环
}

(f) parkAndCheckInterrupt()

  • shouldParkAfterFailedAcquire返回true时调用此方法。
  • 职责: 使用LockSupport.park(this)来阻塞当前线程,并返回线程是否被中断。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞当前线程
    return Thread.interrupted(); // 清除并返回中断状态
}

(g) selfInterrupt()

  • 如果在acquireQueued中线程被中断 (interrupted = true),acquire方法会在最后调用selfInterrupt()(g) selfInterrupt()

  • 如果在acquireQueued中线程被中断 (interrupted = true),acquire方法会在最后调用selfInterrupt()

  • 职责: 重新设置当前线程的中断状态。这是因为parkAndCheckInterrupt中的Thread.interrupted()会清除中断状态。AQS的设计哲学是,底层的同步机制不应该"吞噬"中断信号,而是应该在完成其主要任务(获取锁)后,将中断状态恢复,以便上层调用者可以响应这个中断。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

(h) cancelAcquire(Node node) (在acquireQueuedfinally块中调用)

  • 如果线程在获取锁的过程中失败(例如,等待超时或tryAcquire抛出未检查异常),或者线程在park之前就被中断且决定不再等待,则需要取消该节点。
  • 职责: 将节点的waitStatus设置为CANCELLED,并尝试将其从队列中移除,同时唤醒可能的后继节点。
    • 这个过程比较复杂,涉及到处理已取消节点前后节点的链接,以及确保如果当前节点是tail,则正确更新tail
    • 如果当前节点的前驱的waitStatusSIGNAL,或者成功将其设置为SIGNAL,并且当前节点有线程,则会唤醒它的后继节点(如果后继节点存在且未取消)。

4.2 释放同步状态 (独占模式) - release(int arg)

public final boolean release(int arg) {
    // 1. 尝试释放同步状态 (由子类实现)
    if (tryRelease(arg)) {
        Node h = head; // 获取当前头节点
        // 2. 如果头节点存在且其waitStatus不为0 (通常是SIGNAL)
        //    说明有后继节点在等待被唤醒
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 唤醒头节点的后继
        return true;
    }
    return false;
}

流程分解:

(a) tryRelease(int arg) (由子类实现)

  • 这是模板方法模式的另一部分。
  • 职责: 尝试以独占方式释放同步状态。
    • 如果成功释放(例如,state变为0,锁完全被释放),返回true
    • 如果只是部分释放(例如,可重入锁的计数减少但未到0),或者释放失败,返回false
  • 实现要点:
    • 必须是线程安全的。
    • 通常需要检查当前线程是否是锁的持有者。
    • 修改state。如果锁完全释放,通常需要清除独占所有者线程(AQS提供了setExclusiveOwnerThread(null))。
  • 返回值: release方法的行为依赖于此。

(b) unparkSuccessor(Node node) (如果tryRelease返回true且队列需要唤醒)

  • 当锁被成功释放后,需要唤醒等待队列中的下一个线程。
  • 参数node此时是队列的head节点。
  • 职责: 找到head节点的第一个未被取消的后继节点,并使用LockSupport.unpark()唤醒其线程。
private void unparkSuccessor(Node node) { // node is the head
    int ws = node.waitStatus;
    if (ws < 0) // 如果head的waitStatus是SIGNAL或PROPAGATE
        // 尝试将其CAS回0 (表示不再需要通知后继,或已完成传播)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // 获取head的直接后继
    // 如果后继为null或已取消 (waitStatus > 0)
    if (s == null || s.waitStatus > 0) {
        s = null; // 清空s,准备从尾部向前查找
        // 从队列尾部向前遍历,找到离head最近的、未取消的节点
        // 为什么要从尾部向前?
        // 因为在enq操作中,node.prev的设置先于t.next的设置。
        // 在并发情况下,从head向后遍历时,next指针可能暂时为null。
        // 而从tail向前遍历prev指针是相对稳定的。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) // 找到一个有效的等待节点
                s = t;
    }
    if (s != null) // 如果找到了有效的后继节点
        LockSupport.unpark(s.thread); // 唤醒其线程
}

五、条件变量 ConditionObject

AQS还提供了一个内部类ConditionObject,它是java.util.concurrent.locks.Condition接口的实现。Condition通常与一个独占锁(如ReentrantLock)配合使用,提供了类似Object.wait(), notify(), notifyAll()的功能,但更灵活(可以有多个条件队列与一个锁关联)。

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;  // 条件队列的头指针 (单向链表)
    private transient Node lastWaiter;   // 条件队列的尾指针

    public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        // 1. 将当前线程封装成Node加入条件队列
        Node node = addConditionWaiter();
        // 2. 完全释放当前线程持有的锁 (state会保存下来)
        //    这是必须的,否则其他线程无法获取锁来signal当前线程
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 3. 循环检查当前节点是否已被转移到同步队列
        //    如果没有,则park当前线程,等待被signal
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            // 检查是否在park期间被中断
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break; // 如果被中断,跳出等待循环
        }
        // 4. 当线程被唤醒(signal)并成功转移到同步队列后,
        //    或者因中断跳出循环后,尝试重新获取锁
        //    acquireQueued会处理中断模式
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 清理条件队列中可能存在的已取消节点
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // 根据中断模式处理中断
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    public final void signal() {
        if (!isHeldExclusively()) //必须持有锁才能signal
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first); // 唤醒条件队列中的第一个等待者
    }

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        // 将节点从条件队列转移到同步队列,如果成功,则unpark同步队列中被唤醒的节点
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

    // transferForSignal 将节点从条件队列移动到AQS同步队列的尾部,
    // 并可能唤醒它(如果它的前驱是SIGNAL状态或者它成为了新的head)。
}
  • await()过程:
    1. 当前线程必须持有与此Condition关联的锁。
    2. 创建一个新的NodewaitStatusCONDITION),加入到ConditionObject内部的条件队列。
    3. 完全释放当前线程持有的锁(调用AQS的fullyRelease)。这允许其他线程获取锁并最终signal此条件。
    4. 线程进入循环等待,直到被signal并且节点被转移到AQS的同步队列中,或者线程被中断。在循环中,线程会park
    5. 当线程被唤醒(通过signal或中断),它会尝试重新(竞争性地)获取之前释放的锁(调用AQS的acquireQueued)。
    6. 获取锁成功后,await方法返回。
  • signal()过程:
    1. 当前线程必须持有与此Condition关联的锁。
    2. 从条件队列的头部取出一个节点。
    3. 调用transferForSignal方法,尝试将该节点从条件队列转移到AQS的同步队列尾部。
    4. 如果转移成功,并且该节点在同步队列中的状态允许被唤醒(例如,它的前驱节点释放了锁),则该节点的线程最终会被unpark,并参与锁的竞争。

六、AQS的设计特点总结

  1. 模板方法设计模式:

    • AQS 定义了同步过程的骨架(acquire, release, acquireShared, releaseShared等final方法)。
    • 子类通过重写受保护的 tryAcquire, tryRelease, tryAcquireShared, tryReleaseShared, isHeldExclusively 等方法来定制具体的同步逻辑。这使得AQS可以适应多种同步需求。
  2. 基于volatile int state的同步状态管理:

    • 简单而强大,一个int变量足以表达多种同步语义。
    • volatile保证可见性,CAS操作保证原子更新。
  3. CLH队列变体:

    • FIFO队列保证了线程获取锁的公平性(如果子类实现支持公平性)。
    • 节点通过waitStatus和前驱节点的状态来决定是否阻塞,以及何时被唤醒。
    • head作为虚拟节点简化了队列操作。
  4. CAS操作的大量使用:

    • 无论是修改state,还是对队列headtail的修改,以及节点waitStatus的修改,都广泛使用CAS操作来保证无锁或极少锁情况下的线程安全和高效性。
  5. 阻塞与唤醒机制 (LockSupport):

    • AQS使用LockSupport.park()来挂起线程,使用LockSupport.unpark(Thread)来唤醒线程。
    • LockSupport提供了更细粒度的线程阻塞和唤醒控制,与线程的Object.wait/notify不同,它不需要获取对象的监视器锁。park/unpark可以响应中断。
  6. 对中断的良好处理:

    • 在等待获取同步状态时,如果线程被中断,AQS会记录中断状态,并在获取成功后通过selfInterrupt()恢复中断状态,让上层代码有机会处理中断。
    • acquireInterruptibly方法允许在等待时响应中断并抛出InterruptedException
  7. 独占模式与共享模式:

    • AQS同时支持独占(只有一个线程能获取)和共享(多个线程能同时获取)两种同步模式,这使得它可以用来实现如ReentrantLock(独占)和Semaphore/CountDownLatch/ReentrantReadWriteLock的读锁(共享)等多种同步器。
  8. 公平性与非公平性支持:

    • AQS本身不强制公平性,但提供了判断是否有前驱等待者的方法(如hasQueuedPredecessors()),子类可以利用这个方法来实现公平锁。
    • 非公平锁: 新请求锁的线程可以尝试"插队",直接获取锁,如果获取失败再入队。吞吐量通常更高,但可能导致饥饿。
    • 公平锁: 严格按照线程在队列中的顺序分配锁。保证了公平性,但通常吞吐量较低,因为即使锁可用,新线程也必须入队等待。

七、应用实例:ReentrantLock

ReentrantLock通过内部类Sync(及其子类FairSyncNonfairSync)继承AQS来实现。

// NonfairSync (默认)
static final class NonfairSync extends Sync {
    // ...
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 无锁状态
            // 非公平:直接尝试CAS获取,不检查队列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 重入
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc); // 重入不需要CAS,因为当前线程已持有锁
            return true;
        }
        return false; // 获取失败
    }
}

// FairSync
static final class FairSync extends Sync {
    // ...
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 无锁状态
            // 公平:先检查队列中是否有等待者 (hasQueuedPredecessors())
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 重入
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
// tryRelease 对于公平和非公平锁是一样的
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException(); // 非锁持有者不能释放
    boolean free = false;
    if (c == 0) { // 完全释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c); // 设置新的state
    return free; // 返回是否完全释放
}
  • 非公平锁tryAcquire: 如果锁空闲,直接尝试CAS获取,不关心队列中是否有等待者。
  • 公平锁tryAcquire: 如果锁空闲,会先调用hasQueuedPredecessors()检查同步队列中是否有比当前线程等待更久的线程。如果没有,才尝试CAS获取。
  • tryRelease: 减少state计数。如果计数变为0,则表示锁完全被释放,清除独占线程,并返回true,这将触发AQS唤醒等待队列中的下一个线程。

八、总结

AQS 通过 volatile int state、FIFO同步队列和CAS操作,构建了一个强大且灵活的同步基础框架。它巧妙地运用了模板方法模式,将核心的同步逻辑(线程排队、阻塞、唤醒)封装在自身,而将具体的同步状态判断和修改逻辑交由子类实现。这使得Java并发包中的各种同步器(Lock, Semaphore, CountDownLatch等)能够拥有一致且高效的底层实现。


网站公告

今日签到

点亮在社区的每一天
去签到