【Java无锁编程】面试官:CAS和AQS都是什么,有什么用,谁来实现,源码怎么写的?

发布于:2024-04-30 ⋅ 阅读:(30) ⋅ 点赞:(0)

CAS

全称是:Compare And Swap 比较并交换,是乐观锁的主要实现方式。cas实现了在多线程环境下即使不使用锁也可以让线程间同步。

在Java中使用CAS设计到三个参数,变量的内存地址,期望修改的值,要修改的值。

只有内存中的值和期望修改的值相等时,才会将内存中的值修改为要修改的值,这样代表cas操作成功。

如果修改失败,一般都会自旋重试,正常情况下都会设置线程阻塞,用来防止CPU空转导致占用率飙高的问题。

CAS的ABA问题

ABA问题通俗的解释为:

  • 线程A执行了CAS的操作,将内存地址A中期望的0修改为1,成功!
  • 线程B将内存地址A中期望的1又修改为0,成功!
  • 线程C将内存地址A中期望的0修改为1,成功!

我们可以发现,线程C看似是成功的,但是在它执行之前,内存中的数值已经被修改过一次了,这样虽然结果是CAS成功的,但是过程中可能蕴含着一些不可控的因素在里边,显然是不可以接受的。

解决方法,可以在每次修改的时候都增加一次版本号,每次更新都+1,在执行修改之前,再次判断版本号是否和期望的版本号一致。

AtomicStampedReference<T> 中就携带的有版本号,可以帮助我们快速解决ABA问题。


AQS

aqs是抽象同步队列,通过Node实现了队列,内部使用unsafe提供的CAS api实现的cas操作,也是java无锁编程的重要实现,最常用的有RLock,线程池

Node节点的几种状态

  • INIT:Node节点初始化时的状态,其实就是waitStatus的默认值(int 默认值=0)
  • CANCELLED:代表当前节点的任务是取消了的
  • SIGNAL:代表当前线程需要唤醒下一个线程
  • CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒
  • PROPAGATE:共享模式下的Node状态

AQS应用总流程

  • 首先使用AQS必须要子类实现以下两组中其中一组方法,如果不实现,则会调用父类AbstractQueuedSynchronizer 的方法,则会直接抛出UnsupportedOperationException异常。

    • tryAcquire(arg) tryRelease(arg) :以非共享模式尝试上锁和尝试解锁
    • tryAcquireShared(arg) tryReleaseShard(arg) :以共享模式尝试上锁和尝试解锁
  • 应用类获取锁:

    • 一般都会先调用父类AbstractQueuedSynchronizercompareAndSetState即CAS操作,进行上锁。
      • 成功则代表这个线程cas操作成功了,也就是拿到锁了
      • 失败则代表这个锁已经被别的线程抢走了。
      • 这个时候会调用父类AbstractQueuedSynchronizeracquire方法。
        • 再次尝试获取锁,如果失败了,并且成功添加到双向链表中,则当前线程阻塞等待。
        • 如果尝试获取锁成功了,则说明刚才拿到锁的线程已经释放掉了,当前线程上锁成功。
  • 应用类释放锁:

    • 应用类首先会调用release方法,这个时候aqs会去调用由应用类实现的tryRelease方法,并尝试进行解锁。
      • 如果解锁成功的话:会去唤醒当前节点的下一个节点,依次循环此流程。
      • 如果解锁失败的话:那说明子类有自己的逻辑。

一、类信息

AbstractQueuedSynchronizer:AQS的主类。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
 	// ...   
}

AbstractQueuedSynchronizer简称AQS,集成自:AbstractOwnableSynchronizer类,实现了序列化的接口。

AbstractOwnableSynchronizer父类内部维护了一个属性:

private transient Thread exclusiveOwnerThread;

用于存放当前执行的线程。独占模式同步的当前所有者。


Node:用于存放任务线程,以及实现双向链表类。

static final class Node {}

这个类是AQS的精髓,重要的信息都在内部进行存放,其中几个重要的属性如下:

  • waitStatus:描述了当前节点的运行状态,共有以下几种状态。

    • 初始化、默认状态:Node节点初始化时的状态(并没有真实属性表示),其实就是waitStatus的默认值(int 默认值=0)

    • CANCELLED:代表当前节点的任务是取消了的

    • SIGNAL:代表当前线程需要唤醒下一个线程

    • CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒

    • PROPAGATE:共享模式下的Node状态

  • prev:链表的上一个节点

  • next:链表的下一个节点

  • thread:工作线程


