Java DelayQueue 介绍(二)-Java专区论坛-技术-SpringForAll社区

Java DelayQueue 介绍(二)

DelayQueue 源码解析

这里以 JDK1.8 为例,分析一下 DelayQueue 的底层核心源码。

DelayQueue 的类定义如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
{
  //...
}

DelayQueue 继承了 AbstractQueue 类,实现了 BlockingQueue 接口。

DelayQueue类图

核心成员变量

DelayQueue 的 4 个核心成员变量如下:

//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();

//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
  • lock : 我们都知道 DelayQueue 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 DelayQueue 就是基于 ReentrantLock 独占锁确保存取操作的线程安全。
  • q : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 DelayQueue 底层元素的存取都是通过这个优先队列 PriorityQueue 的成员变量 q 来管理的。
  • leader : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 leader 来管理延迟任务,只有 leader 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 leader 线程执行完手头的延迟任务后唤醒它。
  • available : 上文讲述 leader 线程时提到的等待唤醒操作的交互就是通过 available 实现的,假如线程 1 尝试在空的 DelayQueue 获取任务时,available 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 available 的 signal 方法将其唤醒。

构造方法

相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。还有一个传入 Collection 对象的构造方法,它会将调用 addAll()方法将集合元素存到优先队列 q 中。

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

添加元素

DelayQueue 添加元素的方法无论是 addput 还是 offer,本质上就是调用一下 offer ,所以了解延迟队列的添加逻辑我们只需阅读 offer 方法即可。

offer 方法的整体逻辑为:

  1. 尝试获取 lock 。
  2. 如果上锁成功,则调 q 的 offer 方法将元素存放到优先队列中。
  3. 调用 peek 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 leader 设置为空,通知因为队列为空时调用 take 等方法导致阻塞的线程来争抢元素。
  4. 上述步骤执行完成,释放 lock
  5. 返回 true。

源码如下:

public boolean offer(E e) {
    //尝试获取lock
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果上锁成功,则调q的offer方法将元素存放到优先队列中
        q.offer(e);
        //调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
        if (q.peek() == e) {
            //将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //上述步骤执行完成,释放lock
        lock.unlock();
    }
}

获取元素

DelayQueue 中获取元素的方式分为阻塞式和非阻塞式,先来看看逻辑比较复杂的阻塞式获取元素方法 take,为了让读者可以更直观的了解阻塞式获取元素的全流程,笔者将以 3 个线程并发获取元素为例讲述 take 的工作流程。

1、首先, 3 个线程会尝试获取可重入锁 lock,假设我们现在有 3 个线程分别是 t1、t2、t3,随后 t1 得到了锁,而 t2、t3 没有抢到锁,故将这两个线程存入等待队列中。

图片[2]-Java DelayQueue 介绍(二)-Java专区论坛-技术-SpringForAll社区

2、紧接着 t1 开始进行元素获取的逻辑。

3、线程 t1 首先会查看 DelayQueue 队列首元素是否为空。

4、如果元素为空,则说明当前队列没有任何元素,故 t1 就会被阻塞存到 conditionWaiter 这个队列中。

图片[3]-Java DelayQueue 介绍(二)-Java专区论坛-技术-SpringForAll社区

注意,调用 await 之后 t1 就会释放 lcok 锁,假如 DelayQueue 持续为空,那么 t2、t3 也会像 t1 一样执行相同的逻辑并进入 conditionWaiter 队列中。

图片[4]-Java DelayQueue 介绍(二)-Java专区论坛-技术-SpringForAll社区

如果元素不为空,则判断当前任务是否到期,如果元素到期,则直接返回出去。如果元素未到期,则判断当前 leader 线程(DelayQueue 中唯一一个可以等待并获取元素的线程引用)是否为空,若不为空,则说明当前 leader 正在等待执行一个优先级比当前元素还高的元素到期,故当前线程 t1 只能调用 await 进入无限期等待,等到 leader 取得元素后唤醒。反之,若 leader 线程为空,则将当前线程设置为 leader 并进入有限期等待,到期后取出元素并返回。

自此我们阻塞式获取元素的逻辑都已完成后,源码如下:

public E take() throws InterruptedException {
    // 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            //查看队列第一个元素
            E first = q.peek();
            //若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
            if (first == null)
                available.await();
            else {
                //若元素不为空,则查看当前元素多久到期
                long delay = first.getDelay(NANOSECONDS);
                //如果小于0则说明已到期直接返回出去
                if (delay <= 0)
                    return q.poll();
                //如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
                first = null; // don't retain ref while waiting
                //判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
                if (leader != null)
                    available.await();
                else {
                    //反之将我们的线程成为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //并进入有限期等待
                        available.awaitNanos(delay);
                    } finally {
                        //等待任务到期时,释放leader引用,进入下一次循环将任务return出去
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 收尾逻辑:当leader为null,并且队列中有任务时,唤醒等待的获取元素的线程。
        if (leader == null && q.peek() != null)
            available.signal();
        //释放锁
        lock.unlock();
    }
}

我们再来看看非阻塞的获取元素方法 poll ,逻辑比较简单,整体步骤如下:

  1. 尝试获取可重入锁。
  2. 查看队列第一个元素,判断元素是否为空。
  3. 若元素为空,或者元素未到期,则直接返回空。
  4. 若元素不为空且到期了,直接调用 poll 返回出去。
  5. 释放可重入锁 lock 。

源码如下: 

public E poll() {
    //尝试获取可重入锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //查看队列第一个元素,判断元素是否为空
        E first = q.peek();

        //若元素为空,或者元素未到期,则直接返回空
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            //若元素不为空且到期了,直接调用poll返回出去
            return q.poll();
    } finally {
        //释放可重入锁lock
        lock.unlock();
    }
}

查看元素

上文获取元素时都会调用到 peek 方法,peek 顾名思义仅仅窥探一下队列中的元素,它的步骤就 4 步:

  1. 上锁。
  2. 调用优先队列 q 的 peek 方法查看索引 0 位置的元素。
  3. 释放锁。
  4. 将元素返回出去。
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}
请登录后发表评论

    没有回复内容