首页 > 编程语言 >LinkedBlockingQueue源码解析

LinkedBlockingQueue源码解析

时间:2023-01-10 11:31:10浏览次数:45  
标签:count return LinkedBlockingQueue putLock queue 源码 解析 takeLock final


java.util.concurrent.LinkedBlockingQueue是一个底层为单向链表的,有界的,FIFO阻塞队列;访问和移除操作是在队头,添加操作在队尾进行,并且使用不同的锁进行保护。

在使用线程池时,如下两种方式创建的线程池,默认都是使用的LinkedBlockingQueue:

Executors.newFixedThreadPool(3);
Executors.newSingleThreadExecutor();

下面分析一下源码中几个关键属性和方法:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;

//数据结构 单向链表 ,只有后继指针
static class Node<E> {
E item;
Node<E> next;//后继指针
Node(E x) { item = x; }
}
//容量,最大值为int的最大值
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

//当前队列元素数量,用的AtomicInteger类型。因为读写使用的是不同的锁,都会访问这个属性,需要是个线程安全的类
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

//头结点 此结点之前无元素
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

//尾结点 此结点之后无元素
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;


//takeLock锁,执行take,poll操作时,会加这个锁 队头访问锁
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

//锁的条件 队头访问条件
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

//putLock锁,执行put,offer操作时,会加这个锁 队尾访问锁
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

//所得条件 队尾访问条件
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

//标识队列非空 仅由put/offer调用
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

//标识队列非满 仅有take/poll调用
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

//入队操作
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}


//出队操作
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

//确保所有的加锁和解锁的顺序都是按照这个顺序的,加锁和解锁的顺序是反的
//固定两把锁的加锁顺序
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

//固定两把锁的解锁顺序
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

...省略构造方法

//把指定元素添加到队尾,没有空间则一直等待
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

//和put作用一样,多了个时间,在指定时间内,把元素添加到队尾,如果没有空间就一直等待
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

//获取元素,如果没有元素会一直阻塞直到有元素返回
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

//移除元素 涉及到这个元素的前后节点,需要调用上面的加锁和解锁方法
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}


标签:count,return,LinkedBlockingQueue,putLock,queue,源码,解析,takeLock,final
From: https://blog.51cto.com/u_15936016/5999938

相关文章

  • 【项目源码】基于JavaEE的健康管理系统
    随着网络技术的不断发展,网站的开发与运用变得更加广泛。这次采用java语言SSH框架(Spring,Struts,Hibernate)设计并实现了面向特定群体的健康管理平台。该网站主要有教师饮食管......
  • [项目源码] JavaWeb校园宿舍管理系统
     jsp校园宿舍管理系统源码,采用Servlet+JSP+MySQL。包含数据库文件,界面采用bootstrap,简洁大方。      项目导入eclipse后的目录结构如下: 关注下面公众号,下载源码原......
  • ERP进销存系统源码
    介绍ERP进销存管理系统软件架构核心框架:SpringBoot2.0.0持久层框架:Mybatis1.3.2日志管理:Log4j2.10.0JS框架:Jquery1.8.0UI框架:EasyUI1.3.5模板框架:AdminLTE2.4.0项......
  • jeecgBoot对象字典解析
    接口:interface CommonService声明:publicJSONObjectconvertObjDict(Objectobj,booleanisIgnore,String...columns);实现类:classCommonServiceImplimplements......
  • Flutter异常监控 - 肆 | Rollbar源码赏析
    一.Rollbar可以帮你解决哪些问题无特别说明,文中Rollbar统指Rollbar-flutter1.代码复用Rollbar官方文档说是纯Dart实现,该特征意味着自带”代码复用”光环。如图当......
  • 用opencv的DNN模块做Yolov5目标检测(纯干货,源码已上传Github)
    最近在微信公众号里看到多篇讲解yolov5在openvino部署做目标检测文章,但是没看到过用opencv的dnn模块做yolov5目标检测的。于是,我就想着编写一套用opencv的dnn模块做yolov5......
  • 关于MyBatis查询属性封装到对象,对象为null的情况源码分析
    源码分析在DefaultResultSetHandler类中getRowValue方法创建映射类相应的对象,如果为列匹配到的值标识foundValues是false,表示没有为对象中任何一个字段映射到一个值,则......
  • xpath解析数据的方法
    1功能描述21.实例化一个etree对象,且需要将被解析的页面源码数据加载到该对象中32.调用etree对象中的XPath表达式实现标签的定位和内容捕获43.环境安装pipins......
  • Python编程之——推导式解析
    今天这篇文章总结一下Python的推导式,python推导式又叫解析式,是一种独特的数据处理方式,可以从一个数据序列构建另一个新的数据序列的结构体。列表推导式1)表达式  for ......
  • golang 解析xml 同时解析 attr 和 value
    xml长这样<ModifyKeys><ModifyKeyVKCode="91">LWin</ModifyKey><ModifyKeyVKCode="92">RWin</ModifyKey><ModifyKeyV......