Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区

Java ConditionObject 介绍(一)

前言

在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解。

介绍

Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。

Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。

因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。

Condition 实例本质上要绑定到锁。 为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

基本使用

class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    // condition 实例依赖于 lock 实例
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putPtr, takePtr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //  put 时判断是否已经满了
            // 则线程在 notFull 条件上排队阻塞
            while (count == items.length) {
                notFull.await();
            }
            items[putPtr] = x;
            if (++putPtr == items.length) {
                putPtr = 0;
            }
            ++count;
            // put 成功之后,队列中有元素
            // 唤醒在 notEmpty 条件上排队阻塞的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            // take 时,发现为空
            // 则线程在 notEmpty 的条件上排队阻塞
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) {
                takePtr = 0;
            }
            --count;
            // take 成功,队列不可能是满的
            // 唤醒在 notFull 条件上排队阻塞的线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。

问题疑问

  1. Condition 和 AQS 有什么关系?

  2. Condition 的实现原理是什么?

  3. Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

源码分析

基本结构

图片[1]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区

通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。

创建 Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

一般使用 lock.newCondition() 创建条件变量。

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    public Condition newCondition() {
        return sync.newCondition();
    }
    // Sync 集成 AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }
}

这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。

这里需要注意的是 lock 每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。

Condition 参数

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;

await 方法

await 方法,会造成当前线程在等待,直到收到信号或被中断。

与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:

  1. 其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;

  2. 其他一些线程调用此 Condition 的 signalAll 方法;

  3. 其他一些线程中断当前线程,并支持中断线程挂起;

  4. 发生虚假唤醒。

在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。

现在来看 AQS 内部的实现逻辑:

public final void await() throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到条件队列尾部(等待队列)
    // 内部会创建 Node.CONDITION 类型的 Node
    Node node = addConditionWaiter();
    // 释放当前线程获取的锁(通过操作 state 的值)
    // 释放了锁就会被阻塞挂起
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着
    while (!isOnSyncQueue(node)) {
        // 调用 park 阻塞挂起当前线程
        LockSupport.park(this);
        // 说明 signal 被调用了或者线程被中断,校验下唤醒原因
        // 如果因为终端被唤醒,则跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // while 循环结束, 线程开始抢锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 统一处理中断的
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

await 方法步骤如下:

  1. 创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;

  2. 释放当前线程获取的锁(通过操作 state 的值)

  3. 判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。

  4. 循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。

在这里一定要把条件队列和同步队列进行区分清楚!!

条件队列/等待队列:即 Condition 的队列

同步队列:AQS 的队列。

下面对 await 里面重要方法进行阅读:

  • addConditionWaiter() 方法

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 判断尾节点状态,如果被取消,则清除所有被取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建新节点,类型为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 将新节点放到等待队列尾部
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:

  1. 条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;

  2. 条件队列是单向队列。

作为对比,这里把条件队列和同步队列做出对比:

图片[2]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区

AQS 同步队列如下:

图片[3]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区

再来看下 Condition 的条件队列

图片[4]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区

waitStatus 在 AQS 中已经进行了介绍:

1. 默认状态为 0;

2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;

3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);

4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;

5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。

请登录后发表评论

    没有回复内容