首页 > 编程语言 >JAVA并发容器-ConcurrentLinkedQueue 源码分析

JAVA并发容器-ConcurrentLinkedQueue 源码分析

时间:2022-11-04 14:10:14浏览次数:72  
标签:NULL JAVA item tail 源码 ConcurrentLinkedQueue null 节点 指针


在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

ConcurrentLinkedQueue是一个非阻塞的基于链表节点的无界线程安全队列,它遵循先进先出的原则,并采用了“wait-free”算法(即CAS算法)来实现。

类关系图

JAVA并发容器-ConcurrentLinkedQueue 源码分析_头结点

核心属性

// 头结点指针(不一定指向头,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> head;

// 尾节点指针(不一定指向尾,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> tail;

// 链表节点
private static class Node<E> {
// 节点元素(存数据)
volatile E item;
// 下一个节点
volatile Node<E> next;
...
}

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和 指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一 张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

注意:

  • 核心属性都是用volatile来修饰来保证数据的可见性。
  • 默认情况下头结点和尾节点是相等的对应节点的​​item​​​为​​null​
  • 头和尾节点都是缓更新,所以他们只能代表一个逻辑上的位置,不是实际的头尾节点

offer() 入队方法

public boolean offer(E e) {
// 检查元素是否为NULL,这里不允许存NULL值
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 自旋,t是tail节点的引用,p表示尾节点,默认情况下p=t的
for (Node<E> t = tail, p = t; ; ) {
// q是p的下一个节点
Node<E> q = p.next;
// q==null表示p是尾节点,否则需要更新p进入下一次循环
if (q == null) {
// 使用CAS将新节点设置为p的next节点
if (p.casNext(null, newNode)) {
// 不是每次添加新节点都会更新尾节点指针,每新增两个节点才会更新尾节点指针。
// 这样做的好处是减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。
if (p != t) // hop two nodes at a time
// 即使设置失败也没有问题,说明其他线程已经更新了尾节点的值
casTail(t, newNode);

// 永远返回true
return true;
}
}
// 定位尾节点
// 删除节点的时候,会把删除的节点指向自己(自引用),就会发生p==q. 当添加第一个节点后tail也会发生自引用
else if (p == q)
// t != tail表示尾节点指针已经被移动过,这时p直接指向新的尾节点指针tail,否则p直接指向头结点指针,
// 因为这时p已经被移出了链表,所以不能将p指向p.next
p = (t != (t = tail)) ? t : head;
else
// t != tail表示尾节点指针已经被移动过,所以p直接指向新的尾节点指针(tail),否则p指向p.next
p = (p != t && t != (t = tail)) ? t : q;
}
}

入队过程:

  1. 检查元素是否为NULL,如果是NULL直接抛出异常
  2. 初始化新节点​​newNode​
  3. 找到尾节点指针```t``
  4. 根据尾节点指针​​t​​​找到真正的尾节点​​p​
  5. CAS将​​p​​​的​​next​​​指向​​newNode​
  6. 判断是否更新尾节点指针的位置

注意:

  • ConcurrentLinkedQueue 是不允许存NULL值的,NULL有特殊含义,表示已经从链表中移除(出队了)
  • 入队过程大致分为三步,第一步找到真正的尾节点​​p​​​;第二步CAS更新​​p.next​​​;第三步CAS更新尾节点指针的位置​​casTail(t, newNode);​
  • ​t != tail​​表示尾节点指针已经被移动过
  • ​p == q​​​表示​​p​​节点已经从链表中移除,并发生了自引用
  • 每新增两个节点才会更新尾节点指针, 这样能减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。从本质上来 看它通过增加对volatile变量的读操作来减少对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

入队快照图

JAVA并发容器-ConcurrentLinkedQueue 源码分析_头结点_02

入队debug图

JAVA并发容器-ConcurrentLinkedQueue 源码分析_头结点_03

poll()出队方法

public E poll() {
// 外循环标记
restartFromHead:
for (; ; ) {
// h是头结点指针,p头结点,q是p的下一个节点
for (Node<E> h = head, p = h, q; ; ) {
// 保存头节点的值
E item = p.item;
// CAS替换头结点的item值为null,如果item为null表示该节点已经从链表中移除了
if (item != null && p.casItem(item, null)) {
// 每移除两个元素,头节点指针才开始移动
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为NULL
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 发生了自引用,重头再来
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

出队过程:

  1. 根据头结点指针找到头结点
  2. 暂存头结点的值
  3. 判断节点内容​​item​​​是否为​​NULL​​​,如果为​​NULL​​表示该节点已经出队了
  4. 如果​​item​​​不为​​NULL​​​则使用CAS替换为​​NULL​​,如果成功判断是否需要更新头结点指针
  5. 如果失败表示该节点已经被其他线程出队了,寻找新的头结点,继续第二步

注意:

  • 每删除两个节点才会更新头点指针, 这样能减少了更新头点的竞争。
  • 当节点的​​item==null​​时表示节点已经出队了

测试代码

public class ConcurrentLinkedQueueTset {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<>();

linkedQueue.offer(1);
linkedQueue.offer(2);
linkedQueue.poll();
linkedQueue.offer(3);
linkedQueue.offer(4);
linkedQueue.poll();
linkedQueue.poll();
}
}

参考

《java并发编程的艺术》

源码

​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下


标签:NULL,JAVA,item,tail,源码,ConcurrentLinkedQueue,null,节点,指针
From: https://blog.51cto.com/u_15861563/5823694

相关文章

  • JAVA并发容器-ConcurrentSkipListMap,ConcurrentSkipListSet
    ConcurrentSkipListMap其实是TreeMap的并发版本。TreeMap使用的是红黑树,并且按照key的顺序排序(自然顺序、自定义顺序),但是他是非线程安全的,如果在并发环境下,建议使用Concurre......
  • JAVA并发容器-写时复制容器
    写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将......
  • ThreadPoolExecutor 源码解析
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理地使用线程池能够带来3个好处:降低资源消耗。通过重复利用已创建的线......
  • Java中的指令重排
    在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分3种类型:编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。......
  • Spring 源码(七)Spring 事务源码解析
    注册后置处理器开启对事务的支持@EnableTransactionManagement​​@EnableTransactionManagement​​注解的主要作用是开启对事务的支持,源码如下:@Target(ElementType.TYPE)@......
  • FutureTask 源码解析
    Future接口和实现Future接口的FutureTask类,代表异步计算的结果。FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用......
  • CompletionService 源码解析
    ​​CompletionService​​​的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在​​ExecutorCompletionService​​中。类图核心内部类privat......
  • JAVA并发容器-ConcurrentHashMap 1.7和1.8 源码解析
    HashMap是一个线程不安全的类,在并发情况下会产生很多问题,详情可以参考​​HashMap源码解析​​;HashTable是线程安全的类,但是它使用的是synchronized来保证线程安全,线程竞争......
  • Java中的锁
    锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。Lock和synchronized......
  • HashMap 源码解析
    源码学习,边看源码边加注释,边debug,边理解。基本属性常量DEFAULT_INITIAL_CAPACITY:默认数组的初始容量-必须是2的幂。MAXIMUM_CAPACITY:数组的最大容量DEFAULT_LOAD_FACTOR:哈......