BlockingQueue
BlockingQueue
是Java并发包(java.util.concurrent)中提供的一个阻塞队列接口,它继承自 Queue
接口。
BlockingQueue
中的元素采用 FIFO 的原则,支持多线程环境并发访问,提供了阻塞读取和写入的操作,当前线程在队列满或空的情况下会被阻塞,直到被唤醒或超时。
常用的实现类有:
ArrayBlockingQueue
:并发容器 ArrayBlockingQueue 详解LinkedBlockingQueue
:并发容器 LinkedBlockingQueue 详解PriorityBlockingQueue
SynchronousQueue
LinkedBlockingDeque
:并发容器 LinkedBlockingDeque 详解
等类,它们的实现方式各有不同。
适用场景
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。
如果该阻塞队列到达了其临界点,生产者线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到消费者线程从队列中拿走一个对象。
消费者线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
常用方法
-
put(E e)
:将元素 e 插入到队列中,如果队列已满,则会阻塞当前线程,直到队列有空闲空间 -
offer(E e)
:将元素 e 插入到队列中,如果队列已满,则返回 false。 -
offer(E element, long timeout, TimeUnit unit)
方法是BlockingQueue
:在指定的时间内将元素添加到队列中。-
timeout
:超时时间,表示在指定的时间内等待队列空间可用。如果超过指定的时间仍然无法将元素添加到队列中,将返回 false。 -
unit
:超时时间的单位。
-
-
take()
:移除并返回队列头部的元素,如果队列为空,则会阻塞当前线程,直到队列有元素 -
poll()
:移除并返回队列头部的元素,如果队列为空,则返回null
-
poll(long timeout, TimeUnit unit)
:在指定的时间内从队列中检索并移除元素。返回移除的元素。如果超过指定的时间仍然没有可用的元素,将返回 null。 -
peek()
:返回队列头部的元素,但不会移除。如果队列为空,则返回null
-
size()
:返回队列中元素的数量 -
isEmpty()
:判断队列是否为空,为空返回 true,否则返回 false -
isFull()
:判断队列是否已满,已满返回 true,否则返回 false -
clear()
:清空队列中的所有元素
行为 | 抛异常 | 返回特定值 | 阻塞 | 超时 | |
---|---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) | |
移除 | remove() | poll() | take() | poll(timeout, timeunit) | |
检查 | element() | peek() |
-
抛异常: 如果试图的操作无法立即执行,抛一个异常。
-
特定值: 如果试图的操作无法立即执行,返回 true / false / null)。
-
阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
-
超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
若向 BlockingQueue 中插入 null,将会抛出 NullPointerException
死锁问题
需要注意的是,在使用 BlockingQueue
时要注意防止死锁的问题:
-
在队列满之后调用
offer()
方法插入元素会返回false
,此时不能直接调用put()
方法,因为在插入之前还需要获取其它资源,如果在获取资源时一直阻塞在这里,就会发生死锁。 -
为了防止死锁的问题,建议使用
offer(E e, long timeout, TimeUnit unit)
和poll(long timeout, TimeUnit unit)
带有超时时间的方法。
PriorityBlockingQueue
PriorityBlockingQueue
是一个无界的并发队列。它使用了和类PriorityQueue
一样的排序规则。
所有插入到 PriorityBlockingQueue
的元素必须实现 java.lang.Comparable
接口。因此该队列中元素的排序就取决于自定义的 Comparable 实现。如果元素没有实现 Comparable 接口并且没有提供自定义的比较器,将会引发 ClassCastException
。
PriorityBlockingQueue
对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
特点
-
线程安全:
PriorityBlockingQueue
是线程安全的,它使用锁(如ReentrantLock
)来确保在多线程环境下的数据一致性和操作的原子性。 -
优先级排序:队列中的元素会根据它们的优先级进行排序,默认情况下,元素按照其自然顺序(如果实现了
Comparable
接口)进行排序,或者根据构造时提供的Comparator
进行排序。 -
阻塞特性:当队列为空时,尝试从队列中取出元素的线程会被阻塞,直到队列中有元素可用;当队列已满(对于无界队列,实际上是系统资源耗尽)时,尝试向队列中添加元素的线程也会被阻塞,或者可以选择非阻塞操作(如
offer
方法)。 -
无界扩容:
PriorityBlockingQueue
是基于数组实现的,当队列中的元素数量超过当前数组的容量时,它会自动进行扩容操作,以确保能够继续存储新的元素。
扩容问题
PriorityBlockingQueue
并不是一个动态调整容量的队列。它的“无界”特性并不是指它会动态地调整其内部存储的大小,而是指它不会因为容量限制而阻止新的元素被加入队列。相反,它会尽可能多地存储元素,直到耗尽系统的可用内存。
尽管 PriorityBlockingQueue
没有显式的容量限制,但在实际应用中,仍然需要考虑以下几点:
-
内存限制:虽然
PriorityBlockingQueue
没有固定的容量限制,但它仍然受到 JVM 的堆内存限制。如果队列中的元素过多,会导致内存不足,最终可能导致OutOfMemoryError
。 -
垃圾回收:当队列中的元素被消费后,如果没有其他引用指向这些元素,垃圾回收器会自动回收这些对象所占用的内存。因此,即使队列曾经存储了大量的元素,只要这些元素被消费并且没有其他引用,内存就会被释放。
应用场景
PriorityBlockingQueue
适用于多种场景,包括但不限于:
-
任务调度:在任务调度系统中,可以使用
PriorityBlockingQueue
来管理待执行的任务,工作线程可以从中取出优先级最高的任务来执行。 -
资源管理:在资源有限的情况下,可以使用
PriorityBlockingQueue
来确定哪些请求或任务应该优先获得资源。 -
并发访问:在多线程环境中,
PriorityBlockingQueue
提供了安全的并发访问机制,多个线程可以同时向队列中添加或检索元素。
构造方法
- 创建一个初始容量为 11(默认容量) 的
PriorityBlockingQueue
对象。
PriorityBlockingQueue()
- 创建一个初始容量为指定容量的
PriorityBlockingQueue
对象。
PriorityBlockingQueue(int capacity)
- 创建一个初始元素集合为指定集合的
PriorityBlockingQueue
对象。
PriorityBlockingQueue(Collection<? extends E> collection)
- 创建一个初始容量为指定容量、元素按照指定比较器优先级排序的
PriorityBlockingQueue
对象。
PriorityBlockingQueue(int c, Comparator<? super E> comparator)
使用示例
下面是一个使用 PriorityBlockingQueue
的简单示例,展示如何实现一个基于优先级的任务队列:
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
class Task implements Comparable<Task> {
private final int priority;
private final String description;
public Task(int priority, String description) {
this.priority = priority;
this.description = description;
}
public int getPriority() {
return priority;
}
public String getDescription() {
return description;
}
@Override
public int compareTo(Task other) {
// 优先级数值越小,优先级越高
return Integer.compare(this.priority, other.priority);
}
@Override
public String toString() {
return "Task{" +
"priority=" + priority +
", description='" + description + '\'' +
'}';
}
}
public class PriorityBlockingQueueExample {
public static void main(String[] args) {
// 创建一个 PriorityBlockingQueue 实例
PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();
// 创建生产者线程
Thread producer = new Thread(() -> {
try {
// 生成不同优先级的任务
taskQueue.put(new Task(3, "Medium Priority Task"));
taskQueue.put(new Task(1, "High Priority Task"));
taskQueue.put(new Task(5, "Low Priority Task"));
taskQueue.put(new Task(2, "Normal Priority Task"));
System.out.println("所有任务已添加到队列中");
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 设置线程中断状态
System.err.println("生产者线程被中断");
}
});
// 创建消费者线程
Thread consumer = new Thread(() -> {
while (true) {
try {
Task task = taskQueue.take();
System.out.println("处理任务: " + task);
TimeUnit.MILLISECONDS.sleep(1000); // 模拟任务处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 设置线程中断状态
System.err.println("消费者线程被中断");
break; // 终止循环
}
}
});
// 启动生产者线程和消费者线程
producer.start();
consumer.start();
// 等待生产者线程结束
try {
producer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 设置线程中断状态
System.err.println("主线程被中断");
}
// 中断消费者线程
consumer.interrupt();
}
}
示例说明:
- 优先级:在这个例子中,优先级数值越小,优先级越高。因此,优先级为 1 的任务会被首先处理。
- 队列无界:
PriorityBlockingQueue
默认是无界的,即没有固定的容量限制。但是,如果队列中的元素数量过大,可能会导致内存溢出。 - 线程中断:在捕获
InterruptedException
异常时,需要调用Thread.currentThread().interrupt()
来恢复线程的中断状态,以便其他可能依赖中断状态的部分能够正确响应。 - 消费者线程的终止:在生产者线程结束后,通过中断消费者线程来终止其无限循环。这是为了防止消费者线程无限期地运行下去。