AQS抽象同步器的核心原理与实践

发布时间:2022-07-02 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了AQS抽象同步器的核心原理与实践脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

基于CAS自旋实现的轻量级锁有两个问题:

(1)CAS空自旋会浪费大量CPU资

(2)在CMP架构的CPU会导致“总线风暴”。

解决CAS空自旋的有效方式之一是以空间换时间,比较常见的方案由两种:分散操作和热点和使用队列削峰。JUC使用的是队列削峰的方案解决CAS性能问题(LongAdder是分散热点),它提供了一个双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(AQS)。

锁与队列的关系

1.CLH锁的内部队列

CLH自旋锁使用的CLH是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的节点表示占有锁的节点,新加入的抢锁线程需要等待,会插入队列的尾部。

2.分布式锁的内部队列

在分布式锁的实现中,以ZooKeePEr的分布式锁为例,就是创建临时节点,顺序执行。

3.AQS的内部队列

AQS是JUC提供的一个用于构建锁和同步容器的基础类。例如ReentrantLock、semaphore、CountDownLatch、FutureTask等都是基于AQS构建的。AQS解决了实现同步容器时设计的大量细节问题。

AQS是CLH队列的一个变种。AQS队列内部维护的是一个FIFO的双向链表,每个节点有前驱结点和后继节点。每个节点由线程封装,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

AQS抽象同步器的核心原理与实践

AQS的核心成员

AQS出于“分离变与不变”(人话:单一职责和开闭原则)的原则,基于模版模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模版方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,让各种锁的子类去实现。

状态标志位

AQS中维持了一个单一的volatile变量state,state表示锁的状态。

PRivate volatile int state;

state保证了可见性,所以任何线程通过getState()获取状态都可以得到最新值。AQS提供了compareAndSetState()方法利用底层UnSafe的CAS机制来实现原子性。

protected final boolean compareAndSetState(int exepect, int update) {
    return unsafe.COMpareAndSwapInt(this, stateOffset, expect, update);
}

以ReentrantLock为例,state初始化为0,表示未锁定。A线程执行该锁的lock()操作时,会调用tryAcquire独占该锁并将state加1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取锁的(state会累加),这就是可重入。但是,获取多少次就要释放多少次,这样才能保证state回到零态。

AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了get和set方法。

队列节点类

Node

FIFO双向同步队列

每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。

JUC显式锁与AQS的关系

AQS是一个同步器,它实现了锁的基本抽象功能,该类是由模版模式来实现的。

1.ReentrantLock与AQS的组合关系

ReentrantLock是一个可重入的互斥锁,可以被单个线程多次获取。ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer { ... }

ReentrantLock支持公平锁和非公平锁。默认情况下是非公平锁。

final static class NonfairSync extends Sync { ... }
final static class FairSync extends Sync { ... }

由ReentrantLock的lock和unlock的源码可以看到,它们只是分别调用了sync对象的lock和release方法。

public void lock() {
        sync.acquire(1);
}

public void unlock() {
    sync.release(1);
}

而Sync内部类只是AQS的子类,所以本质是ReentrantLock的操作是委托给AQS完成的。

AQS的模版流程

AQS定义了两种资源共享方式:

  • Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。
  • share(共享锁):多个线程可以同时占有资源,如SEMaphore、CountDownLatch。

AQS为不同的资源共享方式提供了不同的模版流程,AQS提供了一种实现阻塞锁和依赖FIFO等待队列的同步器的框架。自定义的同步器只需要实现共享资源state的获取与释放方式即可,这些逻辑都编写在钩子方法中。无论是共享锁还是独占锁,AQS在执行模版流程时都会回调自定义的钩子方法。

AQS的钩子方法

自定义同步器时,AQS中需要重写的钩子方法如下:

  • tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false。
  • tryRekease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
  • tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;
  • isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condITion条件队列时才需要去实现它。

通过AQS实现简单的独占锁

AQS抽象同步器的核心原理与实践

SimpleMockLock只实现了Lock接口的两个方法:

(1)lock方法:完成显式锁的抢占。

(2)unlock方法:完成显式锁的释放。

SimpleMockLock的锁抢占和释放是委托给Sync实例的方法来实现的。在抢占锁时,AQS的acquire会调用tryAcquire钩子方法;释放锁时,AQS的release会调用tryRelease钩子方法。

内部类Sync继承AQS类时提供了一下两个钩子方法的实现:

(1)tryAcquire:将state设置为1并保存当前线程,表示互斥锁已经占用。

(2)tryRelease:将state设置为0,表示互斥锁已经被释放。

public class SimpleMockLock implements Lock {

    // 同步器实例
    private final Sync sync = new Sync();

    // 自定义的内部类:同步器
    // 直接使用 state 表示锁的状态
    // state = 0 表示锁没有被占用
    // state = 1 表示已经被占用
    private static class Sync extends AbstractQueuedSynchronizer {
        @override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 接下来不需要使用CAS操作,因为下面的操作不存在并发场景
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    static int i = 0;

    public static void lockAnDFastIncrease(Lock lock) {
        lock.lock();
        i++;
        System.out.println(i);
        lock.unlock();
    }

    public static void main(String[] args) {
        LongAdder cnt = new LongAdder();
        final int TURNS = 1000;
        final int THREADS = 10;
        final ExecutorService pool = Executors.newFixedThreadPool(THREADS);
        final SimpleMockLock lock = new SimpleMockLock();
        long start = System.currentTimeMillis();
        for (int i = 0; i < THREADS; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < TURNS; j++) {
                        lockAndFastIncrease(lock);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        final long l = System.currentTimeMillis() - start;
        System.out.println("耗时:" + l);
        pool.shutdown();
    }

}

AQS锁抢占原理

流程的第一步,显式锁的lock方法会调用同步器基类AQS的模版方法acquire。acquire是AQS封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,源码如下

public final void acquire(int arg) {
    if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire至少执行一次tryAcquire钩子方法,tryAcquire默认抛出一个异常,具体的获取独占资源state的逻辑需要钩子方法来实现。若调用tryAcquire尝试成功,则acquire将直接返回,表示抢到锁;若不成功,则将线程加入等待队列中。

tryAcquire流程:CAS操作state字段,将值从0改为1,若成功表示锁未被占用,返回true;若失败,则返回false。如果是重入锁,state字段值会累积,表示重入次数。

直接入队:addWaiter。在acquire模版方法中,如果钩子方法tryAcquire返回失败,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter方法将节点加入同步队列的队尾。

自选入队:enq。addWaiter第一次尝试在尾部添加节点失败,意味有并发抢锁发生,需要自旋。enq方法通过CAS自旋将节点添加到队列尾部。

自旋抢占:acquireQueued。节点入队之后,启动自旋锁的流程,acquireQueued的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且在前驱结点上自旋,只有当前驱结点是头结点时才尝试获取锁。为了不浪费资源,如果头结点获取了锁,那么该节点会终止自旋,线程回去执行临界区的代码。其余处于自旋状态的线程当然也不会自旋浪费资源,而是被挂起进入阻塞状态。

Re

《Java高并发核心编程》

虽然讲源码了,但是感觉有些啰嗦,没有重点,笔记做下来感觉太注重细节,没有总结和纲领,看下《Java并发编程之美》试试。

脚本宝典总结

以上是脚本宝典为你收集整理的AQS抽象同步器的核心原理与实践全部内容,希望文章能够帮你解决AQS抽象同步器的核心原理与实践所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。