前言
在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解。
介绍
Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。
Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。
Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。
因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。
Condition 实例本质上要绑定到锁。 为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
基本使用
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 实例依赖于 lock 实例
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// put 时判断是否已经满了
// 则线程在 notFull 条件上排队阻塞
while (count == items.length) {
notFull.await();
}
items[putPtr] = x;
if (++putPtr == items.length) {
putPtr = 0;
}
++count;
// put 成功之后,队列中有元素
// 唤醒在 notEmpty 条件上排队阻塞的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
// take 时,发现为空
// 则线程在 notEmpty 的条件上排队阻塞
while (count == 0) {
notEmpty.await();
}
Object x = items[takePtr];
if (++takePtr == items.length) {
takePtr = 0;
}
--count;
// take 成功,队列不可能是满的
// 唤醒在 notFull 条件上排队阻塞的线程
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。
问题疑问
-
Condition 和 AQS 有什么关系?
-
Condition 的实现原理是什么?
-
Condition 的等待队列和 AQS 的同步队列有什么区别和联系?
源码分析
基本结构
![图片[1]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区](https://static001.geekbang.org/infoq/e0/e0755ea4537225c08052212d9ea04c98.png)
通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。
创建 Condition
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
一般使用 lock.newCondition() 创建条件变量。
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public Condition newCondition() {
return sync.newCondition();
}
// Sync 集成 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}
这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。
这里需要注意的是 lock 每调用一次 lock.newCondition()
都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。
Condition 参数
/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;
await 方法
await 方法,会造成当前线程在等待,直到收到信号或被中断。
与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:
-
其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;
-
其他一些线程调用此 Condition 的 signalAll 方法;
-
其他一些线程中断当前线程,并支持中断线程挂起;
-
发生虚假唤醒。
在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。
现在来看 AQS 内部的实现逻辑:
public final void await() throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 添加到条件队列尾部(等待队列)
// 内部会创建 Node.CONDITION 类型的 Node
Node node = addConditionWaiter();
// 释放当前线程获取的锁(通过操作 state 的值)
// 释放了锁就会被阻塞挂起
int savedState = fullyRelease(node);
int interruptMode = 0;
// 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着
while (!isOnSyncQueue(node)) {
// 调用 park 阻塞挂起当前线程
LockSupport.park(this);
// 说明 signal 被调用了或者线程被中断,校验下唤醒原因
// 如果因为终端被唤醒,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// while 循环结束, 线程开始抢锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 统一处理中断的
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await 方法步骤如下:
-
创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;
-
释放当前线程获取的锁(通过操作 state 的值)
-
判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。
-
循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。
在这里一定要把条件队列和同步队列进行区分清楚!!
条件队列/等待队列:即 Condition 的队列
同步队列:AQS 的队列。
下面对 await 里面重要方法进行阅读:
-
addConditionWaiter() 方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 判断尾节点状态,如果被取消,则清除所有被取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建新节点,类型为 Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将新节点放到等待队列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:
-
条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;
-
条件队列是单向队列。
作为对比,这里把条件队列和同步队列做出对比:
![图片[2]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区](https://static001.geekbang.org/infoq/8b/8bafc308af6b3ca2114422d251b624bb.png)
AQS 同步队列如下:
![图片[3]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区](https://static001.geekbang.org/infoq/dc/dcab2df64d9799660d1eb4f4bba87d81.png)
再来看下 Condition 的条件队列
![图片[4]-Java ConditionObject 介绍(一)-Java专区论坛-技术-SpringForAll社区](https://static001.geekbang.org/infoq/73/737998e9d53706dab3802ce45d219e0a.png)
waitStatus 在 AQS 中已经进行了介绍:
1. 默认状态为 0;
2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;
3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);
4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;
5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。
没有回复内容