二、内部属性

head、tail:用于保存双线链表的头尾指针

// 仅通过setHead方法进行设置
private transient volatile Node head;
// 仅通过enq方法进行设置
private transient volatile Node tail;

阻塞队列(因为队列中的元素会被阻塞掉,所以也可以称之为阻塞队列,实际上就是一个双向链表)的头尾,在使用时初始化。

如果head存在,则head的waitStatus属性保证不为CANCELLED。

state:队列的同步状态

示例:在应用类ReentrantLock用于保存锁的获取状态以及获取次数。

private volatile int state;

CAS操作的必要属性

// 调用cas操作指令
private static final Unsafe unsafe = Unsafe.getUnsafe();
// xxxoffset记录字段在内存中的地址,用于查找和修改使用
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

这些字段会在类初始化的时候,也就是static代码块中进行赋值:

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

三、需要实现的方法

在AQS中,部分的方法要求子类去实现,否则调用时会抛出异常,像:tryXXX()这些方法,详细列表如下:

tryAcquire(int arg) 尝试使用独占锁的方式进行加锁。

tryAcquireShared(int arg) 尝试使用共享方式进行加锁。

tryRelease(int arg) 释放独占锁。

tryReleaseShard(int arg) 释放共享锁。

这样做的目的可能是因为:AQS因为有两套实现,共享和独占,如果以接口的设计那么就过于的死板,子类必须实现所有的方法才可以使用AQS,即使子类只需要独占模式,这样设计的好处在于,如果子类只希望使用独占模式,则只需要实现独占模式的方法即可,更加的灵活,而且子类实现,也可以扩展自己其它的逻辑。


四、AQS使用流程及原理

AQS本身是一个抽象类,并不能直接拿来使用,所以此处使用实现类:ReentrantLock进行举例。

上锁的流程

应用类首先会去调用aqs实现的cas方法判断是否可以直接加锁成功。

final void lock() {
    // 如果可以cas操作成功,说明当前没有线程争抢,或者是第一个执行到这里的线程
    if (compareAndSetState(0, 1))
        // 设置独占锁的线程为当前线程,就是个set方法
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 执行aqs获取独占锁的方法
        acquire(1);
}
  • 实际上这个compareAndSetState是由UnSafe类实现的navite方法,直接调用操作系统原语进行操作,这里不深究。

  • protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    // UnSafe
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    

以下为AQS的实现。

AQS的独占锁方法acquire()

public final void acquire(int arg) {
    // 再次尝试获取锁,看是否成功
    	// 失败了:会被添加到阻塞队列中去
    	// 成功了:加锁成功,返回
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 设置当前线程的中断标志为中断,代码:Thread.currentThread().interrupt();
        selfInterrupt();
    }
}

这个tryAcquire()方法是子类实现的,其实还是调用的cas操作,此处不深究。主要看入队列和封装waiter的操作。

按照顺序,先看addWaiter()方法

// 这个方法的作用就是将任务线程封装为一个Node,并添加到队列中去
// 这个参数Node是Node.EXCLUSIVE,意思为独占模式的Node节点
private Node addWaiter(Node mode) {
    // 先将任务线程包装到Node里,以独占模式封装。
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试快速添加,因为大多数情况下,队列的尾部都不是空的,因此不需要每次都调用enq方法,校验是否初始化,其实就是将enq方法内部的代码搬了出来而已,并舍弃了初始化Node的逻辑。
    // 由此可以得知,这个维护任务队列的双向链表是从尾部插入的。通过指针的修改,将当前任务线程放到原有尾部的后边,成为新的尾部。
    // Try the fast path of enq; backup to full enq on failure
	// 保存原先尾部的指针
    Node pred = tail;
    if (pred != null) {
        // 赋值当前任务线程的上一个节点为原先的尾部线程
        node.prev = pred;
        // 使用cas操作判断是否修改成功
        if (compareAndSetTail(pred, node)) {
            // 成功了就将原先尾部的下一个节点修改为当前的
            // 如果不成功的话就不能进行修改,因为可能在当前线程之前,已经有别的线程修改成功了
            pred.next = node;
            // 返回node,入队成功
            return node;
        }
    }
    // 能走到这说明,双向链表没有初始化,并且在执行快速入队的时候别的线程已经抢先修改了队列尾。
    enq(node);
    // 入队成功,返回node
    return node;
}

