阻塞队列
定义
在Java中,阻塞队列(Blocking Queue)是一种线程安全的队列。阻塞队列是Java并发包(java.util.concurrent)中的一个重要组件,常用于生产者-消费者模式中,一个线程产生数据放入队列,另外一个从队列取出数据进行消费。
主要有两种情况
- 在尝试添加元素到队列中时,如果队列已满,则线程会被阻塞;
- 在尝试从队列中移除元素时,如果队列为空,则线程也会被阻塞。
BlockingQueue
BlockingQueue是一个接口,方法主要有如下四种形式。
方法用途\行为方式 | 抛出异常 | 返回特殊值 | 阻塞 | 超时阻塞 |
---|---|---|---|---|
插入 | add(E e) | offer(E e) | put(E e) | offer(E e, long timeout, TimeUnit unit) |
移除 | remove(Object o) | poll() | take() | poll(long timeout, TimeUnit unit) |
检查 | element() | peek() |
行为说明
- 抛出异常
如果试图的操作无法立即执行,抛一个异常。比如:
1、add(e):给队列添加元素,当队列的容量已满就会抛出异常 IllegalStateException。
2、remove(): 当队列为空时会抛出异常NoSuchElementException。
3、队列为空时会抛出NoSuchElementException。 - 返回特殊值
如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
1、offer(e) : 给添加元素,如果成功返回true,队列满了就返回false。
2、poll:从头部取元素,如果成功返回true,队列为空就返回false。 - 阻塞
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
1、put(e):如果队列满了就会阻塞当前put的线程。
2、如果队列为空也会阻塞当前线程。 - 超时阻塞
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
几种常用的队列
ArrayBlockingQueue
定义和使用
一个由数组结构组成的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
//生成数据的线程
Thread putThread = new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
System.out.println("生成:"+i);
blockingQueue.put(i);//队列满了会阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//消费数据的线程
Thread takeThread = new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
int item = blockingQueue.take();//队列空了会阻塞
System.out.println("消费:"+item);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
putThread.start();
takeThread.start();
源码分析(JDK17)
- 初始化
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
//存储数据的数组,构造的时候会根据传入的大小进行初始化
final Object[] items;
//取元素的下标
int takeIndex;
//存入元素的下标
int putIndex;
//队列元素大小
int count;
//锁
final ReentrantLock lock;
//取元素时阻塞线程用的条件变量
private final Condition notEmpty;
//存元素时阻塞线程用的条件变量
private final Condition notFull;
/*
* 传入队列容量大小
* @param capacity 队列容量
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/*
* 传入队列容量大小
* @param capacity 队列容量
* @param fair true表示创建的是公平锁,在锁中等待的线程被唤醒后会按照阻塞时间的长短来竞争锁
* 表示非公平锁,在锁中等待的线程被唤醒后自由竞争锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
- put添加元素
/*
* 向队列添加元素
* @param e 要添加的元素
*/
public void put(E e) throws InterruptedException {
//检查元素是否为空
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
//先加锁,因为只有一把锁所以这个队列存和取不能同时进行
lock.lockInterruptibly();
try {
//当队列中已经满了时阻塞住存元素的线程
while (count == items.length)
notFull.await();
//走到这里表示队列未满了,执行添加元素的逻辑
enqueue(e);
} finally {
//放在finally保证一定会解锁
lock.unlock();
}
}
/*
* 入队
* @param e 要添加的元素
*/
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//把要添加的元素e放在数组的putIndex下标处
items[putIndex] = e;
//存元素的下标自增1,下次put就可以往下一个位置放元素,
//所以放元素是在数组的尾部添加元素
//如果+1后超出了数组的容量限制就重置成0,表示已按顺序放到了队列的末尾,
//按先入先出的原则肯定是0号位的元素会先被取走,所以下次就在0号位put
if (++putIndex == items.length) putIndex = 0;
//队列大小+1
count++;
//唤醒正在阻塞的获取元素的线程
notEmpty.signal();
}
- take取出元素
/*
* 从队列获取元素
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//如果队列大小为空,则取元素的线程阻塞
while (count == 0)
notEmpty.await();
//从队列取出元素
return dequeue();
} finally {
lock.unlock();
}
}
/*
* 出队
*/
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//从数组中取出元素
E e = (E) items[takeIndex];
//将取出元素后的位置置为null
items[takeIndex] = null;
//取完后让takeIndex自增指向下次要取的下标,
//如果超过了数组的容量就重新指向0,因为按先入先出原则取完最后一个就要回到起点准备下一轮获取
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒正在阻塞的添加元素的线程
notFull.signal();
return e;
}
LinkedBlockingQueue
定义
一个由链表结构组成的可选有界阻塞队列,创建时可以不指定容量,但本质上看还是一个有界队列,当你不指定容量是默认的容量是Integer.MAX_VALUE。因为基于链表实现,所以存和取操作的是不同的节点,所以内部有两把锁,存和取是可以同时进行的。
源码分析(JDK17)
- 初始化
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 链表节点
*/
static class Node<E> {
//存储的值
E item;
//下个节点
Node<E> next;
Node(E x) { item = x; }
}
//容量
private final int capacity;
//当前队列大小
private final AtomicInteger count = new AtomicInteger();
//指向链表的头节点,链表的头节点不放元素,head.item == null,
//这是为了处理在链表中只有一个元素时存线程和取线程同时操作这个节点的next属性引发线程安全问题,
//这样设计后当链表中只有一个头节点时就认为当前队列是空的,让取线程阻塞,就可以避免上边的问题。
transient Node<E> head;
//尾节点
private transient Node<E> last;
//取元素的锁
private final ReentrantLock takeLock = new ReentrantLock();
//取元素时等待的条件变量
private final Condition notEmpty = takeLock.newCondition();
//添加元素的锁
private final ReentrantLock putLock = new ReentrantLock();
//添加元素时等待的条件变量
private final Condition notFull = putLock.newCondition();
/*
* 无参构造函数,容量按Integer.MAX_VALUE
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/*
* 有参构造函数,传入指定容量大小
* @param capacity 容量
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//头节点不存元素
last = head = new Node<E>(null);
}
}
- put添加元素
/*
* 添加元素
* @param e 存入的元素
*/
public void put(E e) throws InterruptedException {
//元素不能为空
if (e == null) throw new NullPointerException();
final int c;
//创建新节点
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加锁
putLock.lockInterruptibly();
try {
//当前队列已满,存的线程阻塞
while (count.get() == capacity) {
notFull.await();
}
//入队
enqueue(node);
//队列大小+1
c = count.getAndIncrement();
//如果当前容量+1后没有超过容量大小,唤醒存的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//只有当c=0时才会去叫醒取元素的线程,因为c=0说明添加之前队列是空的,才有可能会有阻塞的取元素线程
if (c == 0)
//唤醒取线程
signalNotEmpty();
}
/*
* 入队
* @param node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//在链表尾部添加节点
last = last.next = node;
}
- take取出元素
/*
* 取出元素
*/
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lockInterruptibly();
try {
//当前队列为空,取线程阻塞
while (count.get() == 0) {
notEmpty.await();
}
//元素出队
x = dequeue();
//队列大小-1
c = count.getAndDecrement();
//如果还有元素唤醒取线程
if (c > 1)
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
//如果取这个元素之前队列满了,那就可能会有阻塞的存元素线程,所以调用唤醒
//存元素线程的方法
if (c == capacity)
signalNotFull();
return x;
}
/*
* 出队,从对头取出
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
//获取头节点的下一个节点
Node<E> first = h.next;
h.next = h; // help GC
head = first;
//取头节点下一个节点的元素
E x = first.item;
//把头节点的下一个节点的内容清空,这样它就可以作为新的头节点
first.item = null;
return x;
}
DelayQueue
- 一个无界阻塞队列,其中的元素只能在其延迟期满时才能从队列中提取。元素必须实现Delayed接口。
- DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,需要实现getDelay()和compareTo()。
- compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
- getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素,然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
源码分析
- 初始化
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//锁
private final transient ReentrantLock lock = new ReentrantLock();
//PriorityQueue是一个基于优先级堆的无界队列,它实现了Queue接口。
//PriorityQueue中的元素并不 是按照它们被添加到队列中的顺序来排序的,
//而是根据元素的自然顺序(如果元素实现了Comparable接口)或者根据在构造PriorityQueue时提供的Comparator来排序的
private final PriorityQueue<E> q = new PriorityQueue<E>();
//第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能。
private Thread leader;
}
- 添加元素
public void put(E e) {
offer(e);
}
/*
* 入队
* @param e 入队元素
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
//获取锁
lock.lock();
try {
//插入元素
q.offer(e);
//检查插入的元素是否是队首,是的话将leader = null,然后唤醒一个消费线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
//释放锁
lock.unlock();
}
}
- take取出元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//获取锁
lock.lockInterruptibly();
try {
for (;;) {
//获取头部元素,但不会删除
E first = q.peek();
//如果头部元素为空,阻塞线程
if (first == null)
available.await();
else {
//获取延迟时间
long delay = first.getDelay(NANOSECONDS);
//延迟时间小于0,表示到期了,直接取出
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
//leader != null 表明有其他线程在操作,线程阻塞
if (leader != null)
available.await();
else {
//获取当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//超时阻塞
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就唤醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
标签:元素,Java,队列,lock,知识,阻塞,线程,final
From: https://blog.csdn.net/linwq8/article/details/145229528