AQS之Semaphore分析 (七)

发布于:2022-11-10 ⋅ 阅读:(659) ⋅ 点赞:(0)

1.Semaphore 介绍

Semaphore即信号量,常用于同时限制访问某些资源的线程数量。
其内部抽象类Fair继承了AQS,Semaphore正是通过Sync实现数量的控制

在这里插入图片描述

在这里插入图片描述

1.1 Sync

Semaphore是基于AQS原理实现的,但并不是说Semaphore继承了AbstractQueuedSynchronizer抽象类,而是其内部类进行了AbstractQueuedSynchronizer的继承,Semaphore通过内部类实现,后续其他几个AQS的应用同样也是如此。

在这里插入图片描述

具体到Semaphore,内部类Sync继承了AQS,而且Sync同样的也只是个抽象类,具体有氛围两个策略:公平锁(FairSync)和非公平锁(UnFairSync),两者的区别是对于竞争资源的线程是否严格遵守先到先得的公平策略。

1.2 state

Sync的state代表可以同时访问的线程数量,也可能理解为访问的许可证(permit)数量。每个线程访问(acquire)时需要拿到对应的许可证,否则进行阻塞,访问结束则返还(release)许可证。
state只能在Semaphore的构造方法中进行初始化,后续不能进行修改。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 构造方法(int permits): 非公平锁
  • 构造方法(int permits, boolean fair): 如果fair为true的话, 公平锁, 如果为false的话为非公平锁

1.3 对外方法

在这里插入图片描述

在这里插入图片描述
获取资源:
1>不响应中断: acquireUninterruptibly()
2>响应中断: acquire()
3>超时响应中断: tryAcquire(int permits, long timeout, TimeUnit unit)
4>直接返回结果: tryAcquire()

释放资源:
1>release(int permits)
2>release()

2.实例代码

   public static void main(String[] args){
        // 3个车位
        Semaphore semaphore = new Semaphore(3);
        // 6个车
         for (int i = 1; i <= 6; i++) {
             new Thread(()->{

                 try {
                     semaphore.acquire();
                     System.out.println(Thread.currentThread().getName() + "抢到");
                     try { TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}
                     System.out.println(Thread.currentThread().getName() + "了");
                 }catch (InterruptedException e){
                     e.printStackTrace();
                 }finally {
                     semaphore.release();
                 }

             }, String.valueOf(i)).start();

          }
    }

相当于3个车位, 6辆车去抢

在这里插入图片描述

3.资源获取acquire

共享锁不同应用类的核心逻辑实在其对AbstractQueuedSynchronizer的模板方法protected int tryAcquireShared(int acquires)的重写上。

AQS的state对应的是Semaphore的permit数量, 只要state大于acquire那么说明
资源满足线程需要, 那么获取成功。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • FairSync
  • NofairSync

3.1 NofairSync.tryAcquireShared 尝试获取资源加锁

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

如果剩余remaining 小于 acquires则说明资源不足,返回失败,否则cas更新,cas失败则重试。

3.2 FairSync.tryAcquireShared 尝试获取资源加锁

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        //后面的和非公平锁基本一致
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
public final boolean hasQueuedPredecessors() {

    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

遍历队列,从头结点开始检查,到当前线程的节点截止, 没有当前线程则到尾, 检查是否存在其他节点, 就是当前线程是否存在前驱节点。即公平锁的含义,先到先得。

3.3 doAcquireSharedInterruptibly 释放资源操作

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

通过之前分析ReentrantLock得知先是加入到一个队列中addWaiter(), 然后进行判断和获取资源的操作

4.释放资源

共享锁不同应用类的核心逻辑实在其对AbstractQueuedSynchronizer的模板方法protected boolean tryReleaseShared(int arg)的重写上。

在Semaphore中,release方法有Sync重写,公平锁和非公平锁没有区别

4.1 tryReleaseShared 尝试释放资源

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

cas+重试的方式去增加state, 即release, 返回true, 进行真正的释放资源操作

4.2 doReleaseShared 释放资源操作

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

通过从头结点到尾结点, 根据waitStatus的值做不同的操作。waitStatus=-1的话释放资源, 否则cas改变waitStatus的值。


网站公告

今日签到

点亮在社区的每一天
去签到