目录
- ConcurrentLinkedQueue详解
ConcurrentLinkedQueue详解
1、ConcurrentLinkedQueue简介
ConcurrentLinkedQueue 是 Java 中 java.util.concurrent 包下的一个非阻塞线程安全队列实现。
为什么要详细讲这个队列呢? 主要还是因为这个队列的高并发,无锁等特性。
同时这个队列总结完之后,就开始进入Java并发相关的内容总结了。
ConcurrentLinkedQueue 的一些关键特性:
-
①、 非阻塞算法
ConcurrentLinkedQueue 使用的是 Michael & Scott 算法(Michael & Scott 算法的提出者是 Maged M. Michael 和 Michael L. Scott)。Michael & Scott 是无锁算法,允许多个线程并发访问队列,而无需显式的锁(通过使用 CAS 等原子操作保证线程安全),从而减少了锁竞争和上下文切换的开销。 -
②、 线程安全
该队列是线程安全的,允许多个线程同时进行插入和删除操作。所有的操作(如插入、删除和遍历)都是原子操作,不会出现线程间的竞态条件。 -
③、高性能
由于采用了非阻塞算法,ConcurrentLinkedQueue 在高并发环境下性能优越,特别适用于需要高吞吐量的场景。
2、ConcurrentLinkedQueue继承体系
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
上面简介已经说了ConcurrentLinkedQueue 是个线程安全的队列,从继承结构也可以看出其实现了Queue接口,具有队列的功能。
3、ConcurrentLinkedQueue的构造函数
空参构造
public ConcurrentLinkedQueue() {
// 初始化时,head 和 tail 都指向一个新的空节点
head = tail = new Node<E>(null);
}
带一个集合参数的构造
public ConcurrentLinkedQueue(Collection<? extends E> c) {
// h 和 t 分别表示链表的头节点和尾节点
Node<E> h = null, t = null;
// 遍历集合 c,将每个元素插入到队列中
for (E e : c) {
// 检查元素是否为 null,如果是则抛出 NullPointerException
checkNotNull(e);
// 创建一个包含元素 e 的新节点
Node<E> newNode = new Node<E>(e);
// 如果 h 为空,表示这是第一个元素,初始化 h 和 t
if (h == null)
h = t = newNode;
else {
// 否则,将新节点添加到链表的尾部
t.lazySetNext(newNode);
t = newNode;
}
}
// 如果集合 c 为空,初始化 h 和 t 为一个新的空节点
if (h == null)
h = t = new Node<E>(null);
// 将 head 和 tail 设置为链表的头和尾
head = h;
tail = t;
}
4、ConcurrentLinkedQueue的数据结构
ConcurrentLinkedQueue类的属性注释
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
// 使用 transient 和 volatile 修饰符来声明 head 和 tail,这两个字段不会被序列化,并且保证可见性
// head 表示队列的头节点,指向队列的第一个有效元素
private transient volatile Node<E> head;
// tail 表示队列的尾节点,指向队列的最后一个有效元素
private transient volatile Node<E> tail;
}
可以看到ConcurrentLinkedQueue类的属性并不多,我们主要关注一个头结点head,一个尾结点tail。
ConcurrentLinkedQueue真正存储元素的类Node<E>
可以看到 这里的Node<E>
节点只有一个next指针指向下一个元素,说明ConcurrentLinkedQueue是一个单向链表的结构。
并且Node类中提供了一些CAS操作来更新节点的值。
// Node 类是一个私有的静态内部类,用于存储 ConcurrentLinkedQueue 中的元素
private static class Node<E> {
// 存储节点中的元素,使用 volatile 修饰以确保可见性
volatile E item;
// 指向下一个节点,使用 volatile 修饰以确保可见性
volatile Node<E> next;
/**
* 构造一个新节点。使用 relaxed write,因为只有在通过 casNext 发布后,item 才能被看到。
* @param item 节点存储的元素
*/
Node(E item) {
// 使用 Unsafe 类直接操作内存,将 item 放入节点中
UNSAFE.putObject(this, itemOffset, item);
}
// 使用 CAS 操作更新节点中的 item 字段
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 延迟设置 next 字段的值,使用有序写入
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// 使用 CAS 操作更新节点中的 next 字段
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe 相关的字段和静态代码块
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
// 获取 Unsafe 实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
// 获取 item 字段的偏移量
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
// 获取 next 字段的偏移量
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
ConcurrentLinkedQueue数据结构图示
是不是有点印象,和LinkedBlockingQueue的数据结构是一样的。
自引用节点图示:
有些资料把这种节点叫哨兵节点。我还是习惯形似叫法,就叫它自引用节点吧。
自己引用自己的节点叫自引用节点没毛病吧,生动形象。
5、ConcurrentLinkedQueue的offer
方法
offer
方法用于将指定的元素插入到 ConcurrentLinkedQueue 的尾部。
如果插入成功,返回 true。
元素在入队时的三种情况
- ①、当p的后继节点为空时(此时p为真正的尾节点),尝试CAS增加新节点,成功后尝试更新尾节点tail;
- ②、当p等于p的后继节点时(p的next指向自己,说明p是自引用节点(移除元素或者遍历元素时可能构造出自引用节点)
此时判断尾节点是否被修改过,如果尾节点被修改过就定位到最新的尾节点,如果未被修改过(使用next无法继续遍历,因为自己指向了自己,next还是自己),只能定位到头节点,从头开始遍历; - ③、其他情况时,说明此时的p并不是真正的尾节点,需要定位到真正尾节点;此时如果p不是原来的尾节点并且尾节点被修改过,那就定位到尾节点,否则定位到p的后继节点q,继续遍历;
public boolean offer(E e) {
// 检查元素是否为 null,如果是,则抛出 NullPointerException
checkNotNull(e);
// 创建一个包含元素 e 的新节点
final Node<E> newNode = new Node<E>(e);
// 使用 for 循环尝试将新节点插入到队列的尾部
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果 p.next 为空,表示 p 是当前的最后一个节点
if (q == null) {
// 尝试使用 CAS 操作将 newNode 插入到 p 之后
if (p.casNext(null, newNode)) {
// 成功的 CAS 操作是将 e 插入队列的尾部
// newNode 现在成为队列中的实际节点
// 如果 p 不是 tail,尝试更新 tail 指针
if (p != t)
casTail(t, newNode); // 失败也没关系
return true;
}
// CAS 操作失败,表示其他线程插入了新节点,重新读取 next
} else if (p == q) {
// 如果 p == p.next,表示我们已经脱离了链表
// 如果 tail 没有改变,说明 tail 也脱离了链表,需要从 head 重新开始
// 否则,新的 tail 是一个更好的选择
p = (t != (t = tail)) ? t : head;
} else {
// 检查 tail 指针是否更新,如果没有更新,则继续向前移动
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
总结下:
-
①、检查输入:
使用 checkNotNull(e) 检查元素是否为 null。如果元素为 null,则抛出 NullPointerException。 -
②、创建新节点:
使用 new Node(e) 创建一个包含元素 e 的新节点 newNode。 -
③、循环插入节点:
使用一个 for 循环不断尝试将 newNode 插入到队列的尾部。
初始化 t 和 p 为当前的尾节点 tail。
在循环中,通过 p.next 来检查当前节点是否是最后一个节点。
如果 p.next 为空,则尝试使用 CAS 操作将 newNode 插入到 p 之后。
如果 CAS 操作成功,将 newNode 插入队列,并尝试更新 tail 指针(如果 p 不是 tail)。
如果 CAS 操作失败,表示有其他线程已经插入了新节点,重新读取 next 进行下一次尝试。
如果 p == p.next,表示已经脱离了链表,需要重新设置 p 为 tail 或 head。
如果 p.next 不为空且不等于 p,则向前移动 p,并在两步之后检查 tail 指针是否更新。
下面对变量再总结一下,方便下面进行多线程的分析:
offer
方法中的变量
// t 表示队列临时的尾节点,初始时指向 tail。在多线程情况下,tail 可能随时变化。
// p 表示队列真正的尾节点,初始时与 t 相同,当p节点为真正尾节点时才允许添加新节点。
Node<E> t = tail, p = t;
// q 表示 p 节点的下一个节点。
Node<E> q = p.next;
// 下面这段代码很重要
// 这段代码检查 tail 是否发生了变化。如果 tail 发生了变化,p 会重新指向新的 tail;否则,p 会指向 head。
p = (t != (t = tail)) ? t : head;
动画演示单线程环境下offer
的入队过程
代码示例:
import java.util.concurrent.*;
public class TestA {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("A");
queue.offer("B");
queue.offer("C");
queue.offer("D");
System.out.println(queue);
}
}
动画演示:
从源码的步骤中我们可以得知,当第一个元素添加完成后,head和tail指针并没有移动仍然是指向默认的空Node节点,当第二个元素添加完毕的时候tail指针才会移动到新添加的Node上。
我们继续添加两个元素
queue.offer("C");
queue.offer("D");
可以看到添加C的时候没有移动tail指针,添加D的时候tail指针移动到了尾部。
注意避坑☆☆☆☆☆
ConcurrentLinkedQueue断点offer
方法或者打印队列的坑
我习惯利用反射打印集合的内部状态,来验证自己分析的是否正确。
但是这次遇到ConcurrentLinkedQueue算是彻彻底底掉进了坑里。
问题复现:
先描述下问题
当我去看offer方法的源码时,我发现offer("A")
之后的确没有改变head和tail的指针,然后head和tail指针依然是指向dummy Node,也就是初始化ConcurrentLinkedQueue时的那个初始化节点,同时把该节点的next设置成A节点。
我为了验证下刚添加完A节点的状态,利用反射,打印了head和tail指针的hashCode,以及初始化节点的item和next,和队列queue。
然后坑就来了:
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestA {
public static void main(String[] args) throws Exception {
// 创建一个新的 ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Class<?> queueClass = queue.getClass();
System.out.println("================刚初始化================");
System.out.println(queue);
Field head1 = queueClass.getDeclaredField("head");
head1.setAccessible(true);
Field tail1 = queueClass.getDeclaredField("tail");
tail1.setAccessible(true);
System.out.println("Head node: " + head1.get(queue).hashCode());
System.out.println("Tail node: " + tail1.get(queue).hashCode());
// 将元素 "A" 插入到队列中
queue.offer("A");
System.out.println("================插入A后================");
// 打印队列
System.out.println(queue);
// 获取队列的 Class 对象
Class<?> aClass = queue.getClass();
// 获取 head 和 tail 字段
Field headField = aClass.getDeclaredField("head");
Field tailField = aClass.getDeclaredField("tail");
headField.setAccessible(true);
tailField.setAccessible(true);
// 获取 head 和 tail 的值
Object head = headField.get(queue);
Object tail = tailField.get(queue);
// 打印 head 和 tail 的值
System.out.println("Head node: " + head.hashCode());
System.out.println("Tail node: " + tail.hashCode());
// 获取 head 和 tail 节点的 item 和 next 字段的值
Field itemField = head.getClass().getDeclaredField("item");
Field nextField = head.getClass().getDeclaredField("next");
itemField.setAccessible(true);
nextField.setAccessible(true);
System.out.println("Head item: " + itemField.get(head));
System.out.print("Head next: ");
System.out.println(nextField.get(head) == null ? 0 : +nextField.get(head).hashCode());
System.out.println("Tail item: " + itemField.get(tail));
System.out.println("Tail next: " + nextField.get(tail).hashCode());
}
}
运行结果:
================刚初始化================
[]
Head node: 356573597
Tail node: 356573597
================插入A后================
[A]
Head node: 1735600054
Tail node: 356573597
Head item: A
Head next: 0
Tail item: null
Tail next: 356573597
看到上面这个结果当时就懵了!!!
这个结果就对应下图(这谁看了不迷糊):
本来按照源码预想的结果应该是这样:
================刚初始化================
Head node: 356573597
Tail node: 356573597
================插入A后================
Head node: 356573597
Tail node: 356573597
Head item: null
Head next: 1735600054
Tail item: null
Tail next: 1735600054
应该对应这样的图:
所以到底是什么原因呢?
原因探究
为了去探究具体原因,我于是在 queue.offer("A");
进行了断点,准备一步一步去看,结果当 queue.offer("A");
执行完,return true;之后发现并没有什么特殊的。
但是再看head和tail的状态,就发现不对了, head指向了A节点, tail指向初始节点,初始节点的next指向了自身。到底是什么神秘力量修改了head和tail呢?
至此我是彻底懵了,仿佛陷入了罗生门,到底是凭看源码得到的结果对,还是断点看到的结果或者反射打印得到的结果对?
回到问题的本质
接下来就是漫长的排除法,首先排除多线程造成的影响,因为我使用的一直是单个主线程,再者排除反射的影响(一开始怀疑是反射的问题),因为反射只是获取队列内部成员变量的状态,并不会造成数据结构的改变,而且即使不用反射打印head和tail,debug也会出现 tail指向初始节点,初始节点的next指向自身,head指向A节点的问题。
开始查资料,百度,谷歌,chatGPT4o, 通义千问、文心一言。。。半天已经过去了。。。 没什么进展。。。
晚上,又去查了B站,分析ConcurrentLinkedQueue的视频到是不少,但是没有找到去断点offer方法的,大都是照着源码分析,结果可想而知,按照源码分析一定是下面的结果:
然后又去知乎,找到一篇关于ConcurrentLinkedQueue的文章找到了点突破口。
然后开始思考问题的本质
这个问题本质上是出现了指向自身的节点:
而这种把初始节点的next指针指向自身的操作,在ConcurrentLinkedQueue中是存在的。
我们就把 这种自身的next指针指向自己的节点叫自引用节点吧。
至于自引用节点的作用后续再说,现在先把这个坑给过了。
既然ConcurrentLinkedQueue内部存储的元素都是Node
,那就从Node入手,看下Node中有哪些设置Node的next指针的方法:
可以看到有下面这两个方法:
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
putOrderedObject
是一个有序写操作。它确保写操作在Java内存模型中的某些情况下具有有序性,但不像volatile写操作那样强制所有的内存可见性保证。它可能比普通的volatile写操作稍微快一些。
为了找到问题 我需要找到到底哪些Node
节点对象 调用了这两个方法,并且传的next值是对象本身。
最终发现是这个方法:
/**
* 尝试将头节点更新为新的节点。如果成功,将旧的头节点设置为指向自身,作为一个自引用节点。
*
* @param h 当前的头节点
* @param p 新的头节点
*/
final void updateHead(Node<E> h, Node<E> p) {
// 检查当前头节点和新头节点是否不同,并尝试使用CAS操作将头节点从h更新为p
if (h != p && casHead(h, p)) {
// 如果CAS操作成功,将旧的头节点的next字段设置为指向自身
h.lazySetNext(h);
}
}
h.lazySetNext(h);
这不就是把自己的next指针指向自己了嘛~
答案就近在眼前了。
我只要再找到是哪里调用了updateHead
方法就行了。
发现有下面的方法调用了updateHead
:
poll()、 peek()、first()
又懵了~。。。
我只调用了offer方法 添加了一个A ,压根没调用过什么 获取元素的方法呀~
不信邪了,继续看 哪里调用了poll()、 peek()、first()
。 这几个方法真的太多地方调用了。
经过漫长的查找。
终于在调用first方法的某个地方发现了端倪!!!
看这个是什么?
迭代器! 惊不惊喜、意不意外。
什么时候会调用迭代器? 我上面代码没有显式的调用迭代器来遍历元素。但是,但是,但是 System.out.println(queue);
我打印了队列,会调用队列的toString。
实际上会调用AbstractCollection的toString方法:
public String toString() {
Iterator<E> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
里面创建迭代器的方法调用的是 ConcurrentLinkedQueue的迭代器构造方法;
Iterator<E> it = iterator();
public Iterator<E> iterator() {
return new Itr();
}
Itr() {
advance(); // 初始化时调用 advance 方法以定位第一个有效元素
}
private E advance() {
lastRet = nextNode; // 记录当前节点
E x = nextItem; // 记录当前元素
Node<E> pred, p;
if (nextNode == null) { // 如果当前节点为空,从头开始遍历
p = first();
pred = null;
} else { // 否则从当前节点的下一个节点开始遍历
pred = nextNode;
p = succ(nextNode);
}
for (;;) {
if (p == null) { // 如果节点为空,表示遍历结束
nextNode = null;
nextItem = null;
return x;
}
E item = p.item; // 获取节点的元素
if (item != null) { // 如果元素不为空,更新 nextNode 和 nextItem 并返回 x
nextNode = p;
nextItem = item;
return x;
} else { // 否则跳过空元素
Node<E> next = succ(p); // 获取下一个节点
if (pred != null && next != null)
pred.casNext(p, next); // 使用 CAS 操作将前驱节点的 next 指针指向下一个节点
p = next; // 更新当前节点
}
}
}
创建ConcurrentLinkedQueue的迭代器,会调用advance
方法,这个方法里面会定位第一个有效元素。
p = first();
Node<E> first() {
restartFromHead: // 定义一个标签,用于从头重新开始遍历
for (;;) { // 无限循环,直到找到第一个有效节点或队列为空
for (Node<E> h = head, p = h, q;;) { // 初始化 h 和 p 为队列的头节点
boolean hasItem = (p.item != null); // 检查当前节点 p 是否有元素
if (hasItem || (q = p.next) == null) { // 如果 p 有元素或者 p 是最后一个节点
updateHead(h, p); // 更新队列的头节点
return hasItem ? p : null; // 如果 p 有元素,返回 p,否则返回 null
} else if (p == q) // 如果 p 自引用(即 p == p.next),重新从头遍历
continue restartFromHead;
else // 否则,移动到下一个节点
p = q;
}
}
}
最终是迭代器调用了first方法,导致head和tail被更新出现下面的这种情况:
找到问题
终于找到问题了
原来是打印队列,导致队列创建迭代器,调用了first方法,然后把head指针设置到了新加的A节点,tail指针还是不变并且指向了自身。
这个时候把打印队列那两行代码注释掉:
再运行一下:
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestA {
public static void main(String[] args) throws Exception {
// 创建一个新的 ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Class<?> queueClass = queue.getClass();
System.out.println("================刚初始化================");
// System.out.println(queue);
Field head1 = queueClass.getDeclaredField("head");
head1.setAccessible(true);
Field tail1 = queueClass.getDeclaredField("tail");
tail1.setAccessible(true);
System.out.println("Head node: " + head1.get(queue).hashCode());
System.out.println("Tail node: " + tail1.get(queue).hashCode());
// 将元素 "A" 插入到队列中
queue.offer("A");
System.out.println("================插入A后================");
// 打印队列
// System.out.println(queue);
// 获取队列的 Class 对象
Class<?> aClass = queue.getClass();
// 获取 head 和 tail 字段
Field headField = aClass.getDeclaredField("head");
Field tailField = aClass.getDeclaredField("tail");
headField.setAccessible(true);
tailField.setAccessible(true);
// 获取 head 和 tail 的值
Object head = headField.get(queue);
Object tail = tailField.get(queue);
// 打印 head 和 tail 的值
System.out.println("Head node: " + head.hashCode());
System.out.println("Tail node: " + tail.hashCode());
// 获取 head 和 tail 节点的 item 和 next 字段的值
Field itemField = head.getClass().getDeclaredField("item");
Field nextField = head.getClass().getDeclaredField("next");
itemField.setAccessible(true);
nextField.setAccessible(true);
System.out.println("Head item: " + itemField.get(head));
System.out.print("Head next: ");
System.out.println(nextField.get(head) == null ? 0 : +nextField.get(head).hashCode());
System.out.println("Tail item: " + itemField.get(tail));
System.out.println("Tail next: " + nextField.get(tail).hashCode());
}
}
运行结果:
================刚初始化================
Head node: 1254526270
Tail node: 1254526270
================插入A后================
Head node: 1254526270
Tail node: 1254526270
Head item: null
Head next: 662441761
Tail item: null
Tail next: 662441761
这就符合我们从源码观察到的结论了:
奇怪的事情又发生了
上面问题我们已经解决了,我高兴的认为只要我不打印队列,反射获取的队列内部head和tail的状态都是对的了,的确不打印队列得到了正确的结果。
基于此只要我不打印队列,我使用IDEA断点一定也和上面的结论一致(太天真了~)。
结果奇怪的事情又发生了。我断点的时候发现,又出现自引用节点了。head指针设置到了新加的A节点,tail指针还是不变并且指向了自身。
整个人瞬间又不好了。
不过还好有上面的排查过程,导致去思考这个IDEA的断点问题没走那么多弯路。
于是去查资料,IDEA断点会调用 toString吗? 结果还真是。
IDEA断点设置的坑
IDEA的断点默认设置:
把IDEA的断点设置 Enable ‘toString()’ object view: 选项给取消勾选。
再次断点 queue.offer("A");
就正常了。
建议把上面标注的三个都取消勾选,这样debug能快些,对于集合的内部状态也能在断点的时候看到。
最后断点的效果如下:
大概花了大半天时间,趟过了这个坑。(详细记录下,以供读者参考避坑)
6、ConcurrentLinkedQueue的poll
方法
poll方法 用于从队列头部移除并返回一个元素。
如果队列中存在至少一个元素,poll()方法会返回该元素并在队列中将其移除。
如果队列为空则返回null。
元素在出队时的四种情况
- ①、当p为真正头节点时(即当前节点的元素item不为空),CAS将头节点数据设置为空,然后判断head是否为真正头节点,不是则更新头节点,然后将原来的头节点next指向它自己构建自引用节点,下面有动画演示;
- ②、当p的后继节点为空时,说明队列遍历完毕,队列为空,尝试CAS将头节点修改成p;
- ③、如果p的后继节点是它自己,说明有其他线程移除或者遍历了队列 构建成了自引用节点,跳出本次外部循环,重头开始遍历;
- ④、其他情况继续向后遍历;
public E poll() {
restartFromHead: // 定义一个标签,用于从头重新开始遍历
for (;;) { // 无限循环,直到成功移除一个元素或确认队列为空
for (Node<E> h = head, p = h, q;;) { // 初始化 h 和 p 为队列的头节点 初始化q表示当前节点的下一个节点,此时并没有赋值
E item = p.item; // 获取当前节点 p 的元素
if (item != null && p.casItem(item, null)) {
// 如果当前节点 p 有元素且成功使用 CAS 操作将其设为 null
// 成功的 CAS 操作是元素从队列中移除的线性化点
if (p != h) // 如果 p 不是头节点,则尝试跳过两个节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item; // 返回移除的元素
} else if ((q = p.next) == null) { // 赋值q为下一个节点
// 如果下一个节点 q 为 null,表示队列遍历完毕
updateHead(h, p); // 更新头节点为 p
return null; // 队列为空,返回 null
} else if (p == q) {
// 如果 p 自引用(即 p == p.next),重新从头遍历
continue restartFromHead;
} else {
// 否则,移动到下一个节点
p = q;
}
}
}
}
总结下步骤:
-
①、标签 restartFromHead: 用于在检测到某个节点自引用(即 p == p.next )的情况下,重新从队列头开始遍历。
-
②、无限循环
for ( ; ; )
: 持续尝试直到成功移除一个元素或确认队列为空。 -
③、内部循环
for (Node<E> h = head, p = h, q;;)
:
初始化h
和p
为队列的头节点(head)。
q
用于存储p
的下一个节点。 -
④、获取当前节点元素
E item = p.item
:
获取当前节点p
的元素。 -
⑤、检查并移除元素
if (item != null && p.casItem(item, null))
:
如果当前节点p
有元素且成功使用 CAS 操作将其设为 null,则表示移除元素成功。
成功的 CAS 操作是元素从队列中移除的线性化点。 -
⑥、更新头节点
if (p != h) updateHead(h, ((q = p.next) != null) ? q : p)
:
如果p
不是头节点,则尝试跳过两个节点更新头节点。
这是为了优化头节点,使未来的遍历更快。 -
⑦、返回移除的元素
return item
:
成功移除元素后,返回该元素。 -
⑧、处理队列为空
else if ((q = p.next) == null)
:
如果p
是最后一个节点(p.next 为 null)
,则更新头节点为p
并返回 null,表示队列为空。 -
⑨、处理自引用节点
else if (p == q)
:
如果 p 自引用(即 p == p.next),则进入 continue restartFromHead 重新从头节点开始遍历。 -
⑩、移动到下一个节点
else p = q
:
如果当前节点p
没有元素且p.next
不为 null,则移动到下一个节点p = q
,继续内部循环。
poll
方法中的变量
// h 表示队列临时的头节点,初始时指向 head。在多线程情况下,head 可能随时变化。
// p 表示队列真正的头节点,初始时与 h 相同,当p节点为真正头节点时才允许删除节点。
Node<E> h = head, p = h, q;
// item 表示 p 节点存储的元素。
E item = p.item;
// q 表示 p 节点的下一个节点。
q = p.next
动画演示下单线程poll
出队过程:
public static void main(String[] args) throws Exception {
// 创建一个新的 ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("A");
queue.offer("B");
queue.offer("C");
queue.offer("D");
queue.poll();
queue.poll();
queue.poll();
queue.poll();
}
A、B、C、D添加后的状态:
把IDEA的 Enable ‘toString()’ object view: 设置去掉之后,断点也是一致的了,再把IDEA的 Enable alternative view for collections classes设置去掉之后,断点内集合的内容也能直接看到head和tail的状态了。
// todo poll过程动画 做的有点瑕疵 ,后续有空 会优化一下,暂时不影响观看
下面是动画演示:
再继续poll
:
再把最后一个D也给poll
了:
7、分析下t != (t = tail)
截取部分源码如下:
for (Node<E> t = tail, p = t;;) {
p = (t != (t = tail)) ? t : head;
}
分析下 t != (t = tail))
这个复合表达式:
可以分为两步:
- ①、比较操作 t !=
- ②、赋值操作并返回赋的值 t=tail
这个复合操作的顺序按照代码顺序来看:
- ①、先读取 != 左边的t值
- ②、再执行 != 右边的 (t=tail) 把最新的tail 赋值给 t 并返回tail的最新值作为 != 右边需要被比较的值
(注意虽然这一步 把最新的tail赋值给了t,但是执行比较时候 != 左边的值 按照代码顺序 已经读取了旧的tail值) - ③、比较 t_旧 != t_新
在单线程条件下: 由于t=tail ,tail没有被其他线程修改过所以 最终 t_旧 != t_新 是false。
在多线程条件下: 由于t=tail , tail如果被其他线程修改过,且修改为不同的值, 最终 t_旧 != t_新 就是true。
下面代码演示了多线程情况下 t_旧 != t_新
结果是true的情况:
import java.util.concurrent.CountDownLatch;
public class TestA {
// 定义一个共享变量a 使用volatile修饰保证其可见性
static volatile Integer a = 256;
public static void main(String[] args) throws Exception {
// 定义两个线程同步器,来控制线程的执行顺序
CountDownLatch latchT1Start = new CountDownLatch(1);
CountDownLatch latchT1Done = new CountDownLatch(1);
Thread t1 = new Thread(() -> {
try {
latchT1Start.await(); // 等待 t2 线程读取完a的值256后,再更新a的值为512
a = 512; // 修改 a 的值
latchT1Done.countDown(); // 通知 t2 线程 t1 线程已修改 a 当前a的最新值是512
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
Integer b = a; // 读取 a 的初始值 256
latchT1Start.countDown(); // 通知 t1 线程可以继续执行
latchT1Done.await(); // 等待 t1 线程修改 a的值为512后 再继续向下执行
// 重新读取 a 的最值 并和a的旧值比较
if (b != (b = a)) {
System.out.println("其他线程修改了a,a的最新值是:" + a);
} else {
System.out.println("a的值没有被修改过!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
}
}
执行结果:
其他线程修改了a,a的最新值是:512
我对此的分析也是建立在,查一些网上资料和自己对于Java的理解基础上总结而来, != 在java中是个 原子操作,但是并不意味着 != 左右两边的共享变量不会被其他线程修改,只是 != 这个原子操作不会被其他线程打断而已。 所以我分析 t != (t = tail)
就直接按照 单线程的代码执行顺序,并结合 volatile 共享变量的可见性去分析得到的这个总结性的结论。 如果有不同看法的读者,欢迎分享自己的看法。
8、多线程下的offer
和poll
方法
单线程下,只要把IDEA的断点选项设置好,断点走一遍还是比较好理解的。
在多线程并发的条件下,ConcurrentLinkedQueue无锁的设计,加上并发的加持,会让程序变得不太好理解。
多线程下offer
和 poll
有四种情况
- ①、线程t1执行offer的过程中,其他线程同时也在
offer
- ②、线程t1执行offer的过程中,其他线程同时在
poll
- ③、线程t1执行
poll
的过程中,其他线程同时也在poll
- ④、线程t1执行
poll
的过程中,其他线程同时在offer
利用IDEA断点演示多线程下的offer
代码如下:
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestA {
public static void main(String[] args) throws Exception {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Thread t1 = new Thread(() -> {
queue.offer("A");
}, "t1");
Thread t2 = new Thread(() -> {
queue.offer("B");
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("运行结束~");
}
}
断点前设置下IDEA
断点前设置下IDEA
断点前设置下IDEA
重要的事情说三遍~
在offer方法的源码处打上条件断点:
断点条件为,鼠标右键点击断点即可设置:
Thread.currentThread().getName().equals("t1") || Thread.currentThread().getName().equals("t2")
这样就可以控制t1、t2线程的执行顺序来达到我们想要的条件了。
下面只演示一个简单的CAS失败的情况。
t1线程添加A元素,先让t1线程走到下面这个位置。
即t1已经判断了当前队列的尾节点的下一个节点为空,走到了casNext方法准备插入A。
此时切换t2线程运行,让t2线程casNext执行完毕并返回true。
再继续执行t1,由于此时t1,casNext期望值是null,但是计算得出值为B!=null
(因为t2线程已经插入了B),所以t1会CAS失败。
此时队列状态为:
t1线程继续往下走 ,第二次循环
此时q=p.next=B节点
p!=q
走到p = (p != t && t != (t = tail)) ? t : q;
由于 (p != t && t != (t = tail))
,tail指针并没没修改,所以计算得到是false。
p=q, 也就是p指向了B节点
这个地方其实也可以再加一个线程,去添加C节点,导致tail指针改变的情况,这里就不演示了。可以自己去试下。
t1线程继续下次循环
这次q=p.next=null了 进入了casNext方法,并且预期值 null==null
,成功添加A节点。
这里 再判断 p != t
结果true,表示这是第二次CAS成功了, 需要尝试CAS更新尾节点。
casTail(t, newNode);
这个地方也可以再搞个线程,让本次cas失败。这里也不演示了。直接让casTail成功。 返回true。
再看下此时队列的元素和head、tail
符合正确的预期结果:
上面只是演示了最简单的一种多线程offer的情况,可以看到CAS失败之后t1线程又循环了2次,成功添加了元素。并且保证了数据的一致性。没有出现数据丢失或者覆盖的情况。最重要的是整个过程没有加锁。
其他更复杂的情况,还是建议多debug瞅瞅。
9、为什么每两次入队或出队操作才更新一次head或tail
HOPS
HOPS 设计(Hop-by-hop Operations Performed Sequentially)是一种优化策略,用于减少高频次的 CAS(Compare-And-Swap)操作,从而降低处理器缓存一致性流量的开销,提高队列操作的性能。
HOPS 设计的主要思想
HOPS 的主要思想是通过跳跃节点来减少每次操作中对 head 和 tail 指针进行 CAS 更新的频率。
具体来说,就是在更新 head 或 tail 指针时,尽量一次跳过多个节点,而不是每处理一个节点就更新指针。(ConcurrentLinkedQueue中是每处理2个节点更新一下指针)。
这对应的源码如下:
if (p != t) // hop two nodes at a time
casTail(t, newNode);
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
hop two nodes at a time:一次跳过两个节点。
其实JUC里面的工具,包括并发集合,线程同步器等。源码中又许多奇奇怪怪的操作,我们只要抓住2个本质就好了。
①、线程安全是前提
②、在满足①的前提下,一切为性能服务。
所以每两次入队或出队操作才更新一次head或tail
是为了提高并发条件下的队列入队或者出队的性能。
具体体现在以下方面:
- ①、减少CAS操作的频率
CAS操作比较昂贵,因为它需要进行内存屏障操作以确保内存的可见性。因此,减少CAS操作的频率可以显著提高系统的性能。
在入队操作中:if (p != t) casTail(t, newNode);
这里的逻辑是每次入队时,如果新节点的前一个节点p不等于当前的尾节点t,则执行casTail(t, newNode),将尾节点更新为新节点。这个条件实际上确保了每两次入队操作才更新一次尾节点。
在出队操作中:if (p != h) updateHead(h, ((q = p.next) != null) ? q : p);
这里的逻辑是每次出队时,如果当前节点p不等于头节点h,则执行updateHead(h, ((q = p.next) != null) ? q : p)
,将头节点更新为当前节点的下一个节点。这个条件实际上确保了每两次出队操作才更新一次头节点。
- ②、提高吞吐量
通过减少对head和tail的频繁更新,可以减少多个线程之间的竞争,从而提高整个队列操作的吞吐量。在高并发场景下,这种优化尤为重要,因为过多的CAS操作会导致大量的处理器缓存一致性流量,从而降低系统性能。
为什么不是三次操作一次更新
这就和 HashMap的负载因子为什么默认是0.75 这种问题类似。
选择每两次操作更新一次head或tail,而不是每三次或更多次,是经过权衡后的结果。
这样既能显著减少CAS操作的频率,提高性能,又能保持代码的简单性和执行效率。
在设计并发数据结构时,找到一个合理的平衡点是至关重要的,每两次操作的策略在实际应用中表现出较好的效果。
并且如果再增加操作次数,必定需要维护更多的状态和计数器来跟踪操作次数。这种复杂性可能导致代码更加难以维护和理解。
CAS操作对系统整体性能的影响
在多处理器系统中,处理器缓存一致性是一个重要的概念,用于确保每个处理器缓存中的数据与主内存中的数据保持一致。
CAS(Compare-And-Swap)操作是一种用于实现并发安全的原子操作,但它在执行时会触发缓存一致性协议,从而带来一定的性能开销。
处理器缓存一致性协议的概念:
现代多处理器系统通常具有多级缓存结构(如L1、L2、L3缓存)。
每个处理器都有自己的缓存,当一个处理器对某个缓存行进行写操作时,其他处理器缓存中相同缓存行的数据可能会变得过期。
为了解决这个问题,处理器使用缓存一致性协议(如MESI、MOESI)来确保所有缓存中的数据是一致的。
CAS操作对处理器缓存一致性的影响:
CAS操作是一种原子性操作,它会先读取某个变量的值,比较它与预期值是否相同,如果相同则更新它。这个操作需要在硬件层面确保其原子性,通常会触发缓存一致性协议。
比如以下具体的几个方面的影响:
缓存行失效:
当一个处理器执行CAS操作时,它需要锁定包含目标变量的缓存行,以确保其他处理器在操作期间不能修改该缓存行的数据。这会导致其他处理器的缓存中相同的缓存行失效。
缓存一致性通信:
为了保证一致性,处理器之间需要通信以传播缓存行的状态变化。这种通信会带来一定的延迟和额外的带宽开销。
内存屏障:
CAS操作会引入内存屏障(memory barrier),确保在操作前后的内存操作不会被重排序。这种屏障会影响处理器的指令流水线,降低执行效率。
举个简单的例子说明CAS操作带来的性能开销:
假设有个多核处理器,内核A和内核B,它们都有各自的缓存,并且它们都缓存了某个变量X的值:
内核A执行CAS操作,尝试更新变量X。
内核A需要确保在操作期间变量X的缓存行是独占的。
内核A通过缓存一致性协议通知内核B,使内核B缓存中的X失效。
内核A完成CAS操作,更新变量X。
内核B下一次访问X时,发现其缓存中的X已失效,需要从主内存中重新读取X。
这种缓存行失效和一致性通信会带来额外的性能开销,特别是在高并发环境下,处理器的多个内核频繁进行CAS操作时,这种开销会显著增加,导致系统整体性能下降。
所以ConcurrentLinkedQueue中的优化,通过每两次操作才更新一次head或tail, 可以显著减少CAS操作的频率,从而:
减少缓存一致性流量:减少缓存行失效和内存屏障带来的开销。
提高性能:提高处理器缓存的利用率和整体系统性能。
10、初始化节点,自引用节点的用处
初始化的head和tail节点
初始状态
在创建ConcurrentLinkedQueue实例时,会初始化一个空节点作为head和tail:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
这个空节点(dummy node)的item为null,并且头指针和尾指针都指向它。
作用
- ①、简化逻辑:
初始化一个空节点可以简化入队和出队操作的逻辑。无论队列是否为空,头节点和尾节点始终存在,这避免了在操作时检查队列是否为空的特殊情况。 - ②、确保一致性:
通过始终保持一个有效的头节点和尾节点,可以确保在多线程环境中进行并发操作时,头指针和尾指针的引用始终有效,从而保证操作的线程安全性。
自引用节点:
在ConcurrentLinkedQueue的操作中,有时会出现自引用节点(即一个节点的next指向它自己)。
这是通过设置节点的next指针为自身来实现的,对应源码:
if (h != p && casHead(h, p))
h.lazySetNext(h);
作用
- ①、标记删除:
自引用节点通常用作逻辑删除标记。这种标记方式使得其他线程可以识别并跳过这些节点,避免误用。 - ②、垃圾回收:
通过有效管理队列中的节点状态和引用关系,自引用节点间接地减少了内存泄漏,并帮助垃圾回收器识别和回收无效节点。
11、总结
ConcurrentLinkedQueue 是 Java 提供的一个无界、非阻塞、线程安全、高性能的队列,基于Michael & Scott 队列算法(一种无锁算法)实现。
它适用于高并发环境下的多线程访问,广泛应用于需要高效、线程安全队列的场景。
ConcurrentLinkedQueue 的设计目标:
- ①、线程安全
- ②、保证线程安全的前提下提高性能
主要有以下几个特点:
初始化的 head 和 tail 即 空节点(dummy node)
自引用节点
CAS操作
头尾指针的延迟更新