文章目录
一、BlockQueue
1、特点
阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法:
- 阻塞添加
所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,知道队列元素不满时,才重新唤醒线程执行元素添加操作。 - 阻塞删除
阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程再执行删除操作。
2、阻塞队列常用方法
- 添加类方法
- add(E e)
添加成功则返回true,失败就抛出IllegalStateException异常。 - offer(E e)
成功则返回true,如果此队列已满就返回false。 - put(E e)
将元素添加到此队列的尾部,如果该队列已满,就一直阻塞。
- add(E e)
- 删除类方法
- poll()
获取并移除此队列的队首元素,若队列为空,则返回null。 - remove(Object o)
移除指定元素,成功则返回true,失败则返回false。 - take()
获取并移除此队列的队首元素,若没有元素,则一直阻塞。
- poll()
- 获取元素类方法
- element()
获取但不移除此队列的队首元素,没有元素则抛出异常。 - peek()
获取但不移除此队列的队首元素,若队列为空,则返回null。
- element()
二、常见的BlockingQueue
1、介绍
不同的BlockingQueue子类之间的区别主要体现在元素存储结构和元素操作上。
2、ArrayBlockingQueue
ArrayBlockingQueue 是 Java 并发包 java.util.concurrent 中的一个类,它实现了一个基于数组的有界阻塞队列。这个队列按照 FIFO(先进先出)的原则对元素进行排序。以下是 ArrayBlockingQueue 的一些主要特点和功能:
- 有界性: ArrayBlockingQueue有界且固定,在构造函数时确认大小,确认后不支持改变。其内部使用一个定长数组存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识队列的头部和尾部在数组中的位置。一旦队列已满,任何试图向队列中插入元素的线程都将被阻塞,直到队列中有可用的空间。同样,如果队列为空,任何试图从队列中移除元素的线程都将被阻塞,直到队列中有新的元素。
- 线程安全:ArrayBlockingQueue的添加和删除操作都是公用同一个锁对象,因此添加和删除无法并行运行。
- 性能:由于 ArrayBlockingQueue 是基于数组实现的,因此在内存布局上更加紧凑,并且在随机访问元素时性能较好。然而,如果频繁地在队列头部和尾部进行插入和删除操作,性能可能不如 LinkedBlockingQueue,因为 LinkedBlockingQueue 是基于链表实现的,更适合动态调整大小。
- 公平性:ArrayBlockingQueue 支持公平性。公平性是指按照线程等待的顺序来处理它们,即等待时间最长的线程将优先得到处理。可以通过构造函数中的 fair 参数来设置队列是否为公平的。当设置为 true 时,等待时间最长的线程将获得优先处理;当设置为 false 时,则不保证这种顺序。
- 可选的迭代器:ArrayBlockingQueue 提供了迭代器,允许在遍历队列的同时安全地访问队列中的元素。但需要注意的是,迭代器的弱一致性意味着它反映的元素状态可能有些滞后。
- 实现
- ReentrantLock
- Condition
package chatpter06;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
public class TestArrayBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 创建一个容量为3的队列
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
String item = "Item " + i;
System.out.println("Produced: " + item);
queue.put(item); // 将元素放入队列,如果队列满则阻塞
Thread.sleep(1000); // 模拟耗时操作
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
String item = queue.take(); // 从队列中取出元素,如果队列空则阻塞
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 启动线程
producer.start();
consumer.start();
}
}
3、LinkedBlockingQueue
- 基于链表,无界的FIFO阻塞队列。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
- 需要注意的是,在新建一个LinkedBlockingQueue对象时,若没有指定其容量大小,则LinkedBLockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞生产,系统内存就有可能已被消耗殆尽。
- 主要特点和功能:
- 阻塞操作:当队列为空时,从队列中获取元素的线程将会被阻塞,直到有元素可获取;当队列已满时,尝试添加元素的线程将会被阻塞,直到队列中有空间可用。
- 可选容量:在创建 LinkedBlockingQueue 时,可以指定一个容量限制。如果不指定容量,那么 LinkedBlockingQueue 的默认容量为 Integer.MAX_VALUE。
- 线程安全:LinkedBlockingQueue 的所有公共方法都是线程安全的,可以在多线程环境下安全使用。
- 性能:相比于 ArrayBlockingQueue,LinkedBlockingQueue 在插入和删除操作上有更好的性能,因为它内部是通过链表实现的,而链表在动态调整大小方面比数组更灵活。
package chatpter06;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestLinkedBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // 创建一个容量为3的队列
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
String item = "Item " + i;
System.out.println("Produced: " + item);
queue.put(item); // 将元素放入队列,如果队列满则阻塞
Thread.sleep(1000); // 模拟耗时操作
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
String item = queue.take(); // 从队列中取出元素,如果队列空则阻塞
System.out.println("Consumed: " + item);
Thread.sleep(100000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 启动线程
producer.start();
consumer.start();
}
}
在这个示例中,我们创建了一个容量为3的 LinkedBlockingQueue,并启动了一个生产者线程和一个消费者线程。生产者线程不断生产元素并放入队列,消费者线程从队列中取出元素并消费。由于队列的容量限制,当队列满时,生产者线程会被阻塞,直到队列中有空间可用;当队列空时,消费者线程会被阻塞,直到队列中有元素可消费。
4、PriorityBlockingQueue
- 支持优先级的无界阻塞队列,PriorityBlockQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终消耗所有的可用堆内存空间。
- 默认情况下元素采用自然顺序升序排序,可以通过指定Comparetor来对元素进行排序
- 二叉堆
- 分类
- 最大堆
父节点的键值总是大于或等于任何一个子节点的键值 - 最小堆
父节点的键值总是小于或等于任何一个子节点的键值
添加操作则是不断“上冒”,而删除操作则是不断“下掉”
- 最大堆
- 分类
- 实现
ReentrantLock + Condition - 二叉堆
5、LinkedTransferQueue
LinkedTransferQueue是Java并发包java.util.concurrent中的一个类,它实现了TransferQueue接口。这个队列是一个由链表结构组成的无界阻塞队列,具有先进先出(FIFO)的特性。它可以看作是LinkedBlockingQueue和SynchronousQueue的结合体。
LinkedTransferQueue提供了几个重要的方法,如transfer(E e)和tryTransfer(E e),这些方法使得线程间的数据传递更为高效。具体来说:
transfer(E e)方法:如果当前存在一个正在等待获取的消费者线程,该方法会立刻将元素e移交给它;否则,它会将当前元素e插入到队列的尾部,并等待进入阻塞状态,直到有消费者线程取走该元素。
tryTransfer(E e)方法:如果当前存在一个正在等待获取的消费者线程(使用take()或poll()函数),该方法会即刻转移/传输对象元素e;如果不存在,则返回false,并且不进入队列。
此外,LinkedTransferQueue的实现在逻辑上与TransferStack(SynchronousQueue中的一个组件)大致相同,但由于队列是FIFO的,所以在匹配和入队操作上有所不同。每次匹配操作都是从头节点开始寻找,直到找到一个未匹配的节点,而入队列的节点是从队尾进入,与队头不冲突。
总的来说,LinkedTransferQueue是一个功能强大的并发队列,它提供了高效的线程间数据传递机制,适用于需要高效数据交换的并发场景。
6、SynchrousQueue
- 一个没有容量的阻塞队列
- 应用
交换工作,生产者的线程和消费者的线程同步已传递某些信息、事件后者任务
7、DelayQueue
- 支持延时获取元素的无界阻塞队列
- 应用
- 缓存:清掉缓存中超时的缓存数据
- 任务超时处理
- 实现
- ReentrantLock + Condition
- 根据Delay事件排序的有线级队列:PriorityQueue
- Delayed接口
- 用来标记那些应该在给定延迟时间之后执行的对象
- 该接口要求实现它的实现类必须定义一个compareTo方法,该方法提供与此接口的getDelay方法一致的排序