文章目录
引言
BlockingQueue是Java并发包中的重要组件,它在生产者-消费者场景中发挥着关键作用。作为线程安全的队列实现,BlockingQueue不仅提供了普通队列操作,还支持阻塞操作,使其成为并发编程中不可或缺的工具。
一、BlockingQueue基本概念
BlockingQueue在Java中是一个接口,它扩展了Queue接口,提供了阻塞的插入和获取操作。当队列满时,插入操作将被阻塞;当队列空时,获取操作将被阻塞。这种特性使其特别适合于生产者-消费者模式的实现。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueBasics {
public static void demonstrateBasicOperations() {
// 创建一个容量为3的ArrayBlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
try {
// 添加元素的不同方式
queue.put("First"); // 阻塞式添加
queue.offer("Second"); // 非阻塞式添加
boolean success = queue.offer("Third", 1, TimeUnit.SECONDS); // 限时添加
System.out.println("队列大小: " + queue.size());
// 获取元素的不同方式
String item1 = queue.take(); // 阻塞式获取
String item2 = queue.poll(); // 非阻塞式获取
String item3 = queue.poll(1, TimeUnit.SECONDS); // 限时获取
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
二、主要实现类解析
2.1 ArrayBlockingQueue实现原理
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。它使用ReentrantLock来保证线程安全,使用Condition来实现线程等待和唤醒机制。
public class ArrayBlockingQueueAnalysis {
static class CustomArrayBlockingQueue<E> {
private final Object[] items;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
private int takeIndex;
private int putIndex;
private int count;
public CustomArrayBlockingQueue(int capacity) {
this.items = new Object[capacity];
this.lock = new ReentrantLock(true); // 使用公平锁
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
// 队列满时等待
notFull.await();
}
items[putIndex] = e;
putIndex = (putIndex + 1) % items.length;
count++;
// 通知等待的消费者
notEmpty.signal();
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
// 队列空时等待
notEmpty.await();
}
E item = (E) items[takeIndex];
items[takeIndex] = null;
takeIndex = (takeIndex + 1) % items.length;
count--;
// 通知等待的生产者
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
}
2.2 LinkedBlockingQueue实现原理
LinkedBlockingQueue是基于链表的可选有界阻塞队列。它使用两个ReentrantLock来分别控制头尾节点的访问,从而提高并发性能。
public class LinkedBlockingQueueAnalysis {
static class CustomLinkedBlockingQueue<E> {
static class Node<E> {
E item;
Node<E> next;
Node(E item) { this.item = item; }
}
private final ReentrantLock takeLock;
private final ReentrantLock putLock;
private final Condition notEmpty;
private final Condition notFull;
private final int capacity;
private Node<E> head;
private Node<E> last;
private AtomicInteger count;
public CustomLinkedBlockingQueue(int capacity) {
this.capacity = capacity;
this.count = new AtomicInteger(0);
this.takeLock = new ReentrantLock();
this.putLock = new ReentrantLock();
this.notEmpty = takeLock.newCondition();
this.notFull = putLock.newCondition();
this.head = this.last = new Node<>(null);
}
public void put(E e) throws InterruptedException {
int c = -1;
final Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
last.next = node;
last = node;
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
}
}
三、高级特性应用
3.1 优先级队列实现
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,内部使用堆来维护元素顺序。
public class PriorityQueueExample {
static class Task implements Comparable<Task> {
private final int priority;
private final String name;
public Task(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // 高优先级在前
}
@Override
public String toString() {
return String.format("Task[priority=%d, name='%s']", priority, name);
}
}
public static void demonstratePriorityQueue() {
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// 生产者线程
new Thread(() -> {
try {
priorityQueue.put(new Task(2, "中等优先级任务"));
priorityQueue.put(new Task(1, "低优先级任务"));
priorityQueue.put(new Task(3, "高优先级任务"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
Thread.sleep(100); // 确保所有任务都已添加
while (!priorityQueue.isEmpty()) {
Task task = priorityQueue.take();
System.out.println("处理任务: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
3.2 延迟队列实现
DelayQueue是一个支持延迟获取元素的无界阻塞队列,元素只有在其指定的延迟时间到期后才能被获取。
public class DelayQueueExample {
static class DelayedTask implements Delayed {
private final String name;
private final long startTime;
public DelayedTask(String name, long delayInMillis) {
this.name = name;
this.startTime = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayedTask[name=" + name + "]";
}
}
public static void demonstrateDelayQueue() {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 添加延迟任务
delayQueue.put(new DelayedTask("Task1", 2000));
delayQueue.put(new DelayedTask("Task2", 1000));
delayQueue.put(new DelayedTask("Task3", 3000));
// 处理延迟任务
new Thread(() -> {
try {
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take();
System.out.println("执行任务: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
四、实际应用场景
4.1 生产者-消费者模式
BlockingQueue最常见的应用场景是实现生产者-消费者模式,它能够自动处理线程同步问题。
public class ProducerConsumerExample {
private static final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<>(10);
static class Producer implements Runnable {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Task task = generateTask();
taskQueue.put(task);
System.out.println("生产任务: " + task);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private Task generateTask() {
return new Task(
ThreadLocalRandom.current().nextInt(3) + 1,
"Task-" + System.currentTimeMillis()
);
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Task task = taskQueue.take();
processTask(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processTask(Task task) {
System.out.println("消费任务: " + task);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void startProducerConsumer() {
// 启动生产者和消费者
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new Producer());
executor.submit(new Consumer());
// 运行一段时间后关闭
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdownNow();
}
}
总结
BlockingQueue作为Java并发包中的核心组件,通过提供线程安全的队列操作和阻塞机制,有效地解决了并发编程中的生产者-消费者问题。在实际应用中,可以根据具体需求选择合适的BlockingQueue实现类,如有界队列ArrayBlockingQueue、可选有界的LinkedBlockingQueue、支持优先级的PriorityBlockingQueue或支持延迟的DelayQueue。通过合理使用BlockingQueue提供的API,可以构建出高效、可靠的并发应用。在使用时需要注意选择合适的队列实现和容量设置,并正确处理中断异常,以确保应用的稳定性和可靠性。
标签:Java,Thread,队列,private,final,111,new,public,BlockingQueue From: https://blog.csdn.net/weixin_55344375/article/details/144745738