首页 > 编程语言 >【Java 并发】【十】【JUC数据结构】【九】ConcurrentLinkedQueue 原理

【Java 并发】【十】【JUC数据结构】【九】ConcurrentLinkedQueue 原理

时间:2024-02-10 18:22:06浏览次数:42  
标签:JUC Java 队列 next item 线程 ConcurrentLinkedQueue null 节点

1  前言

JDK 中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻 塞队列和非阻塞队列,前者使用锁实现,而后者则使用CAS非阻塞算法实现。这节我们来看看 ConcurrentLinkedQueue。

2  ConcurrentLinkedQueue 介绍

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,其底层数据结构使用单向链表 实现,对于入队和出队操作使用CAS来实现线程安全。下面我们来看具体实现。

为了能从全局直观地了解ConcurrentLinkedQueue 的内部构造,先简单介绍 ConcurrentLinkedQueue 的类图结构,如图7-1 所示。

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型 的Node节点分别用来存放队列的首、尾节点。从下面的无参构造函数可知,默认头、尾 节点都是指向item为null的哨兵节点。新元素会被插入队列末尾,出队时从队列头部获 取一个元素。

public ConcurrentLinkedQueue() { 
    head = tail = new Node<E>(null); 
}

在Node节点内部则维护一个使用volatile修饰的变量item,用来存放节点的值;next 用来存放链表的下一个节点,从而链接为一个单向无界链表。其内部则使用UNSafe工具 类提供的CAS算法来保证出入队时操作链表的原子性。

3  ConcurrentLinkedQueue 源码分析

从源码来看看ConcurrentLinkedQueue 的几个主要方法的实现原理。

3.1  offer 操作

offer 操作是在队列末尾添加一个元素,如果传递的参数是null则抛出NPE异常,否 则由于ConcurrentLinkedQueue 是无界队列,该方法一直会返回true。另外,由于使用 CAS无阻塞算法,因此该方法不会阻塞挂起调用线程。下面具体看下实现原理。

