文章目录
【JUC并发编程系列】深入理解Java并发机制:阻塞队列详解与简单纯手写(七、BlockingQueue、ArrayBlockingQueue、LinkedBlocking)
1. 简单回顾
1.1 数组结构和链表结构
1.1.1 数组结构
数组是一种线性的数据结构,其中的元素是按照顺序存储在连续的内存空间中。
特点
- 数组中的所有元素必须是相同的类型。
- 数组的大小在创建时就已经确定,并且通常不可更改。
- 数组中的每个元素都可以通过索引直接访问。
优点
- 快速访问:由于数组是连续存储的,可以通过计算偏移地址直接访问任意位置的元素,时间复杂度为O(1)。
- 支持随机访问:可以直接通过索引访问任何位置的元素。
缺点
- 插入和删除效率低:当需要在数组中间插入或删除元素时,可能需要移动大量元素来保持数组的连续性,时间复杂度为O(n)。
- 内存要求高:数组需要预先分配足够的内存空间,即使部分空间未被使用也会占用内存资源。
- 大小固定:数组的大小在创建时就已经确定,无法动态调整。
1.1.2 链表结构
链表是一种线性的数据结构,其中的元素通过指针链接在一起,不一定存储在连续的内存空间中。
特点
- 每个元素(节点)包含两部分:数据和指向下一个元素的指针。
- 链表的大小可以在运行时动态改变。
优点
- 插入和删除效率高:只需要修改相邻节点之间的指针即可完成插入或删除操作,时间复杂度通常为O(1)。
- 灵活的内存管理:链表的大小可以动态调整,不需要预先分配固定大小的内存空间。
缺点
- 访问速度较慢:要访问链表中的某个元素,需要从头节点开始遍历到目标节点,时间复杂度为O(n)。
- 额外的内存开销:每个节点除了存储数据外还需要存储指向下一个节点的指针,增加了内存的使用。
总结
- 数组适用于需要快速随机访问的场景,但在频繁的插入和删除操作方面效率较低。
- 链表适用于需要频繁插入和删除的场景,但随机访问效率不高。
选择哪种数据结构取决于具体的应用需求。例如,在实现缓存或需要快速访问的场景中,数组可能更为适合;而在实现动态列表或需要频繁更新的场景中,链表可能是更好的选择。
1.2 有界队列与无界队列
在计算机科学中,队列是一种遵循先进先出(FIFO, First In First Out)原则的数据结构。根据队列是否具有预设的最大容量,可以将队列分为有界队列和无界队列。
有界队列
- 定义:有界队列是指具有固定最大长度的队列。一旦队列中的元素数量达到这个最大值,就不能再添加新的元素,除非队列中有元素被移除。
- 特点:
- 通常用于需要限制资源使用的场景,比如消息队列、任务调度等。
- 可以防止资源耗尽或过载的情况发生。
- 当队列满时,新来的元素可能会被拒绝或导致阻塞,直到有足够的空间可用。
- 实现上可以通过数组或链表来实现,并且需要额外的逻辑来跟踪队列的边界。
无界队列
- 定义:无界队列是指没有预设最大长度的队列,理论上可以无限地添加元素。
- 特点:
- 通常用于不需要严格控制资源使用量的场景。
- 随着元素的增加,队列会动态扩展其存储空间。
- 在实际应用中,虽然称为“无界”,但受限于系统内存或其他资源限制,实际上还是存在一个上限的(
Integer.MAX_VALUE
)。 - 实现上通常采用链表或者可变数组等数据结构,可以灵活地进行扩展。
总结
- 主要区别:有界队列有固定的大小限制,而无界队列则可以根据需要动态扩展。
- 应用场景:选择哪种类型的队列取决于具体的应用需求。例如,在网络编程或并发处理中,为了防止资源耗尽,通常会选择有界队列;而在不需要严格控制资源的情况下,则可以选择无界队列。
1.3 Lock锁使用回顾
ReentrantLock
类:
-
lock()
:加锁操作,如果此时有竞争会进入等待队列中阻塞直到获取锁。 -
lockInterruptibly()
:加锁操作,但是优先支持响应中断。 -
tryLock()
:尝试获取锁,不等待,获取成功返回true
,获取不成功直接返回false
。 -
tryLock(long timeout, TimeUnit unit)
:尝试获取锁,在指定的时间内获取成功返回true
,获取失败返回false
。 -
unlock()
:释放锁。
Condition
类:
-
通常和
ReentrantLock
一起使用的 -
await()
:阻塞当前线程,并释放锁。 -
signal()
:唤醒一个等待时间最长的线程。
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
System.out.println("1");
condition.await();
System.out.println("2");
} catch (Exception e) {
} finally {
lock.unlock();
}
}).start();
try {
Thread.sleep(2000);
} catch (Exception e) {
}
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
condition.signal();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}).start();
}
2. 什么是阻塞队列
阻塞队列(Blocking Queue)是一种特殊的队列数据结构,它提供了额外的同步机制来处理多线程环境下的并发访问问题。阻塞队列在队列为空时,从队列获取元素的操作会被阻塞;同样地,当队列已满时,向队列添加元素的操作也会被阻塞。这种特性使得阻塞队列非常适合用作生产者-消费者模型中的数据交换容器。
阻塞队列的特点
- 线程安全:阻塞队列内部实现了线程安全机制,可以被多个线程安全地共享。
- 阻塞行为:当队列满时,生产者线程会被阻塞,直到有消费者线程消费掉一个元素;当队列为空时,消费者线程会被阻塞,直到有生产者线程添加了一个元素。
- 可选的等待策略:阻塞队列提供了一些方法,如
take()
和put()
方法,这些方法会在相应的条件不满足时阻塞调用线程。此外还提供了一些非阻塞的方法,如offer()
和poll()
方法,这些方法不会阻塞线程,而是返回一个指示是否成功执行的值。
阻塞队列的使用场景
- 生产者-消费者模式:阻塞队列可以作为生产者和消费者之间共享的数据容器。
- 限流:阻塞队列可以用作任务队列,限制同时处理的任务数量。
- 异步处理:阻塞队列可以用于异步处理任务,例如在网络请求处理中,可以将请求放入阻塞队列中,由工作线程池中的线程来处理这些请求。
Java
中的BlockingQueue
接口是一个线程安全的存取队列,适用于生产者消费者的应用场景中,支持两个附加操作:
-
生产者线程会一直不断的往阻塞队列中放入数据,直到队列满了为止。队列满了后,生产者线程阻塞等待消费者线程取出数据。
-
消费者线程会一直不断的从阻塞队列中取出数据,直到队列空了为止。队列空了后,消费者线程阻塞等待生产者线程放入数据。
3. BlockingQueue接口
BlockingQueue
接口提供了一组方法来处理队列的操作,这些方法可以根据不同的情况选择使用。下面是一些常用的方法及其行为总结:
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除方法 | remove(o) | poll() | take(o) | poll(o, timeout, timeunit) |
检查方法 | element() | peek() | — | — |
抛出异常
- 插入方法
add(o)
: 如果队列已满,尝试插入数据时会抛出IllegalStateException
异常。如果数据类型不匹配,则会抛出ClassCastException
;如果元素为null
,则会抛出NullPointerException
;如果元素的某些属性阻止其被添加到队列中,则会抛出IllegalArgumentException
。remove(o)
: 从队列中移除一个指定的元素,如果队列中存在该元素,则移除并返回true
;如果队列中不存在该元素,则返回false
。需要注意的是,remove()
是Queue
接口的方法,而remove(o)
是BlockingQueue
接口的方法。element()
: 如果队列为空,则抛出NoSuchElementException
异常;如果队列不为空,则返回队列头部的元素而不移除它。
返回特殊值
- 插入方法
offer(o)
: 如果队列未满,则插入元素并返回true
;如果队列已满,则返回false
。
- 移除方法
poll()
: 如果队列不为空,则移除并返回队列头部的元素;如果队列为空,则返回null
。peek()
: 如果队列不为空,则返回队列头部的元素但不移除它;如果队列为空,则返回null
。
一直阻塞
- 插入方法
put(o)
: 如果队列已满,则一直阻塞直到队列中有空间可用。如果线程在此期间被中断,则会抛出InterruptedException
。
- 移除方法
take()
: 如果队列不为空,则移除并返回队列头部的元素;如果队列为空,则一直阻塞直到队列中有元素可用。如果线程在此期间被中断,则会抛出InterruptedException
。
超时退出
- 插入方法
offer(o, timeout, timeUnit)
: 如果队列未满,则插入元素并立即返回true
;如果队列已满,则阻塞指定的时间等待队列可用。如果线程在此期间被中断,则会抛出InterruptedException
。如果在指定时间内队列仍未可用,则返回false
。
- 移除方法
poll(timeout, timeUnit)
: 如果队列不为空,则移除并返回队列头部的元素;如果队列为空,则阻塞指定的时间等待队列可用。如果线程在此期间被中断,则会抛出InterruptedException
。如果在指定时间内队列仍未可用,则返回null
。
注意事项
BlockingQueue
不允许插入null
元素,否则会抛出NullPointerException
。remove(o)
和remove()
方法的区别在于前者是BlockingQueue
接口的方法,后者是Queue
接口的方法。remove(o)
用于移除指定元素,而remove()
用于移除并返回队列头部的元素。
4. Java里的阻塞队列
-
ArrayBlockingQueue
:一个由数组结构组成的有界阻塞队列。 -
LinkedBlockingQueue
:一个由链表结构组成的有界阻塞队列。 -
PriorityBlockingQueue
:一个支持优先级排序的无界阻塞队列。 -
DelayQueue
:一个使用优先级队列实现的无界阻塞队列。 -
SynchronousQueue
:一个不存储元素的阻塞队列。 -
LinkedTransferQueue
:一个由链表结构组成的无界阻塞队列。 -
LinkedBlockingDeque
:一个由链表结构组成的双向阻塞队列。
4.1 ArrayBlockingQueue
ArrayBlockingQueue
是基于数组(array-based
)的先进先出(FIFO)有界(bounded)阻塞队列。
-
ArrayBlockingQueue
是基于数组实现; -
存入方法,采用
lock
锁保证存取线程安全问题; -
ArrayBlockingQueue
属于有界队列,默认的情况下会创建指定大小的数组,名称为items
如果现在设置队列容量限制过大的话有可能会引发内存溢出的问题; -
ArrayBlockingQueue
读写都会使用到同一把锁(ReentrantLock
)。
// 有界
BlockingQueue<String> strings = new ArrayBlockingQueue<>(2);
strings.offer("一");
strings.offer("二");
// 先进先出原则 取出 一 同时从队列中删除
System.out.println(strings.poll());
// 先进先出原则 取出 二 同时从队列中删除
System.out.println(strings.poll());
// 先进先出原则 此时队列中没有数据了 返回 null
System.out.println(strings.poll());
-
strings.poll(3, TimeUnit.SECONDS);
:如果3s内没有从队列中获取到内容,则当前线程会阻塞等待,超时时间为3s。 -
strings.offer("xiaowei", 3, TimeUnit.SECONDS);
当队列满了,继续投递数据到队列中当前线程会阻塞等待。
4.2 ArrayBlockingQueue 实现生产者与消费者模型
public static void main(String[] args) {
/**
* 需要定义容器(队列) 缓存 生产者线程投递到队列中的数据内容
*/
ArrayBlockingQueue<Integer> strings = new ArrayBlockingQueue<>(20);
new Thread(() -> {
for (int i = 0; i < 30; i++) {
boolean result = strings.offer(i);
System.out.println(Thread.currentThread().getName() + ",投递消息:"+i+",投递结果"+ result);
}
}, "生产者线程").start();
new Thread(() -> {
while (true) {
Integer poll = strings.poll();
if (poll != null) {
System.out.println(Thread.currentThread().getName() + ",消费消息" + poll);
}
}
}, "消费者线程").start();
}
4.3 纯手写ArrayBlockingQueue
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 赵立
*/
public class TestArrsyBlockingQueue<E> {
ReentrantLock lock = new ReentrantLock();
/**
* 基于 ArrayList 集合实现队列容量
*/
private List<E> arrayList;
/**
* 初始队列容量
*/
private int capacity;
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public TestArrsyBlockingQueue(int capacity) {
this.capacity = capacity;
arrayList = new ArrayList<E>(capacity);
}
public boolean offer(E e) {
lock.lock();
try {
// 先检查队列是否已满
if (arrayList.size() == capacity) {
return false;
}
boolean result = arrayList.add(e);
if (result) {
notEmpty.signal(); // 通知消费者
}
return result;
} finally {
lock.unlock();
}
}
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
//把 timeout 这个以 unit 单位表示的时间值转换成纳秒
long nanos = unit.toNanos(timeout);
try {
//当队列容器满的情况下
while (arrayList.size() == capacity) {
if (nanos <= 0) {
//判断是否过了设定的超时时间
return false;
}
/**
* 生产者线程可以调用 notFull.awaitNanos(nanos) 来等待最多 nanos 纳秒的时间。
* 如果在这段时间内队列变得不满了(即消费者消费了一些元素),生产者线程会被唤醒并继续执行。
* 如果队列在这段时间内仍然满了,生产者线程会返回剩余的等待时间,并可能采取进一步的行动,比如重试或者放弃等待。
*/
nanos = notFull.awaitNanos(nanos);
}
return offer(e);
} finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
return arrayList.size() == 0 ? null : dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
long nanos = unit.toNanos(timeout);
try {
//当前队列容器中的数据为空
while (arrayList.size() == 0) {
if (nanos <= 0) {
//判断是否过了设定的超时时间
return null;
}
//消费者线程阻塞等待
nanos = notEmpty.awaitNanos(nanos);
}
return poll();
} finally {
lock.unlock();
}
}
private E dequeue() {
E e = arrayList.get(0);
if (e != null) {
arrayList.remove(0);
//唤醒生产者线程
notFull.signal();
}
return e;
}
}
4.4 LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表的阻塞队列,它同样实现了 BlockingQueue
接口,与 ArrayBlockingQueue
不同的是,它可以拥有一个无限大的容量(默认情况下),也可以被配置为一个有界的队列。
下面是基于 LinkedBlockingQueue
的示例代码:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueExample {
public static void main(String[] args) {
// 创建一个有界的 LinkedBlockingQueue,容量为 2
LinkedBlockingQueue<String> strings = new LinkedBlockingQueue<>(2);
// 添加元素到队列
strings.offer("一");
strings.offer("二");
// 先进先出原则,取出 "一" 并从队列中删除
System.out.println(strings.poll());
// 先进先出原则,取出 "二" 并从队列中删除
System.out.println(strings.poll());
// 先进先出原则,此时队列中没有数据了,返回 null
System.out.println(strings.poll());
// 使用带超时的 poll 方法
try {
// 如果3s内没有从队列中获取到内容,则当前线程会阻塞等待,超时时间为3s
System.out.println(strings.poll(3, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread was interrupted.");
}
// 使用带超时的 offer 方法
try {
// 当队列满了,继续投递数据到队列中当前线程会阻塞等待
strings.offer("三", 3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread was interrupted.");
}
}
}
在这个例子中,我们创建了一个容量为2的 LinkedBlockingQueue
,然后通过 offer
方法添加了两个元素。之后,我们使用 poll
方法从队列中移除这些元素。当队列为空时,poll
方法返回 null
。
接着,我们展示了如何使用带超时的 poll
方法。如果在指定的时间内无法从队列中获取元素,则方法返回 null
并且不会抛出异常。
最后,我们演示了如何使用带超时的 offer
方法。如果队列已满,而该方法的调用者希望在有限的时间内等待队列空间可用,那么可以使用这种方法。如果在这段时间内队列仍未有足够的空间容纳新元素,方法将返回 false
。
请注意,LinkedBlockingQueue
默认情况下是无界的,这意味着它可以无限增长,除非明确指定了容量。在这个例子中,我们通过构造函数指定了容量为2,使其成为一个有界队列。
4.5 纯手写LinkedBlockingQueue
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 赵立
*/
public class TestLinkedBlockingQueue<E> {
ReentrantLock putLock = new ReentrantLock();
ReentrantLock takeLock = new ReentrantLock();
/**
* 基于 LinkedList 集合实现队列容量
*/
private LinkedList<E> linkedList;
/**
* 初始队列容量
*/
private final int capacity = Integer.MAX_VALUE;
private final Condition notFull = putLock.newCondition();
private final Condition notEmpty = takeLock.newCondition();
public TestLinkedBlockingQueue() {
linkedList = new LinkedList<>();
}
public boolean offer(E e) {
putLock.lock();
try {
// 先检查队列是否已满
if (linkedList.size() == capacity) {
return false;
}
boolean result = linkedList.add(e);
if (result) {
notEmpty.signal(); // 通知消费者
}
return result;
} finally {
putLock.unlock();
}
}
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
putLock.lock();
//把 timeout 这个以 unit 单位表示的时间值转换成纳秒
long nanos = unit.toNanos(timeout);
try {
//当队列容器满的情况下
while (linkedList.size() == capacity) {
if (nanos <= 0) {
//判断是否过了设定的超时时间
return false;
}
/**
* 生产者线程可以调用 notFull.awaitNanos(nanos) 来等待最多 nanos 纳秒的时间。
* 如果在这段时间内队列变得不满了(即消费者消费了一些元素),生产者线程会被唤醒并继续执行。
* 如果队列在这段时间内仍然满了,生产者线程会返回剩余的等待时间,并可能采取进一步的行动,比如重试或者放弃等待。
*/
nanos = notFull.awaitNanos(nanos);
}
return offer(e);
} finally {
putLock.unlock();
}
}
public E poll() {
takeLock.lock();
try {
return linkedList.size() == 0 ? null : dequeue();
} finally {
takeLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
takeLock.lock();
long nanos = unit.toNanos(timeout);
try {
//当前队列容器中的数据为空
while (linkedList.size() == 0) {
if (nanos <= 0) {
//判断是否过了设定的超时时间
return null;
}
//消费者线程阻塞等待
nanos = notEmpty.awaitNanos(nanos);
}
return poll();
} finally {
takeLock.unlock();
}
}
private E dequeue() {
E e = linkedList.get(0);
if (e != null) {
linkedList.remove(0);
//唤醒生产者线程
notFull.signal();
}
return e;
}
}
4.6 ArrayBlockingQueue与LinkedBlockingQueue 区别
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
底层数据结构 | 基于数组实现。 | 基于链表实现。 |
默认队列类型 | 默认是有界队列,容量由构造函数指定。 | 默认是无界队列,容量为 Integer.MAX_VALUE ;可通过构造函数指定容量成为有界队列。 |
容量调整 | 需要在构造时显式指定容量,一旦设定不可更改。 | 可以在构造时指定容量以创建有界队列,容量可设为任意正整数。 |
并发控制 | 读写操作共享一把 ReentrantLock 锁,可能会导致读写冲突。 | 读和写分别使用不同的锁,采用读写分离策略,提高并发性能。 |
元素计数机制 | 使用 int 类型的变量记录队列中的元素数量。 | 使用 AtomicInteger 类型的变量原子地记录队列中的元素数量。 |
清空队列 | 清空队列时不需要特别处理锁,因为只有一个锁。 | 清空队列时需要同时清理两把锁,确保队列状态的一致性。 |
线程安全性 | 提供线程安全的存取方法,如 put , take , offer , poll 等。 | 同样提供线程安全的存取方法,但因为读写分离,理论上具有更高的并发能力。 |
性能考量 | 因为使用单锁,所以在多线程环境下可能不如 LinkedBlockingQueue 性能高。 | 由于采用了读写分离的锁策略,通常具有更好的吞吐量。 |
应用场景 | 适用于需要固定容量限制的应用场景,以及对数据的顺序敏感的情况。 | 更适合于需要高吞吐量的应用场景,特别是对于不确定的数据流。 |