什么是CAS?
CAS(Compare And Swap),顾名思义就是比较并交换。用于解决多线程使用锁带来的性能损耗的问题,是一种非阻塞算法,其交换原理如下图:
CAS用法:
- 数据库中的乐观锁:即表字段+version字段,然后每次更新时就比较当前version版本是否一致,一直才更新并且升级version=version+1。
- unsafe的用法:
java.util.concurrent.atomic.*
什么是AQS?
AQS(AbstractQueuedSynchronizer),顾名思义就是抽象队列同步器。由FIFO(先进先出)的阻塞队列和相关同步器组成。这是在concurrent包(并发处理)下。
AbstractQueuedSynchronizer为锁机制维护了一个队列,需要获取锁的线程们排在队列中,只有排在队首的线程才有资格获取锁。
首先看张图,取自《Java并发编程的艺术》:
然后看如下,AbstractQueuedSynchronizer源码及其分析如下:
/**
* 提供一个阻塞锁和相关依赖FIFO等待队列同步器的实现。
* 这个类支持排他和共享模式。排他模式下当一个已获取到了,其他线程尝试获取不可能成功。共享模式可以被多个线程获取。通常子类实现仅支持其中一种,但是也有两种的支持的如ReadWriteLock。
* 这个类定义了一个实现了Condition的内部类ConditionObject,用于排他模式。
* 使用一个基础的同步器需要重新定义以下方法:
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* 以上的每个方法均默认抛出{UnsupportedOperationException}错误,所以以上的几个方法没有提供默认实现,需要子类重写。
* 这个类提供了一个有效的、可伸缩的基础给同步器如状态、acquire获取和的同步器释放参数、内部FIFO等待队。当这些不够用时,可使用atomic、Queue、LockSupport。
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
/**
* 等待队列的node class。Node作为等待队列的节点
* 这个等待队列是CLH的变体,CLH一般用于自旋锁。使用其代替一般的同步器,但也用了相同的策略来控制。
*/
static final class Node {
/** 共享模式 */
static final Node SHARED = new Node();
/** 排他模式 */
static final Node EXCLUSIVE = null;
/** 当前线程被取消 */
static final int CANCELLED = 1;
/** 当前节点的后继节点包含的线程需要运行*/
static final int SIGNAL = -1;
/** 当前结点在condition队列中 */
static final int CONDITION = -2;
当前场景下后续的acquireShared能够得以执行 */
static final int PROPAGATE = -3;
/** 当前节点的状态。*/
volatile int waitStatus;
/** 前驱结点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 入队线程 */
volatile Thread thread;
/** 存储condition队列中的后继节点 */
Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED;}
/**
* 返回前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
..............................
}
/**
* 仅用于初始化等待队列的head。只能通过setHead修改,当这个head还存在时不能将waitStatus=>cancelled
*/
private transient volatile Node head;
/**Tail节点初始化,仅能通过enq追加新的wait node*/
private transient volatile Node tail;
synchronization state. */
private volatile int state;
/** CAS原子性的修改 synchronization state ,拉到代码最下面可见其值的设置*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
static final long spinForTimeoutThreshold = 1000L;
/**
* 为队列追加node节点
*/
private Node enq(final Node node) {
for (;;) {//一直循环入队,直到成功
Node t = tail;
if (t == null) { //同样获取尾节点,并且如果为空就将尾节点初始化为头结点head一样
if (compareAndSetHead(new Node()))
tail = head;
} else { //尾节点不为空就执行addWaiter一样的过程把新的node加到最后
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
新建Node并入队
*/
private Node addWaiter(Node mode) {
//新建一个Node
Node node = new Node(Thread.currentThread(), mode);
// 存储当前尾节点(当作旧的尾节点)
Node pred = tail;
if (pred != null) { //如果当前尾节点不为空
node.prev = pred; //将新建的节点的前驱节点执行旧的为节点
if (compareAndSetTail(pred, node)) {//CAS原子替换当前尾节点从旧的替换到新建node的位置
pred.next = node;//将旧的尾节点位置的后置节点执行新建的节点
return node;
}
}
//如果上面入队失败则调用enq方法入队
enq(node);
return node;
}
private void setHead(Node node) {
head = node; //将头结点指向node
node.thread = null; //线程置空
node.prev = null;//因为是头节点了,不用需要前驱结点
}
/**
* 唤醒后续节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果后置节点是尾节点或Cancelled状态
if (s == null || s.waitStatus > 0) {
s = null; //将当前后置节点置为null
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
/**
* 共享模式下
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 取消正在尝试获取锁的节点
*/
private void cancelAcquire(Node node) {
if (node == null) return;
//cancel一个节点时会将当前节点thread置为null
node.thread = null;
// 循环跳过已设置了cancelled状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0) node.prev = pred = pred.prev;
//存储上面得到的节点前驱节点
Node predNext = pred.next;
//将当前要cancel的节点状态设置CANCELLED
node.waitStatus = Node.CANCELLED;
//1 如果当前节点node是尾节点。更新尾节点为pred.next指向null,相当于删除了node(和pred到node间为cancel的节点)
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//2 当前既不是尾节点,也不是head后继节点。设置node的前驱节点waitStatus为SIGNAL,node前驱节点指向后继节点,相当于删除node
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//3 如果node是head的后继节点。则直接唤醒node的后继节点。在head后面的节点有资格尝试获取锁,但是当前node放齐了当前资格,所以会唤醒其后续的节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**
* 判断当前节点是否挂起
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //当前状态下挂起
return true;
if (ws > 0) {
//跳过已被设置了cancelled的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/** 将上级的等待状态设为SIGNAL */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* 尝试获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //进入循环后会不断的尝试获取
final Node p = node.predecessor();//获取当前节点的头结点!!只有head头结点才持有锁!!
//如果当前的前驱节点是头结点则尝试获取锁。
//如果尝试成功则将当前node设为头结点,并将旧的head设为null便于回收
//获取失败看是否需要挂起,如果需要挂起则挂起线程等待下一次被唤醒时继续尝试获取锁。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // 帮助GC
failed = false;
return interrupted;
}
//判断是否挂起(根据Node的状态=-3就会挂起),然后调用刮起的方法(里面调了Thread.interrupted();)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
。。。。。。。。。。。。。。。
// Main exported methods
子类实现的尝试获取锁的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/** 子类实现尝试释放锁的方法 */
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/** 子类实现尝试获取共享锁的方法 */
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/** 子类实现尝试释放共享锁的方法 */
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/** 子类实现排他模式下状态是否占用 */
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 排他模式,获取互斥锁
*/
public final void acquire(int arg) {
//尝试获取锁(tryAcquire在此类中是抛异常的,应在子类实现),
acquireQueued再次尝试获取锁,addWaiter适用于新建一个新node
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {}
public final boolean release(int arg) {
if (tryRelease(arg)) {//尝试释放锁成功
Node h = head;//获取当前被释放了锁的head头节点
//如果头节点不为空且当前节点状态正常就唤醒当前节点的后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {... }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {........ }
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 以下是队列检查方法
public final boolean hasQueuedThreads() {
return head != tail;//队列是否存在
}
public final boolean hasContended() {
return head != null; //头结点是否为空
}
public final Thread getFirstQueuedThread() {
return (head == tail) ? null : fullGetFirstQueuedThread();
}
。。。。。。。。。。
//工具和监控方法
public final int getQueueLength() {
int n = 0;//拿到队列长度
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
//获取当前node queue的所有线程
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
......的工具类..............................
/**
* condition queue, 单向队列。线程拿到锁,但条件不足时,会放到这个队列等待被唤醒
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter; //头结点
private transient Node lastWaiter;//尾节点
public ConditionObject() { }
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
。。。。。。。。。。之类的工具类。。。。。。。。
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
。。。。。一堆CAS方法。。。。。。。。
}
参考:
http://ifeve.com/introduce-abstractqueuedsynchronizer/
以下四篇:
https://www.jianshu.com/p/9ebca222513b
https://www.jianshu.com/p/c806dd7f60bc
https://www.jianshu.com/p/01f2046aab64
https://www.jianshu.com/p/134f494d76d8
标签:node,Node,CAS,节点,int,简析,源码,null,final From: https://blog.51cto.com/u_13854513/6952283