首页 > 其他分享 >ConcurrentLinkedQueue详解(详细图文+动画演示)

ConcurrentLinkedQueue详解(详细图文+动画演示)

时间:2024-06-30 19:58:32浏览次数:20  
标签:动画 head ConcurrentLinkedQueue next tail null 节点 图文

目录

ConcurrentLinkedQueue详解

1、ConcurrentLinkedQueue简介

ConcurrentLinkedQueue 是 Java 中 java.util.concurrent 包下的一个非阻塞线程安全队列实现。
为什么要详细讲这个队列呢? 主要还是因为这个队列的高并发,无锁等特性。
同时这个队列总结完之后,就开始进入Java并发相关的内容总结了。

ConcurrentLinkedQueue 的一些关键特性:

  • ①、 非阻塞算法
    ConcurrentLinkedQueue 使用的是 Michael & Scott 算法(Michael & Scott 算法的提出者是 Maged M. Michael 和 Michael L. Scott)。Michael & Scott 是无锁算法,允许多个线程并发访问队列,而无需显式的锁(通过使用 CAS 等原子操作保证线程安全),从而减少了锁竞争和上下文切换的开销。

  • ②、 线程安全
    该队列是线程安全的,允许多个线程同时进行插入和删除操作。所有的操作(如插入、删除和遍历)都是原子操作,不会出现线程间的竞态条件。

  • ③、高性能
    由于采用了非阻塞算法,ConcurrentLinkedQueue 在高并发环境下性能优越,特别适用于需要高吞吐量的场景。

2、ConcurrentLinkedQueue继承体系

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

在这里插入图片描述
上面简介已经说了ConcurrentLinkedQueue 是个线程安全的队列,从继承结构也可以看出其实现了Queue接口,具有队列的功能。

3、ConcurrentLinkedQueue的构造函数

空参构造

public ConcurrentLinkedQueue() {
    // 初始化时,head 和 tail 都指向一个新的空节点
    head = tail = new Node<E>(null);
}

带一个集合参数的构造

public ConcurrentLinkedQueue(Collection<? extends E> c) {
    // h 和 t 分别表示链表的头节点和尾节点
    Node<E> h = null, t = null;

    // 遍历集合 c,将每个元素插入到队列中
    for (E e : c) {
        // 检查元素是否为 null,如果是则抛出 NullPointerException
        checkNotNull(e);

        // 创建一个包含元素 e 的新节点
        Node<E> newNode = new Node<E>(e);

        // 如果 h 为空,表示这是第一个元素,初始化 h 和 t
        if (h == null)
            h = t = newNode;
        else {
            // 否则,将新节点添加到链表的尾部
            t.lazySetNext(newNode);
            t = newNode;
        }
    }

    // 如果集合 c 为空,初始化 h 和 t 为一个新的空节点
    if (h == null)
        h = t = new Node<E>(null);

    // 将 head 和 tail 设置为链表的头和尾
    head = h;
    tail = t;
}

4、ConcurrentLinkedQueue的数据结构

ConcurrentLinkedQueue类的属性注释

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    // 使用 transient 和 volatile 修饰符来声明 head 和 tail,这两个字段不会被序列化,并且保证可见性
    // head 表示队列的头节点,指向队列的第一个有效元素
    private transient volatile Node<E> head;

    // tail 表示队列的尾节点,指向队列的最后一个有效元素
    private transient volatile Node<E> tail;
    
}

可以看到ConcurrentLinkedQueue类的属性并不多,我们主要关注一个头结点head,一个尾结点tail。

ConcurrentLinkedQueue真正存储元素的类Node<E>

可以看到 这里的Node<E>节点只有一个next指针指向下一个元素,说明ConcurrentLinkedQueue是一个单向链表的结构。
并且Node类中提供了一些CAS操作来更新节点的值。

// Node 类是一个私有的静态内部类,用于存储 ConcurrentLinkedQueue 中的元素
private static class Node<E> {
    // 存储节点中的元素,使用 volatile 修饰以确保可见性
    volatile E item;

    // 指向下一个节点,使用 volatile 修饰以确保可见性
    volatile Node<E> next;

    /**
     * 构造一个新节点。使用 relaxed write,因为只有在通过 casNext 发布后,item 才能被看到。
     * @param item 节点存储的元素
     */
    Node(E item) {
        // 使用 Unsafe 类直接操作内存,将 item 放入节点中
        UNSAFE.putObject(this, itemOffset, item);
    }

    // 使用 CAS 操作更新节点中的 item 字段
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 延迟设置 next 字段的值,使用有序写入
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    // 使用 CAS 操作更新节点中的 next 字段
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe 相关的字段和静态代码块
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            // 获取 Unsafe 实例
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            // 获取 item 字段的偏移量
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            // 获取 next 字段的偏移量
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

ConcurrentLinkedQueue数据结构图示

是不是有点印象,和LinkedBlockingQueue的数据结构是一样的。
在这里插入图片描述

自引用节点图示:
在这里插入图片描述
有些资料把这种节点叫哨兵节点。我还是习惯形似叫法,就叫它自引用节点吧。
自己引用自己的节点叫自引用节点没毛病吧,生动形象。

5、ConcurrentLinkedQueue的offer方法

offer 方法用于将指定的元素插入到 ConcurrentLinkedQueue 的尾部。
如果插入成功,返回 true。

元素在入队时的三种情况

