AQS 重入锁

发布于:2022-12-03 ⋅ 阅读:(605) ⋅ 点赞:(0)

什么是AQS

AQS是 AbstractQueuedSynchronizer的缩写,可重入锁ReentrantLock 是它的子类,所以我们今天来看看AQS 都做了做什么?

可重入锁,就是支持重新进入的锁,它表示该锁能够支持一个线程对资源的重复加锁,可以通过构造方法来确定是公平锁还是非公平锁。

公平锁就是 线程先来后到,也就是等待时间最长的线程最优先拿到锁。
实现这一目的的方法是设计一个队列(不是queue,而是双向链表),链表里面放的是Node,Node里面会封装等待的线程Thread,前后指针等一些信息。

synchronize 就是重入锁,可以重复进入锁

实现重进入要解决两个问题:

  1. 线程再次获取锁
  2. 锁最终释放,线程重复获取锁 n次,在释放锁 n次后,其它线程可以获取到该锁。线程获取锁几次是通过 state变量来实现,获取锁会增加 state,释放锁会减少state,当state=0时,其它线程就可以抢锁了。

在这里插入图片描述
head节点会获取锁,后面的node节点会被park 挂起,
==我们调用lock这个方法就会有一个node进入阻塞队列,等待,只有head节点才可以拿到锁,拿到锁才可以继续向下执行,否则等待。==这就是为什么AQS重入锁会和阻塞队列联系起来。

AQS中的部分属性

属性:一些维护双向队列需要的指针 prev,next,头尾节点 head,tail。当前Node中封装的thread,状态state

    //队列的头节点 任何时刻  头节点对应的线程都是当前持锁线程
    private transient volatile Node head;
    
    //阻塞队列的尾结点 (阻塞队列不包括头节点  head.next-->tail 是阻塞队列)
    private transient volatile Node tail;
    
    //表示资源
    //独占模式:0 表示未加锁状态  >0 表示已经加锁状态
    private volatile int state;
    
    //队列的头节点 任何时刻  头节点对应的线程都是当前持锁线程
    private transient volatile Node head;
    
    //阻塞队列的尾结点 (阻塞队列不包括头节点  head.next-->tail 是阻塞队列)
    private transient volatile Node tail;
    
    //表示资源
    //独占模式:0 表示未加锁状态  >0 表示已经加锁状态
    private volatile int state;
    

AQS中的重要内部类

重要内部类 Node ,里面封装的是Node状态state,头尾节点 head,tail。当前Node中封装的thread,前后指针prev,tail

public abstract class AbstractQueuedSynchronizer{

   static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        //枚举:共享模式
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        //枚举:独占模式
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        //表示当前节点处于 取消状态
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //表示当前节点需要唤醒它的后继节点(SIGNAL 表示后继节点需要当前节点唤醒)
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        //
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        //node状态,可选值(0,CANCELED 1 ,SIGNAL -1 ,CONDITION -2 ,PROPAGATE -3)
        //==0 是默认状态
        //>0  取消状态
        //<0  表示当前node如果是head节点时,释放锁之后,需要唤醒后继节点
        volatile int waitStatus;
        
	    //因为node需要构建 fifo队列,索引prev 指向前继节点的
	    volatile Node prev;
	    
	    //因为node需要构建 fifo队列,索引 next 指向后继节点的
	    volatile Node next;
	    
	    //当前node封装的线程 本尊
	    volatile Thread thread;
}

AQS和ReentrantLock

公平锁FairSync 是ReentrantLock里的内部类,Sync继承了AQS
下面方法前有/*AQS*/ 就代表这个方法在AQS中。
ReentrantLock中的大部分方法都来自AQS,所以公平锁就是用来讲AQS中方法的最好例子。

ReentrantLock的两个构造方法:

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

通过构造来选择 公平锁还是非公平锁
下面我们来看一下 可重入锁是怎么实现的
公平锁加锁 lock()

static final class FairSync extends Sync {
		final void lock() {
		            acquire(1);
	    }
	    
