Java AbstractQueuedSynchronizer (AQS) 初识


AQS,全称为 AbstractQueuedSynchronizer,是 Java 并发包 java.util.concurrent 中的一个抽象类,它为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁
和相关同步器(信号量、事件,等等)提供了一个框架。

从包名就可以知道这是一个关于并发的类, 如 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等这些工具的实现都依赖于 AQS。
这篇文章会对 AQS 做一个比较简单的介绍。

1 AbstractQueuedSynchronizer (AQS) 简介

在源码中, 对 AQS 的解释 (可以查看 AbstractQueuedSynchronizer 的类注释)

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic {@code int} value to represent state.


提供了一个框架, 用于实现依赖于先进先出 (FIFO) 的等待队列的阻塞锁和相关的同步器 (例如: 信号量, 事件等)。  
这个类是被设计用来作为一个有用的基础, 为大部分的依靠一个单独的原子变量来表示状态的同步器。

AQS (同步器) 是用来构建锁和其他同步组件的基础框架, 它的实现主要依赖一个 int 成员变量来表示同步状态以及通过一个 FIFO 队列构成等待队列。
它本身是一个抽象类, 内部已经将大部分的线程排队, 阻塞机制逻辑封装完成了, 但是 还是有几个 protected 修饰的用来改变同步状态的方法需要子类进行实现

在实际地使用中, 它的子类被推荐定义为同步组件的静态内部类, AQS 自身没有实现任何同步接口, 它仅仅是定义了若干同步状态的获取和释放方法来供自定义同步组件的使用。
AQS 既支持独占式获取同步状态, 也可以支持共享式获取同步状态, 这样就可以方便的实现不同类型的同步组件。

AQS 是实现锁 (也可以是任意同步组件) 的关键, 在锁的实现中聚合 AQS, 利用 AQS 实现锁的语义。
可以这样理解二者的关系: 锁是面向使用者, 它定义了使用者与锁交互的接口, 隐藏了实现细节, 而 AQS 是面向锁的实现者, 它简化了锁的实现方式, 屏蔽了同步状态的管理, 线程的排队, 等待和唤醒等底层操作
锁和 AQS 很好的隔离了使用者和实现者所需关注的领域。

2 AQS 的模板方法设计模式

AQS 的设计是使用模板方法设计模式, 它将**一些方法开放给子类进行重写, 而 AQS 给同步组件所提供模板方法又会重新调用被子类所重写的方法( 也就是重载)**。
例如, AQS 中需要重写的方法 tryAcquire

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

ReentrantLock 中 NonfairSync (继承 AQS) 会重写该方法为:

protected final boolean tryAcquire(int acquires) {
    // 不公平的尝试获取锁
    return nonfairTryAcquire(acquires);
}

而 AQS 中的获取锁的模板方法为 acquire(), 它的实现如下:

