JDK数组阻塞队列源码深入剖析
前言
在前面一篇文章从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实现了一个低配版的数组阻塞队列。在这篇文章当中我们将仔细介绍JDK具体是如何实现数组阻塞队列的。
阻塞队列的功能
而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是阻塞队列在并发下情况是安全的。阻塞队列的主要的需求如下:
- 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。
- 所有的队列操作都要是并发安全的。
- 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。
- 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。
- 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。
上面就是数组阻塞队列给我们提供的最核心的功能,其中将线程挂起和唤醒就是阻塞队列的核心,挂起和唤醒体现了“阻塞”这一核心思想。
数组阻塞队列设计
阅读这部分内容你需要熟悉可重入锁ReentrantLock
和条件变量Condition
的使用。
数组的循环使用
因为我们是使用数组存储队列当中的数据,从下表为0的位置开始,当我们往队列当中加入一些数据之后,队列的情况可能如下,其中head表示队头,tail表示队尾。
在上图的基础之上我们在进行四次出队操作,结果如下:
在上面的状态下,我们继续加入8个数据,那么布局情况如下:
我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。
字段设计
在JDK当中数组阻塞队列的实现是ArrayBlockingQueue
类,在他的内部是使用数组实现的,我们现在来看一下它的主要的字段,为了方便阅读将所有的解释说明都写在的注释当中:
/** The queued items */
final Object[] items; // 这个就是具体存储数据的数组
/** items index for next take, poll, peek or remove */
int takeIndex; // 因为是队列 因此我们需要知道下一个出队的数据的下标 这个就是表示下一个将要出队的数据的下标
/** items index for next put, offer, or add */
int putIndex; // 我们同时也需要下一个入队的数据的下标
/** Number of elements in the queue */
int count; // 统计队列当中一共有多少个数据
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; // 因为阻塞队列是一种可以并发使用的数据结构
/** Condition for waiting takes */
private final Condition notEmpty; // 这个条件变量主要用于唤醒被 take 函数阻塞的线程 也就是从队列当中取数据的线程
/** Condition for waiting puts */
private final Condition notFull; // 这个条件变量主要用于唤醒被 put 函数阻塞的线程 也就是从队列当中放数据的线程
构造函数
构造函数的主要功能是申请指定大小的内存空间,并且对类的成员变量进行赋值操作。
public ArrayBlockingQueue(int capacity) {
// capacity 表示用与存储数据的数组的长度
this(capacity, false);
}
// fair 这个参数主要是用于说明 是否使用公平锁
// 如果为 true 表示使用公平锁 执行效率低 但是各个线程进入临界区的顺序是先来后到的顺序 更加公平
// 如果为 false 表示使用非公平锁 执行效率更高
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 对变量进行赋值操作
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put函数
这个函数是阻塞队列对核心的函数之一了,首先我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。注意还有一个很重要的一点是,当我们往队列当中加入一个数据之后需要发一个信号给其他被take
函数阻塞的线程,因为这些线程在取数据的时候可能队列当中已经空了,因此需要将这些线程唤醒。
public void put(E e) throws InterruptedException {
checkNotNull(e); // 保证输入的数据不为 null 代码在下方
final ReentrantLock lock = this.lock;
// 进行加锁操作,因为下面是临界区
lock.lockInterruptibly();
try {
while (count == items.length) // 如果队列已经满了 也就是队列当中数据的个数 count == 数组的长度的话 就需要将线程挂起
notFull.await();
// 当队列当中有空间的之后将数据加入到队列当中 这个函数在下面仔细分析 代码在下方
enqueue(e);
} finally {
lock.unlock();
}
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 进入这个函数的线程已经在 put 函数当中加上锁了 因此这里不需要加锁
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 因为这个数据是循环使用的 因此可以回到下标为0的位置
// 因为队列当中的数据可以出队 因此下标为 0 的位置不存在数据可以使用
putIndex = 0;
count++;
// 在这里需要将一个被 take 函数阻塞的线程唤醒 如果调用这个方法的时候没有线程阻塞
// 那么调用这个方法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
notEmpty.signal();
}
注意:这里有一个地方非常容易被忽略,那就是在将线程挂起的时候使用的是while
循环而不是if
条件语句,代码:
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
这是因为,线程被唤醒之后并不会立马执行,因为线程在调用await
方法的之后会释放锁