	    /*AQS */
        public final void acquire(int arg) {
            //条件一:tryAcquire 尝试获取锁 获取成功返回true 获取失败 返回false
            //条件二:2.1:addWaiter 将当前线程封装成node入队
            //      2.2:acquireQueued 挂起当前线程  唤醒后相关逻辑
            //      acquireQueued 返回true 表示挂起过程这个线程被中断唤醒过.. false 表示未被中断过
            if (!tryAcquire(arg) &&
                    //addWaiter执行完后,node一定在队列中了
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //Node.EXCLUSIVE==null
                //再次设置中断标记为 true
                selfInterrupt();
        }
  1. tryAcquire方法是用来尝试获取锁的
  2. addWaiter方法 将当前线程关联的节点加入到队列中
  3. acquireQueued 方法再次尝试获取锁,获取不到 挂起该线程

tryAcquire 快速尝试获取锁

该方法会很快的尝试获取锁,获取不到 直接返回false,不会做入队和挂起的操作

       /** 抢占成功返回true
         * 抢占失败返回 false */
        protected final boolean tryAcquire(int acquires) {
            //当前线程
            final Thread current = Thread.currentThread();
            //AQS state值
            int c = getState();
            //条件成立:c==0 表示当前AQS处于无锁状态
            if (c == 0) {
                //条件一:
                //因为是公平锁,任何时候都需要检查一下 队列中是否在当前线程之前有等待者
                //hasQueuedPredecessors() 方法返回true 表示当前线程前面有等待者,需要入队
                //返回false  前面没有等待者 直接尝试获取锁

                //条件二:成功 说明当前线程抢占锁成功
                //      失败 s,存在竞争,且当前线程竞争失败
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //成功之后设置 ExclusiveOwnerThread为当前线程,独占者线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //执行到这里,有几种情况?
            //c!=0 大于0 的情况,这种情况就需要检查一下 当前线程是不是独占锁的线程,
            // 因为ReentrantLock 是可重入的

            //条件成立:说明当前线程就是独占锁线程
            else if (current == getExclusiveOwnerThread()) {
                //锁重入逻辑

                //更新nextc值
                int nextc = c + acquires;
                //越界判断:当重入的深度很深时,会导致 nextc < 0,int值达到最大值之后 再+1 会变为负数
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                //更新操作
                setState(nextc);
                return true;
            }

            //1.CAS失败  c==0  CAS修改state时,没有抢过别人
            //2.c>0   且current == getExclusiveOwnerThread()
            return false;
        }
    }

addWaiter 快速入队

该方法让Node(thread)入队,addWaiter 会尝试一次快速入队,入队失败调用 enq,自旋 重复入队直到成功

这里有一点要注意:最最最开始的第一个节点在获取到锁之后 并不会入队(它是在tryAcquire 方法里面直接返回true),所以后来的节点入队的时候需要帮助这个节点入队,然后自己再入队(这个操作在 enq方法的第一个 if 中实现)

    /*AQS */
        private Node addWaiter(Node mode) {
            //mode == Node.EXCLUSIVE  独占模式
            //构建Node,把当前线程封装到对象node中
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure

            //快速入队
            //快速获取tail尾结点
            Node pred = tail;
            //队列中有元素
            if (pred != null) {
                //当前节点的prev指向尾结点
                node.prev = pred;
                //cas成功,node入队成功
                if (compareAndSetTail(pred, node)) {
                    //双向绑定
                    pred.next = node;
                    return node;
                }
            }

            //1.当前队列是空队列 tail == null
            //2.CAS竞争失败

            //完整入队
            enq(node);
            return node;
        }
------------------------------------------------------------------------
       /*AQS */
        private Node enq(final Node node) {
            //自选入队
            for (;;) {
                //1.当前队列是空队列  tail==null
                //说明当前线程 锁被占用,且当前线程 有可能是第一个获取锁失败的线程(当前时刻可能存在一批获取锁失败的线程)
                Node t = tail;
                if (t == null) { // Must initialize
                    //作为作为当前持锁线程的第一个后继线程,需要做什么事?
                    //1.为第一个head(快速入队)添加node
                    //      因为当前持锁的线程,它获取锁时,直接tryAcquire成功了,
                    //      但没有向阻塞队列中添加任何node,所以作为后继需要为它解决这个问题
                    //2.为自己追加node

                    //CAS成功 说明当前线程成为head.next节点
                    //线程需要当前持有锁的线程创建head
                    if (compareAndSetHead(new Node()))
                        tail = head;


                    //注意:这里没有return,会继续for
                } else {

                    //普通入队方式,只不过在for中,会保证一定入队成功
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        //***s == null***(上面要用到的)
                        t.next = node;
                        return t;
                    }
                }
            }
        }

acquireQueued 挂起线程,醒后获取锁

这个方法会不停的判断,是否可以获取锁,条件是:该节点前面是head节点,并且尝试获取锁 tryAcquire成功。
但大部分情况不会这么顺利,在这个不停的for循环中第二个 if会判断线程是否需要挂起 shouldParkAfterFailedAcquire方法实现这一判断,parkAndCheckInterrupt实现挂起,挂起后这个线程就会卡在第二个 if这里,直到 head线程 unpark,这个线程才会继续执行 for循环