  • ①、当p的后继节点为空时(此时p为真正的尾节点),尝试CAS增加新节点,成功后尝试更新尾节点tail;
  • ②、当p等于p的后继节点时(p的next指向自己,说明p是自引用节点(移除元素或者遍历元素时可能构造出自引用节点)
    此时判断尾节点是否被修改过,如果尾节点被修改过就定位到最新的尾节点,如果未被修改过(使用next无法继续遍历,因为自己指向了自己,next还是自己),只能定位到头节点,从头开始遍历;
  • ③、其他情况时,说明此时的p并不是真正的尾节点,需要定位到真正尾节点;此时如果p不是原来的尾节点并且尾节点被修改过,那就定位到尾节点,否则定位到p的后继节点q,继续遍历;
public boolean offer(E e) {
    // 检查元素是否为 null,如果是,则抛出 NullPointerException
    checkNotNull(e);

    // 创建一个包含元素 e 的新节点
    final Node<E> newNode = new Node<E>(e);

    // 使用 for 循环尝试将新节点插入到队列的尾部
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;

        // 如果 p.next 为空,表示 p 是当前的最后一个节点
        if (q == null) {
            // 尝试使用 CAS 操作将 newNode 插入到 p 之后
            if (p.casNext(null, newNode)) {
                // 成功的 CAS 操作是将 e 插入队列的尾部
                // newNode 现在成为队列中的实际节点

                // 如果 p 不是 tail,尝试更新 tail 指针
                if (p != t)
                    casTail(t, newNode);  // 失败也没关系
                return true;
            }
            // CAS 操作失败,表示其他线程插入了新节点,重新读取 next
        } else if (p == q) {
            // 如果 p == p.next,表示我们已经脱离了链表
            // 如果 tail 没有改变,说明 tail 也脱离了链表,需要从 head 重新开始
            // 否则,新的 tail 是一个更好的选择
            p = (t != (t = tail)) ? t : head;
        } else {
            // 检查 tail 指针是否更新,如果没有更新,则继续向前移动
            p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

总结下:

  • ①、检查输入:
    使用 checkNotNull(e) 检查元素是否为 null。如果元素为 null,则抛出 NullPointerException。

  • ②、创建新节点:
    使用 new Node(e) 创建一个包含元素 e 的新节点 newNode。

  • ③、循环插入节点:
    使用一个 for 循环不断尝试将 newNode 插入到队列的尾部。
    初始化 t 和 p 为当前的尾节点 tail。
    在循环中,通过 p.next 来检查当前节点是否是最后一个节点。
    如果 p.next 为空,则尝试使用 CAS 操作将 newNode 插入到 p 之后。
    如果 CAS 操作成功,将 newNode 插入队列,并尝试更新 tail 指针(如果 p 不是 tail)。
    如果 CAS 操作失败,表示有其他线程已经插入了新节点,重新读取 next 进行下一次尝试。
    如果 p == p.next,表示已经脱离了链表,需要重新设置 p 为 tail 或 head。
    如果 p.next 不为空且不等于 p,则向前移动 p,并在两步之后检查 tail 指针是否更新。

下面对变量再总结一下,方便下面进行多线程的分析:

offer方法中的变量

// t  表示队列临时的尾节点,初始时指向 tail。在多线程情况下,tail 可能随时变化。
// p  表示队列真正的尾节点,初始时与 t 相同,当p节点为真正尾节点时才允许添加新节点。
Node<E> t = tail, p = t;

//  q 表示 p 节点的下一个节点。
Node<E> q = p.next;

// 下面这段代码很重要 
// 这段代码检查 tail 是否发生了变化。如果 tail 发生了变化,p 会重新指向新的 tail;否则,p 会指向 head。
 p = (t != (t = tail)) ? t : head;

动画演示单线程环境下offer的入队过程

代码示例:

import java.util.concurrent.*;

public class TestA {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");
        queue.offer("C");
		queue.offer("D");
        System.out.println(queue);
    }
}

动画演示:
在这里插入图片描述
从源码的步骤中我们可以得知,当第一个元素添加完成后,head和tail指针并没有移动仍然是指向默认的空Node节点,当第二个元素添加完毕的时候tail指针才会移动到新添加的Node上。

我们继续添加两个元素

queue.offer("C");
queue.offer("D");

在这里插入图片描述
可以看到添加C的时候没有移动tail指针,添加D的时候tail指针移动到了尾部。

注意避坑☆☆☆☆☆

ConcurrentLinkedQueue断点offer方法或者打印队列的坑

我习惯利用反射打印集合的内部状态,来验证自己分析的是否正确。

但是这次遇到ConcurrentLinkedQueue算是彻彻底底掉进了坑里。

问题复现:
先描述下问题
当我去看offer方法的源码时,我发现offer("A")之后的确没有改变head和tail的指针,然后head和tail指针依然是指向dummy Node,也就是初始化ConcurrentLinkedQueue时的那个初始化节点,同时把该节点的next设置成A节点。

我为了验证下刚添加完A节点的状态,利用反射,打印了head和tail指针的hashCode,以及初始化节点的item和next,和队列queue。

然后坑就来了:

import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentLinkedQueue;

public class TestA {
    public static void main(String[] args) throws Exception {
        // 创建一个新的 ConcurrentLinkedQueue
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        Class<?> queueClass = queue.getClass();
        System.out.println("================刚初始化================");
        System.out.println(queue);
        Field head1 = queueClass.getDeclaredField("head");
        head1.setAccessible(true);
        Field tail1 = queueClass.getDeclaredField("tail");
        tail1.setAccessible(true);
        System.out.println("Head node: " + head1.get(queue).hashCode());
        System.out.println("Tail node: " + tail1.get(queue).hashCode());

        // 将元素 "A" 插入到队列中
        queue.offer("A");

        System.out.println("================插入A后================");
        // 打印队列
        System.out.println(queue);

        // 获取队列的 Class 对象
        Class<?> aClass = queue.getClass();

        // 获取 head 和 tail 字段
        Field headField = aClass.getDeclaredField("head");
        Field tailField = aClass.getDeclaredField("tail");
        headField.setAccessible(true);
        tailField.setAccessible(true);

        // 获取 head 和 tail 的值
        Object head = headField.get(queue);
        Object tail = tailField.get(queue);

        // 打印 head 和 tail 的值
        System.out.println("Head node: " + head.hashCode());
        System.out.println("Tail node: " + tail.hashCode());

        // 获取 head 和 tail 节点的 item 和 next 字段的值
        Field itemField = head.getClass().getDeclaredField("item");
        Field nextField = head.getClass().getDeclaredField("next");
        itemField.setAccessible(true);
        nextField.setAccessible(true);

        System.out.println("Head item: " + itemField.get(head));
        System.out.print("Head next: ");
        System.out.println(nextField.get(head) == null ? 0 : +nextField.get(head).hashCode());
        System.out.println("Tail item: " + itemField.get(tail));
        System.out.println("Tail next: " + nextField.get(tail).hashCode());
    }
}

运行结果:

================刚初始化================
[]
Head node: 356573597
Tail node: 356573597
================插入A后================
[A]
Head node: 1735600054
Tail node: 356573597
Head item: A
Head next: 0
Tail item: null
Tail next: 356573597

看到上面这个结果当时就懵了!!!
这个结果就对应下图(这谁看了不迷糊):
在这里插入图片描述

本来按照源码预想的结果应该是这样:

================刚初始化================
Head node: 356573597
Tail node: 356573597
================插入A后================
Head node: 356573597
Tail node: 356573597
Head item: null
Head next: 1735600054
Tail item: null
Tail next: 1735600054

应该对应这样的图:
在这里插入图片描述

所以到底是什么原因呢?

原因探究

为了去探究具体原因,我于是在 queue.offer("A");进行了断点,准备一步一步去看,结果当 queue.offer("A");执行完,return true;之后发现并没有什么特殊的。
但是再看head和tail的状态,就发现不对了, head指向了A节点, tail指向初始节点,初始节点的next指向了自身。到底是什么神秘力量修改了head和tail呢?

至此我是彻底懵了,仿佛陷入了罗生门,到底是凭看源码得到的结果对,还是断点看到的结果或者反射打印得到的结果对?

回到问题的本质

接下来就是漫长的排除法,首先排除多线程造成的影响,因为我使用的一直是单个主线程,再者排除反射的影响(一开始怀疑是反射的问题),因为反射只是获取队列内部成员变量的状态,并不会造成数据结构的改变,而且即使不用反射打印head和tail,debug也会出现 tail指向初始节点,初始节点的next指向自身,head指向A节点的问题。

开始查资料,百度,谷歌,chatGPT4o, 通义千问、文心一言。。。半天已经过去了。。。 没什么进展。。。

晚上,又去查了B站,分析ConcurrentLinkedQueue的视频到是不少,但是没有找到去断点offer方法的,大都是照着源码分析,结果可想而知,按照源码分析一定是下面的结果:
在这里插入图片描述
然后又去知乎,找到一篇关于ConcurrentLinkedQueue的文章找到了点突破口。

然后开始思考问题的本质
这个问题本质上是出现了指向自身的节点:
在这里插入图片描述
而这种把初始节点的next指针指向自身的操作,在ConcurrentLinkedQueue中是存在的。

我们就把 这种自身的next指针指向自己的节点叫自引用节点吧。

至于自引用节点的作用后续再说,现在先把这个坑给过了。

既然ConcurrentLinkedQueue内部存储的元素都是Node,那就从Node入手,看下Node中有哪些设置Node的next指针的方法:
可以看到有下面这两个方法:

void lazySetNext(Node<E> val) {
       UNSAFE.putOrderedObject(this, nextOffset, val);
}  

boolean casNext(Node<E> cmp, Node<E> val) {
     return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }

putOrderedObject是一个有序写操作。它确保写操作在Java内存模型中的某些情况下具有有序性,但不像volatile写操作那样强制所有的内存可见性保证。它可能比普通的volatile写操作稍微快一些。

为了找到问题 我需要找到到底哪些Node节点对象 调用了这两个方法,并且传的next值是对象本身。

最终发现是这个方法:

/**
 * 尝试将头节点更新为新的节点。如果成功,将旧的头节点设置为指向自身,作为一个自引用节点。
 * 
 * @param h 当前的头节点
 * @param p 新的头节点
 */
final void updateHead(Node<E> h, Node<E> p) {
    // 检查当前头节点和新头节点是否不同,并尝试使用CAS操作将头节点从h更新为p
    if (h != p && casHead(h, p)) {
        // 如果CAS操作成功,将旧的头节点的next字段设置为指向自身
        h.lazySetNext(h);
    }
}

h.lazySetNext(h); 这不就是把自己的next指针指向自己了嘛~
答案就近在眼前了。

我只要再找到是哪里调用了updateHead方法就行了。

发现有下面的方法调用了updateHead

poll()、 peek()、first() 

又懵了~。。。

我只调用了offer方法 添加了一个A ,压根没调用过什么 获取元素的方法呀~
不信邪了,继续看 哪里调用了poll()、 peek()、first() 。 这几个方法真的太多地方调用了。
经过漫长的查找。

终于在调用first方法的某个地方发现了端倪!!!
看这个是什么?
在这里插入图片描述
迭代器! 惊不惊喜、意不意外。

什么时候会调用迭代器? 我上面代码没有显式的调用迭代器来遍历元素。但是,但是,但是 System.out.println(queue);我打印了队列,会调用队列的toString。
实际上会调用AbstractCollection的toString方法:

public String toString() {
        Iterator<E> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            E e = it.next();
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

里面创建迭代器的方法调用的是 ConcurrentLinkedQueue的迭代器构造方法;

Iterator<E> it = iterator();

public Iterator<E> iterator() {
        return new Itr();
    }
Itr() {
    advance(); // 初始化时调用 advance 方法以定位第一个有效元素
}

private E advance() {
    lastRet = nextNode; // 记录当前节点
    E x = nextItem; // 记录当前元素

    Node<E> pred, p;
    if (nextNode == null) { // 如果当前节点为空,从头开始遍历
        p = first();
        pred = null;
    } else { // 否则从当前节点的下一个节点开始遍历
        pred = nextNode;
        p = succ(nextNode);
    }

    for (;;) {
        if (p == null) { // 如果节点为空,表示遍历结束
            nextNode = null;
            nextItem = null;
            return x;
        }
        E item = p.item; // 获取节点的元素
        if (item != null) { // 如果元素不为空,更新 nextNode 和 nextItem 并返回 x
            nextNode = p;
            nextItem = item;
            return x;
        } else { // 否则跳过空元素
            Node<E> next = succ(p); // 获取下一个节点
            if (pred != null && next != null)
                pred.casNext(p, next); // 使用 CAS 操作将前驱节点的 next 指针指向下一个节点
            p = next; // 更新当前节点
        }
    }
}

创建ConcurrentLinkedQueue的迭代器,会调用advance方法,这个方法里面会定位第一个有效元素。
p = first();

Node<E> first() {
    restartFromHead: // 定义一个标签,用于从头重新开始遍历
    for (;;) { // 无限循环,直到找到第一个有效节点或队列为空
        for (Node<E> h = head, p = h, q;;) { // 初始化 h 和 p 为队列的头节点
            boolean hasItem = (p.item != null); // 检查当前节点 p 是否有元素
            if (hasItem || (q = p.next) == null) { // 如果 p 有元素或者 p 是最后一个节点
                updateHead(h, p); // 更新队列的头节点
                return hasItem ? p : null; // 如果 p 有元素,返回 p,否则返回 null
            } else if (p == q) // 如果 p 自引用(即 p == p.next),重新从头遍历
                continue restartFromHead;
            else // 否则,移动到下一个节点
                p = q;
        }
    }
}

最终是迭代器调用了first方法,导致head和tail被更新出现下面的这种情况:
在这里插入图片描述

找到问题

终于找到问题了
原来是打印队列,导致队列创建迭代器,调用了first方法,然后把head指针设置到了新加的A节点,tail指针还是不变并且指向了自身。

这个时候把打印队列那两行代码注释掉:
再运行一下:

import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentLinkedQueue;

public class TestA {
    public static void main(String[] args) throws Exception {
        // 创建一个新的 ConcurrentLinkedQueue
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        Class<?> queueClass = queue.getClass();
        System.out.println("================刚初始化================");
//        System.out.println(queue);
        Field head1 = queueClass.getDeclaredField("head");
        head1.setAccessible(true);
        Field tail1 = queueClass.getDeclaredField("tail");
        tail1.setAccessible(true);
        System.out.println("Head node: " + head1.get(queue).hashCode());
        System.out.println("Tail node: " + tail1.get(queue).hashCode());

        // 将元素 "A" 插入到队列中
        queue.offer("A");

        System.out.println("================插入A后================");
        // 打印队列
//        System.out.println(queue);

        // 获取队列的 Class 对象
        Class<?> aClass = queue.getClass();

        // 获取 head 和 tail 字段
        Field headField = aClass.getDeclaredField("head");
        Field tailField = aClass.getDeclaredField("tail");
        headField.setAccessible(true);
        tailField.setAccessible(true);

        // 获取 head 和 tail 的值
        Object head = headField.get(queue);
        Object tail = tailField.get(queue);

        // 打印 head 和 tail 的值
        System.out.println("Head node: " + head.hashCode());
        System.out.println("Tail node: " + tail.hashCode());

        // 获取 head 和 tail 节点的 item 和 next 字段的值
        Field itemField = head.getClass().getDeclaredField("item");
        Field nextField = head.getClass().getDeclaredField("next");
        itemField.setAccessible(true);
        nextField.setAccessible(true);

        System.out.println("Head item: " + itemField.get(head));
        System.out.print("Head next: ");
        System.out.println(nextField.get(head) == null ? 0 : +nextField.get(head).hashCode());
        System.out.println("Tail item: " + itemField.get(tail));
        System.out.println("Tail next: " + nextField.get(tail).hashCode());
    }
}

运行结果:

================刚初始化================
Head node: 1254526270
Tail node: 1254526270
================插入A后================
Head node: 1254526270
Tail node: 1254526270
Head item: null
Head next: 662441761
Tail item: null
Tail next: 662441761

这就符合我们从源码观察到的结论了:
在这里插入图片描述

奇怪的事情又发生了

上面问题我们已经解决了,我高兴的认为只要我不打印队列,反射获取的队列内部head和tail的状态都是对的了,的确不打印队列得到了正确的结果。

基于此只要我不打印队列,我使用IDEA断点一定也和上面的结论一致(太天真了~)。

结果奇怪的事情又发生了。我断点的时候发现,又出现自引用节点了。head指针设置到了新加的A节点,tail指针还是不变并且指向了自身。

整个人瞬间又不好了。

不过还好有上面的排查过程,导致去思考这个IDEA的断点问题没走那么多弯路。
于是去查资料,IDEA断点会调用 toString吗? 结果还真是。

IDEA断点设置的坑

IDEA的断点默认设置:
在这里插入图片描述

把IDEA的断点设置 Enable ‘toString()’ object view: 选项给取消勾选。
再次断点 queue.offer("A");就正常了。

建议把上面标注的三个都取消勾选,这样debug能快些,对于集合的内部状态也能在断点的时候看到。
最后断点的效果如下:
在这里插入图片描述
大概花了大半天时间,趟过了这个坑。(详细记录下,以供读者参考避坑)

6、ConcurrentLinkedQueue的poll方法

poll方法 用于从队列头部移除并返回一个元素。
如果队列中存在至少一个元素,poll()方法会返回该元素并在队列中将其移除。
如果队列为空则返回null。

元素在出队时的四种情况

  • ①、当p为真正头节点时(即当前节点的元素item不为空),CAS将头节点数据设置为空,然后判断head是否为真正头节点,不是则更新头节点,然后将原来的头节点next指向它自己构建自引用节点,下面有动画演示;
  • ②、当p的后继节点为空时,说明队列遍历完毕,队列为空,尝试CAS将头节点修改成p;
  • ③、如果p的后继节点是它自己,说明有其他线程移除或者遍历了队列 构建成了自引用节点,跳出本次外部循环,重头开始遍历;
  • ④、其他情况继续向后遍历;
public E poll() {
    restartFromHead: // 定义一个标签,用于从头重新开始遍历
    for (;;) { // 无限循环,直到成功移除一个元素或确认队列为空
        for (Node<E> h = head, p = h, q;;) { // 初始化 h 和 p 为队列的头节点  初始化q表示当前节点的下一个节点,此时并没有赋值
            E item = p.item; // 获取当前节点 p 的元素

            if (item != null && p.casItem(item, null)) { 
                // 如果当前节点 p 有元素且成功使用 CAS 操作将其设为 null
                // 成功的 CAS 操作是元素从队列中移除的线性化点
                if (p != h) // 如果 p 不是头节点,则尝试跳过两个节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item; // 返回移除的元素
            } else if ((q = p.next) == null) {  // 赋值q为下一个节点
                // 如果下一个节点 q 为 null,表示队列遍历完毕
                updateHead(h, p); // 更新头节点为 p
                return null; // 队列为空,返回 null
            } else if (p == q) {
                // 如果 p 自引用(即 p == p.next),重新从头遍历
                continue restartFromHead;
            } else {
                // 否则,移动到下一个节点
                p = q;
            }
        }
    }
}

总结下步骤:

  • ①、标签 restartFromHead: 用于在检测到某个节点自引用(即 p == p.next )的情况下,重新从队列头开始遍历。

  • ②、无限循环 for ( ; ; ) : 持续尝试直到成功移除一个元素或确认队列为空。

  • ③、内部循环 for (Node<E> h = head, p = h, q;;) :
    初始化 hp 为队列的头节点(head)。
    q 用于存储 p 的下一个节点。

  • ④、获取当前节点元素 E item = p.item:
    获取当前节点 p 的元素。

  • ⑤、检查并移除元素 if (item != null && p.casItem(item, null)):
    如果当前节点 p 有元素且成功使用 CAS 操作将其设为 null,则表示移除元素成功。
    成功的 CAS 操作是元素从队列中移除的线性化点。

  • ⑥、更新头节点 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p):
    如果p不是头节点,则尝试跳过两个节点更新头节点。
    这是为了优化头节点,使未来的遍历更快。

  • ⑦、返回移除的元素 return item:
    成功移除元素后,返回该元素。

  • ⑧、处理队列为空 else if ((q = p.next) == null):
    如果 p 是最后一个节点(p.next 为 null),则更新头节点为p并返回 null,表示队列为空。

  • ⑨、处理自引用节点 else if (p == q):
    如果 p 自引用(即 p == p.next),则进入 continue restartFromHead 重新从头节点开始遍历。

  • ⑩、移动到下一个节点 else p = q:
    如果当前节点p没有元素且 p.next 不为 null,则移动到下一个节点 p = q,继续内部循环。

poll方法中的变量

// h 表示队列临时的头节点,初始时指向 head。在多线程情况下,head 可能随时变化。
// p 表示队列真正的头节点,初始时与 h 相同,当p节点为真正头节点时才允许删除节点。
Node<E> h = head, p = h, q;

// item  表示 p 节点存储的元素。
E item = p.item;

// q 表示 p 节点的下一个节点。
q = p.next

动画演示下单线程poll出队过程:

public static void main(String[] args) throws Exception {
        // 创建一个新的 ConcurrentLinkedQueue
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");
        queue.offer("C");
        queue.offer("D");
        queue.poll();
        queue.poll();
        queue.poll();
        queue.poll();
    }

A、B、C、D添加后的状态:
把IDEA的 Enable ‘toString()’ object view: 设置去掉之后,断点也是一致的了,再把IDEA的 Enable alternative view for collections classes设置去掉之后,断点内集合的内容也能直接看到head和tail的状态了。
在这里插入图片描述

在这里插入图片描述
// todo poll过程动画 做的有点瑕疵 ,后续有空 会优化一下,暂时不影响观看

下面是动画演示:
在这里插入图片描述
再继续poll:
在这里插入图片描述
再把最后一个D也给poll了:
在这里插入图片描述

7、分析下t != (t = tail)

截取部分源码如下:

for (Node<E> t = tail, p = t;;) {
 p = (t != (t = tail)) ? t : head;
}

分析下 t != (t = tail))这个复合表达式:
可以分为两步:

  • ①、比较操作 t !=
  • ②、赋值操作并返回赋的值 t=tail

这个复合操作的顺序按照代码顺序来看:

  • ①、先读取 != 左边的t值
  • ②、再执行 != 右边的 (t=tail) 把最新的tail 赋值给 t 并返回tail的最新值作为 != 右边需要被比较的值
    (注意虽然这一步 把最新的tail赋值给了t,但是执行比较时候 != 左边的值 按照代码顺序 已经读取了旧的tail值)
  • ③、比较 t_旧 != t_新

在单线程条件下: 由于t=tail ,tail没有被其他线程修改过所以 最终 t_旧 != t_新 是false。

在多线程条件下: 由于t=tail , tail如果被其他线程修改过,且修改为不同的值, 最终 t_旧 != t_新 就是true。

下面代码演示了多线程情况下 t_旧 != t_新 结果是true的情况:

import java.util.concurrent.CountDownLatch;

public class TestA {

    // 定义一个共享变量a 使用volatile修饰保证其可见性
    static volatile Integer a = 256;

    public static void main(String[] args) throws Exception {
        // 定义两个线程同步器,来控制线程的执行顺序
        CountDownLatch latchT1Start = new CountDownLatch(1);
        CountDownLatch latchT1Done = new CountDownLatch(1);

        Thread t1 = new Thread(() -> {
            try {
                latchT1Start.await(); // 等待 t2 线程读取完a的值256后,再更新a的值为512
                a = 512; // 修改 a 的值
                latchT1Done.countDown(); // 通知 t2 线程 t1 线程已修改 a 当前a的最新值是512
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                Integer b = a; // 读取 a 的初始值 256
                latchT1Start.countDown(); // 通知 t1 线程可以继续执行
                latchT1Done.await(); // 等待 t1 线程修改 a的值为512后 再继续向下执行
                // 重新读取 a 的最值 并和a的旧值比较
                if (b != (b = a)) {
                    System.out.println("其他线程修改了a,a的最新值是:" + a);
                } else {
                    System.out.println("a的值没有被修改过!");
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "t2");

        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }
}

执行结果:

其他线程修改了a,a的最新值是:512

我对此的分析也是建立在,查一些网上资料和自己对于Java的理解基础上总结而来, != 在java中是个 原子操作,但是并不意味着 != 左右两边的共享变量不会被其他线程修改,只是 != 这个原子操作不会被其他线程打断而已。 所以我分析 t != (t = tail) 就直接按照 单线程的代码执行顺序,并结合 volatile 共享变量的可见性去分析得到的这个总结性的结论。 如果有不同看法的读者,欢迎分享自己的看法。

8、多线程下的offerpoll方法

单线程下,只要把IDEA的断点选项设置好,断点走一遍还是比较好理解的。
在多线程并发的条件下,ConcurrentLinkedQueue无锁的设计,加上并发的加持,会让程序变得不太好理解。

多线程下offerpoll有四种情况

  • ①、线程t1执行offer的过程中,其他线程同时也在offer
  • ②、线程t1执行offer的过程中,其他线程同时在poll
  • ③、线程t1执行poll的过程中,其他线程同时也在poll
  • ④、线程t1执行poll的过程中,其他线程同时在offer

利用IDEA断点演示多线程下的offer

代码如下:

import java.util.concurrent.ConcurrentLinkedQueue;

public class TestA {
    public static void main(String[] args) throws Exception {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

        Thread t1 = new Thread(() -> {
            queue.offer("A");
        }, "t1");


        Thread t2 = new Thread(() -> {
            queue.offer("B");
        }, "t2");

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("运行结束~");
    }
}

断点前设置下IDEA
断点前设置下IDEA
断点前设置下IDEA
重要的事情说三遍~
在这里插入图片描述

在offer方法的源码处打上条件断点:
在这里插入图片描述
断点条件为,鼠标右键点击断点即可设置:

Thread.currentThread().getName().equals("t1") || Thread.currentThread().getName().equals("t2") 

这样就可以控制t1、t2线程的执行顺序来达到我们想要的条件了。

下面只演示一个简单的CAS失败的情况。

t1线程添加A元素,先让t1线程走到下面这个位置。
即t1已经判断了当前队列的尾节点的下一个节点为空,走到了casNext方法准备插入A。
在这里插入图片描述

此时切换t2线程运行,让t2线程casNext执行完毕并返回true。
在这里插入图片描述
在这里插入图片描述
再继续执行t1,由于此时t1,casNext期望值是null,但是计算得出值为B!=null(因为t2线程已经插入了B),所以t1会CAS失败。
在这里插入图片描述
此时队列状态为:
在这里插入图片描述
t1线程继续往下走 ,第二次循环
此时q=p.next=B节点
p!=q
走到p = (p != t && t != (t = tail)) ? t : q;
由于 (p != t && t != (t = tail)) ,tail指针并没没修改,所以计算得到是false。
p=q, 也就是p指向了B节点
在这里插入图片描述
这个地方其实也可以再加一个线程,去添加C节点,导致tail指针改变的情况,这里就不演示了。可以自己去试下。

t1线程继续下次循环
这次q=p.next=null了 进入了casNext方法,并且预期值 null==null,成功添加A节点。
在这里插入图片描述
这里 再判断 p != t 结果true,表示这是第二次CAS成功了, 需要尝试CAS更新尾节点。
casTail(t, newNode); 这个地方也可以再搞个线程,让本次cas失败。这里也不演示了。直接让casTail成功。 返回true。

再看下此时队列的元素和head、tail
在这里插入图片描述
符合正确的预期结果:
在这里插入图片描述
上面只是演示了最简单的一种多线程offer的情况,可以看到CAS失败之后t1线程又循环了2次,成功添加了元素。并且保证了数据的一致性。没有出现数据丢失或者覆盖的情况。最重要的是整个过程没有加锁。
其他更复杂的情况,还是建议多debug瞅瞅。

9、为什么每两次入队或出队操作才更新一次head或tail

HOPS

HOPS 设计(Hop-by-hop Operations Performed Sequentially)是一种优化策略,用于减少高频次的 CAS(Compare-And-Swap)操作,从而降低处理器缓存一致性流量的开销,提高队列操作的性能。

HOPS 设计的主要思想
HOPS 的主要思想是通过跳跃节点来减少每次操作中对 head 和 tail 指针进行 CAS 更新的频率。
具体来说,就是在更新 head 或 tail 指针时,尽量一次跳过多个节点,而不是每处理一个节点就更新指针。(ConcurrentLinkedQueue中是每处理2个节点更新一下指针)。

这对应的源码如下:

if (p != t) // hop two nodes at a time
casTail(t, newNode); 
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);

hop two nodes at a time:一次跳过两个节点。

其实JUC里面的工具,包括并发集合,线程同步器等。源码中又许多奇奇怪怪的操作,我们只要抓住2个本质就好了。
①、线程安全是前提
②、在满足①的前提下,一切为性能服务。

所以每两次入队或出队操作才更新一次head或tail是为了提高并发条件下的队列入队或者出队的性能。

具体体现在以下方面:

  • ①、减少CAS操作的频率
    CAS操作比较昂贵,因为它需要进行内存屏障操作以确保内存的可见性。因此,减少CAS操作的频率可以显著提高系统的性能。

在入队操作中:if (p != t) casTail(t, newNode);
这里的逻辑是每次入队时,如果新节点的前一个节点p不等于当前的尾节点t,则执行casTail(t, newNode),将尾节点更新为新节点。这个条件实际上确保了每两次入队操作才更新一次尾节点。
在这里插入图片描述

在出队操作中:if (p != h) updateHead(h, ((q = p.next) != null) ? q : p);
这里的逻辑是每次出队时,如果当前节点p不等于头节点h,则执行updateHead(h, ((q = p.next) != null) ? q : p),将头节点更新为当前节点的下一个节点。这个条件实际上确保了每两次出队操作才更新一次头节点。
在这里插入图片描述

  • ②、提高吞吐量
    通过减少对head和tail的频繁更新,可以减少多个线程之间的竞争,从而提高整个队列操作的吞吐量。在高并发场景下,这种优化尤为重要,因为过多的CAS操作会导致大量的处理器缓存一致性流量,从而降低系统性能。

为什么不是三次操作一次更新

这就和 HashMap的负载因子为什么默认是0.75 这种问题类似。

选择每两次操作更新一次head或tail,而不是每三次或更多次,是经过权衡后的结果。
这样既能显著减少CAS操作的频率,提高性能,又能保持代码的简单性和执行效率。
在设计并发数据结构时,找到一个合理的平衡点是至关重要的,每两次操作的策略在实际应用中表现出较好的效果。
并且如果再增加操作次数,必定需要维护更多的状态和计数器来跟踪操作次数。这种复杂性可能导致代码更加难以维护和理解。

CAS操作对系统整体性能的影响

在多处理器系统中,处理器缓存一致性是一个重要的概念,用于确保每个处理器缓存中的数据与主内存中的数据保持一致。
CAS(Compare-And-Swap)操作是一种用于实现并发安全的原子操作,但它在执行时会触发缓存一致性协议,从而带来一定的性能开销。

处理器缓存一致性协议的概念:
现代多处理器系统通常具有多级缓存结构(如L1、L2、L3缓存)。
每个处理器都有自己的缓存,当一个处理器对某个缓存行进行写操作时,其他处理器缓存中相同缓存行的数据可能会变得过期。
为了解决这个问题,处理器使用缓存一致性协议(如MESI、MOESI)来确保所有缓存中的数据是一致的。

CAS操作对处理器缓存一致性的影响:
CAS操作是一种原子性操作,它会先读取某个变量的值,比较它与预期值是否相同,如果相同则更新它。这个操作需要在硬件层面确保其原子性,通常会触发缓存一致性协议。

比如以下具体的几个方面的影响:
缓存行失效:
当一个处理器执行CAS操作时,它需要锁定包含目标变量的缓存行,以确保其他处理器在操作期间不能修改该缓存行的数据。这会导致其他处理器的缓存中相同的缓存行失效。

缓存一致性通信:
为了保证一致性,处理器之间需要通信以传播缓存行的状态变化。这种通信会带来一定的延迟和额外的带宽开销。

内存屏障:
CAS操作会引入内存屏障(memory barrier),确保在操作前后的内存操作不会被重排序。这种屏障会影响处理器的指令流水线,降低执行效率。

举个简单的例子说明CAS操作带来的性能开销:
假设有个多核处理器,内核A和内核B,它们都有各自的缓存,并且它们都缓存了某个变量X的值:

内核A执行CAS操作,尝试更新变量X。
内核A需要确保在操作期间变量X的缓存行是独占的。
内核A通过缓存一致性协议通知内核B,使内核B缓存中的X失效。
内核A完成CAS操作,更新变量X。
内核B下一次访问X时,发现其缓存中的X已失效,需要从主内存中重新读取X。
这种缓存行失效和一致性通信会带来额外的性能开销,特别是在高并发环境下,处理器的多个内核频繁进行CAS操作时,这种开销会显著增加,导致系统整体性能下降。

所以ConcurrentLinkedQueue中的优化,通过每两次操作才更新一次head或tail, 可以显著减少CAS操作的频率,从而:
减少缓存一致性流量:减少缓存行失效和内存屏障带来的开销。
提高性能:提高处理器缓存的利用率和整体系统性能。

10、初始化节点,自引用节点的用处

初始化的head和tail节点
初始状态
在创建ConcurrentLinkedQueue实例时,会初始化一个空节点作为head和tail:

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

这个空节点(dummy node)的item为null,并且头指针和尾指针都指向它。

作用

  • ①、简化逻辑:
    初始化一个空节点可以简化入队和出队操作的逻辑。无论队列是否为空,头节点和尾节点始终存在,这避免了在操作时检查队列是否为空的特殊情况。
  • ②、确保一致性:
    通过始终保持一个有效的头节点和尾节点,可以确保在多线程环境中进行并发操作时,头指针和尾指针的引用始终有效,从而保证操作的线程安全性。

自引用节点:
在ConcurrentLinkedQueue的操作中,有时会出现自引用节点(即一个节点的next指向它自己)。
这是通过设置节点的next指针为自身来实现的,对应源码:

if (h != p && casHead(h, p))
            h.lazySetNext(h);

作用

  • ①、标记删除:
    自引用节点通常用作逻辑删除标记。这种标记方式使得其他线程可以识别并跳过这些节点,避免误用。
  • ②、垃圾回收:
    通过有效管理队列中的节点状态和引用关系,自引用节点间接地减少了内存泄漏,并帮助垃圾回收器识别和回收无效节点。

11、总结

ConcurrentLinkedQueue 是 Java 提供的一个无界、非阻塞、线程安全、高性能的队列,基于Michael & Scott 队列算法(一种无锁算法)实现。
它适用于高并发环境下的多线程访问,广泛应用于需要高效、线程安全队列的场景。

ConcurrentLinkedQueue 的设计目标:

  • ①、线程安全
  • ②、保证线程安全的前提下提高性能

主要有以下几个特点:
初始化的 head 和 tail 即 空节点(dummy node)
自引用节点
CAS操作
头尾指针的延迟更新

标签:动画,head,ConcurrentLinkedQueue,next,tail,null,节点,图文
From: https://blog.csdn.net/qq_37883866/article/details/139739727

相关文章

  • 图文并茂! TCP的“三次握手”是如何进行的?
    对于TCP连接,相信大家都不陌生,这种连接确保了通信双方之间的可靠性和一致性。无论是在学习计算机网络,还是在面试过程中,TCP都是一个比较常考的知识点,然而这个知识点却很容易被遗忘。为了帮助大家更容易理解和记住该知识点,本文通过图文并茂的形式向大家解释了TCP三次握手的过程。......
  • 硕思闪客精灵软件怎么下载安装? 【详细安装图文教程】
    数据表明闪客精灵专业版的优势:支持将不带脚本的Flex生成的SWF导出为Flex文件。我们都明白闪客精灵专业版的优势:优化了批量导出的控制面板。由此可知闪客精灵专业版的优势:支持Server200864-bit操作系统。从大部分从业者反应来看闪客精灵专业版的优势:支持一帧一帧地移动动画片......
  • 最新最详细的media encoder安装包下载 附图文教程
    在我们学习英语的时候,有个ME的单词,代表“我”;同样,当我们学习视频编码的时候,ME代表了一款软件。AdobeMediaEncoder简称ME,是一款专业的视频编码软件。它主要用于将各种不同格式的视频、音频和图形文件转换成适合播放、流媒体传输或导出到其他项目中的格式。这个工具在音视频制......
  • centos7系统上安装MySQL8.4图文教程
    本章教程,主要记录如何在CentOS7系统上安装MySQL8.4的详细步骤。一、查看当前系统版本cat/etc/centos-release二、安装步骤1、创建mysql目录cd/usr/local&&mkdirmysql&&cdmysql2、安装rpm包yuminstallhttps://repo.mysql.com//mysql84-community-relea......
  • Android 14.0 Launcher3仿ios长按app图标实现拖拽抖动动画
    1.概述在14.0的系统rom定制化开发中,在对系统原生Launcher3的定制需求中,也有好多功能定制的,在ios等电子产品中的一些好用的功能,也是可以被拿来借用的,所以在最近的产品开发需求中,需求要求模仿ios的功能实现长按app图标实现抖动动画,接下来看如何分析该功能的实现.效果图如图:......
  • Vendors and Customers(酒吧餐厅厨师人物动画动作)
    此包包含商店、酒吧和餐馆中顾客和工作人员的各种动画。包括:饮食动画。站立、倾斜和坐着(酒吧凳子和椅子),以及各种姿势的进入和退出动画,坐姿变化(腿抬起、弯腰、交叉腿、向后倾斜)和害怕反应动画(举手、躲藏、畏缩)。厨师烹饪动画(煎锅、炒锅、平底锅、锅)、食物准备(切割、揉捏)和清......
  • Sword and Shield Animations(劈砍防御剑盾带动画动作)
    这是一个动画资产包,为剑和盾牌用户提供手工制作的成对动画和空闲。包括8向步行和跑步动画、攻击、跳跃、冲刺、向下状态移动和过渡、躲避、阻挡、蹒跚、各种配对终结者动画等。一切你需要把剑和盾牌战士带到生活中。动画总数:115攻击19区块5关闭状态14Evade5怠速9跳跃......
  • 【图文】BP神经网络与深度学习CNN的关系
    本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/目录一、BP神经网络网络是什么二、BP神经网络用于图象识别问题1.1.BP神经网络解决图象识别问题1.2.BP神经网络解决图象识别问题的困难三、从BP到CNN深度学习模型BP神经网络是一个经典、有效的算法,即使时至今日,在传统......
  • 【儿歌点点动画故事】60集故事,开启宝贝的想象力之门
    亲爱的家长们,您是否在寻找既能娱乐又能教育的动画故事,来陪伴您的孩子成长?儿歌点点动画故事,精心挑选了一系列寓教于乐的动画短片,让孩子们在欢笑中学习,在故事中成长。《猴子捞月亮》一个充满想象力的故事,讲述了小猴子们如何用智慧和团结合作去捞取天空中的月亮。这个故事教会孩子......
  • 【图文】ROG魔霸3硬件更新教程:清灰换硅脂,液金,加装硬盘ssd
    新三年,旧三年,缝缝补补又三年~~准备1.魔霸3一台2.防静电:手套,静电手环,类似的都可以3.工具:螺丝刀,酒精棉片(大量,仅清灰不必),小刷子(化妆的那种很合适)4.液金,绝缘硅脂,硅脂5.ssd,ssd排线清灰1.关机,拆掉后盖螺丝。左上角和右上角螺丝没法彻底拆下,对称拧松即可。最边上的螺......