public boolean offer(E e) {
    //(1)e为null则抛出空指针异常 
    checkNotNull(e);
    //(2)构造Node节点,在构造函数内部调用unsafe.putObject 
    final Node<E> newNode = new Node<E>(e);
    //(3)从尾节点进行插入 
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        //(4)如果q==null说明p是尾节点,则执行插入 
        if (q == null) {
            //(5)使用CAS设置p节点的next节点 
            if (p.casNext(null, newNode)) {
                //(6)CAS成功,则说明新增节点已经被放入链表,然后设置当前尾节点(包含head,第
                //1,3,5...个节点为尾节点) 
                if (p != t)
                    casTail(t, newNode);  // Failure is OK. 
                return true;
            }
        }
        else if (p == q)//(7) 
            //多线程操作时,由于poll操作移除元素后可能会把head变为自引用,也就是head的next变
            //成了head,所以这里需要 
            //重新找新的head 
            p = (t != (t = tail)) ? t : head;
        else
            //(8) 寻找尾节点 
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

下面结合图来讲解该方法的执行流程。

(1)首先看当一个线程调用offer(item)时的情况。首先代码(1)对传参进行空检查, 如果为null则抛出NPE异常,否则执行代码(2)并使用item作为构造函数参数创建一 个新的节点,然后代码(3)从队列尾部节点开始循环,打算从队列尾部添加元素,当执 行到代码(4)时队列状态如图7-2所示。

 这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的 next 节点为null,所以这里q也指向null。代码(4)发现q==null则执行代码(5),通 过CAS原子操作判断p节点的next节点是否为null,如果为null则使用节点newNode替 换p的next节点,然后执行代码(6),这里由于p==t所以没有设置尾部节点,然后退出 offer 方法,这时候队列的状态如图7-3所示。

 (2)上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在 多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用 offer(item2),同时执行到代码(5)p.casNext(null, newNode)。由于CAS 的比较设置操作 是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的next节点确实 是null,则会原子性地更新next节点为 item1,这时候线程B也会判断p的next节点是否 为null,结果发现不是null(因为线程A已经设置了p的next节点为 item1),则会跳到代 码(3),然后执行到代码(4),这时候的队列分布如图7-4所示。

根据上面的状态图可知线程B接下来会执行代码(8),然后把q赋给了p,这时候队 列状态如图7-5所示。

然后线程B再次跳转到代码(3)执行,当执行到代码(4)时队列状态如图7-6所示。

由于这时候q==null,所以线程B会执行代码(5),通过CAS操作判断当前p的next 节点是否是null,不是则再次循环尝试,是则使用item2替换。假设CAS成功了,那么执 行代码(6),由于p!=t,所以设置tail节点为item2,然后退出offer方法。这时候队列分 布如图7-7所示。

分析到现在,就差代码(7)还没走过,其实这一步要在执行poll操作后才会执行。 这里先来看一下执行poll操作后可能会存在的一种情况,如图7-8所示。

 

下面分析当队列处于这种状态时调用offer添加元素,执行到代码(4)时的状态图(见 图7-9)。

这里由于q节点不为空并且p==q所以执行代码(7),由于t==tail所以p被赋值为 head,然后重新循环,循环后执行到代码(4),这时候队列状态如图7-10所示。

这时候由于q==null,所以执行代码(5)进行CAS操作,如果当前没有其他线程执 行offer 操作,则CAS操作会成功,p的next节点被设置为新增节点。然后执行代码(6), 由于p!=t所以设置新节点为队列的尾部节点,现在队列状态如图7-11所示。

需要注意的是,这里自引用的节点会被垃圾回收掉。

可见,offer 操作中的关键步骤是代码(5),通过原子CAS操作来控制某时只有一 个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环一次次尝试进行 CAS操作,直到CAS成功才会返回,也就是通过使用无限循环不断进行CAS尝试方式来 替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。

3.2  add操作

add 操作是在链表末尾添加一个元素,其实在内部调用的还是offer操作。

public boolean add(E e) {
    return offer(e);
}

3.3  poll 操作

poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回null。下面看看它 的实现原理。

public E poll() {
    //(1) goto标记
    restartFromHead:
    //(2)无限循环
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(3)保存当前节点值
            E item = p.item;
            //(4)当前节点有值则CAS变为null
            if (item != null && p.casItem(item, null)) {
                //(5)CAS成功则标记当前节点并从链表中移除
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //(6)当前队列为空则返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7)如果当前节点被自引用了,则重新寻找新的队列头节点
            else if (p == q)
                continue restartFromHead;
            else//(8)
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

同样,也结合图来讲解代码执行逻辑。

I.poll 操作是从队头获取元素,所以代码(2)内层循环是从head节点开始迭代,代 码(3)获取当前队列头的节点,队列一开始为空时队列状态如图7-12所示。

由于head节点指向的是item为null的哨兵节点,所以会执行到代码(6),假设这个 过程中没有线程调用offer方法,则此时q等于null,这时候队列状态如图7-13所示。

所以会执行updateHead方法,由于h等于p所以没有设置头节点,poll方法直接返回 null。

II.假设执行到代码(6)时已经有其他线程调用了offer方法并成功添加一个元素到 队列,这时候q指向的是新增元素的节点,此时队列状态如图7-14所示。

所以代码(6)判断的结果为false,然后会转向执行代码(7),而此时p不等于q, 所以转向执行代码(8),执行的结果是p指向了节点q,此时队列状态如图7-15所示。

然后程序转向执行代码(3), p现在指向的元素值不为null,则执行p.casItem(item, null) 通过 CAS 操作尝试设置p的item值为null,如果此时没有其他线程进行poll操作, 则CAS成功会执行代码(5),由于此时p!=h所以设置头节点为p,并设置h的next节点 为h自己,poll然后返回被从队列移除的节点值item。此时队列状态如图7-16所示。

这个状态就是在讲解offer操作时,offer代码的执行路径(7)的状态。

III.假如现在一个线程调用了poll操作,则在执行代码(4)时队列状态如图7-17所示。

这时候执行代码(6)返回null。

IV.现在poll的代码还有分支(7)没有执行过,那么什么时候会执行呢?下面来看看。 假设线程A执行poll操作时当前队列状态如图7-18所示。

那么执行p.casItem(item, null) 通过 CAS 操作尝试设置p的item值为null, 假设CAS 设置成功则标记该节点并从队列中将其移除,此时队列状态如图7-19所示。

然后,由于p!=h,所以会执行updateHead方法,假如线程A执行updateHead前另外 一个线程B开始poll操作,这时候线程B的p指向head节点,但是还没有执行到代码(6), 这时候队列状态如图7-20所示

然后线程A执行updateHead操作,执行完毕后线程A退出,这时候队列状态如图7-21 所示。

然后线程B继续执行代码(6), q=p.next,由于该节点是自引用节点,所以p==q,所 以会执行代码(7)跳到外层循环restartFromHead,获取当前队列头head,现在的状态如 图7-22 所示。 

总结:poll方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item值 设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤 立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改 了,要跳到外层循环重新获取新的头节点。

 

3.4  peek 操作

peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回null。下 面看下其实现原理。

public E peek() {
    //(1)
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(2)
            E item = p.item;
            //(3)
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //(4)
            else if (p == q)
                continue restartFromHead;
            else
                //(5) 
                p = q;
        }
    }
}

Peek 操作的代码结构与poll操作类似,不同之处在于代码(3)中少了castItem操作。 其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们 知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一 次执行peek时在代码(3)中会发现item==null,然后执行q=p.next,这时候q节点指向 的才是队列里面第一个真正的元素,或者如果队列为null则q指向null。

当队列为空时队列状态如图7-23所示。

这时候执行updateHead,由于h节点等于p节点,所以不进行任何操作,然后peek 操作会返回null。

当队列中至少有一个元素时(这里假设只有一个),队列状态如图7-24所示。

这时候执行代码(5),p指向了q节点,然后执行代码(3),此时队列状态如图7-25所示。

执行代码(3)时发现item不为null,所以执行updateHead方法,由于h!=p,所以设 置头节点,设置后队列状态如图7-26所示。

也就是剔除了哨兵节点。

总结:peek操作的代码与poll操作类似,只是前者只获取队列头元素但是并不从队 列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用peek操作时, 会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。

3.5  size 操作

计算当前队列元素个数,在并发环境下不是很有用,因为CAS没有加锁,所以从调 用size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // 
            最大值Integer.MAX_VALUE
    if (++count == Integer.MAX_VALUE)
        break;
    return count;
}
//获取第一个队列元素(哨兵元素不算),没有则为null 
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
//获取当前节点的next元素,如果是自引入节点则返回真正的头节点 
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