public final void acquire(int arg) {
    // 调用到子类的 tryAcquire 方法来判断是否获取锁成功
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

acquire 会调用 tryAcquire 方法, 而此时当继承 AQS 的 NonfairSync 调用模板方法 acquire 时就会调用已经被 NonfairSync 重写的 tryAcquire 方法。
这就是使用 AQS 的方式, 在弄懂这点后, 对 Lock 的实现理解有很大的提升。

可以归纳总结为这么几点:

  1. 同步组件 (这里不仅仅指锁, 还包括 CountDownLatch 等) 的实现依赖于同步器 AQS, 在同步组件实现中, 使用 AQS 的方式被推荐定义继承 AQS 的静态内存类
  2. AQS 采用模板方法进行设计, AQS 的 protected 修饰的方法需要由继承 AQS 的子类进行重写实现, 当调用 AQS 的子类的方法时就会调用被重写的方法
  3. AQS 负责同步状态的管理, 线程的排队, 等待和唤醒这些底层操作, 而 Lock 等同步组件主要专注于实现同步语义;

AQS 可重写的方法如下

方法 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态, 实现该方法需要查询当前状态并判断同步状态是否符合预期, 然后再进行 CAS 设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态, 等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态, 返回大于等于 0 的值, 表示获取成功, 反之, 失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占有, 一般该方法表示锁是否被当前线程所独占

在实现同步组件时 AQS 提供的模板方法, 也是各种同步组件真正会调用到的方法

方法 描述
void acquire(int arg) 独占式获取同步状态, 如果当前线程获取同步状态成功, 则由该方法进行返回。否则将会进入同步队列进行等待, 该方法会调用重写的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 与 acquire(int arg) 作用相同, 但是该方法可以相应中断, 如果当前线程被中断, 会抛出异常。如果没有, 则按照 acquire(int arg) 进行执行
boolean tryAcquireNanos(int arg, long nanosTimeout) 在 acquireInterruptibly(int arg) 的基础上, 增加了时间限制, 如果在指定的时候内获取到了同步状态, 返回 true, 否则返回 false
boolean release(int arg) 独占式的释放同步状态, 该方法会在释放同步状态后, 将同步队列的第一个节点包含的线程唤醒
void acquireShared(int arg) 共享式获取同步状态, 如果当前线程没有获取到同步状态, 将会进行到同步队列等待, 和独占式的区别在于, 同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 同 acquireShared(int arg) 相同, 该方法可以响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在 acquireSharedInterruptibly(int arg) 的基础上, 加上了时间限制
boolean releaseShared(int arg) 共享式的状态的释放
Collection getQueuedThreads() 获取在等待同步队列上等待的线程

AQS 提供的模板方法可以分为3类:

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中等待线程情况

3 一个例子

class Mutex implements Lock, java.io.Serializable {

    // 定义我们内部自己的同步器,  继承 AQS 的静态内存类重写方法
    // 将原子变量从 0 设置为 1 成功, 表示上锁成功
    // 将原子变量从 1 设置为 0 成功, 表示释放锁成功
    private static class Sync extends AbstractQueuedSynchronizer {
    
        // 判断当前是否处于上锁状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 获取锁: 通过 cas 将 state 从 0 设置为 1
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; 
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 释放锁: 通过 cas 将 state 从 1 设置为 0
        protected boolean tryRelease(int releases) {
            assert releases == 1; 
            if (getState() == 0) 
              throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 提供给一个 Condition
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 声明我们自己的同步器
    private final Sync sync = new Sync();
    
    //使用同步器的模板方法实现自己的同步语义
    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

使用

public class MutextDemo {

    private static Mutex mutex = new Mutex();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                mutex.lock();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.unlock();
                }
            });
            thread.start();
        }
    }
}

执行情况:

Alt '自定义 AQS 同步器执行线程状态'

上面的这个例子实现了独占锁的语义, 在同一个时刻只允许一个线程占有锁。
MutexDemo 新建了 10 个线程, 分别睡眠 3s, 从执行情况也可以看出来当前 Thread-6 正在执行占有锁, 而其他线程, 如 Thread-7, Thread-8 等线程处于 WAIT 状态。

按照推荐使用的方式, Mutex 定义了一个继承 AQS 的静态内部类 Sync, 并且重写了 AQS 的 tryAcquire 等方法, 而对 state 的更新也是利用了 setState(), getState(), compareAndSetState() 这三个方法。
在实现 Lock 接口中的方法也只是调用了 AQS 提供的模板方法 (因为 Sync 继承 AQS )。

从这个例子就可以很清楚的看出来, 在同步组件的实现上主要是利用了 AQS, 而 AQS “屏蔽” 了同步状态的修改, 线程排队等底层实现,
通过 AQS 的模板方法可以很方便的给同步组件的实现者进行调用。而针对用户来说, 只需要调用同步组件提供的方法来实现并发编程即可。

在新建一个同步组件时需要把握的两个关键点是:

  1. 实现同步组件时推荐定义继承 AQS 的静态内存类, 并重写需要的 protected 修饰的方法
  2. 同步组件语义的实现依赖于 AQS 的模板方法, 而 AQS 模板方法又依赖于被 AQS 的子类所重写的方法

同步组件实现者的角度:
通过可重写的方法:

  1. 独占式: tryAcquire() (独占式获取同步状态), tryRelease() (独占式释放同步状态)
  2. 共享式: tryAcquireShared()(共享式获取同步状态), tryReleaseShared()(共享式释放同步状态)
    告诉AQS怎样判断当前同步状态是否成功获取或者是否成功释放

同步组件专注于对当前同步状态的逻辑判断, 从而实现自己的同步语义。
这句话比较抽象, 举例来说, 上面的 Mutex 例子中通过 tryAcquire 方法实现自己的同步语义, 在该方法中如果当前同步状态为 0 (即该同步组件没被任何线程获取) , 当前线程可以获取同时将状态更改为 1 返回 true,
否则, 该组件已经被线程占用返回 false。很显然, 该同步组件只能在同一时刻被线程占用, Mutex 专注于获取释放的逻辑来实现自己想要表达的同步语义。

AQS 的角度
而对 AQS 来说, 只需要同步组件返回的 true 和 false 即可, 因为 AQS 会对 true 和 false 会有不同的操作, true 会认为当前线程获取同步组件成功直接返回, 而 false 的话就 AQS 也会将当前线程插入同步队列等一系列的方法。

4 参考

初识Lock与AbstractQueuedSynchronizer(AQS)


  目录