JUC-阻塞队列
如有侵权,请联系~
如有错误,也欢迎批评指正~
1、阻塞队列概述
阻塞队列在业务代码中可能较少使用,但是只要喜欢看源码的同学就会发现,阻塞队列使用的很频繁,到处都有它的影子。例如:线程池ThreadPoolExecutor中的工作队列。
阻塞队列(BlockingQueue)是 Java 并发包(java.util.concurrent)中的一个重要接口,允许多个线程同时安全地操作队列。它不仅提供了队列的基本操作(如插入、删除、检查元素),还可以在队列满时阻塞插入线程,在队列空时阻塞删除线程。这种机制非常适合用于生产者-消费者问题等多线程场景。
特点:
- 线程安全:阻塞队列是线程安全的,多个线程可以安全地访问和操作它。
- 阻塞操作:
- 在队列已满时,插入元素的线程会阻塞,直到有空间可用。
- 在队列为空时,取出元素的线程会阻塞,直到有元素可用。
多种实现:
Java 提供了多种阻塞队列的实现,例如:
- ArrayBlockingQueue: 使用数组实现的有界阻塞队列。
- LinkedBlockingQueue: 使用链表实现的可选界限的阻塞队列。
- PriorityBlockingQueue: 支持优先级的无界阻塞队列。
- DelayQueue: 延迟队列,元素在到达特定时间后才能被获取。
- SynchronousQueue: 不持有元素的阻塞队列,插入和获取操作必须同步进行。
2、ArrayBlockingQueue阻塞队列
2.1 ArrayBlockingQueue架构图
因为有很多种阻塞队列,我们以ArrayBlockingQueue阻塞队列进行讲解、学习。首先先看下整体的架构图,先看到整体的流程图再学习源码:
上面这个图其实就是管程模型,上述图转换为下面这张图结合之前的管程模型进行对比。
2.2 ArrayBlockingQueue源码
结合上面的图一起看源码效果更佳哦。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 阻塞队列相关的属性
/** The queued items */
final Object[] items; // 队列,将数据都是往这个队列中存储数据
/** items index for next take, poll, peek or remove */
int takeIndex; // 队列的起始位置
/** items index for next put, offer, or add */
int putIndex; // 队列的最终位置
/** Number of elements in the queue */
int count; // 队列中数据的个数
// 控制逻辑【控制数据的put和take】:锁和条件
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
// 从阻塞队列拿值
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
// 往阻塞队列中塞数据
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 这里获取锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
// 必须要等到这里才会释放锁么,唯一释放锁的地方么?
// 很明显不是。例如两个线程,第一个线程先获取数据,等待10ms【此时阻塞队列没有数据】,第二个线程存储数据。如果真的是这样,那岂不是第一个线程获取不到数据了
lock.unlock();
}
}
// 阻塞nanosTimeout毫秒
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node); //仔细看这里会释放锁,在超时等待之前释放锁,允许其他线程对阻塞队列进行操作
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // 超时等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 这里会重新获取锁。这样就实现了对队列操作的时候进行锁,阻塞的时候释放锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
标签:JUC,队列,lock,阻塞,线程,items,ArrayBlockingQueue
From: https://blog.csdn.net/m0_50149847/article/details/143775980