3.6  remove 操作

如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回true, 否则返回false。

public boolean remove(Object o) {
    //为空,则直接返回false
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
    //相等则使用CAS设置为null,同时一个线程操作成功,失败的线程循环查找队列中是否有匹配的其他元素。 
        if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)) {
            //获取next元素 
            Node<E> next = succ(p);
            //如果有前驱节点,并且next节点不为空则链接前驱节点到next节点 
            if (pred != null && next != null)
                pred.casNext(p, next);
            return true;
        }
        pred = p;
    }
    return false;
}

3.7  contains 操作

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像size 操作一样结果也 不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该 元素删除了,那么就会返回false。

public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

4  小结

ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被 包装成一个Node节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行peek或者first操作时会把head指向第一个真正的队 列元素。由于使用非阻塞CAS算法,没有加锁,所以在计算size时有可能进行了offer、poll 或者remove 操作,导致计算的元素个数不精确,所以在并发情况下size函数不是很 有用。

如图7-27所示,入队、出队都是操作使用volatile修饰的tail、head节点,要保证在 多线程下出入队线程安全,只需要保证这两个Node操作的可见性和原子性即可。由于 volatile 本身可以保证可见性,所以只需要保证对两个变量操作的原子性即可。

offer 操作是在tail 后面添加元素,也就是调用tail.casNext方法,而这个方法使用的是 CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail,再执行casNext 方法。poll操作也通过类似CAS的算法保证出队时移除节点操作的原子性。

