【JavaEE】AQS原理

发布于:2024-07-23 ⋅ 阅读:(158) ⋅ 点赞:(0)

本文将介绍AQS的简单原理。


首先有个整体认识,全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。常用的ReentrantLock、Semaphore、CountDownLatch等都有实现它。

本文参考:

深入理解AbstractQueuedSynchronizer只需15张图_abstractqueuedsynchronizer流程图-CSDN博客

 黑马程序员深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili

从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队 (meituan.com)

Java AQS 核心数据结构-CLH 锁 (qq.com)


AQS关键组成部分


state

state用来表示资源的状态(独占模式和共享模式),子类需要控制这个变量,就能够获取锁或者释放锁。

独占模式:只允许一个线程访问资源。

共享模式:允许多个线程访问资源。


CLH队列

这个队列保存的是没有获得锁资源进入阻塞的线程。使用的是头节点和尾结点来创建出这个双向队列。这个队列是先进先出的,不支持优先级。这里的CLH队列是AQS对于CLH锁的升级改后的应用。

该队列是对自旋锁的改进。那么自旋锁有上面缺点呢?

自旋锁


public class SpinLock {
    private AtomicReference<Thread> owner = new AtomicReference<Thread>();

    public void lock() {
        Thread currentThread = Thread.currentThread();
        // 如果锁未被占用,则设置当前线程为锁的拥有者
        // 这里就是自旋锁的核心部分
        while (!owner.compareAndSet(null, currentThread)) {
        }
    }

    public void unlock() {
        Thread currentThread = Thread.currentThread();
        // 只有锁的拥有者才能释放锁
        owner.compareAndSet(currentThread, null);
    }
}

第一个是锁饥饿问题。在锁竞争激烈的情况下,可能存在一个线程一直被其他线程”插队“而一直获取不到锁的情况。

第二是性能问题。在实际的多处理上运行的自旋锁在锁竞争激烈时性能较差。

CLH队列就把上述的两个问题解决了。

CLH队列过程(加解锁)

  1. 初始化一个 Tail 指向一个状态为false的空节点。
  2. 当t1线程获取锁,tail就指向t1线程,同时修改状态为true
  3. 当t2也想获得锁,此时t1还没释放,tail就指向t2,同时t1对应的节点。此时t2检查到t1节点为false,就开始轮询上一个节点的状态
  4. 当t1结束后,就把值修改为false
  5. 当t2轮询到值为false后,就是获取锁成功。

AQS升级CLH锁

  1. 扩展了每个节点的状态(waitStatus),从简单的true和false,扩展到如下部分。
  2. 维护前驱后继节点
    原始版本的 CLH 锁中,节点间甚至都没有互相链接。但是,通过在节点中显式地维护前驱节点,CLH 锁就可以处理“超时”和各种形式的“取消”:如果一个节点的前驱节点取消了,这个节点就可以滑动去使用前面一个节点的状态字段。
  3. 用阻塞等待代替自选操作
    当前一个节点释放锁之后,它会通知后一个节点获取锁。

条件变量

如果线程不满足条件变量后,那么它就要进入另外一种队列进行等待。每一个条件变量都会创建出一个队列。其中这个队列是单向的。


锁资源获取与释放

  • 独占式
    • acquire获取资源
    • release释放资源
  • 共享式
    • acquireShared获取资源
    • releaseShared释放资源

获得独占锁

acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

释放独占锁

AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

获得共享锁

acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

释放共享锁

AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点)

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

自定义锁

下面我们自定义一个锁,该锁内部的同步器是不可重入的独占锁

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

// 自定义锁
public class MyLock implements Lock {

    // 独占锁 同步器类
    public class MySync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {

            if (compareAndSetState(0, 1)) {
                // 加锁,并设置成当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int acquires) {
            setExclusiveOwnerThread(null);
            // state是被volatile修饰的,exclusiveOwnerThread 没有被volatile修饰
            // 所以设置exclusiveOwnerThread成null要放在前面
            setState(0);
            return true;
        }
        protected Condition newCondition() {
            return new ConditionObject();
        }

        // 是否持有独占锁
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private MySync sync = new MySync();

    // 加锁,不成功进入等待队列
    @Override
    public void lock() {
        sync.acquire(1);
    }

    // 加锁,不成功进入等待队列,可被中断
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // 尝试加锁一次,不成功直接返回
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    // 尝试加锁(带超时的),不成功进入等待队列,可被中断
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    // 解锁
    @Override
    public void unlock() {
        // 这里release本质上调用的是tryRelease方法 解锁并唤醒等待队列中的线程
        // 如果调用的是release方法,就不会唤醒等待的线程
        sync.release(1);
    }

    // 创建一个Condition
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

class Test {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            long now = System.currentTimeMillis();
            try {
                System.out.println("t1 获取锁");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("t1 释放锁");
                System.out.println("t1 耗时:" + (System.currentTimeMillis() - now));
                lock.unlock();
            }
        }, "t1").start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("t2 获取锁");
            } finally {
                System.out.println("t2 释放锁");
                lock.unlock();
            }
        }, "t2").start();
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到