在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
ConcurrentLinkedQueue是一个非阻塞的基于链表节点的无界线程安全队列,它遵循先进先出的原则,并采用了“wait-free”算法(即CAS算法)来实现。
类关系图
核心属性
// 头结点指针(不一定指向头,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> head;
// 尾节点指针(不一定指向尾,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> tail;
// 链表节点
private static class Node<E> {
// 节点元素(存数据)
volatile E item;
// 下一个节点
volatile Node<E> next;
...
}
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和 指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一 张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。
注意:
- 核心属性都是用volatile来修饰来保证数据的可见性。
- 默认情况下头结点和尾节点是相等的对应节点的
item
为null
- 头和尾节点都是缓更新,所以他们只能代表一个逻辑上的位置,不是实际的头尾节点
offer() 入队方法
public boolean offer(E e) {
// 检查元素是否为NULL,这里不允许存NULL值
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 自旋,t是tail节点的引用,p表示尾节点,默认情况下p=t的
for (Node<E> t = tail, p = t; ; ) {
// q是p的下一个节点
Node<E> q = p.next;
// q==null表示p是尾节点,否则需要更新p进入下一次循环
if (q == null) {
// 使用CAS将新节点设置为p的next节点
if (p.casNext(null, newNode)) {
// 不是每次添加新节点都会更新尾节点指针,每新增两个节点才会更新尾节点指针。
// 这样做的好处是减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。
if (p != t) // hop two nodes at a time
// 即使设置失败也没有问题,说明其他线程已经更新了尾节点的值
casTail(t, newNode);
// 永远返回true
return true;
}
}
// 定位尾节点
// 删除节点的时候,会把删除的节点指向自己(自引用),就会发生p==q. 当添加第一个节点后tail也会发生自引用
else if (p == q)
// t != tail表示尾节点指针已经被移动过,这时p直接指向新的尾节点指针tail,否则p直接指向头结点指针,
// 因为这时p已经被移出了链表,所以不能将p指向p.next
p = (t != (t = tail)) ? t : head;
else
// t != tail表示尾节点指针已经被移动过,所以p直接指向新的尾节点指针(tail),否则p指向p.next
p = (p != t && t != (t = tail)) ? t : q;
}
}
入队过程:
- 检查元素是否为NULL,如果是NULL直接抛出异常
- 初始化新节点
newNode
- 找到尾节点指针```t``
- 根据尾节点指针
t
找到真正的尾节点p
- CAS将
p
的next
指向newNode
- 判断是否更新尾节点指针的位置
注意:
- ConcurrentLinkedQueue 是不允许存NULL值的,NULL有特殊含义,表示已经从链表中移除(出队了)
- 入队过程大致分为三步,第一步找到真正的尾节点
p
;第二步CAS更新p.next
;第三步CAS更新尾节点指针的位置casTail(t, newNode);
-
t != tail
表示尾节点指针已经被移动过-
p == q
表示p
节点已经从链表中移除,并发生了自引用- 每新增两个节点才会更新尾节点指针, 这样能减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。从本质上来 看它通过增加对volatile变量的读操作来减少对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。
入队快照图
入队debug图
poll()出队方法
public E poll() {
// 外循环标记
restartFromHead:
for (; ; ) {
// h是头结点指针,p头结点,q是p的下一个节点
for (Node<E> h = head, p = h, q; ; ) {
// 保存头节点的值
E item = p.item;
// CAS替换头结点的item值为null,如果item为null表示该节点已经从链表中移除了
if (item != null && p.casItem(item, null)) {
// 每移除两个元素,头节点指针才开始移动
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为NULL
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 发生了自引用,重头再来
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
出队过程:
- 根据头结点指针找到头结点
- 暂存头结点的值
- 判断节点内容
item
是否为NULL
,如果为NULL
表示该节点已经出队了 - 如果
item
不为NULL
则使用CAS替换为NULL
,如果成功判断是否需要更新头结点指针 - 如果失败表示该节点已经被其他线程出队了,寻找新的头结点,继续第二步
注意:
- 每删除两个节点才会更新头点指针, 这样能减少了更新头点的竞争。
- 当节点的
item==null
时表示节点已经出队了
测试代码
public class ConcurrentLinkedQueueTset {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<>();
linkedQueue.offer(1);
linkedQueue.offer(2);
linkedQueue.poll();
linkedQueue.offer(3);
linkedQueue.offer(4);
linkedQueue.poll();
linkedQueue.poll();
}
}
参考
《java并发编程的艺术》
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下