Java AbstractQueuedSynchronizer 中的 Condition


1 简介

任何一个 Java 对象都天然继承于 Object 类, 在线程间实现通信的往往会应用到 Object 的几个方法, 比如 wait(), wait(long timeout), wait(long timeout, int nanos) 与 notify(), notifyAll()
几个方法实现等待 / 通知机制。同样的, 在 Java Lock 体系下也有同样的方法实现等待/通知机制。

从整体上来看 Object 的 wait 和 notify / notify 是与对象监视器配合完成线程间的等待/通知机制, 而 Condition 与 Lock 配合完成等待通知机制,
前者是 Java 底层级别的, 后者是语言级别的, 具有更高的可控制性和扩展性

两者除了在使用方式上不同外, 在功能特性上还是有很多的不同:

  1. Condition 能够支持不响应中断, 而通过使用 Object 方式不支持
  2. Condition 能够支持多个等待队列 (new 多个 Condition 对象), 而 Object 方式只能支持一个
  3. Condition 能够支持超时时间的设置, 而 Object 不支持

参照 Object 的 wait 和 notify / notifyAll 方法, Condition 也提供了同样的方法

2 Condition 实现原理分析

看一下 Condition 的示意图, 方便后续的理解

Alt 'AQS 的 Condition 实现'

2.1 等待队列

创建一个 Condition 对象是通过 lock.newCondition(), 而这个方法实际上是会 new 出一个 ConditionObject 对象, 该类是 AQS 的一个内部类。
Condition 是要和 lock 配合使用的, 也就是 Condition 和 Lock 是绑定在一起的。

我们知道在 Lock 是借助 AQS 实现的, 而 AQS 内部维护了一个同步队列, 如果是独占式锁的话, 所有获取锁失败的线程的会尾插入到同步队列。
同样的, Condition 内部也是使用同样的方式, 内部维护了一个等待队列, 所有调用 Condition.await 方法的线程会加入到等待队列中, 并且线程状态转换为等待状态。

另外注意到 ConditionObject 中有两个成员变量:

/** First node of condition queue. */
private transient Node firstWaiter;

/** Last node of condition queue. */
private transient Node lastWaiter;

Node 类有这样一个属性:

//后继节点
Node nextWaiter;

进一步说明, 等待队列是一个单向队列。调用 Condition.await 方法后线程依次尾插入到等待队列中。
总的来说 ConditionObject 内部的队列的样子是这样的:

Alt 'Condition 队列的数据结构'

同时还有一点需要注意的是: 可以多次调用 lock.newCondition() 方法创建多个 condition 对象, 也就是一个 lock 可以持有多个等待队列。
而在利用 Object 的在实现上只能借助对象 Object 对象监视器上的一个同步队列和一个等待队列, 而并发包中的 Lock 拥有一个同步队列和多个等待队列

如图所示:

MultiConditionInAbstractQueuedSynchronizer
Alt 'AQS 中的多个 Condition'

如图所示, ConditionObject 是 AQS 的内部类, 因此每个 ConditionObject 能够访问到 AQS 提供的方法, 相当于每个 Condition 都拥有所属同步器的引用。

3 await 的实现原理

当调用 Condition.await() 方法后会使得当前获取 lock 的线程进入到等待队列, 如果该线程能够从 await() 方法返回的话一定是该线程获取了与 Condition 相关联的 lock 或者被中断

await() 方法源码为:

public final void await() throws InterruptedException {

    // 当前线程为中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 1. 把当前的线程封装为 Node 节点, 放到队列的尾部, 同时返回这个节点    
    Node node = addConditionWaiter();
    
    // 2. 释放当前线程所占用的 lock, 在释放的过程中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);

    int interruptMode = 0;

    // 判断当前节点是否在 AQS 的同步队列里面
    while (!isOnSyncQueue(node)) {
        // 3. 把当前线程挂起, 进入阻塞状态
        LockSupport.park(this);
        // 0 不是被中断, 结束循环了
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    // 4. 自旋等待获取到同步状态, 既 Lock 
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
        
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();

    // 5. 处理被中断的状态    
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

大概的流程是这样的:
当前线程调用 Condition.await() 方法后, 会使得当前线程释放 lock, 然后加入到等待队列中, 直至被 signal/signalAll 后会使得当前线程从等待队列中移至到同步队列中去, 再次获得了 lock 后才会从 await 方法返回, 或者在等待时被中断会做中断处理

这里涉及几个问题

  1. 是怎样将当前线程添加到等待队列中去的
  2. 释放锁的过程
  3. 怎样才能从 await 方法退出

3.1 await 中的入队操作 - addConditionWaiter 方法

private Node addConditionWaiter() {

    // 当前线程是否为持有锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();

    // 取到最后的节点    
    Node t = lastWaiter;
    
    // 尾节点不为空, 同时状态不为 CONDITION (-2)
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 把当前链表里面状态值不是 Conditon 的进行删除
        unlinkCancelledWaiters();
        // 重试设置 t 为新的尾节点
        t = lastWaiter;
    }
    
    // 把当前节点封装为 Node
    Node node = new Node(Node.CONDITION);

    // 尾节点为空, 当前节点就是头节点了
    if (t == null)
        firstWaiter = node;
    else
        // 把当前节点放到队列的尾部
        t.nextWaiter = node;

    // 更新当前节点为尾节点    
    lastWaiter = node;
    return node;
}

private void unlinkCancelledWaiters() {
    
    // 头节点
    Node t = firstWaiter;
    // 记录最后一个状态是 Condition 的节点, 用于后面赋值给尾节点
    Node trail = null;
    
    // t 节点不为空
    while (t != null) {
        // 获取下一个节点
        Node next = t.nextWaiter;

        // t 节点的状态不为 CONDITION
        if (t.waitStatus != Node.CONDITION) {
            // 设置 t 的下一个节点为 null
            t.nextWaiter = null;


            // 因为 trail 记录的是遍历中最新的一个状态不是 Condition 的节点
            // 为 null, 当前一直在移动头节点, 那么只需要把 状态不为 Condition 的 t 节点的下一个节点为头节点即可
            // 不为 null, trail 表示当前遍历中, 最新的那个状态为 Condition 的节点, 将 t 节点的下一个节点设置到 trail 后面即可
            if (trail == null)
                // 设置当前的头节点为 t 节点的下一个节点
                firstWaiter = next;
            else
                // trail 的下一个节点等于 t 节点的下一个节点
                trail.nextWaiter = next;
            
            // 没有下一个节点了
            if (next == null)
                // 将尾节设置为 trail
                lastWaiter = trail;
        } 
        else
            // 设置 trail = t
            trail = t;
        // t = t 的下一个节点    
        t = next;
    }
}

3.2 await 中的锁释放操作 - fullyRelease 方法

将当前节点插入到等待对列之后, 会使当前线程释放 lock, 由 fullyRelease 方法实现, fullyRelease 源码为:

final int fullyRelease(Node node) {
    try {
        // 获取同步状态
        int savedState = getState();

        // 调用 AQS 的方法释放锁
        if (release(savedState))
            // 释放成功
            return savedState;

        // 释放失败, 抛出异常    
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {

        // 释放锁的节点的状态修改为 Cancelled 取消状态
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

调用 AQS 的模板方法 release 方法释放 AQS 的同步状态并且唤醒在同步队列中头节点的后继节点引用的线程。

如果释放成功则正常返回, 若失败的话就抛出异常。这样就解决了上面的第 2 个问题了。

3.3 await 中的判断当前节点是否在等待队列的操作 - isOnSyncQueue 方法

final boolean isOnSyncQueue(Node node) {
    
    // 当前节点的状态为 condition 或没有上一个节点, 也就是头节点了, 直接返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

    // 没有下一个节点, 也就是理论理论上的头节点, 直接返回 true
    if (node.next != null) 
        return true;

    // 从链表的尾节点开始向上找, 是否有等于这个节点
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {

    // 从尾节点一直往前找
    for (Node p = tail;;) {
        // 当前节点和 node 一样 返回 true
        if (p == node)
            return true;
        // p 节点为 null, 没有数据了, 返回 false    
        if (p == null)
            return false;
        p = p.prev;
    }
}

整理一下, 逻辑如下

  1. 当前节点为 condition 状态或者没有上一节点, 也就是头节点, 直接返回 false
  2. 当前节点没有下一个节点了, 也就是理论上的头节点, 直接返回 true
  3. 从链表的尾节点开始往前找是否有和判断的节点一样的, 有返回 ture, 没有 false

3.4 await 中的线程线程唤醒中断判断 - checkInterruptWhileWaiting 方法

这里简单说一下 Condition.signal() 方法的原理

  1. 找到当前的等待队列的头节点
  2. 通过 CAS 将头节点的状态从 condition 设置为 0 (加入等待队列的线程的状态默认为 condition)
  3. 第二步的 CAS 失败了, 会重新从等待队列中找下一个节点, 然后从第二步继续开始
  4. 第二步的 CAS 成功了, 把当前的节点放到同步队列的尾部, 同时返回上一次的尾节点
  5. 上一次的尾节点的状态为取消状态, 或者通过 CAS 将上一次的尾节点的状态设置为 signal 状态失败的话, 调用 LockSupport.unpark(上一次尾节点里面的线程), 对齐进行唤醒

上面大体就是 signal 的步骤, 理解完上面的步骤, 才能理解下面的代码, 重点, 重点。

在上面 await 方法的源码中, 知道线程是通过 LockSupport.park(this) 挂起的。 那么什么时候这个阻塞的方法会继续执行, 也就是对应的线程苏醒。

  1. 通过调用 LockSupport.unpark(当前线程), 唤醒当前线程
  2. 别的线程调用了当前线程的 interrupt 方法, 中断当前线程, 这是 LockSupport.park(this), 也会被唤醒, 不会抛中断异常的
  3. 线程假唤醒

所以线程在 LockSupport.park(this) 处苏醒, 继续走下去的, 可能性有 3 种, 这时候需要先判断当前线程是否为中断导致的属性


private int checkInterruptWhileWaiting(Node node) {

    // 判断当前线程在等待期间, 是否被设置了中断标识, 这个方法会返回 3 种情况
    // 0: 正常情况, 没有设置中断标识, 表示是正常的同步队列唤醒或者外部直接通过 LockSupport.unpark() 唤醒这个线程
    // THROW_IE (-1): 线程唤醒后, 抛出异常
    // REINTERRUPT (1): 线程唤醒后, 设置中断标识为 true

    // THROW_IE 和 REINTERRUPT 都是表示线程在等待中被设置了中断标识, 他们是如何区别的?
    // 需要分析 transferAfterCancelledWait 方法中的, 将当前节点的状态通过 CAS 从 condition 设置为 0 是否成功
    // 那么什么时候是成功, 什么时候会失败, 这时就涉及到上面说的 signal()/signalAll() 方法的流程了
    // 1. 直接调用了这个线程的 interrupt() 方法, 这时候节点的状态还是 condition 的, 成功
    // 2. 一个线程调用了 signal()/ signalAll() 方法, 这时候执行到了上面步骤的第二步, 并成功了, 这时候另一个线程直接中断了线程, 这时候节点的状态不是 condition 的, 失败了
    // 3. 一个线程调用了 signal()/ signalAll() 方法, 这时候还未执行到上面步骤的第二步, 这时候另一个线程直接中断了线程, 这时候节点的状态还是 condition 的, 成功
    
    // 上面 3 种情况, 可以简单的概况为 
    // 1. 调用了 signal()/ signalAll() 后, 中断线程
    // 2. 中断了线程后, 调用了 signal()/ signalAll()

    // 如果是第一种情况, 则由 signal() / signalALl() 将中断线程的节点放到同步队列的尾部
    // 如果是第二种情况, 则由 transferAfterCancelledWait() 将中断线程的节点放到同步队列的尾部

    // 在第一种情况下, 存在将中断线程的节点从等待队列移动到同步队列的过程, 
    // 这个过程会出现一个很端的时间, 在同步队列找不到这个节点, 所以在后面做多了一层判断, 直到在同步队列中找到了线程的节点, 才返回

    // 2 种不同的中断方式
    // 第一种返回了 REINTERRUPT, 线程唤醒后, 设置中断标识为 true
    // 第二种返回了 THROW_IE, 线程唤醒后, 抛出异常

    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :  0;
}

final boolean transferAfterCancelledWait(Node node) {

    // 把 node 节点的状态从 condition 设置为 0,
    // 设置成功, 把节点放到同步队列的尾部, 返回 true
    
    // 在 signal/signalAll 中会先将节点设置为 signal 状态, 即 0, 然后把节点从等待队列移动到同步队列
    // 在这里将节点的状态从 condition 设置为 0, 设置成功了, 表示在 signal/signalAll 之前, 线程就被中断了
    // 设置失败, 在表示在 signal/signalAll 之后, 线程才被中断

    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        enq(node);
        return true;
    }

    // 将中断线程的节点从等待队列移动到同步队列的过程, 存在极端时间的节点找不到
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

3.5 await 的退出流程

public final void await() throws InterruptedException {

    if (Thread.interrupted())
        throw new InterruptedException();

    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    // 当前节点是否在等待队列中
    while (!isOnSyncQueue(node)) {
        // 不在的话, 挂起当前线程
        LockSupport.park(this);

        // 唤醒后判断当前线程的是否被中断过
        // 等于 0, 表示没有被中断
        // 不等于 0, 表示中断过, 存在 2 种中断方式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    // acquireQueued 的返回值, true 线程需要中断, false 线程不需要中断
    
    // acquireQueued 返回需要中断, 则将不是 THROW_IE 的中断模式设置为 REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        // 也就是将 0 的情况也设置为 REINTERRUPT
        interruptMode = REINTERRUPT;

    // 节点的下一个节点不为 null, 移除当前链表中的取消状态的节点    
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();

    // 中断模式不为 0, 根据中断模式设置线程的中断状态   
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    // 中断模式为 THROW_IE, 抛出异常
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // 中断模式为 REINTERRUPT, 设置线程的中断标识为 true    
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

上面就是 await() 方法的整体流程梳理。

上面的第 3 个问题: 怎样才能从 await 方法退出 也就能得到答案了

  1. 当前线程被中断, 从等待队列移动到同步队列后, 重新获取锁
  2. 调用 signal/signalAll 将当前线程的节点从等待队列移到同步队列, 重新获取锁

3.6 await 类似的其他挂起线程的方法

3.6.1 超时机制的支持

condition 还额外支持了超时机制, 使用者可调用方法 awaitNanos(), awaitUtil() 这两个方法的实现原理, 基本上与 AQS 中的 tryAcquire 方法如出一辙。

2 者的实现方式和 await 类似, 只是在死循环中添加了对超时时间的判断。同时在超时后, 将当前线程的节点的状态从 condition 设置为 0, 追加到同步队列的尾部

3.6.2 不响应中断的支持

要想不响应中断可以调用 condition.awaitUninterruptibly() 方法, 该方法的源码为:

public final void awaitUninterruptibly() {

    // 节点入到等待队列
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);

    // 是否需要设置中断标识
    boolean interrupted = false;
    // 判断当前节点不在同步队列中, 在的话, 结束循环
    while (!isOnSyncQueue(node)) {
        // 挂起线程
        LockSupport.park(this);
        // 线程苏醒后, 判断中断标识是否为 true
        if (Thread.interrupted())
            interrupted = true;
    }
    // 从同步队列中获取锁成功了, 同时返回需要设置中断 或者上面的判断后需要中断
    if (acquireQueued(node, savedState) || interrupted)
        // 设置线程的中断标识为 true
        selfInterrupt();

}

这段方法与上面的 await 方法基本一致, 只不过减少了对中断的处理, 并省略了 reportInterruptAfterWait 方法抛被中断的异常。

4 signal/signalAll 实现原理

调用 condition 的 signal 或者 signalAll 方法可以将等待队列中等待时间最长的节点移动到同步队列中, 使得该节点能够有机会获得 lock。
按照等待队列是先进先出 (FIFO) 的, 所以等待队列的头节点必然会是等待时间最长的节点, 也就是每次调用 condition 的 signal 方法是将头节点移动到同步队列中。

signal 方法源码为:

public final void signal() {
    // 检测当前线程是否持有锁
    if (!isHeldExclusively())
        // 没有锁, 抛出异常
        throw new IllegalMonitorStateException();
    // 获取头节点    
    Node first = firstWaiter;
    if (first != null)
        // 尝试将头节点移动到同步队列
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 设置头节点为当前头节点的下一个节点
        if ((firstWaiter = first.nextWaiter) == null)
            // 当前的头节点为空, 设置尾节点也为空
            lastWaiter = null;

        // 置空需要移动的节点的下一个节点    
        first.nextWaiter = null;
    // transferForSignal 进行节点的真正处理
    // 在 transferForSignal 处理失败时, 最新的头节点不为空, 继续处理新的头节点
    } while (!transferForSignal(first) &&  (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {

    // 1. 将节点的状态从 condition 设置为 0
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    // 2. 把这个节点放到 AQS 里面的同步队列的尾部, 同时获取到上一次的尾节点
    Node p = enq(node);

    int ws = p.waitStatus;
    // 上一次的尾节点状态为 1 (取消状态) 或者 通过 CAS 将这个节点的状态设置为 signal 失败, 则唤醒这个线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal方法的逻辑:

  1. 将头节点的状态更改为CONDITION
  2. 调用 enq 方法, 将该节点尾插入到同步队列中

现在我们可以得到结论: 调用 condition 的 signal 的前提条件是当前线程已经获取了 lock, 该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列, 而移入到同步队列后才有机会使得等待线程被唤醒,
即从 await 方法中的 LockSupport.park(this) 方法中返回, 从而才有机会使得调用 await 方法的线程成功退出。

signalAll

sigllAll 与 sigal 方法的区别体现在 doSignalAll 方法上, 前面我们已经知道 doSignal 方法只会对等待队列的头节点进行操作, 而 doSignalAll 的源码:

private void doSignalAll(Node first) {
    // 将头尾节点都设置为 null
    lastWaiter = firstWaiter = null;

    do {
        // 获取头节点的下一个节点
        Node next = first.nextWaiter;
        // 设置投节点的下一个节点为 null
        first.nextWaiter = null;
        // 尝试将头节点设置到同步队列的尾部
        transferForSignal(first);
        // 头节点等于头节点的下一个节点
        first = next;
    } while (first != null);
}

该方法会不断地将等待队列中的每一个节点都移入到同步队列中, 即 “通知” 当前调用 condition.await() 方法的每一个线程。

5 await 与 signal-signalAll 的结合思考

文章开篇提到等待/通知机制, 通过使用 condition 提供的 await 和 signal/signalAll 方法就可以实现这种机制, 而这种机制能够解决最经典的问题就是 “生产者与消费者问题”。
await 和 signal 和 signalAll 方法就像一个开关控制着线程 A (等待方) 和线程 B (通知方) 。它们之间的关系可以用下面一个图来表现得更加贴切:

Alt 'Condition 下的 wait 和 notice 机制'

如图:

  1. 线程 awaitThread 先通过 lock.lock() 方法获取锁成功后调用了 condition.await 方法进入等待队列
  2. 另一个线程 signalThread 通过 lock.lock() 方法获取锁成功后调用了condition.signal 或者 signalAll 方法, 使得线程 awaitThread 能够有机会移入到同步队列中,
  3. 当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock, 从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列

举个例子

public class AwaitSignal {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new Waiter());
        waiter.start();
        Thread signaler = new Thread(new Signaler());
        signaler.start();
    }

    static class Waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class Signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

输出结果

Thread-0当前条件不满足等待  
Thread-0接收到通知, 条件满足

6 参考

详解Condition的await和signal等待/通知机制
Condition的await-signal流程详解


  目录