enq():初始化双向链表 && Node入队

// 参数node就是包装好的任务线程
private Node enq(final Node node) {
    // 自选一直尝试cas操作,直到成功。
    for (;;) {
        // 保存一下原先指针
        Node t = tail;
        // 双向队列还没有初始化
        if (t == null) { // Must initialize
            // 先初始化一下head,如果失败了就说明别的线程已经抢先初始化过了。
            // 这个空的Node实际上是一个虚无的占位节点,用于初始化的
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        // 这个else里的才是真正入队的流程
        else {
            // 替换指针,原来的队列尾巴,等于当前Node的上一个节点
            node.prev = t;
            // 尝试修改尾巴,失败了就重新获取tail,重新赋值,直到成功
            if (compareAndSetTail(t, node)) {
                // 将原先tail的下一个节点设置为自己,完成指针的维护和入队
                t.next = node;
                // 跳出循环,入队成功
                return t;
            }
        }
    }
}

到此addWaiter方法就执行完毕,任务线程已经包装成Node进入了队列中。

再回过头来看acquire()方法的acquireQueued()方法

// 这个方法主要是用于阻塞任务线程的方法,因为cas操作设计自旋重试,如果任由线程自选,则会无故消耗大量cpu
final boolean acquireQueued(final Node node, int arg) {
    // 代表是否中途失败了
    boolean failed = true;
    try {
        // 代表当前线程是否被中断了,有可能是被别的线程进行了中断
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的上一个节点
            final Node p = node.predecessor();
            // 如果是头部的话,才会有获取锁的资格,由此可以得出,aqs的队列是从头部开始往后依次执行
            if (p == head && tryAcquire(arg)) {
                // 重新设置头部节点
                setHead(node);
                // 帮助gc回收
                // 这个地方不用疑问,因为在上一次执行setHead()方法的时候,已经将这里的node.predecessor()里边的其它属性设置为null了,这里再将next设置为null,那么这个Node就是一个没有引用关系,并且数据都是Null的一个对象,这样才会求助gc进行回收
                p.next = null; // help GC
                failed = false;
                // 返回不阻塞线程,因为成功拿到锁了
                return interrupted;
            }
            // 不是头部的下一个节点,或者拿锁失败了,才会执行到这个if
            // 主要做了几件事:
            // 1. 判断获取所失败的节点的状态,如果返回true,则代表需要挂起当前任务线程,会进入到parkAndCheckInterrupt()方法进行挂起,否则自旋一直进行cas操作,重试shouldParkAfterFailedAcquire方法
            // 2. 移除状态为取消的Node,并修改Node的waitStatus,以改变重试shouldParkAfterFailedAcquire()方法的返回值
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
                // 这个值取决于parkAndCheckInterrupt()方法的返回值,用于外边设置线程的中断状态
                interrupted = true;
            }
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire():判断当前Node的任务线程是否需要进行阻塞,以及移除状态为取消的Node

// 检查并更新获取失败节点的状态。如果线程阻塞,返回true。这是所有采集回路中的主要信号控制。需要pred == node.prev。 
// 参数: pred -节点的前身节点持有状态-节点 返回: 如果线程应该阻塞,则为
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取上一个节点的状态
    int ws = pred.waitStatus;
    // 如果状态是SIGNAL,则代表当前节点需要被等待唤醒,那么就需要阻塞
    if (ws == Node.SIGNAL)
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    // 只有在状态是取消时才会进入
    if (ws > 0) {
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        // 循环一直往上查找,直到找到不是取消状态的节点,在赋值给当前节点的prev指针,原先取消的n个Node就会被移除掉
        do {
            // node.prev = 当前节点的prev指针
            // perd = 当前节点的上一个节点
            // pred.perv 当前节点的上一个节点的上一个节点
            // 一直赋值,一直往前找,直到找到waitStatus>0的Node才停止
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 更新指针
        pred.next = node;
    }
    // 如果是头节点,或者是共享节点,就将当前节点的上一个节点的waitStatus赋值为SIGNAL,代表后续节点需要被唤醒
    else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回false不会立即挂起线程,则会重新自旋执行这个方法
    return false;
}

parkAndCheckInterrupt():挂起线程,并返回当前线程是否被中断。

// 封装方法,简化挂起的流程
private final boolean parkAndCheckInterrupt() {
    // 挂起
    LockSupport.park(this);
    // 清除线程的中断标志
    return Thread.interrupted();
}

释放锁的流程

在aqs中释放锁会先去调用:release() 方法对锁进行一个释放。

// 释放锁的方法
public final boolean release(int arg) {
	// 首先去调用子类实现的方法,尝试释放锁,以ReentrantLock进行举例,它会先去将state减1,然后判断释放归零,如果state已经为0了,代表已经解锁成了。
    // 这个地方是子类实现的,所以,子类会在修改之前判断,修改的线程是否是上锁的线程,如果是的话才可以进行修改,否则抛出异常。
    if (tryRelease(arg)) {
        // 此时的head节点就是执行完毕了的任务线程,这个地方只是保存了一下指针
        Node h = head;
        // 校验
        if (h != null && h.waitStatus != 0){
        	// 将头节点的下一个节点进行唤醒,该起来执行了!
            unparkSuccessor(h);
        }
        // 返回解锁成功标识
        return true;
    }
    // 解锁失败了
    return false;
}

unparkSuccessor(h):清除头节点执行状态(waitStatus),唤醒阻塞线程

// 传入的参数是头节点的指针
private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    // 判断头节点的状态
    // 我猜测可能是执行到这个地方的时候说明任务线程已经执行完毕,需要重置waitStatus,避免对其它地方造成影响
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    // 获取当前任务节点的下一个节点
    Node s = node.next;
    // 会先尝试进行唤醒,但是如果下一个节点是空的,或者被取消掉了,那么就会重新查找新的节点
    // 查找的逻辑是,从后往前开始找,这么做的目的是因为防止入队enq()执行时,因为线程上下文的切换,或者锁线程的执行速度超过了入队的速度,就有可能导致next为null,也就是说还没来得及进行赋值。
    // 这个地方为什么会是从后往前进行查找?
    // 因为每当线程执行addWaiter()方法入队的时候,首先会将当前线程封装Node的prev属性赋值,这样的话就可以保证前一个节点是肯定有值的,接下来才是进行cas操作进行修改tail节点,修改成功之后才会将自身的next属性赋值。
    // 那么就会有以下的可能性:
    // cas执行成功了,还没有赋值next节点,那么如果是这个时候碰巧进入了这一段方法,从前往后找就会遇到空指针的问题,从而引发异常的发生。这也是从后往前找的根本原因。
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 这个for的停止判断为,循环到的节点等于头节点,也就是说从后往前找,找完了
        // 会在每次循环的时候进行指针的切换,t=t.prev
        for (Node t = tail; t != null && t != node; t = t.prev)
            // 一直找到状态不是取消的节点
            if (t.waitStatus <= 0)
                s = t;
    }
    // next节点有值,或找到了符合的节点
    if (s != null)
        // 就唤醒线程
        LockSupport.unpark(s.thread);
}

