这里以 JDK1.8 为例,分析一下 DelayQueue
的底层核心源码。
DelayQueue
的类定义如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
{
//...
}
DelayQueue
继承了 AbstractQueue
类,实现了 BlockingQueue
接口。
DelayQueue
的 4 个核心成员变量如下:
//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
lock
: 我们都知道DelayQueue
存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而DelayQueue
就是基于ReentrantLock
独占锁确保存取操作的线程安全。q
: 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以DelayQueue
底层元素的存取都是通过这个优先队列PriorityQueue
的成员变量q
来管理的。leader
: 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用leader
来管理延迟任务,只有leader
所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到leader
线程执行完手头的延迟任务后唤醒它。available
: 上文讲述leader
线程时提到的等待唤醒操作的交互就是通过available
实现的,假如线程 1 尝试在空的DelayQueue
获取任务时,available
就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过available
的signal
方法将其唤醒。
相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。还有一个传入 Collection
对象的构造方法,它会将调用 addAll()
方法将集合元素存到优先队列 q
中。
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
DelayQueue
添加元素的方法无论是 add
、put
还是 offer
,本质上就是调用一下 offer
,所以了解延迟队列的添加逻辑我们只需阅读 offer 方法即可。
offer
方法的整体逻辑为:
- 尝试获取
lock
。 - 如果上锁成功,则调
q
的offer
方法将元素存放到优先队列中。 - 调用
peek
方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将leader
设置为空,通知因为队列为空时调用take
等方法导致阻塞的线程来争抢元素。 - 上述步骤执行完成,释放
lock
。 - 返回 true。
源码如下:
public boolean offer(E e) {
//尝试获取lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果上锁成功,则调q的offer方法将元素存放到优先队列中
q.offer(e);
//调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
if (q.peek() == e) {
//将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
leader = null;
available.signal();
}
return true;
} finally {
//上述步骤执行完成,释放lock
lock.unlock();
}
}
DelayQueue
中获取元素的方式分为阻塞式和非阻塞式,先来看看逻辑比较复杂的阻塞式获取元素方法 take
,为了让读者可以更直观的了解阻塞式获取元素的全流程,笔者将以 3 个线程并发获取元素为例讲述 take
的工作流程。
1、首先, 3 个线程会尝试获取可重入锁 lock
,假设我们现在有 3 个线程分别是 t1、t2、t3,随后 t1 得到了锁,而 t2、t3 没有抢到锁,故将这两个线程存入等待队列中。
2、紧接着 t1 开始进行元素获取的逻辑。
3、线程 t1 首先会查看 DelayQueue
队列首元素是否为空。
4、如果元素为空,则说明当前队列没有任何元素,故 t1 就会被阻塞存到 conditionWaiter
这个队列中。
注意,调用 await
之后 t1 就会释放 lcok
锁,假如 DelayQueue
持续为空,那么 t2、t3 也会像 t1 一样执行相同的逻辑并进入 conditionWaiter
队列中。
如果元素不为空,则判断当前任务是否到期,如果元素到期,则直接返回出去。如果元素未到期,则判断当前 leader
线程(DelayQueue
中唯一一个可以等待并获取元素的线程引用)是否为空,若不为空,则说明当前 leader
正在等待执行一个优先级比当前元素还高的元素到期,故当前线程 t1 只能调用 await
进入无限期等待,等到 leader
取得元素后唤醒。反之,若 leader
线程为空,则将当前线程设置为 leader 并进入有限期等待,到期后取出元素并返回。
自此我们阻塞式获取元素的逻辑都已完成后,源码如下:
public E take() throws InterruptedException {
// 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//查看队列第一个元素
E first = q.peek();
//若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
if (first == null)
available.await();
else {
//若元素不为空,则查看当前元素多久到期
long delay = first.getDelay(NANOSECONDS);
//如果小于0则说明已到期直接返回出去
if (delay <= 0)
return q.poll();
//如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
first = null; // don't retain ref while waiting
//判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
if (leader != null)
available.await();
else {
//反之将我们的线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//并进入有限期等待
available.awaitNanos(delay);
} finally {
//等待任务到期时,释放leader引用,进入下一次循环将任务return出去
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 收尾逻辑:当leader为null,并且队列中有任务时,唤醒等待的获取元素的线程。
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}
我们再来看看非阻塞的获取元素方法 poll
,逻辑比较简单,整体步骤如下:
- 尝试获取可重入锁。
- 查看队列第一个元素,判断元素是否为空。
- 若元素为空,或者元素未到期,则直接返回空。
- 若元素不为空且到期了,直接调用
poll
返回出去。 - 释放可重入锁
lock
。
源码如下:
public E poll() {
//尝试获取可重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看队列第一个元素,判断元素是否为空
E first = q.peek();
//若元素为空,或者元素未到期,则直接返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//若元素不为空且到期了,直接调用poll返回出去
return q.poll();
} finally {
//释放可重入锁lock
lock.unlock();
}
}
上文获取元素时都会调用到 peek
方法,peek 顾名思义仅仅窥探一下队列中的元素,它的步骤就 4 步:
- 上锁。
- 调用优先队列 q 的 peek 方法查看索引 0 位置的元素。
- 释放锁。
- 将元素返回出去。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
没有回复内容