1、LinkedBlockingQueue使用
LinkedBlockingQueue的使用案例详情如下:
1 import java.util.concurrent.LinkedBlockingDeque; 2 import java.util.concurrent.LinkedBlockingQueue; 3 4 public class TestLinkedBlockingQueue { 5 public static void main(String[] args) throws Exception { 6 LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1); 7 // poll、peek、take 获取元素 8 queue.add("123"); 9 // peek 获取队列头元素,不移除元素 10 String peek = queue.peek(); 11 System.out.println(peek); 12 // poll 获取队列头元素,移除元素 13 String poll = queue.poll(); 14 System.out.println(poll); 15 // queue.add("234"); 16 17 // take 获取队列头元素,移除元素,队列为空,阻塞 18 // String take = queue.take(); 19 // System.out.println(take); 20 21 22 // add、offer、put 添加元素 23 // add 添加元素,队列满了,抛出异常 24 boolean add01 = queue.add("1"); 25 System.out.println("add01 ==> " + add01); 26 // add 添加元素,队列满了,返回false 27 boolean offer = queue.offer("2"); 28 System.out.println("offer ==> " + offer); 29 try { 30 boolean add02 = queue.add("1"); 31 System.out.println("add02 ==> " + add02); 32 }catch (Exception e) { 33 e.printStackTrace(); 34 } 35 36 queue.clear(); 37 // put 添加元素,队列满了,阻塞等待 38 queue.put("3"); 39 System.out.println("put01 ==> " + queue.peek()); 40 queue.put("4"); 41 } 42 }
2、LinkedBlockingQueue继承体系
与ArrayBlockingQueue一样,LinkedBlockingQueue实现了Queue、BlockingQueue接口,继承AbstractCollection类。相关父类分析可参考ArrayBlockingQueue源码分析中介绍,此处不再赘述。
3、LinkedBlockingQueue源码分析
LinkedBlockingQueue的成员变量:
1 // 队列的容量,若不指定,默认 Integer.MAX_VALUE 2 private final int capacity; 3 // 队列中元素的数量 4 private final AtomicInteger count = new AtomicInteger(); 5 // 队列头节点 6 transient Node<E> head; 7 // 队列尾节点 8 private transient Node<E> last; 9 10 // 读锁 take() 等获取元素用到的可重入排他锁 11 private final ReentrantLock takeLock = new ReentrantLock(); 12 13 // 阻塞获取数据,用于线程挂起、线程唤醒 14 private final Condition notEmpty = takeLock.newCondition(); 15 16 // put() 等添加元素用到的可重入排它锁 17 private final ReentrantLock putLock = new ReentrantLock(); 18 19 // 写锁 阻塞添加数据,用于线程挂起、线程唤醒 20 private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue的链表节点静态类:
1 static class Node<E> { 2 // 元素 3 E item; 4 // 指向下一节点的Node 5 Node<E> next; 6 // 构造函数 7 Node(E x) { item = x; } 8 }
LinkedBlockingQueue基于Node节点实现元素的存储,通过next属性将添加的元素构成一个单向链表,在成员变量中声明了链表的头节点head、链表的尾节点last。
3.1、添加元素
LinkedBlockingQueue#enqueue(node) 详情如下:1 // 元素添加进队列 2 private void enqueue(Node<E> node) { 3 // 将node添加进队列 - 单向链表的尾部 4 last = last.next = node; 5 }
3.1.1、offer(e) - 添加元素
LinkedBlockingQueue#offer(e) 详情如下:
1 // 添加元素至队列尾 2 public boolean offer(E e) { 3 // 添加元素为null,抛出异常 4 if (e == null) throw new NullPointerException(); 5 // 获取当前队列中元素总数 6 final AtomicInteger count = this.count; 7 // 队列已满,添加元素失败,返回1false 8 if (count.get() == capacity) 9 return false; 10 // 添加元素e前,队列中的元素总数 11 int c = -1; 12 // 创建Node节点,封装添加的e元素 13 Node<E> node = new Node<E>(e); 14 // 获取锁对象 15 final ReentrantLock putLock = this.putLock; 16 // 加锁 17 putLock.lock(); 18 try { 19 // 队列元素总数 小于 队列容量 20 if (count.get() < capacity) { 21 // 元素添加进队列中 22 enqueue(node); 23 // 队列元素总数 + 1 24 c = count.getAndIncrement(); 25 // 当前元素添加进队列后,队列未满 26 if (c + 1 < capacity) 27 // 唤醒其他添加元素线程 28 notFull.signal(); 29 } 30 } finally { 31 // 释放锁 32 putLock.unlock(); 33 } 34 // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0 35 // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程 36 if (c == 0) 37 signalNotEmpty(); 38 // 返回添加结果 39 return c >= 0; 40 }
offer(e)在队列尾部添加元素,若队列中的数组元素已满,返回false;否则将元素添加进队列中,返回false。
若添加元素后队列未满,唤醒其他添加元素的线程(put);
若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。
与ArrayBlockingQueue基于数组添加不同的是,LinkedBlockingQueue基于链表添加元素。
3.1.2、offer(e, timeout, unit)
LinkedBlockingQueue#offer(e, timeout, unit) 详情如下:
1 // 添加元素至队列尾 2 public boolean offer(E e, long timeout, TimeUnit unit) 3 throws InterruptedException { 4 // 添加元素为null,抛出异常 5 if (e == null) throw new NullPointerException(); 6 // 阻塞时间转化为纳秒 7 long nanos = unit.toNanos(timeout); 8 // 添加元素e前,队列中的元素总数 9 int c = -1; 10 // 获取锁对象 11 final ReentrantLock putLock = this.putLock; 12 // 获取当前队列中元素总数 13 final AtomicInteger count = this.count; 14 // 加锁 15 putLock.lockInterruptibly(); 16 try { 17 // 队列已满 18 while (count.get() == capacity) { 19 // 阻塞时间已达到 20 if (nanos <= 0) 21 // 返回false 22 return false; 23 // 阻塞时间未达到,继续阻塞,并返回阻塞的剩余时间 24 nanos = notFull.awaitNanos(nanos); 25 } 26 // 添加进队列中 27 enqueue(new Node<E>(e)); 28 // 获取添加元素前的总数 c 并获取添加元素后的总数 count + 1 29 c = count.getAndIncrement(); 30 // 当前元素添加进队列后,队列未满 31 if (c + 1 < capacity) 32 // 唤醒其他添加元素线程 33 notFull.signal(); 34 } finally { 35 // 释放锁资源 36 putLock.unlock(); 37 } 38 // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0 39 // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程 40 if (c == 0) 41 signalNotEmpty(); 42 // 添加成功返回false 43 return true; 44 }
offer(e, timeout, unit)在队列尾部添加元素,若队列中的数组元素已满,阻塞timeout时间,若阻塞时间已达到,仍未添加成功,返回false。
阻塞时间未到达添加成功,若添加元素后队列未满,唤醒其他添加元素的线程(put);
若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。
3.1.3、put(e) - 添加元素
LinkedBlockingQueue#put(e) 详情如下:
1 // 添加元素至队列尾 2 public void put(E e) throws InterruptedException { 3 // 添加元素为null,抛出异常 4 if (e == null) throw new NullPointerException(); 5 // 添加元素e前,队列中的元素总数 6 int c = -1; 7 // 创建Node节点,封装添加的e元素 8 Node<E> node = new Node<E>(e); 9 // 获取锁对象 10 final ReentrantLock putLock = this.putLock; 11 // 获取当前队列中元素总数 12 final AtomicInteger count = this.count; 13 // 加锁 14 putLock.lockInterruptibly(); 15 try { 16 // 当前队列已满,put操作挂起线程 17 while (count.get() == capacity) { 18 notFull.await(); 19 } 20 // 元素添加进队列 21 enqueue(node); 22 // 获取添加元素前的总数 c 并获取添加元素后的总数 count + 1 23 c = count.getAndIncrement(); 24 // 添加元素后,队列未满,唤醒put添加元素线程操作 25 if (c + 1 < capacity) 26 notFull.signal(); 27 } finally { 28 // 释放锁资源 29 putLock.unlock(); 30 } 31 // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0 32 // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程 33 if (c == 0) 34 signalNotEmpty(); 35 }
put(e)阻塞添加元素至队列尾部,若当前队列已满,当前put线程挂起。当获取元素线程从队列中拿走元素,队列中有可取空间时,唤醒挂起的put线程,将元素添加进队列中。
若添加元素后队列未满,唤醒其他添加元素的线程(put);
若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。
3.1.4、总结
方法 |
不同点 |
offer(e) |
队列容量已满,添加失败,直接返回false |
offer(e,timeout,unit) |
队列容量已满,阻塞等待timeout时间,若阻塞时间到达,仍添加失败,直接返回false |
put(e) |
队列容量已满,阻塞等待直到队列中有可添加的空间 |
3.2、获取元素
出队流程如下:
LinkedBlockingQueue#dequeue() 详情如下:
1 // 从队列头元素中移除并返回元素 2 private E dequeue() { 3 // 获取队列头节点 4 Node<E> h = head; 5 // 获取队列头节点的next 6 Node<E> first = h.next; 7 // 将当前队列头节点的next作为新的头节点,原头节点通过GC回收 8 h.next = h; 9 head = first; 10 // 获取并记录新头节点的元素值 11 E x = first.item; 12 // 当前节点的元素值设置为null 13 first.item = null; 14 // 返回当前节点的元素值 15 return x; 16 }
3.2.1、peek() - 获取元素
LinkedBlockingQueue#peek() 详情如下:
1 // 获取队列头节点 2 public E peek() { 3 // 队列中无可取的元素 4 if (count.get() == 0) 5 // 返回null 6 return null; 7 // 获取锁对象 8 final ReentrantLock takeLock = this.takeLock; 9 // 加锁 10 takeLock.lock(); 11 try { 12 // 获取队列元素头节点 13 Node<E> first = head.next; 14 // 头节点为null,返回null 15 if (first == null) 16 return null; 17 // 头节点不为null,返回节点中的元素值 18 else 19 return first.item; 20 } finally { 21 // 释放锁 22 takeLock.unlock(); 23 } 24 }
peek()获取队列中的头元素,若队列中无可取的元素,返回null;获取head的next节点,若该节点为null,则返回null,否则返回节点中的item值。
3.2.2、poll() - 获取元素
LinkedBlockingQueue#poll() 详情如下:
1 // 获取队列头节点 2 public E poll() { 3 // 获取队列中的元素总数 4 final AtomicInteger count = this.count; 5 // 队列中无可取的元素,返回null 6 if (count.get() == 0) 7 return null; 8 // 返回元素变量 9 E x = null; 10 // 本次获取元素操作前,队列中元素的总数 11 int c = -1; 12 // 获取 take 的锁对象 13 final ReentrantLock takeLock = this.takeLock; 14 // 加锁 15 takeLock.lock(); 16 try { 17 // 队列中有可取元素 18 if (count.get() > 0) { 19 // 获取队列头元素 20 x = dequeue(); 21 // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1 22 c = count.getAndDecrement(); 23 // 本次获取元素前,队列非空,至少有两个可取元素 24 if (c > 1) 25 // 唤醒其他获取队列元素的线程 26 notEmpty.signal(); 27 } 28 } finally { 29 // 释放锁资源 30 takeLock.unlock(); 31 } 32 // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1, 33 // 队列中有可用空间,唤醒put线程添加元素 34 if (c == capacity) 35 signalNotFull(); 36 // 返回元素值 37 return x; 38 }
poll()获取队列中的头元素,若队列中无可取的元素,返回null;若队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程,若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。
3.2.3、poll(timeout, unit)
LinkedBlockingQueue#poll(timeout, unit) 详情如下:
1 // 获取队列头节点(阻塞指定时间) 2 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 3 // 返回元素变量 4 E x = null; 5 // 本次获取元素操作前,队列中元素的总数 6 int c = -1; 7 // 转换为纳秒 8 long nanos = unit.toNanos(timeout); 9 // 当前队列中元素总数 10 final AtomicInteger count = this.count; 11 // 获取锁对象 12 final ReentrantLock takeLock = this.takeLock; 13 // 加锁 14 takeLock.lockInterruptibly(); 15 try { 16 // 当前队列可取元素为空 17 while (count.get() == 0) { 18 // 阻塞时间已达到 19 if (nanos <= 0) 20 // 返回null 21 return null; 22 // 阻塞时间未达到,继续阻塞,并返回剩余阻塞时间 23 nanos = notEmpty.awaitNanos(nanos); 24 } 25 // 获取队列头元素 26 x = dequeue(); 27 // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1 28 c = count.getAndDecrement(); 29 // 本次获取元素前,队列非空,至少有两个可取元素 30 if (c > 1) 31 // 唤醒其他获取队列元素的线程 32 notEmpty.signal(); 33 } finally { 34 // 释放锁资源 35 takeLock.unlock(); 36 } 37 // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1, 38 // 队列中有可用空间,唤醒put线程添加元素 39 if (c == capacity) 40 signalNotFull(); 41 // 返回元素值 42 return x; 43 }
poll(timeout, unit)获取队列中的头元素,若队列中无可取的元素,阻塞timeout时间,阻塞时间已达到仍未获取到头元素,返回null;若获取到队列头元素,并且队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程(poll、take),若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。
3.2.4、take() - 阻塞获取元素
LinkedBlockingQueue#take() 详情如下:
1 // 获取队列头节点(阻塞) 2 public E take() throws InterruptedException { 3 // 返回元素变量 4 E x; 5 // 本次获取元素操作前,队列中元素的总数 6 int c = -1; 7 // 当前队列中元素总数 8 final AtomicInteger count = this.count; 9 // 获取锁对象 10 final ReentrantLock takeLock = this.takeLock; 11 // 加锁 12 takeLock.lockInterruptibly(); 13 try { 14 // 当前队列可取元素为空,挂起线程,待put、offer添加元素成功后,唤醒线程 15 while (count.get() == 0) { 16 notEmpty.await(); 17 } 18 // 获取队列头元素 19 x = dequeue(); 20 // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1 21 c = count.getAndDecrement(); 22 // 本次获取元素前,队列非空,至少有两个可取元素 23 if (c > 1) 24 // 唤醒其他获取队列元素的线程 25 notEmpty.signal(); 26 } finally { 27 // 释放锁 28 takeLock.unlock(); 29 } 30 // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1, 31 // 队列中有可用空间,唤醒put线程添加元素 32 if (c == capacity) 33 signalNotFull(); 34 // 返回元素值 35 return x; 36 }
take()获取队列中的头元素,若队列中无可取的元素,挂起线程,等到put/offer添加元素成功后唤醒put线程;获取队列头元素,若队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程,若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。
3.2.5、总结
方法 | 不同点 |
peek() | 队列无可取元素,获取失败,直接返回null |
poll() | 队列无可取元素,直接返回null |
poll(timeout,unit) | 队列无可取元素,阻塞等待timeout时间,若阻塞时间到达,仍获取失败,直接返回null |
take() | 队列无可取元素,阻塞等待直到队列中有可取元素 |
3.3、删除元素
元素移除队列,LinkedBlockingQueue#remove() 详情如下:
1 // 删除元素 2 public boolean remove(Object o) { 3 // 删除元素为null,返回false 4 if (o == null) return false; 5 // 加锁,写锁/读锁 6 fullyLock(); 7 try { 8 // 遍历单向链表 9 for (Node<E> trail = head, p = trail.next; 10 p != null; 11 trail = p, p = p.next) { 12 // 匹配到链表中的元素 13 if (o.equals(p.item)) { 14 // 将当前Node从链表中移除 15 unlink(p, trail); 16 // 返回移除成功标识 17 return true; 18 } 19 } 20 // 返回移除成功标识 21 return false; 22 } finally { 23 fullyUnlock(); 24 } 25 } 26 27 // 加锁 写锁、读锁 28 void fullyLock() { 29 putLock.lock(); 30 takeLock.lock(); 31 } 32 33 // 释放锁 写锁、读锁 34 void fullyUnlock() { 35 takeLock.unlock(); 36 putLock.unlock(); 37 }
Node移除链表,LinkedBlockingQueue#unlink() 详情如下:
1 // 移除Node 2 void unlink(Node<E> p, Node<E> trail) { 3 // 当前Node的item设置为null 4 p.item = null; 5 // 将当前Node的上一节点prev指向当前Node的上一节点next 6 trail.next = p.next; 7 // 若p是尾节点,则当前Node的prev设置为尾节点 8 if (last == p) 9 last = trail; 10 // 如果当前队列满,删除后,也不忘记唤醒等待的线程 11 if (count.getAndDecrement() == capacity) 12 notFull.signal(); 13 }
4、总结
LinkedBlockingQueue基于单向链表实现元素存取,链表中的节点用Node表示,Node中包含当前元素值及指向下一节点的指向next。LinkedBlockingQueue内部维护Atomic原子类count成员变量,记录当前数组中的元素数量,维护当前队列头节点head、尾节点last。LinkedBlockingQueue内部提供了读锁和写锁,读写不互斥。LinkedBlockingQueue数据结构详情如下:
添加元素时,优先判断count值是否等于队列容量,即队列是否满了,若队列满了,调用不同的添加方法有不同的结果。若在添加元素前队列为空,在添加元素后,唤醒(take)获取元素线程;
获取元素时,优先判断count值是否等于0,即队列是否为空,对队列为空,直接返回null;poll/take在获取元素前,队列中有多个元素,唤醒(take)获取线程,若在获取队列前队列已满,在获取元素后,队列中有可用空间,唤醒(put)添加元素线程。
LinkedBlockingQueue阻塞添加、获取队列元素是基于AQS中的ConditionObject实现的。
标签:分析,count,队列,元素,LinkedBlockingQueue,获取,源码,线程,添加 From: https://www.cnblogs.com/RunningSnails/p/17409258.html