标签:JUC,Java,队列,next,item,线程,ConcurrentLinkedQueue,null,节点
From: https://www.cnblogs.com/kukuxjx/p/18012960

相关文章

  • Java之泛型系列--继承父类与实现多个接口(有示例)
    原文网址:​​Java之泛型系列--继承父类与实现多个接口(有示例)_IT利刃出鞘的博客-CSDN博客​​简介本文介绍java如何用泛型表示继承父类并实现多个接口。用泛型表示某个类是某个类的子类或者实现了接口的方法为:<TextendsA&B&C> 用法1:全都是接口。对于本例来说:A、B......
  • JAVA中this和super
    thisthis表示使用的对象本身,可以调用对象的属性和对象的方法以及对象的构造方法(this.x和this.(),其中this.()只能在构造方法的第一行调用且不能和super.()共同使用)使用原因避免属性和方法变量名相同时出现就近原则的冲突使用细节supersuper代表父类的引用,用于访问父类......
  • JAVA构造方法
    构造方法介绍语法使用细节关于在继承中新增的构造方法使用细节1子类必须要调用父类的构造器,完成父类的初始化2父类构造器的调用不限于直接父类!将一直往上追溯直到Object类(顶级父类)3当创建子类对象时,不管使用的是子类的哪个构造器,默认情况下总会调用父类......
  • JAVA的4种访问修饰符
    1、基本介绍补充:1属性和方法可以用四种访问修饰符修饰;类只能用public和默认修饰符修饰,且一个.java文件中只能有一个public修饰的类作为主类,其他类用默认修饰符修饰。2访问权限起作用的情况:①在一个类中定义另一个类的对象,当访问该对象的属性或方法时,修饰符根据同类、同......
  • java中使用opencl操作GPU
    需要管理GPU资源,使用java编写,选用opencl框架,并且选择org.jocl包(<dependency><groupId>org.jocl</groupId><artifactId>jocl</artifactId><version>2.0.5</version></dependency>)。具体opencl原理此处不涉及,仅记录使用java该如何做基本操作。最少要以下几步,详细可以参看:ht......
  • JAVA零钱通面向过程和oop
    编程思想1.每一个小功能对应一个代码块,高内聚低耦合。2.建议先排除不正确情况,而不是对每一个正确情况做一些操作。编程效果源码实现面向过程点击查看代码packagelingqiantong.chuantong;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.uti......
  • Java中String、StringBuffer、StringBuilder的区别以及使用场景总结
    Java中,String、StringBuffer和StringBuilder都用于处理字符串,但在功能和性能上有显著的区别。了解这些区别有助于选择最适合特定情境的类型。在选择使用String、StringBuffer或StringBuilder时,应根据字符串操作的性能需求和线程安全要求来做出决定。1、String、StringBuffer、......
  • Java break、continue 详解与数组深入解析:单维数组和多维数组详细教程
    JavaBreak和ContinueJavaBreak:break语句用于跳出循环或switch语句。在循环中使用break语句可以立即终止循环,并继续执行循环后面的代码。在switch语句中使用break语句可以跳出当前case,并继续执行下一个case。示例://循环示例for(inti=0;i<10;i++......
  • Java并发编程-CompletableFuture(上)
    大家好,我是小高先生,这篇文章我将和大家一起学习Java并发编程中很重要的一个类-CompletableFuture。 在Java的并发编程领域,Future接口一直扮演着关键的角色,它定义了一组与异步任务执行相关的方法,包括获取异步任务的结果、取消任务执行以及检查任务是否已完成等。然而,随着业务场......
  • Java并发编程-CompletableFuture(下)
    大家好,我是小高先生,书接上文,我们继续来学习CompletableFuture。上文我们讲了基础装Future是如何升级为神装CompletableFuture以及如何购买CompletableFuture,接下来我们一起来学习如何在战斗中使用CompletableFuture。CompletableFuture的基本使用CompletableFuture的实战案例C......