      //acquireQueued需要做什么呢?
        //1.当前节点有没有被park 挂起? 没有==>所以需要挂起
        //2.唤醒之后的逻辑在哪里?
        /*AQS  */
        //参数一:当前线程包装出来的node,且当前已经入队成功
        //参数二:当前线程抢占资源成功后,设置state值时,会用
        final boolean acquireQueued(final Node node, int arg) {
            //true 表示抢占线程成功 普通情况下 当前线程早晚会拿到锁
            //false 表示失败 需要执行出队逻辑 ()
            boolean failed = true;
            try {
                //当前线程是否被中断
                boolean interrupted = false;
                //自旋
                for (;;) {

                    //1.进入for循环时,在线程尚未park前会执行
                    //2.线程park之后 被唤醒后,也会执行到这里

                    //当前节点的前置节点
                    final Node p = node.predecessor();
                    //1.说明当前节点为head.next节点,则它在任何时候都有资格争夺锁
                    //2.tryAcquire
                    //  成立:说明head对应的线程已经释放锁了,head.next节点对应的线程 真好获取到锁了
                    //  不成立:说明head对应线程 还没有释放锁  head.next任然需要被park
                    if (p == head && tryAcquire(arg)) {
                        //设置自己为head节点
                        setHead(node);
                        //协助老的head出列
                        p.next = null; // help GC
                        //当前线程获取锁的过程中 没有异常
                        failed = false;

                        return interrupted;
                    }
                    //shouldParkAfterFailedAcquire 表示当前线程获取锁资源失败后,是否需要挂起呢?
                    //返回值 true-> 当前线程需要挂起   false-> 不需要
                    //parkAndCheckInterrupt  挂起当前线程,并且唤醒之后 返回当前线程的 中断标记
                    //  (唤醒:1.正常唤醒 其它线程unpark  2.其它线程给当前挂起的线程 一个中断信号)
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        //表示当前node对应的线程是被中断信号唤醒的
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

shouldParkAfterFailedAcquire 判断是否挂起

实现是否需要挂起的判断也很复杂:

  • 首先我们 先判断 pred 是不是Signal,是 就直接返回true,需要挂起,不是就代表pred是 CANCELED 状态,把前面CANCELED状态的节点全部出队,返回false,
  • 其实返回到上面的acquireQueued 方法,for循环还是会再次来到shouldParkAfterFailedAcquire,此时设置pred的状态为Signal,再返回false;
  • 再次进来时pred==Signal,返回true。
        /*AQS  */
        /**
         * 1.当前节点的前置是 CANCELED状态,第一次来到这个方法 时,hi越过取消状态的节点,
         *              第二次 会返回true,然后park当前线程
         *  2.当前节点的前置节点状态是0,当前线程会设置前置节点的状态为-1,第二次来到这个方法时就会返回true
         * 参数一:当前线程node 的前置节点
         * 参数二:node 当前线程对应node
         * 返回值: true 需要挂起
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //获取前置节点的状态
            //waitStatus: 0 默认状态new Node();
            // -1  Signal状态,表示当前节点释放锁之后会唤醒它的第一个后继节点;
            //>0 表示当前节点是 canceled状态
            int ws = pred.waitStatus;
            //条件成立:表示前置节点是个可以唤醒当前节点的节点, 返回true
            //普通情况,第一次来到这里, ws不是-1
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;

            //条件成立:>0 表示前置节点是 CANCELED状态
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                //前置节点的waitStatu>0 ,说明这个节点是canceled状态,需要出队
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //当前node前置节点的状态就是0
                //当前线程node的前置节点 的状态强制设置为 signal,表示前置节点释放锁之后需要喊醒我node
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

parkAndCheckInterrupt 挂起线程

这个方法就很简单,只有在shouldParkAfterFailedAcquire判断为true的时候才会执行

      /*AQS  */
        //park当前线程将当前线程挂起,唤醒后返回当前线程 是否为 中断信号 唤醒
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

release 释放锁

上面讲了 获取锁,下面说说释放锁
if 里面 tryRelease 返回true 表示完全释放了锁
继续执行:

  • 如果head是空的,那说明队列已经空了,无需其他操作;
  • 如果head不为空,则调用unparkSuccessor()去唤醒后继节点

    public void unlock() {
        sync.release(1);
    }
     /*AQS   release*/
        public final boolean release(int arg) {
            //尝试释放锁,true表示当前线程完全释放锁
            //false 当前线程尚未完全释放锁
            if (tryRelease(arg)) {

                //head什么情况下会被创建出来?
                //当持锁线程未释放线程时,且其它线程想要获取锁时.
                // 其它线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁中的
                //线程构建出来一个head节点,然后后续线程 会追加head节点
                Node h = head;

                //1.说明队列中的head节点已经初始化过了,ReentrantLock在使用区间发生过 多线程竞争
                //2.说明当前head后面一旦插入过node节点
                if (h != null && h.waitStatus != 0)
                    //唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

tryRelease 尝试释放锁

     /*Sync   tryRelease*/
        protected final boolean tryRelease(int releases) {
            //减去释放的值
            int c = getState() - releases;
            //说明当前线程未持有锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();


            //当前线程持有锁
            boolean free = false;
            //说明当前线程已经达到完全释放锁的条件
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //更新state
            setState(c);
            return free;
        }

unparkSuccessor 释放锁后,叫醒后面的node

   /*AQS  */
        /**
         * 唤醒当前节点的下一个节点
         */
        private void unparkSuccessor(Node node) {
            //当前node的状态
            int ws = node.waitStatus;
            //ws -1
            //改为0,是因为
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);

            //后继节点
            Node s = node.next;

            //1.第一种情况:当前节点是tail节点 s==null
            //  第二种情况:可能走到这里了enq中"***s == null***",还没有来得及把node.next设置进去
            //2.前置条件:s!=null
            //  说明当前节点的后继节点是取消状态 需要找到一个可以唤醒的节点
            if (s == null || s.waitStatus > 0) {
                //查找可以被唤醒的节点
                s = null;
                //找到一个距离当前node最近的可以唤醒的node,
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //找不到就是null
            if (s != null)
                LockSupport.unpark(s.thread);
        }

Condition

Condition是一个接口,ConditionObject 是AQS类中的内部类,该内部类实现了Condition接口。

wait 和 notify

首先看一下Condition的使用:
一个线程执行wait 方法,会释放锁(monitor锁),并挂起线程,直到被其它线程唤醒(notify)

public class Test {
    public static void main(String[] args) throws InterruptedException {
        String lock = new String();
        synchronized (lock){
            lock.wait();
        }
    }
}

在这里插入图片描述
The Owner是当前线程,执行wait进入Wait set,执行notify ,Wait Set里面会有一个线程被唤醒。
Entry Set是被Synchronize阻塞的线程

Condition大致过程

await()挂起
Signal()唤醒

Condition c = Lock.newCondition()

ConditionObject是AQS 中的一个内部类,AQS里面的队列是同步队列(或者叫阻塞队列),ConditionObject里面的队列是等待队列。
阻塞队列里面的Node是在等待锁,而等待队列里面的Node在等待其它线程唤醒,唤醒之后就进入阻塞队列。

等待时间越久的节点越先被唤醒

Lock拥有一个同步队列和多个等待队列

等待队列中的node如果是中断唤醒的 则抛出异常

await方法就是向等待队列末尾添加一个节点,然后让lastWaiter指向它
notify方法是让等待队列最左边的node进入阻塞队列,去获取锁

在这里插入图片描述

node在await 后就进入条件队列,被挂起(1.直到被signal唤醒(按顺序唤醒) 1.1并且获取锁之后才会 unpark;1.2 发现前驱节点是取消状态也会立刻unpark。2.也可能是中断唤醒(不按顺序,直接迁移到阻塞队列中)

下面讲解 挂起await 和 唤醒Signal 两个方法

await 挂起节点

     public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();

            //调用await方法的线程包装成node 并且加入到条件队列中,并且返回当前线程的node
            Node node = addConditionWaiter();

            //完全释放掉当前线程对应的锁(将state设置为0)
            int savedState = fullyRelease(node);

            //0 默认值 在condition队列挂起期间 未接收过中断信号
            //-1 在condition队列挂起期间 接收过中断信号
            //1  在condition队列挂起期间 未接收过中断信号,但是迁移到阻塞队列后,接收过中断信号
            int interruptMode = 0;

            //isOnSyncQueue 返回true:当前node在同步队列(阻塞队列)
            //跳过次循环说明node已经在阻塞队列,准备获取锁了,
            //一直循环,park说明node在等待线程等待唤醒
            while (!isOnSyncQueue(node)) {
                //node在等待队列
                //挂起当前node对应的线程     等待Signal()
                LockSupport.park(this);
                //被唤醒情况
                //1.外部线程调用signal方法, 转移条件队列头节点到阻塞队列,当这个节点获取到锁后,会被前驱节点唤醒
                //2.进入阻塞队列后,发现阻塞队列中的前驱节点状态 是取消状态,此时会唤醒当前节点
                //3.当前节点挂起期间,被外部线程使用中断唤醒

                //checkInterruptWhileWaiting: 在挂起期间线程发生中断,对应的node会被迁移到阻塞队列
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    //发生中断
                    break;
            }

            //执行到这里,node已经迁移到阻塞队列了

            //1.acquireQueued: 判断是不是head.next,不是则判断是否需要挂起
            //返回true,说明node 在阻塞队列中被挂起后被中断信号唤醒
            //2.说明当前node在条件队列内未发生过中断,则将中断信号升级为 在队列中发生过中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

            //中断后,node就直接进入阻塞队列,没有设置nextWaiter=null
            if (node.nextWaiter != null) // clean up if cancelled
                //把node从条件队列移除
                unlinkCancelledWaiters();

            //说明挂起期间(条件队列中和阻塞队列中) 发生过中断
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter 把节点放到条件队列

该方法的作用:线程包装成node 并且加入到条件队列中,并且返回当前线程的node
调用await方法的线程 都是 持锁状态,也就是说addConditionWaiter不存在并发,所以没有使用CAS

        private Node addConditionWaiter() {
            //获取当前条件队列的尾结点的引用,保存到局部变量t中
            Node t = lastWaiter;

            //1.队列有节点
            //2.node在条件队列中,它的状态就是CONDITION(-2)
            //如果不是-2,说明node发生中断了
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                //清理条件队列中所有取消状态的节点
                unlinkCancelledWaiters();
                //因为上面清空的就是lastWaiter,修改了lastWaiter
                t = lastWaiter;
            }
            //为当前线程创建node节点,设置为CONDITION(-2)
            Node node = new Node(Thread.currentThread(), Node.CONDITION);

            //说明队列中没有元素,是第一个进入的元素
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

fullyRelease 释放掉所有锁

该方法作用:完全释放掉当前线程对应的锁(将state设置为0)
释放锁

   final int fullyRelease(Node node) {
        //完全释放锁是否失败
        //如果失败(failed = true),说明当前线程是未持有锁调用await方法的线程(错误写法)
        //假设失败,在finally代码块中,会把刚刚加入到 条件队列 的当前线程对应的node状态修改为 取消状态
        //后继线程就会将取消状态的 节点 给清理出去
        boolean failed = true;
        try {
            //获取当前线程持有的 state值
            int savedState = getState();

            //大部分情况:返回true
            if (release(savedState)) {
                //失败标记 为false
                failed = false;
                //返回当前线程释放的state值
                //因为在当你被迁移到阻塞队列后,再次被唤醒,且当前node在阻塞队列中是head.next,
                //而且当前lock状态就是state==0的情况下,当前node可以获取到锁,此时需要将state设置为savedState
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

isOnSyncQueue 判断节点是否在同步队列

判断节点是否在同步队列(阻塞队列,获取锁的队列)

  final boolean isOnSyncQueue(Node node) {
        //1.说明当前node一定是在条件队列,signal方法迁移时会把waitStatus设置为0
        //2.前置条件:node.waitStatus!=-2
        //waitStatus==0  表示当前节点已经被signal了
        //signal是先修改状态再迁移,所以此时可能还没有来得及迁移
        //waitStatus==1  取消状态
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;

        //执行到这里,会有哪些情况?
        //waitStatus一定不可能是 1 取消状态,
        //因为signal是不会把取消状态的node迁移走的
        //设置prev引用是 迁移到阻塞队列(enq())设置的
        //入队逻辑:1.设置node.prev=tail 2.cas当前node阻塞队列的tail尾结点 3. prev.next=node

        //说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其他节点
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /**
         * 执行到这里,说明当前节点:有前置节点 waitStatus==0
         * 从阻塞队列的尾巴开始向前遍历查找node,找到返回true
         * 有可能node正在迁移中
         */
        return findNodeFromTail(node);
    }
----------------------------------------------
  private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

checkInterruptWhileWaiting 判断是否中断以及在哪中断的

        private int checkInterruptWhileWaiting(Node node) {
            //返回当前线程中断标记位 ,并且重置当前标记位false
            return Thread.interrupted() ?
                    //-1  等待队列中断        1 阻塞队列中断
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
---------------------------------------------------------
    final boolean transferAfterCancelledWait(Node node) {

        //说明node在条件队列内
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //因为中断已经把node唤醒了,就把node放到阻塞队列
            //修改状态为默认状态 0
            enq(node);
            return true;
        }

        //执行到这里的情况?
        //1.当前node已经被其他线程signal 已经迁移到阻塞队列了
        //2.当前node正在被其他线程signal 正在迁移到阻塞队列
        
        while (!isOnSyncQueue(node))
            Thread.yield();
        //false:被中断唤醒时,不在条件队列中
        return false;
    }

unlinkCancelledWaiters 把node从条件队列移除

方法的作用:把node从条件队列移除

      private void unlinkCancelledWaiters() {
            //遍历
            Node t = firstWaiter;
            //当前链表上一个正常状态的节点
            Node trail = null;

            while (t != null) {
                //下一个节点
                Node next = t.nextWaiter;
                //当前节点状态为非等待状态
                if (t.waitStatus != Node.CONDITION) {
                    //把nextWaiter置为空
                    t.nextWaiter = null;
                    //遍历的节点还没有碰到正常节点
                    if (trail == null)
                        firstWaiter = next;
                    else
                        //让上一个正常节点执行next,跳过取消节点t
                        trail.nextWaiter = next;
                    //当前节点是尾结点,更新lastWaiter
                    if (next == null)
                        lastWaiter = trail;
                }
                //当前节点是正常节点(等待队列中的正常节点 为-2)
                else
                    trail = t;
                t = next;
            }
        }

Signal 唤醒节点

  public final void signal() {
      //return getExclusiveOwnerThread() == Thread.currentThread();
      //判断调用signal方法的线程是否是独占锁持有的线程,不是抛出异常
      if (!isHeldExclusively())
          throw new IllegalMonitorStateException();

      //条件队列的第一个node
      Node first = firstWaiter;
      if (first != null)
          doSignal(first);
  }

doSignal 跳过取消节点,将正常节点迁移

迁移节点到阻塞队列

        private void doSignal(Node first) {
            do {
                //更新下一个节点为firstWaiter
                //==nul说明队列已经空了
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;

                //出队,断开与下一个节点的关系
                first.nextWaiter = null;

                //1.transferForSignal 返回false 代表迁移失败
                // 则开始迁移下一个节点firstWaiter
                //知道迁移成功 或者队列为空
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal 把节点转移到阻塞队列

将节点从条件队列转移到同步队列。

   final boolean transferForSignal(Node node) {
        //修改node为0,以为马上要到阻塞队列,设为默认状态
        //如果修改失败说明节点为取消状态,返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //enq返回值是当前节点在阻塞队列的前驱节点
        Node p = enq(node);

        //前驱节点的状态
        int ws = p.waitStatus;
        //1.说明前驱节点是取消状态,唤醒当前节点
        //2.前置条件:前驱节点不是取消状态,修改前驱为signal状态,
        //true:修改成功
        //false,说明当前前驱node对应的线程是 lockInterrupt入队的node时,是会响应中断的,
        // 外部线程给前驱线程中断信号之后,前驱会将状态修改为取消状态,并且执行出队逻辑
        //只要前驱节点不是0 ,-1 唤醒当前线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //唤醒当前node对应线程
            LockSupport.unpark(node.thread);
        return true;
    }
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到