了解 ArrayBlockingQueue
的细节前,我们不妨先看看其构造函数,了解一下其初始化过程。从源码中我们可以看出 ArrayBlockingQueue
有 3 个构造方法,而最核心的构造方法就是下面这一个。
// capacity 表示队列初始容量,fair 表示 锁的公平性
public ArrayBlockingQueue(int capacity, boolean fair) {
//如果设置的队列大小小于0,则直接抛出IllegalArgumentException
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个数组用于存放队列的元素
this.items = new Object[capacity];
//创建阻塞队列流程控制的锁
lock = new ReentrantLock(fair);
//用lock锁创建两个条件控制队列生产和消费
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
这个构造方法里面有两个比较核心的成员变量 notEmpty
(非空) 和 notFull
(非满) ,需要我们格外留意,它们是实现生产者和消费者有序工作的关键所在,这一点笔者会在后续的源码解析中详细说明,这里我们只需初步了解一下阻塞队列的构造即可。
另外两个构造方法都是基于上述的构造方法,默认情况下,我们会使用下面这个构造方法,该构造方法就意味着 ArrayBlockingQueue
用的是非公平锁,即各个生产者或者消费者线程收到通知后,对于锁的争抢是随机的。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
还有一个不怎么常用的构造方法,在初始化容量和锁的非公平性之后,它还提供了一个 Collection
参数,从源码中不难看出这个构造方法是将外部传入的集合的元素在初始化时直接存放到阻塞队列中。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//初始化容量和锁的公平性
this(capacity, fair);
final ReentrantLock lock = this.lock;
//上锁并将c中的元素存放到ArrayBlockingQueue底层的数组中
lock.lock();
try {
int i = 0;
try {
//遍历并添加元素到数组中
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//记录当前队列容量
count = i;
//更新下一次put或者offer或用add方法添加到队列底层数组的位置
putIndex = (i == capacity) ? 0 : i;
} finally {
//完成遍历后释放锁
lock.unlock();
}
}
ArrayBlockingQueue
阻塞式获取和新增元素对应的就是生产者-消费者模型,虽然它也支持非阻塞式获取和新增元素(例如 poll()
和 offer(E e)
方法,后文会介绍到),但一般不会使用。
ArrayBlockingQueue
阻塞式获取和新增元素的方法为:
put(E e)
:将元素插入队列中,如果队列已满,则该方法会一直阻塞,直到队列有空间可用或者线程被中断。take()
:获取并移除队列头部的元素,如果队列为空,则该方法会一直阻塞,直到队列非空或者线程被中断。
这两个方法实现的关键就是在于两个条件对象 notEmpty
(非空) 和 notFull
(非满),这个我们在上文的构造方法中有提到。
接下来笔者就通过两张图让大家了解一下这两个条件是如何在阻塞队列中运用的。
假设我们的代码消费者先启动,当它发现队列中没有数据,那么非空条件就会将这个线程挂起,即等待条件非空时挂起。然后 CPU 执行权到达生产者,生产者发现队列中可以存放数据,于是将数据存放进去,通知此时条件非空,此时消费者就会被唤醒到队列中使用 take
等方法获取值了。
随后的执行中,生产者生产速度远远大于消费者消费速度,于是生产者将队列塞满后再次尝试将数据存入队列,发现队列已满,于是阻塞队列就将当前线程挂起,等待非满。然后消费者拿着 CPU 执行权进行消费,于是队列可以存放新数据了,发出一个非满的通知,此时挂起的生产者就会等待 CPU 执行权到来时再次尝试将数据存到队列中。
简单了解阻塞队列的基于两个条件的交互流程之后,我们不妨看看 put
和 take
方法的源码。
public void put(E e) throws InterruptedException {
//确保插入的元素不为null
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
//这里使用lockInterruptibly()方法而不是lock()方法是为了能够响应中断操作,如果在等待获取锁的过程中被打断则该方法会抛出InterruptedException异常。
lock.lockInterruptibly();
try {
//如果count等数组长度则说明队列已满,当前线程将被挂起放到AQS队列中,等待队列非满时插入(非满条件)。
//在等待期间,锁会被释放,其他线程可以继续对队列进行操作。
while (count == items.length)
notFull.await();
//如果队列可以存放元素,则调用enqueue将元素入队
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
put
方法内部调用了 enqueue
方法来实现元素入队,我们继续深入查看一下 enqueue
方法的实现细节:
private void enqueue(E x) {
//获取队列底层的数组
final Object[] items = this.items;
//将putindex位置的值设置为我们传入的x
items[putIndex] = x;
//更新putindex,如果putindex等于数组长度,则更新为0
if (++putIndex == items.length)
putIndex = 0;
//队列长度+1
count++;
//通知队列非空,那些因为获取元素而阻塞的线程可以继续工作了
notEmpty.signal();
}
从源码中可以看到入队操作的逻辑就是在数组中追加一个新元素,整体执行步骤为:
- 获取
ArrayBlockingQueue
底层的数组items
。 - 将元素存到
putIndex
位置。 - 更新
putIndex
到下一个位置,如果putIndex
等于队列长度,则说明putIndex
已经到达数组末尾了,下一次插入则需要 0 开始。(ArrayBlockingQueue
用到了循环队列的思想,即从头到尾循环复用一个数组) - 更新
count
的值,表示当前队列长度+1。 - 调用
notEmpty.signal()
通知队列非空,消费者可以从队列中获取值了。
自此我们了解了 put
方法的流程,为了更加完整的了解 ArrayBlockingQueue
关于生产者-消费者模型的设计,我们继续看看阻塞获取队列元素的 take
方法。
public E take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中元素个数为0,则将当前线程打断并存入AQS队列中,等待队列非空时获取并移除元素(非空条件)
while (count == 0)
notEmpty.await();
//如果队列不为空则调用dequeue获取元素
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
理解了 put
方法再看take
方法就很简单了,其核心逻辑和put
方法正好是相反的,比如put
方法在队列满的时候等待队列非满时插入元素(非满条件),而take
方法等待队列非空时获取并移除元素(非空条件)。
take
方法内部调用了 dequeue
方法来实现元素出队,其核心逻辑和 enqueue
方法也是相反的。
private E dequeue() {
//获取阻塞队列底层的数组
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//从队列中获取takeIndex位置的元素
E x = (E) items[takeIndex];
//将takeIndex置空
items[takeIndex] = null;
//takeIndex向后挪动,如果等于数组长度则更新为0
if (++takeIndex == items.length)
takeIndex = 0;
//队列长度减1
count--;
if (itrs != null)
itrs.elementDequeued();
//通知那些被打断的线程当前队列状态非满,可以继续存放元素
notFull.signal();
return x;
}
由于dequeue
方法(出队)和上面介绍的 enqueue
方法(入队)的步骤大致类似,这里就不重复介绍了。
为了帮助理解,我专门画了一张图来展示 notEmpty
(非空) 和 notFull
(非满)这两个条件对象是如何控制 ArrayBlockingQueue
的存和取的。
- 消费者:当消费者从队列中
take
或者poll
等操作取出一个元素之后,就会通知队列非满,此时那些等待非满的生产者就会被唤醒等待获取 CPU 时间片进行入队操作。 - 生产者:当生产者将元素存到队列中后,就会触发通知队列非空,此时消费者就会被唤醒等待 CPU 时间片尝试获取元素。如此往复,两个条件对象就构成一个环路,控制着多线程之间的存和取。
ArrayBlockingQueue
非阻塞式获取和新增元素的方法为:
offer(E e)
:将元素插入队列尾部。如果队列已满,则该方法会直接返回 false,不会等待并阻塞线程。poll()
:获取并移除队列头部的元素,如果队列为空,则该方法会直接返回 null,不会等待并阻塞线程。add(E e)
:将元素插入队列尾部。如果队列已满则会抛出IllegalStateException
异常,底层基于offer(E e)
方法。remove()
:移除队列头部的元素,如果队列为空则会抛出NoSuchElementException
异常,底层基于poll()
。peek()
:获取但不移除队列头部的元素,如果队列为空,则该方法会直接返回 null,不会等待并阻塞线程。
先来看看 offer
方法,逻辑和 put
差不多,唯一的区别就是入队失败时不会阻塞当前线程,而是直接返回 false
。
public boolean offer(E e) {
//确保插入的元素不为null
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列已满直接返回false
if (count == items.length)
return false;
else {
//反之将元素入队并直接返回true
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
poll
方法同理,获取元素失败也是直接返回空,并不会阻塞获取元素的线程。
public E poll() {
final ReentrantLock lock = this.lock;
//上锁
lock.lock();
try {
//如果队列为空直接返回null,反之出队返回元素值
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
add
方法其实就是对于 offer
做了一层封装,如下代码所示,可以看到 add
会调用没有规定时间的 offer
,如果入队失败则直接抛异常。
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
//调用offer方法如果失败直接抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
remove
方法同理,调用 poll
,如果返回 null
则说明队列没有元素,直接抛出异常。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
peek()
方法的逻辑也很简单,内部调用了 itemAt
方法。
public E peek() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当队列为空时返回 null
return itemAt(takeIndex);
} finally {
//释放锁
lock.unlock();
}
}
//返回队列中指定位置的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
在 offer(E e)
和 poll()
非阻塞获取和新增元素的基础上,设计者提供了带有等待时间的 offer(E e, long timeout, TimeUnit unit)
和 poll(long timeout, TimeUnit unit)
,用于在指定的超时时间内阻塞式地添加和获取元素。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列已满,进入循环
while (count == items.length) {
//时间到了队列还是满的,则直接返回false
if (nanos <= 0)
return false;
//阻塞nanos时间,等待非满
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
可以看到,带有超时时间的 offer
方法在队列已满的情况下,会等待用户所传的时间段,如果规定时间内还不能存放元素则直接返回 false
。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空,循环等待,若时间到还是空的,则直接返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
同理,带有超时时间的 poll
也一样,队列为空则在规定时间内等待,若时间到了还是空的,则直接返回 null。
ArrayBlockingQueue
提供了 contains(Object o)
来判断指定元素是否存在于队列中。
public boolean contains(Object o) {
//若目标元素为空,则直接返回 false
if (o == null) return false;
//获取当前队列的元素数组
final Object[] items = this.items;
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列非空
if (count > 0) {
final int putIndex = this.putIndex;
//从队列头部开始遍历
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
//释放锁
lock.unlock();
}
}
为了帮助理解 ArrayBlockingQueue
,我们再来对比一下上面提到的这些获取和新增元素的方法。
新增元素:
方法 | 队列满时处理方式 | 方法返回值 |
---|---|---|
put(E e) |
线程阻塞,直到中断或被唤醒 | void |
offer(E e) |
直接返回 false | boolean |
offer(E e, long timeout, TimeUnit unit) |
指定超时时间内阻塞,超过规定时间还未添加成功则返回 false | boolean |
add(E e) |
直接抛出 IllegalStateException 异常 |
boolean |
获取/移除元素:
方法 | 队列空时处理方式 | 方法返回值 |
---|---|---|
take() |
线程阻塞,直到中断或被唤醒 | E |
poll() |
返回 null | E |
poll(long timeout, TimeUnit unit) |
指定超时时间内阻塞,超过规定时间还是空的则返回 null | E |
peek() |
返回 null | E |
remove() |
直接抛出 NoSuchElementException 异常 |
boolean |
ArrayBlockingQueue
是 BlockingQueue
接口的有界队列实现类,常用于多线程之间的数据共享,底层采用数组实现,从其名字就能看出来了。
ArrayBlockingQueue
的容量有限,一旦创建,容量不能改变。
为了保证线程安全,ArrayBlockingQueue
的并发控制采用可重入锁 ReentrantLock
,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。并且,它还支持公平和非公平两种方式的锁访问机制,默认是非公平锁。
ArrayBlockingQueue
虽名为阻塞队列,但也支持非阻塞获取和新增元素(例如 poll()
和 offer(E e)
方法),只是队列满时添加元素会抛出异常,队列为空时获取的元素为 null,一般不会使用。
ArrayBlockingQueue
和 LinkedBlockingQueue
是 Java 并发包中常用的两种阻塞队列实现,它们都是线程安全的。不过,不过它们之间也存在下面这些区别:
- 底层实现:
ArrayBlockingQueue
基于数组实现,而LinkedBlockingQueue
基于链表实现。 - 是否有界:
ArrayBlockingQueue
是有界队列,必须在创建时指定容量大小。LinkedBlockingQueue
创建时可以不指定容量大小,默认是Integer.MAX_VALUE
,也就是无界的。但也可以指定队列大小,从而成为有界的。 - 锁是否分离:
ArrayBlockingQueue
中的锁是没有分离的,即生产和消费用的是同一个锁;LinkedBlockingQueue
中的锁是分离的,即生产用的是putLock
,消费是takeLock
,这样可以防止生产者和消费者线程之间的锁争夺。 - 内存占用:
ArrayBlockingQueue
需要提前分配数组内存,而LinkedBlockingQueue
则是动态分配链表节点内存。这意味着,ArrayBlockingQueue
在创建时就会占用一定的内存空间,且往往申请的内存比实际所用的内存更大,而LinkedBlockingQueue
则是根据元素的增加而逐渐占用内存空间。
ArrayBlockingQueue
和 ConcurrentLinkedQueue
是 Java 并发包中常用的两种队列实现,它们都是线程安全的。不过,不过它们之间也存在下面这些区别:
- 底层实现:
ArrayBlockingQueue
基于数组实现,而ConcurrentLinkedQueue
基于链表实现。 - 是否有界:
ArrayBlockingQueue
是有界队列,必须在创建时指定容量大小,而ConcurrentLinkedQueue
是无界队列,可以动态地增加容量。 - 是否阻塞:
ArrayBlockingQueue
支持阻塞和非阻塞两种获取和新增元素的方式(一般只会使用前者),ConcurrentLinkedQueue
是无界的,仅支持非阻塞式获取和新增元素。
ArrayBlockingQueue
的实现原理主要分为以下几点(这里以阻塞式获取和新增元素为例介绍):
ArrayBlockingQueue
内部维护一个定长的数组用于存储元素。- 通过使用
ReentrantLock
锁对象对读写操作进行同步,即通过锁机制来实现线程安全。 - 通过
Condition
实现线程间的等待和唤醒操作。
这里再详细介绍一下线程间的等待和唤醒具体的实现(不需要记具体的方法,面试中回答要点即可):
- 当队列已满时,生产者线程会调用
notFull.await()
方法让生产者进行等待,等待队列非满时插入(非满条件)。 - 当队列为空时,消费者线程会调用
notEmpty.await()
方法让消费者进行等待,等待队列非空时消费(非空条件)。 - 当有新的元素被添加时,生产者线程会调用
notEmpty.signal()
方法唤醒正在等待消费的消费者线程。 - 当队列中有元素被取出时,消费者线程会调用
notFull.signal()
方法唤醒正在等待插入元素的生产者线程。
关于 Condition
接口的补充:
Condition
是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock
对象中可以创建多个Condition
实例(即对象监视器),线程对象可以注册在指定的Condition
中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()
方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock
类结合Condition
实例可以实现“选择性通知” ,这个功能非常重要,而且是Condition
接口默认提供的。而synchronized
关键字就相当于整个Lock
对象中只有一个Condition
实例,所有的线程都注册在它一个身上。如果执行notifyAll()
方法的话就会通知所有处于等待状态的线程,这样会造成很大的效率问题。而Condition
实例的signalAll()
方法,只会唤醒注册在该Condition
实例中的所有等待线程。
没有回复内容