解锁流程到此结束。


锁相关概念

公平锁

等待获取锁的线程拥有特定顺序,挨个排队拿锁。

非公平锁

不按照特定的顺序进行拿锁,谁先抢到就是谁的。

可重入锁

已经拿到过锁的线程,不需要再次拿锁,就可以访问共享资源。

不可重入锁

已经拿到锁的线程,如果要访问共享资源,则需要再次获取锁。

独占锁

自己已经拿到锁了,别的线程再过来那就拿不到。

偏向锁

拿到过一次锁,再去获取锁如果锁指向的是自己,就直接执行流程,不用等待拿锁了。

乐观锁

每次在修改数据之前,获取一个版本号,一般会和数据一起保存,在修改的时候判断版本号是否和携带的版本号一致,如果一致就修改,否则就失败,抛一个版本号异常。适用于并发修改量低的场景。

悲观锁

在每次修改之前都获取锁,以此来保证在修改数据时不会被其它线程影响,性能开销比较大。适用于并发修改量大,且需要保证数据xxx的场景。

抽象类和接口的区别

接口,我觉的是一套规范的具体定义,比如说数据库的驱动,要链接数据库的话,数据库厂商必须实现特定的接口,才可以构建出一个完整的数据库驱动。

抽象类,其实从名字上就可以看出来,它是一件事情,或一个功能的抽象实现,就好比,AQS的这个类,拥有两套实现,公平锁和非公平锁,如果aqs定义为是一个接口的话,那么就过于的具体,不具备抽象的概念,实现类就必须同时实现两套加锁逻辑,抽象类的话可以定义方法的修饰符为protected这样的话,子类就按需实现即可。


网站公告

今日签到

点亮在社区的每一天
去签到