AQS 结构特性
- 内部包含
Node
、ConditionObject
静态内部类,Node 用来存储没竞争到锁的线程状态、CondidtionObject 是对条件变量的封装; volatile int state
变量记录锁的状态,1 表示锁被持有、0 表示锁被释放,同时对应三个方法来更改/获取锁的状态:getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
;- AQS 内部维护了一个双向链表实现的线程等待队列,称为 CLH 队列;
- AQS 支持两种模式的资源访问(独占/共享):独占模式是指同一时间只允许一个线程访问资源,例如
ReentrantLock
,共享模式是指可以多线程访问资源CountDownLatch
、Semaphore
;
AQS 中 CLH 队列结构如下:
- 比如说此时恰好有个线程A 持有资源,持有资源的线程一定位于 Head 节点;
- 此时另一个线程B 想要获取锁资源,但是获取锁失败,将B线程封装为 Node节点存到队列尾;
- 线程B 被挂起,并通知线程A(将线程A 的 waitStatus 状态设置为 SIGNAL),在 A释放资源时通知其它线程;
- 线程A 释放资源,将自身 Node 设置为 null 方便 GC回收,然后通知线程 B;
- 线程B 尝试获取锁资源(公平锁大概率成功获取,非公平锁不一定)。
此外,AQS 还预留了一些接口给子类,由子类实现锁的释放和获取:
//尝试获取排他锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//尝试释放排他锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//尝试获取共享锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//尝试释放共享锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//判定当前线程获得的资源是否是排他资源
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
1、Node 静态内部常量类
static final class Node {
/** 节点处于共享模式标记 */
static final Node SHARED = new Node();
/** 节点处于独占模式标记 */
static final Node EXCLUSIVE = null;
/** 当前节点作废,需要移除队列 */
static final int CANCELLED = 1;
/** 待唤醒后继节点,当前节点释放资源后一定会唤醒后继节点,因此后继节点可以安心挂起 */
static final int SIGNAL = -1;
/** 当前节点线程获取了资源,但是执行过程中又主动放弃了资源,被移入 Condition 队列中(注意不是等待队列) */
static final int CONDITION = -2;
/**
* Head 节点才持有,标记着连续唤醒队列中处于共享模式的节点,让他们并发获取共享资源
*/
static final int PROPAGATE = -3;
/** 当前节点的等待状态,标记为上述的几个值 */
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 当前节点绑定的线程 */
volatile Thread thread;
/** 等待条件的下一个节点 */
Node nextWaiter;
}
Node 结构如上图所示,Node 就是绑定某一个线程,并存储该线程相关信息的结构。
2、ConditionObject 条件变量类
3、核心排他锁加锁方法
这里以 ReentrantLock.lock()
方法切入,我们最终会发现调用的还是 AQS 类的 acquire()
方法:
// Reentrant.lock()
public void lock() {
sync.lock();
}
// FairSync.lock()
final void lock() {
acquire(1);
}
// AQS.acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire()
方法的逻辑如下:
- 调用由子类实现的
tryAcquire()
方法获取锁,如果成功就结束,否则失败了返回 false; - 如果失败了继续执行以下步骤:
- 调用
addWaiter(Node)
方法封装当前线程对应的 Node 结构,然后通过CAS
操作添加到队尾并修改队尾节点; - 调用
acquireQueued()
方法,先找到当前节点的前驱节点 PrevNode:- 如果 PrevNode 是队列头节点,就再执行一次
tryAcquire()
方法获取锁,因为可能前驱节点已经释放了锁资源,相当于一次重试。如果成功获取锁,就将当前节点设置为头节点并将前驱节点置空帮助 GC 回收; - 如果 PrevNode 不是头节点,就执行
shouldParkAfterFailedAcquire()
方法判断是否将当前节点对应的线程挂起,如果需要挂起就调用LockSupport.park()
方法挂起线程。如何判断是否需要挂起?根据前驱节点的waitStatus
标志,如果是SIGNAL
就挂起,否则设置前驱节点为SIGNAL
并等待下一次自选时挂起。
- 如果 PrevNode 是队列头节点,就再执行一次
- 调用
- 调用
selfInterrupt()
方法,只有在尝试加锁失败且acquireQueued()
方法标识为 true 时才执行。
3.1 tryAcquire() 方法
这里以 ReentrantLock.FairSync.tryAcquire()
来分析:
// ReentranLock.Sync 内部类继承自 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {.....}
// FairSync 继承自 Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加锁操作
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁状态:0——未加锁;1——已有线程加锁
int c = getState();
if (c == 0) {
// hasQueuedPredecessors():判断等待队列中是否有线程在排队,已有返回 true,否则 false
// 如果没有线程排队就尝试 CAS 加锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 加锁成功,设置为当前独占线程
setExclusiveOwnerThread(current);
return true;
}
}
// 否则已经有线程获取了锁,判断是不是当前线程自身,如果是就是重入加锁,累加 state 变量
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
3.2 addWaiter() 方法
private Node addWaiter(Node mode) {
// 构造当前线程对应的节点
Node node = new Node(Thread.currentThread(), mode);
// 判断队列尾是否为空,不为空 CAS 插入
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 否则队列未初始化,执行队列初始化然后插入节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter()
方法最终返回成功加入队列尾的节点。
3.3 acquireQueued() 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 注意这里会一直自旋
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点,当前线程尝试重新获取锁
if (p == head && tryAcquire(arg)) {
// 成功后 CAS 设置头节点,并协助 GC
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否则挂起当前线程,等待下一次自旋
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 前驱节点状态为 SIGNAL,安心挂起
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前驱节点状态为 CANCELED,循环删除直到找到找到节点状态正常的前驱节点为止
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 否则设置前驱节点状态为 SIGNAL,那么下一次循环就会挂起当前线程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程
LockSupport.park(this);
// Thread.interrupt() 检测当前线程是否可以被中断(检查中断标志),并返回一个 boolean 并清除中断状态,第二次调用时中断状态已被清除,将返回一个 false
// interrupt() 并不中断线程,结果可能为 true/false
return Thread.interrupted();
}
3.4 加排他锁方法总结
总结下 AQS 添加排他锁的逻辑:
- 首先判断 CLH 队列中是否有等待资源的线程,如果没有直接 CAS 加锁,具体来说就是将当前节点设置为头节点;否则判断持有锁的线程和当前线程是不是为同一个线程,若为同一个,即重入操作,那么修改锁信息(+1)即可;
- 反之当前线程可能需要等待,这取决于它的前驱节点的状态:
- 若前驱节点为头节点,那么再重复尝试一次,这样能保证资源最大限度地利用。如果成功获取锁,就将头节点置空方便垃圾回收,并且当前线程设置为头节点;
- 否则当前线程需要插入到队尾等待,但是此时线程是可能分配到CPU时间片的,所以我们还需要释放线程占有的系统资源,但是释放资源有个前提,就是前驱节点释放锁时能够唤醒我。所以自选检查前驱节点状态,如果不是
SIGNAL
就需要更新为SIGNAL
,SIGNAL
保证了前驱节点释放锁时一定会唤醒后继节点。这里检查的同时还清理了队列中需要丢弃的节点(CANCELLED
)。
4、核心排他锁解锁方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁会首先尝试释放锁,如果释放成功,就唤醒后继第一个状态正常(非CANCELLED
)的线程。
4.1 tryRelease() 方法
参考 ReentrantLock.Sync.tryRelease()
:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前释放锁的线程不是独占锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 设置独占锁线程为空
setExclusiveOwnerThread(null);
}
// 更新锁状态
setState(c);
return free;
}
4.2 unparkSuccessor() 方法
唤醒后继节点的方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 将当前头节点的状态更新为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取后继节点,如果后继节点为空或者状态为 CANCELLED,反而从队列尾反向查找第一个状态正常的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 找到后唤醒线程,线程会进入竞争状态
LockSupport.unpark(s.thread);
}
线程在被成功唤醒后会进入资源竞争状态,此时就对应了前面加锁时的 acquireQueued()
方法,方法中包含一个死循环,循环会一直重复调用 tryAcquire()
方法,这里举个例子方便理解:
- 假设在 t1 时刻,线程A 获取了锁资源,线程B 也尝试获取锁,但是被线程A 占用,所以线程B被搞到了等待队列中(此时线程B 的前驱节点就是头节点也即线程A),线程B 会在acquireQueued的for(;;)中 不断自旋!
- 如果 t2 时刻线程A 释放了锁资源,那么
unparkSuccessor()
方法会唤醒线程B 节点; - 接着在 t3 时刻,线程B 自旋到
if(p==head && tryAcquire(arg))
方法时,将线程B 设置为头节点,此时 B持有锁资源; - 问题是如果B 的前驱节点不是头节点,参考
shouldParkAfterFailedAcquire()
方法,程序会循环向前遍历,将所有状态为CANCELLED
的前驱节点剔除掉,这样队列中的所有节点都有很大概率是有效状态的节点。
4.3 释放锁总结
线程释放锁资源用到了一个特别的策略,在寻找第一个有效后继节点时(状态非 CANCELLED
),如果第一个节点无效(null 或者 CANCELLED
),就转而从队列尾开始向前查找。
相应的,队列中等待获取锁的线程会执行自旋操作,自旋操作过程中会不断清理队列中状态为 CANCELLED
的线程节点。
5、核心共享锁加锁方法
以 Semaphore
为例讲解 AQS 共享锁加锁过程:
// Semaphore.acquire() 加锁
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// Sync 继承 AQS类,调用 AQS类方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 若没有多余的剩余资源
if (tryAcquireShared(arg) < 0)
// 进入等待队列排队
doAcquireSharedInterruptibly(arg);
}
// tryAcquireShared() 方法由 Semaphore 实现
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
5.1 tryAcquireShared() 函数
加锁后返回剩余可用的锁资源数:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 共享锁加锁,返回加锁完毕后剩余的可用资源数
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 计算剩余资源是否可用
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
5.2 doAcquireSharedInterruptibly() 函数
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 当前线程节点添加到队列尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是 Head,重新尝试加锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 当前节点成功获取锁,向下传播,判断后续节点能否获取锁资源
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 否则调整前驱节点,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5.3 setHeadAndPropagate() 函数
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果还有剩余锁资源或者头节点为空或头节点状态无效,尝试唤醒共享模式的后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点状态为 SIGNAL,说明头节点释放后一定会唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 只要头节点CAS成功改变状态为0,就一定会唤醒后继节点
unparkSuccessor(h);
}
// 若头节点是初始状态,就更改状态为 PROPAGATE,以便后续传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
什么情况下节点的 waitStatus=0
?
- 节点进入队列后,如果要挂起一定会先将前驱节点状态调整为
SIGNAL
,若此时前驱节点还未调整完,后继节点的状态为 0; - 在上面
doReleaseShared()
中,如果后继节点被唤醒,前驱节点需要先通过 CAS 将状态转换为 0;
唤醒逻辑梳理下如下:
- 判断头节点的状态,如果是
SIGNAL
说明需要唤醒后继节点; - 如果头节点状态为 0,则告知头节点,在释放后不仅需要唤醒后继节点,还需要不断传播下去唤醒更多共享模式的节点。