一. 简述
这篇文章将介绍几种常见的队列,本文将重点介绍优先队列以及优先队列底层的二叉堆并且实现基础算法(go实现),最后还会介绍一样Java并发包中的四种最常用的队列,分析其源码和使用场景。
那么下面就开始了!
二. 队列介绍
队列(queue
)是一种先进先出的
、操作受限的线性表。结构如下图:
队列中关键的几个方法:
// Queue 队列接口
type Queue interface {
Offer(interface{}) bool // 入队, 成功返回true, 已满返回false
Poll() interface{} // 出队
Size() uint // 返回队列大小
Peek() interface{} // 返回对头元素
IsEmpty() bool // 队列是否为空
}
核心方式就是出队和入队两个方法。
三. 使用场景
队列在程序涉及中也经常出现,最典型的就是操作系统的作业排队。还有常见的任务调度也需要将多个任务入队,依次调用;最常用的莫过于我们的消息中间间。
队列的分类如下图:
四. 队列实现
这里简单实现四种队列,分别是:链式队列、循环队列、双向队列和优先队列。
4.1. 链式队列
链式队列结构如下:
基础定义:
// LinkedNode 链表的结点
type LinkedNode struct {
data interface{}
next *LinkedNode
}
// LinkedQueue 队列定义
type LinkedQueue struct {
link *LinkedNode // 链表
count uint // 结点数量
top *LinkedNode // 头指针
tail *LinkedNode // 尾指针
}
这里就先实现队列中比较重要的方法:出队、入队和判空三个方法。
4.1.1. 判空
这里队头也会存储元素,所以空队列判断条件是:头指针和尾指针都指向nil。
func (l LinkedQueue) IsEmpty() bool {
return l.top == nil && l.tail == nil
}
4.1.2. 入队
func (l *LinkedQueue) Offer(data interface{}) bool {
// 构建结点
node := &LinkedNode{
data: data,
next: nil,
}
// 队列否为空
if l.IsEmpty() {
// 如果是空,将头尾结点都指向node
l.top, l.tail = node, node
} else {
// 如果不为空,将当前尾指针的next指向node结点,并移动尾指针指向node
l.tail.next, l.tail = node, node
}
// 结点数量加一
l.count++
return true
}
具体操作如下图:
4.1.3. 出队
func (l *LinkedQueue) Poll() interface{} {
// 先判空
if l.IsEmpty() {
return nil
}
// 将头指针指向的元素赋值给一个临时变量
temp := l.top
// 将头指针移动指向下一个元素的位置
l.top = l.top.next
// 断开出队元素与队列的联系
temp.next = nil
// 元素的数量减一
l.count--
// 用来处理头指针已经指向nil时,说明队列已经空了,那么需要将尾指针也指向nil
if l.top == nil {
l.tail = nil
}
// 返回数据
return temp.data
}
具体操作如图:
4.2. 循环队列
循环队列基于数组实现,通过index + 1取余数据大小用来实现头尾相连,以此实现循环队列。下图:
当数组容量是6,当前队尾指针指向了下标为5的位置,队头指向下标为2的位置,此时如果要往循环队列中进行入队的话,需要如下操作:
- 先计算入队元素的位置,当前队尾指向下标为5,那么下一个位置是:
(5 + 1) % 6 = 0
; - 接着在下标为0的位置插入元素,并将队尾指针移动到0位置。
这样就可以实现头尾相连了。
下面看一下具体的代码实现,先定义必要的类:
// ArrayQueue 数组队列
type ArrayQueue struct {
queue []interface{}
count uint // 队列大小
capacity uint // 队列容量
top uint // 队头索引
tail uint // 队尾索引
}
增加一个构造方法,用来初始化队列,初始化的时候头尾指针都指向0位置。
func NewArrayQueue(capacity uint) *ArrayQueue {
// 设置容量默认为10
if capacity <= 0 {
capacity = 10
}
return &ArrayQueue{
capacity: capacity,
count: 0,
top: 0,
tail: 0,
queue: make([]interface{}, capacity),
}
}
下面还是实现三个基本的方法:判满/空,入队,出队。
4.2.1. 判满/空
判空的策略和头尾相连的方式差不多,也是通过取余来计算是否头尾指针指向的是同一个位置,但是指向同一个位置并不足以判断是否满,因为队列为空的时候也是头尾指针指向同一个位置;这是上面定一个count
和capacity
两个变量就可以发挥作用了。如果count
和capacity
相同就是满,如果count
是0就是空。
func (a ArrayQueue) IsEmpty() bool {
return a.count == 0
}
func (a ArrayQueue) IsFull() bool {
return a.count == a.capacity
}
如果不想引入count
这个变量来监控队列中元素的变化情况,也可以在数组中少用一个元素空间,约定队列头指针在队列尾指针的下一位置作为队列满的标志。
4.2.2. 入队
入队需要判断队列是否充满,充满的话就不允许入队,返回false。
func (a *ArrayQueue) Offer(data interface{}) bool {
// 判断队列容量已满,返回false
if a.IsFull() {
return false
}
// 入队操作
a.queue[a.tail] = data
a.tail = (a.tail + 1) % a.capacity
a.count++
return true
}
过程如下图:
4.2.3. 出队
出队需要判断判断队列是否为空。
func (a *ArrayQueue) Poll() interface{} {
if a.IsEmpty() {
return nil
}
temp := a.queue[a.top]
a.top = (a.top + 1) % a.capacity
a.count--
return temp
}
4.3. 双向队列
双端队列就是支持队列在两端插入和删除元素,双端使用双向链表实现从两端删除和插入,在Java
中双端队列都实现了Deque
接口。
Jdk中的并发包中关于双端队列实现主要是:ArrayDeque
、LinkedBlockDeque
和ConcurrentLinkedDeque
三个类。
LinkedBlockDeque
和ConcurrentLinkedDeque
是并发安全的双端队列。
LinkedBlockDeque
是基于数组实现的一个有界阻塞队列,大小不能重新定义,在一个满队列中添加元素会发生阻塞。
ConcurrentLinkedDeque
是基于链表实现的无界队列,添加元素不会阻塞,但是生产和消费速度就需要差不多,不然生产的速度太快,就会导致内存耗尽,ConcurrentLinkedDeque
依赖CAS。
ArrayDeque
是大小可变的数组双端队列,不允许为null
。
这个双端队列在实际中并使很常用就不过多的去介绍了。
4.4. 优先队列
在介绍优先队列之前,先了解另一个数据结构:堆。那么什么是堆呢?
4.4.1. 什么是堆?
堆 是一棵任意结点的优先级小于其子节点 的完全二叉树 。
满二叉树:在一棵二叉树中,如果每一个层的结点数都达到最大值,则这个二叉树就是满二叉树 。如上图
完全二叉树:在一棵二叉树中,假设其深度为N(N>1)。除了第N层外,其它各层的节点数目均已达最大值,且第N层所有节点从左向右连续地紧密排列,这样的二叉树被称为完全二叉树 。如上图
4.4.2. 完全二叉树的性质
从下图可以看出完全二叉树的层序遍历就是数组中元素的存放顺序,所以完全二叉树非常适合使用数组进行存储,一棵具有n
个结点的完全二叉树拥有如下性质:
- 非根结点(下表
i > 1
)的父节点的下标是:i / 2
; - 结点(下标
i
)的左孩子结点的下标是2i
,如果2i > n
,那么结点i
不存在左孩子; - 结点(下标
i
)的右孩子结点的下标是:2i + 1
,如果2i+ 1 > n
,那么i
结点不存在右孩子;
当然也可以看出一棵普通的二叉树进行数组存储,会造成空间浪费,如上图右侧的一般二叉树。
4.4.3. 优先队列
一般队列都是遵循先进先出的原则,这就和日常生活中的排队一样,先到先得。而优先队列则是一种特殊的队列,优先队列与普通队列最大的不同点就在于出队顺序不一样。
优先队列出队顺序和入队顺序没关系,但是有入队元素的优先级有关系,优先级高的先出队,可以理解为VIP氪金游戏。
优先队列的实现可以有多种方法:
- 链表:入队的时候只需要将新元素链接到队列尾部,但是出队的时候需要遍历链表找到优先级最大值的元素,时间复杂度就变成了O(n)
- 数组:入队的时候需要将元素安装优先级排序,时间复杂度就是O(n),出队的时候就比较简单了直接返回优先级最大的元素就可以。
上面两种实现方法各有优缺点,接着看一下使用堆来实现效果会怎么样呢?
从堆的定义知道,堆就是满足特定性质的完全二叉树。在Java中优先队列是使用最小堆实现的,最大和最小堆区别在于,最大堆根结点是二叉堆中的最大值,最小值根结点是二叉堆中的最小值,所以性质也就不一样了:
最大堆:一棵任意结点的优先级小于其子节点 的完全二叉树 。
最小堆:一棵任意结点的优先级大于其子节点 的完全二叉树 。
最大堆和最小堆如下图:
下面实现堆相关的算法!
4.4.4. 基础定义
这里通过数组保存堆的结点信息,这里定义struct,保存相关的属性:
// MaxHeap 最大堆
type MaxHeap struct {
data []int // 存放数据
len int // 存储的元素个数
}
// NewMaxHeap 默认构造方法
func NewMaxHeap() *MaxHeap {
return &MaxHeap{
data: make([]int, 10),
len: 0,
}
}
4.4.5. 索引信息计算
这里实现获取结点的父结点和左右孩子结点的索引。先看下图:
这里需要注意一下,对于计算某一结点的父结点、右孩子和左孩子结点的索引。从下图看索引是从1开始的,但是在使用数组存储的时候是从0开始的,所以计算某一结点的父结点、右孩子和左孩子结点的索引需要略作调整。
// parent 返回某一个结点的父节点的索引
func (h MaxHeap) parent(index int) int {
return (index - 1) / 2
}
// left 返回索引右孩子的结点索引
func (h MaxHeap) left(index int) int {
return index*2 + 1
}
// right 返回索引右孩子的结点索引
func (h MaxHeap) right(index int) int {
return index*2 + 2
}
4.4.6. 增加元素
在增加元素,可能会出现不符合堆的数据结构的情况,即任意结点的优先级小于其子节点 。
这时就需要就行优先级的调整,这步操作就叫做上浮 。如下图:
下面看一下代码实现:
// Add 往堆里面添加元素
func (h *MaxHeap) Add(data int) {
// 将元素的值赋值到数组的最后一个位置
h.data[h.len] = data
// 增加
h.len++
// 上浮元素
h.floatUp(h.len - 1)
}
// floatUp 上浮
func (h *MaxHeap) floatUp(index int) {
// 索引大于0
for index > 0 {
// 计算父结点的索引位置
parent := h.parent(index)
// 如果父结点小于当前结点
if h.data[parent] < h.data[index] {
// 交换两个位置的值
h.data[parent], h.data[index] = h.data[index], h.data[parent]
// 将当前索引指向交换(父结点)的索引
index = parent
} else {
// 否则就是符合堆的数据结构,不需要调整,直接跳出循环就可以了
break
}
}
}
4.4.7. 删除元素
对于删除元素,只能从数组0位置元素删除(在增加元素的时候,已经确保0位置的元素是所有的元素中最大的那个元素),并且0位置的元素是二叉堆中的根结点,直接删除会出现问题。
对于上面说的问题,这里可以现将根结点(0位置的元素)和当前数组中最后一个元素(这个元是这个元素到根结点这条路径的最小值)交换对应位置的值。此时就会出现0位置的元素不符合堆的数据结构,此时就需要将0位置元素进行下浮操作。参看下图:
代码实现:
// PopMax 取出堆中最大值
func (h *MaxHeap) PopMax() (int, error) {
// 获取最大值,就是数组中索引为0的元素
max, err := h.Max()
if err != nil {
return 0, err
}
// 头尾交换
h.data[0], h.data[h.len - 1] = h.data[h.len - 1], h.data[0]
// 元素总数减一
h.len--
// 下浮
h.floatDown(0)
// 返回最大值
return max, nil
}
// Max 获取最大值
func (h MaxHeap) Max() (int, error) {
if h.IsEmpty() {
return 0, errors.New("当前最大堆是空的")
}
return h.data[0], nil
}
// floatDown 下浮
func (h *MaxHeap) floatDown(index int) {
// 索引值小于索引的最大值
for h.left(index) < h.len { {
// 先找到左子树索引
tempIndex := h.left(index)
// 判断右子树存在索引,并且 右子树是否大于左子树
if tempIndex + 1 < h.len && h.data[tempIndex + 1] > h.data[tempIndex] {
tempIndex++
}
// 此时tempIndex的索引值是左右子树最大值对应的索引值
// 判断index和tempIndex对应的值大小
if h.data[index] >= h.data[tempIndex] {
break
}
// 如果index的值小于tempIndex的值,那么需要交换两个位置的元素
h.data[index], h.data[tempIndex] = h.data[tempIndex], h.data[index]
// 将当前index指向tempIndex,进行下一轮判断
index = tempIndex
}
}
注意: 下浮时候需要注意需要找到左右子树最大的元素交换;另外还需要注意扩容和缩容的问题。
4.4.8. 实现优先队列
有了上面堆的实现,实现优先队列就比较简单了。
type PriorityQueue struct {
heap *MaxHeap
}
func NewPriorityQueue() *PriorityQueue {
return &PriorityQueue{
heap: NewMaxHeap(),
}
}
// Offer 入队
func (p *PriorityQueue) Offer(data int) {
p.heap.Add(data)
}
// 出队
func (p *PriorityQueue) Poll() (int, error) {
return p.heap.PopMax()
}
// 队列中元素个数
func (p *PriorityQueue) Size() int {
return p.heap.Size()
}
// 队头元素
func (p *PriorityQueue) Peek() (int, error) {
return p.heap.Max()
}
// 队列是否为空
func (p *PriorityQueue) IsEmpty() bool {
return p.heap.IsEmpty()
}
单元测试:
func TestPriorityQueue(t *testing.T) {
priorityQueue := NewPriorityQueue()
priorityQueue.Offer(10)
priorityQueue.Offer(20)
priorityQueue.Offer(15)
priorityQueue.Offer(40)
priorityQueue.Offer(30)
priorityQueue.Offer(50)
t.Log(priorityQueue.Size())
t.Log(priorityQueue.Peek())
t.Log(priorityQueue.Poll())
t.Log(priorityQueue.Poll())
priorityQueue.ToString()
t.Log(priorityQueue.Poll())
priorityQueue.ToString()
t.Log(priorityQueue.Poll())
t.Log(priorityQueue.Poll())
t.Log(priorityQueue.Peek())
}
五. Java常用的队列
下面看一下Java中常用的几种队列:阻塞队列和非阻塞队列。
阻塞队列提供了可阻塞的put
和take
方法,这两个方法于定时offer
和poll
等价。当队列满了之后在使用put
方法会被阻塞等到有空间可用的时候再将元素插入,另外在队列是空的时候,使用take
方法也会阻塞,直到有元素可用。在Java
中包含***BlockingQueue
都是阻塞队列。例如::ArrayBlockingQueue
、LinkedBlockingQueue
和PriorityBlockingQueue
等等。
非阻塞队列就是
5.1. ArrayBlockingQueue
下面看一个阻塞队列消费者生产者的实例:
public class BlockingQueue {
public static void main(String[] args) {
// 创建一个长度为5的阻塞队列
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
// 创建一个线程往队列中写入元素
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
blockingQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[ " + LocalDateTime.now() + "] 队列写入[" + i +"]");
}
System.out.println("[ " + LocalDateTime.now() + "] 队列写入完成");
}).start();
// 另一个线程进行出队操作
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!blockingQueue.isEmpty()) {
try {
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
结果:
[ 2021-11-20T17:58:57.999472] 队列写入[0]
[ 2021-11-20T17:58:58.021618] 队列写入[1]
[ 2021-11-20T17:58:58.021665] 队列写入[2]
[ 2021-11-20T17:58:58.021697] 队列写入[3]
[ 2021-11-20T17:58:58.021730] 队列写入[4] ---这里会阻塞1秒
[ 2021-11-20T17:58:58.987460] 队列写入[5]
[ 2021-11-20T17:58:59.992634] 队列写入[6]
[ 2021-11-20T17:59:00.997914] 队列写入[7]
[ 2021-11-20T17:59:02.002881] 队列写入[8]
[ 2021-11-20T17:59:03.006947] 队列写入[9]
[ 2021-11-20T17:59:03.007200] 队列写入完成
接下来看一个ArrayBlockingQueue是如何实现阻塞的操作的!
先看下ArrayBlockingQueue的属性和构造函数:
// 存储数据的数组
final Object[] items;
// 获取数据的索引,相当于头指针
int takeIndex;
// 添加元素的位置,相当于尾指针
int putIndex;
// 队列中元素的个数
int count;
// 控制并发访问的锁
final ReentrantLock lock;
// 非空条件对象,用于通知take方法中正在等待获取数据的线程
private final Condition notEmpty;
// 非满条件,用户通知put方法中等待添加数据的线程
private final Condition notFull;
// 创建一个具体容量的队列,默认是非公平队列
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 创建一个具体容量和是否公平的队列
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
从构造函数可以知道,内部使用数组对象存储所有数据,通过ReentrantLock
来同时控制入队和出队操纵,控制并发访问的问题。
另外需要注意notEmpty
和notFull
这两个是控制阻塞的信号:
-
notEmpty
用于存放等待调用take
方法的线程,这些线程会放到notEmpty
的等待队列中(单链表),这个时候队列是没有数据的,当队列中数据时候会通过notEmpty
去唤醒对待队列中等待的线程去take
数据。 -
notFull
用于存放等待调用的put
方法的线程,和notEmpty
一样也会放到notFull
的等待队列中,这个时候就是队列中的数据已满,当队列中数据被消费之后,就会通过notFull
唤醒等待的线程去put
数据。
接着看一下出队和入队的相关方法这些方法有啥区别:
- 出队:
poll/remove/take
- poll
:队列为空返回null
,支持设置等待超时时间
- remove
:如果队列操作失败,抛出异常
- take
:队列为空则会阻塞,直到有元素才可以进行出队操作
- 入队:
put/add/offer
- put
:队列满则会阻塞队列,直到有空间才可以进行入队操纵
- add
:入队成功返回true
,否则抛出异常
- offer
:入队成功返回true
,否则返回false
接着看一个put
和take
源码,先看put
源码:
public void put(E e) throws InterruptedException {
// 判断元素的是否null
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// 尝试获取锁
lock.lockInterruptibly();
try {
// 如果队列满之后阻塞
while (count == items.length)
notFull.await();
// 队列没有满直接入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
// 下个位置如果已经到了数组的末尾,就会直接数组的第一个位置
if (++putIndex == items.length) putIndex = 0;
count++;
// 唤醒阻塞的线程
notEmpty.signal();
}
接着看一下take源码:
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列是空,阻塞
while (count == 0)
notEmpty.await();
// 队列不是空,直接出队
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取队头指向的元素
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
// 更新迭代器
itrs.elementDequeued();
// 唤起被阻塞的线程
notFull.signal();
return e;
}
这里提一个队列是是通过数据实现的循环队列,具体的可以上面4.2
实现的循环队列,4.2
实现的是取余重新置0的操作,这里的是重新置0,就不详细介绍了。
总结一下:ArrayBlockingQueue
是不能扩容,利用takeIndex
和putIndex
循环利用数组,并且通过重入锁和notEmpty/notFull
两个条件队列保证并发安全。
5.2. LinkedBlockingQueue
从上面可以看出ArrayBlockingQueue
是限制队列大小的阻塞队列,接着看一下LinkedBlockingQueue
,它是基于链表有界的阻塞队列。
先看一个LinkedBlockingQueue
的构造函数,最大容量限制就是Integer.MAX_VALUE
。
// 创建默认容量为Integer.MAX_VALUE的队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 创建指定容量的队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
然后是数据结点,就是一个单链表,很熟悉。
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
接着看一下LinkedBlockingQueue
的属性,这里的属性其实和ArrayBlockingQueue
是差不多的,但是LinkedBlockingQueue
使用了takeLock
和putLock
两个锁进行新增和移除元素操作,所以统计元素个数的count
也被声明为原子变量。
private final AtomicInteger count = new AtomicInteger();
// 头指针
transient Node<E> head;
// 尾指针
private transient Node<E> last;
// 出队重入锁
private final ReentrantLock takeLock = new ReentrantLock();
// 非空条件信号变量
private final Condition notEmpty = takeLock.newCondition();
// 入队重入锁锁
private final ReentrantLock putLock = new ReentrantLock();
// 非满条件信号变量
private final Condition notFull = putLock.newCondition();
接着也和上面一样,看一下入队和出队的源码!
首先是入队put
:
public void put(E e) throws InterruptedException {
// 如果入队元素是null,则抛出异常
if (e == null) throw new NullPointerException();
final int c;
// 构建结点
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 获取锁
putLock.lockInterruptibly();
try {
// 如果达到最大容量,调用notFull的await方法,阻塞当前线程,等待其他线程调用notFull方法唤醒自己
while (count.get() == capacity) {
notFull.await();
}
// 如果队列没有满,入队
enqueue(node);
// 原子count先赋值再加一操作
c = count.getAndIncrement();
// 接着判断当前数据量 + 1 没有操作最大容量,则唤醒其他的被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果队列数据量是0,则唤醒被notEmpty阻塞的线程
if (c == 0)
signalNotEmpty();
}
接着出队take
,知道了put
方法在看take
方法其实就很简单了,就是一个相反的操作,就不做过多的解释了。
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
这个LinkedBlockingQueue
其实和4.1
中实现的链式队列差不多,可以看一下。
5.3. PriorityBlockingQueue
PriorityBlockingQueue
和4.4
基于二叉堆实现的优先队列是一样的,但是PriorityBlockingQueue
是最小堆,4.4
是最大堆。
接下来看一下这个优先队列中的实现细节和4.4
实现的有那些区别。这个PriorityBlockingQueue
其实和ArrayBlockingQueue
在并发控制上大体上是一样。就是底层数据存储是不一样。
注意:在使用PriorityBlockingQueue
时候需要实现Comparator
接口,以便确定优先级的大小。
先看一下PriorityBlockingQueue
中的属性:
// 默认队列大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组最大容量,但是几乎都不会出现超过Integer.MAX_VALUE的数据,也可以认为是无界队列
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 底层存储数据结构是数组,最小堆可以用数组实现
private transient Object[] queue;
// 队列容量
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 可重入的独占锁(独占锁),同时只有一个线程在入队和出队
private final ReentrantLock lock = new ReentrantLock();
//
private final Condition notEmpty = lock.newCondition();
// 自旋锁,CAS使得同时只有一个线程可以进行扩容,0:没有进行扩容,1:正在进行扩容
private transient volatile int allocationSpinLock;
构造函数:
// 默认是容量为11的优先队列
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定容量优先队列
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 指定容量和比较类的
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}
接着看一下入队操作put,这里入队操作启动和4.4.6实现最大堆的增加元素是差不多了。
public boolean offer(E e) {
// 入队元素为空,抛出异常
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
// 判断是否需要扩容
while ((n = size) >= (cap = (es = queue).length))
// 扩容操作
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
// 针对是否有比较器区别处理,此次执行完之后,入队,并上浮调整维持二叉堆的结构
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
// 数量增加
size = n + 1;
// 唤醒notEmpty条件队列中等待的线程
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
return true;
}
接着看一下上浮操作,这个原理和4.4.6
的上浮是一样的,不做详细讲解。
private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 无符号右移一位,相当于 / 2
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (key.compareTo((T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = key;
}
接着看一下扩容操作:
private void tryGrow(Object[] array, int oldCap) {
// 这里先释放获取的锁,因为还是扩容所以比较耗时,并且不释放出队和入队就没进行
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 这里使用自旋锁,0表没有进行扩容,此时扩容需要把自旋锁从设置为1,表示正在进行扩容
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
// 确定扩容之后新的容量大小,这里可以看到小于64的时候,扩容是2n+2,大于64是3n/2
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 判断新容量大小是否超过最大容量,超过之后在旧的容量大小上加1,如果还大则抛出异常
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 新数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 最后扩容完毕之后,修改cas值的为0,没有正在扩容
allocationSpinLock = 0;
}
}
// 让其他的线程让出CPU,优先让第一个进行的线程执行完毕
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 获取锁
lock.lock();
// 复制元素到新的数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
关于扩容需要关注以下几点:
- 进行扩容方法先释放锁,因为还是扩容所以比较耗时,并且不释放出队和入队就没进行;
- 通过CAS自旋锁锁定第一个进行的线程进行新容量的计算
- 当有第一个线程进行扩容操作,其他的线程需要让出CPU,保证第一个线程完整执行完扩容操作
- 在将原来的值复制新的数组时候需要先获取锁
最后在看一个出队的take
方法,这里出队和上面4.4.7
删除元素原理是一样,首位元素调换,然后下浮调整维持二叉堆的结构。
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 二叉堆是空/数组为空,就会返回null,这时就会发生阻塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 释放锁
lock.unlock();
}
// 返回出队元素
return result;
}
具体的出队操作还的在dequeue
中看:
private E dequeue() {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;
// 获取数组中第一个元素也就是二叉堆的根结点
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
//
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}
下浮操作:这里的下浮操作和4.4.7
中实现的下浮操作,略有差异,但是大体都一样的。
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
// assert n > 0;
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // 只需要遍历到叶子结点就可以了
while (k < half) {
int child = (k << 1) + 1; // 左孩子的索引下标
Object c = es[child];
int right = child + 1; // 右孩子索引下标
// 找左右孩子最小的元素
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
if (key.compareTo((T) c) <= 0)
break;
// 将上面找到最小值赋值给当前结点
es[k] = c;
// 接着最小值对应的下标赋值给下一个比较下标
k = child;
}
// 找到存放位置,放入元素
es[k] = key;
}
大体工程总结:
- 先加锁
- 出队,返回null的话就是队空,将阻塞
- 出队返回队首元素(数组0位置元素),并将队尾元素(数组中最后一个元素)置为null。
- 自上而下重新进行堆化,然后再将队尾元素放入合适位置。
具体如图所示:
注意点:为什么PriorityBlockingQueue
不需要notFull
的条件队列呢?这是因为PriorityBlockingQueue
在没有空间的时候会自动扩容,也就不存在队列满的情况,所以不需要notFull
这个条件队列了。
5.4. DelayQueue
DelayQueue
是Java
并发包下的延时阻塞队列,一般用于实现定时任务。在使用DelayQueue
的时候存储的元素需要实现Delayed
接口。DelayQueue
底层是依托于PriorityQueue
实现的,每个元素都有一个过期时间。
先看一下这里面一些属性:
// 独占锁,用于控制并发
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列存放工作任务
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时)
private Thread leader;
// 条件变量,表示当前是否有可取元素
private final Condition available = lock.newCondition();
接着看一下构造函数,这个构造函数就比较简单了!
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
入队操作,DelayQueue
中的入队操作都是使用的offer
函数,因为DelayQueue
是阻塞队列并且PriorityQueue
是无界队列(),所以入队不会出现超时的问题。这里直接看一下offer方法:
public boolean offer(E e) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 调用优先队列的入队操作
q.offer(e);
// 如果添加的元素是堆顶元素
if (q.peek() == e) {
// leader置为空
leader = null;
// 唤醒等待在条件available上的线程
available.signal();
}
return true;
} finally {
// 释放锁
lock.unlock();
}
}
DelayQueue
是阻塞队列,所有出队的方法好几个,这里只看一下最复杂的take
方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取堆顶元素
E first = q.peek();
// 获取到的是null,阻塞
if (first == null)
available.await();
else {
// 获取堆顶元素的过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
// 小于0L,说明到期了,出队
return q.poll();
// 如果没有过期,将获取的first元素设为null,方便垃圾回收
first = null; // don't retain ref while waiting
if (leader != null)
// leader有线程占用,其他线程的话就是等待
available.await();
else {
// leader没有被占用,就将当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 这里等待delay之后自动唤醒
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程,则将leader设置为null,这样可以让其他线程获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 出队之后,如果leader是null并且堆顶元素不是null,就唤醒等待的线程
if (leader == null && q.peek() != null)
// signal只是把等待线程加入AQS的队列里面,没有真正的唤醒
available.signal();
// 解锁之后,此时真正的唤醒
lock.unlock();
}
}
大体流程:
- 加锁,判断队列头(堆顶)为空,为空直接阻塞等待;
- 判断堆顶元素是否到期,到期之后使用poll方法弹出元素;
- 如果没有到期,就判断是否有其他的线程在等待,有的话就需要等待;
- 如果没有其他线程在等待,就在当前线程中等待delay时候后唤醒,再次尝试获取数据;
- 获取到元素之后在唤醒下一个等待的线程,最后解锁;
测试代码:
class DelayMessage implements Delayed {
// 延时时间
private long delay;
// 到期时间
private long expire;
// 消息
private String msg;
// 创建消息的时间
private long now;
public DelayMessage(long delay, String msg) {
this(delay, System.currentTimeMillis() + delay, msg, System.currentTimeMillis());
}
public DelayMessage(long delay, long expire, String msg, long now) {
this.delay = delay;
this.expire = expire;
this.msg = msg;
this.now = now;
}
@Override
public String toString() {
return "延迟消息[" +
"delay:" + delay +
", expire:" + expire +
", msg:'" + msg + '\'' +
", now:" + now +
']';
}
// 获取延时时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 用于延时队列内部排序比较
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch downLatch = new CountDownLatch(2);
DelayQueue<DelayMessage> delayQueue = new DelayQueue<>();
new Thread(() -> {
Integer a = 1;
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
DelayMessage message = new DelayMessage(3000 + new Random().nextInt(1000), "消息" + (a++));
delayQueue.offer(message);
System.out.println("当前的延时队列数据有:" + delayQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
downLatch.countDown();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
DelayMessage message = delayQueue.take();
System.out.println("[" + LocalDateTime.now() + "] " + message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
downLatch.countDown();
}
}
}).start();
downLatch.await();
}
}
六. 线程池中的缓冲队列
线程池中常用几种缓冲队列:ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
和SynchronizedQueue
四种。
JDK中定义了几个常用的线程池:
- 定长线程池,可控制线程的最大并发数,超出的线程会在队列中等待,这里使用的缓冲队列是
LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 可缓冲的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程,这里使用的是:
SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 单线程线程池,一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行, 这里使用的缓冲队列是
LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 定长线程池,支持定时和周期性任务执行,这个队列使用的是
DelayQueue
的延时队列
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}