Java 阻塞队列的历史可以追溯到 JDK1.5 版本,当时 Java 平台增加了 java.util.concurrent
,即我们常说的 JUC 包,其中包含了各种并发流程控制工具、并发容器、原子类等。这其中自然也包含了我们这篇文章所讨论的阻塞队列。
为了解决高并发场景下多线程之间数据共享的问题,JDK1.5 版本中出现了 ArrayBlockingQueue
和 LinkedBlockingQueue
,它们是带有生产者-消费者模式实现的并发容器。其中,ArrayBlockingQueue
是有界队列,即添加的元素达到上限之后,再次添加就会被阻塞或者抛出异常。而 LinkedBlockingQueue
则由链表构成的队列,正是因为链表的特性,所以 LinkedBlockingQueue
在添加元素上并不会向 ArrayBlockingQueue
那样有着较多的约束,所以 LinkedBlockingQueue
设置队列是否有界是可选的(注意这里的无界并不是指可以添加任务数量的元素,而是说队列的大小默认为 Integer.MAX_VALUE
,近乎于无限大)。
随着 Java 的不断发展,JDK 后续的几个版本又对阻塞队列进行了不少的更新和完善:
- JDK1.6 版本:增加
SynchronousQueue
,一个不存储元素的阻塞队列。 - JDK1.7 版本:增加
TransferQueue
,一个支持更多操作的阻塞队列。 - JDK1.8 版本:增加
DelayQueue
,一个支持延迟获取元素的阻塞队列。
阻塞队列就是典型的生产者-消费者模型,它可以做到以下几点:
- 当阻塞队列数据为空时,所有的消费者线程都会被阻塞,等待队列非空。
- 当生产者往队列里填充数据后,队列就会通知消费者队列非空,消费者此时就可以进来消费。
- 当阻塞队列因为消费者消费过慢或者生产者存放元素过快导致队列填满时无法容纳新元素时,生产者就会被阻塞,等待队列非满时继续存放元素。
- 当消费者从队列中消费一个元素之后,队列就会通知生产者队列非满,生产者可以继续填充数据了。
总结一下:阻塞队列就说基于非空和非满两个条件实现生产者和消费者之间的交互,尽管这些交互流程和等待通知的机制实现非常复杂,好在 Doug Lea 的操刀之下已将阻塞队列的细节屏蔽,我们只需调用 put
、take
、offer
、poll
等 API 即可实现多线程之间的生产和消费。
这也使得阻塞队列在多线程开发中有着广泛的运用,最常见的例子无非是我们的线程池,从源码中我们就能看出当核心线程无法及时处理任务时,这些任务都会扔到 workQueue
中。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {// ...}
简单了解了阻塞队列的历史之后,我们就开始重点讨论本篇文章所要介绍的并发容器——ArrayBlockingQueue
。为了后续更加深入的了解 ArrayBlockingQueue
,我们不妨基于下面几个实例了解以下 ArrayBlockingQueue
的使用。
先看看第一个例子,我们这里会用两个线程分别模拟生产者和消费者,生产者生产完会使用 put
方法生产 10 个元素给消费者进行消费,当队列元素达到我们设置的上限 5 时,put
方法就会阻塞。 同理消费者也会通过 take
方法消费元素,当队列为空时,take
方法就会阻塞消费者线程。这里笔者为了保证消费者能够在消费完 10 个元素后及时退出。便通过倒计时门闩,来控制消费者结束,生产者在这里只会生产 10 个元素。当消费者将 10 个元素消费完成之后,按下倒计时门闩,所有线程都会停止。
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 创建生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
// 向队列中添加元素,如果队列已满则阻塞等待
queue.put(i);
System.out.println("生产者添加元素:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
CountDownLatch countDownLatch = new CountDownLatch(1);
// 创建消费者线程
Thread consumer = new Thread(() -> {
try {
int count = 0;
while (true) {
// 从队列中取出元素,如果队列为空则阻塞等待
int element = queue.take();
System.out.println("消费者取出元素:" + element);
++count;
if (count == 10) {
break;
}
}
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动线程
producer.start();
consumer.start();
// 等待线程结束
producer.join();
consumer.join();
countDownLatch.await();
producer.interrupt();
consumer.interrupt();
}
}
代码输出结果如下,可以看到只有生产者往队列中投放元素之后消费者才能消费,这也就意味着当队列中没有数据的时消费者就会阻塞,等待队列非空再继续消费。
生产者添加元素:1
生产者添加元素:2
消费者取出元素:1
消费者取出元素:2
生产者添加元素:3
消费者取出元素:3
生产者添加元素:4
生产者添加元素:5
消费者取出元素:4
生产者添加元素:6
消费者取出元素:5
生产者添加元素:7
生产者添加元素:8
生产者添加元素:9
生产者添加元素:10
消费者取出元素:6
消费者取出元素:7
消费者取出元素:8
消费者取出元素:9
消费者取出元素:10
了解了 put
、take
这两个会阻塞的存和取方法之后,我我们再来看看阻塞队列中非阻塞的入队和出队方法 offer
和 poll
。
如下所示,我们设置了一个大小为 3 的阻塞队列,我们会尝试在队列用 offer 方法存放 4 个元素,然后再从队列中用 poll
尝试取 4 次。
public class OfferPollExample {
public static void main(String[] args) {
// 创建一个大小为 3 的 ArrayBlockingQueue
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 向队列中添加元素
System.out.println(queue.offer("A"));
System.out.println(queue.offer("B"));
System.out.println(queue.offer("C"));
// 尝试向队列中添加元素,但队列已满,返回 false
System.out.println(queue.offer("D"));
// 从队列中取出元素
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
// 尝试从队列中取出元素,但队列已空,返回 null
System.out.println(queue.poll());
}
}
最终代码的输出结果如下,可以看到因为队列的大小为 3 的缘故,我们前 3 次存放到队列的结果为 true,第 4 次存放时,由于队列已满,所以存放结果返回 false。这也是为什么我们后续的 poll
方法只得到了 3 个元素的值。
true
true
true
false
A
B
C
null
了解了阻塞存取和非阻塞存取,我们再来看看阻塞队列的一个比较特殊的操作,某些场景下,我们希望能够一次性将阻塞队列的结果存到列表中再进行批量操作,我们就可以使用阻塞队列的 drainTo
方法,这个方法会一次性将队列中所有元素存放到列表,如果队列中有元素,且成功存到 list 中则 drainTo
会返回本次转移到 list 中的元素数,反之若队列为空,drainTo
则直接返回 0。
public class DrainToExample {
public static void main(String[] args) {
// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 向队列中添加元素
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
// 创建一个 List,用于存储从队列中取出的元素
List<Integer> list = new ArrayList<>();
// 从队列中取出所有元素,并添加到 List 中
queue.drainTo(list);
// 输出 List 中的元素
System.out.println(list);
}
}
代码输出结果如下
[1, 2, 3, 4, 5]
自此我们对阻塞队列的使用有了基本的印象,接下来我们就可以进一步了解一下 ArrayBlockingQueue
的工作机制了。
在了解 ArrayBlockingQueue
的具体细节之前,我们先来看看 ArrayBlockingQueue
的类图。
从图中我们可以看出,ArrayBlockingQueue
继承了阻塞队列 BlockingQueue
这个接口,不难猜出通过继承 BlockingQueue
这个接口之后,ArrayBlockingQueue
就拥有了阻塞队列那些常见的操作行为。
同时, ArrayBlockingQueue
还继承了 AbstractQueue
这个抽象类,这个继承了 AbstractCollection
和 Queue
的抽象类,从抽象类的特定和语义我们也可以猜出,这个继承关系使得 ArrayBlockingQueue
拥有了队列的常见操作。
所以我们是否可以得出这样一个结论,通过继承 AbstractQueue
获得队列所有的操作模板,其实现的入队和出队操作的整体框架。然后 ArrayBlockingQueue
通过继承 BlockingQueue
获取到阻塞队列的常见操作并将这些操作实现,填充到 AbstractQueue
模板方法的细节中,由此 ArrayBlockingQueue
成为一个完整的阻塞队列。
为了印证这一点,我们到源码中一探究竟。首先我们先来看看 AbstractQueue
,从类的继承关系我们可以大致得出,它通过 AbstractCollection
获得了集合的常见操作方法,然后通过 Queue
接口获得了队列的特性。
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
//...
}
对于集合的操作无非是增删改查,所以我们不妨从添加方法入手,从源码中我们可以看到,它实现了 AbstractCollection
的 add
方法,其内部逻辑如下:
- 调用继承
Queue
接口的来的offer
方法,如果offer
成功则返回true
。 - 如果
offer
失败,即代表当前元素入队失败直接抛异常。 -
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
而 AbstractQueue
中并没有对 Queue
的 offer
的实现,很明显这样做的目的是定义好了 add
的核心逻辑,将 offer
的细节交由其子类即我们的 ArrayBlockingQueue
实现。
到此,我们对于抽象类 AbstractQueue
的分析就结束了,我们继续看看 ArrayBlockingQueue
中另一个重要的继承接口 BlockingQueue
。
点开 BlockingQueue
之后,我们可以看到这个接口同样继承了 Queue
接口,这就意味着它也具备了队列所拥有的所有行为。同时,它还定义了自己所需要实现的方法。
public interface BlockingQueue<E> extends Queue<E> {
//元素入队成功返回true,反之则会抛出异常IllegalStateException
boolean add(E e);
//元素入队成功返回true,反之返回false
boolean offer(E e);
//元素入队成功则直接返回,如果队列已满元素不可入队则将线程阻塞,因为阻塞期间可能会被打断,所以这里方法签名抛出了InterruptedException
void put(E e) throws InterruptedException;
//和上一个方法一样,只不过队列满时只会阻塞单位为unit,时间为timeout的时长,如果在等待时长内没有入队成功则直接返回false。
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//从队头取出一个元素,如果队列为空则阻塞等待,因为会阻塞线程的缘故,所以该方法可能会被打断,所以签名定义了InterruptedException
E take() throws InterruptedException;
//取出队头的元素并返回,如果当前队列为空则阻塞等待timeout且单位为unit的时长,如果这个时间段没有元素则直接返回null。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列剩余元素个数
int remainingCapacity();
//删除我们指定的对象,如果成功返回true,反之返回false。
boolean remove(Object o);
//判断队列中是否包含指定元素
public boolean contains(Object o);
//将队列中的元素全部存到指定的集合中
int drainTo(Collection<? super E> c);
//转移maxElements个元素到集合中
int drainTo(Collection<? super E> c, int maxElements);
}
了解了 BlockingQueue
的常见操作后,我们就知道了 ArrayBlockingQueue
通过继承 BlockingQueue
的方法并实现后,填充到 AbstractQueue
的方法上,由此我们便知道了上文中 AbstractQueue
的 add
方法的 offer
方法是哪里是实现的了。
public boolean add(E e) {
//AbstractQueue的offer来自下层的ArrayBlockingQueue从BlockingQueue继承并实现的offer方法
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
没有回复内容