目录
前言
Java中的大部分同步类,如 Lock、Semaphore、ReentrantLock 等,都是基于 AbstractQueuedSynchronizer(简称为AQS)实现的。
AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。AQS 的框架如下图所示:
总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。
CLH 锁
CLH 锁是由 Craig、Landin 和 Hagersten 的发明,因此命名为 CLH 锁。CLH 是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。
AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个节点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
CLH 队列结构如下图所示:
AQS 框架
AQS 核心思想
AQS 核心思想是:
-
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
-
如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。
这个机制 AQS 是基于 CLH 锁实现的。
AQS 的核心原理图:
AQS 的同步状态
AbstractQueuedSynchronizer 使用了一个 int 成员变量 state 表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。并且,状态的获取和修改都是通过 final 修饰的,在子类中无法被重写。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
private volatile int state; // The synchronization state.
// 返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
// 原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}
我们可以通过修改 state 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程):
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API层。
AQS 对资源的共享方式
AQS 中线程的两种锁的模式:
-
独占(Exclusive):只有一个线程能执行,如,ReentrantLock。
独占方式,又分为公平锁和非公平锁:
-
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁;
-
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。
-
-
共享(Shared):多个线程可同时执行,如,Semaphore、CountDownLatch。
ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。
AQS 的重要方法
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。
对于自定义同步器,需要实现以下几种方法:
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待节点返回True,否则返回False。 |
一般来说,自定义同步器要么是独占方法,要么是共享方式,它们也只需实现:tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。
AQS 的数据结构
Node
先来看下AQS中最基本的数据结构:Node,Node即为上面CLH变体队列中的节点。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node@JDK 1.8
abstract static class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 当前节点在队列中的状态
volatile int waitStatus;
// 前驱指针
volatile Node prev;
// 后继指针
volatile Node next;
// 表示处于该节点的线程
volatile Thread thread;
// 指向下一个处于CONDITION状态的节点
Node nextWaiter;
...
}
其中,节点的状态 waitStatus 有下面几个枚举值:
枚举变量 | 枚举值 | 含义 |
---|---|---|
- | 0 | 一个新节点入队的默认状态,表示当前节点在 sync queue 中,等待着获取锁。 |
CANCELLED | 1 | 表示当前节点已取消调度。当 timeout 或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的节点将不会再变化。 |
CONDITION | -2 | 表示节点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的节点将从等待队列转移到同步队列中,等待获取同步锁。 |
PROPAGATE | -3 | 共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。 |
SIGNAL | -1 | 表示后继节点在等待当前节点唤醒。后继节点入队时,会将前继节点的状态更新为 SIGNAL。 |
注意,负值表示节点处于有效等待状态,而正值表示节点已被取消,因此,源码中很多地方用 ws > 0
、ws < 0
来判断节点的状态是否正常。
ConditionObject
Condition
// java.util.concurrent.locks.Condition
public interface Condition {
// 等待,当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();
//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
ConditionObject
ConditionObject 实现了 Condition 接口,Condition 接口定义了条件操作规范
// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// condition 队列的头节点
private transient Node firstWaiter;
// condition 队列的尾节点
private transient Node lastWaiter;
...
}
...
}
AQS 源码分析
AbstractQueuedSynchronizer 的源码如下所示:
// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
...
}
核心方法
acquire 方法
acquire() 方法的源码如下:
// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
acquire() 方法以独占模式获取(资源),忽略中断,即线程在 aquire 过程中,中断此线程是无效的。
当一个线程调用 acquire 时,调用方法流程如下:
-
首先调用 tryAcquire 方法,调用此方法的线程会试图在独占模式下获取对象状态。
此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在 AbstractQueuedSynchronizer 源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。
-
如果 tryAcquire 失败,则
-
调用 addWaiter 方法,addWaiter 方法完成的功能是将调用此方法的线程封装成为一个节点并放入 Sync queue。
-
调用 acquireQueued 方法,此方法完成的功能是 Sync queue 中的节点不断尝试获取资源,若成功则返回 true,否则,返回 false。
-
acquire() 的流程图:
addWaiter
addWaiter 将线程封装为节点的源码如下:
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
private Node addWaiter(Node mode) {
// 新生成一个节点,默认为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 保存尾节点
if (pred != null) { // 尾节点不为空,即已经被初始化
node.prev = pred; // 将 node 节点的prev域连接到尾节点
// 比较 pred 是否为尾节点,是则将尾节点设置为node
if (compareAndSetTail(pred, node)) {
pred.next = node; // 设置尾节点的 next 域为 node
return node; // 返回新生成的节点
}
}
// 尾节点为空(即还没有被初始化过),或者是 compareAndSetTail 操作失败,则入队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { // 无限循环,确保节点能够成功入队列
Node t = tail; // 保存尾节点
if (t == null) { // 尾节点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的节点
tail = head; // 头节点与尾节点都指向同一个新生节点
} else { // 尾节点不为空,即已经被初始化过
node.prev = t; // 将 node 节点的 prev 域连接到尾节点
if (compareAndSetTail(t, node)) { // 比较节点 t 是否为尾节点,若是则将尾节点设置为 node
t.next = node; // 设置尾节点的 next 域为 node
return t; // 返回尾节点
}
}
}
}
...
}
addWaiter 方法使用快速添加的方式往 sync queue 尾部添加节点,如果 sync queue 队列还没有初始化,则会使用 enq 插入队列中,enq 方法会使用无限循环来确保节点的成功插入。
acquireQueue
如果线程获取资源(锁)失败,说明线程已经被添加到等待队列尾部了。acquireQueued() 方法可以对排队中的线程进行“获锁”操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
acquireQueue 方法的源码如下:
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 标记是否成功拿到资源
try {
boolean interrupted = false; // 标记等待过程中是否中断过
for (;;) { // 开始自旋,要么获取锁,要么中断
final Node p = node.predecessor(); // 获取当前节点的前驱节点
if (p == head && tryAcquire(arg)) { // 如果 p 是头节点,说明当前节点在真实数据队列的首部,就尝试获取锁(注意,头节点是虚节点)
setHead(node); // 获取锁成功,头指针移动到当前node
p.next = null; // help GC
failed = false; // 设置标志
return interrupted;
}
// 说明 p 为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是 p 不为头节点,这个时候就要判断当前 node 是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取头节点的节点状态
if (ws == Node.SIGNAL) // 说明头节点处于唤醒状态
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 可以进行 park 操作
if (ws > 0) { // 表示状态为:取消状态
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { // 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 设置前任节点等待状态为SIGNAL
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() {
// 在许可可用之前禁用当前线程,并且设置了 blocker
LockSupport.park(this);
return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}
// 取消继续获取(资源)
private void doAcquireInterruptibly(long arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 释放后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 获取node节点的等待状态
if (ws < 0) // 状态值小于0,为SIGNAL = -1 或 CONDITION = -2 或 PROPAGATE -3
compareAndSetWaitStatus(node, ws, 0); // 比较并且设置节点等待状态,设置为0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 获取node节点的下一个节点
if (s == null || s.waitStatus > 0) { // 下一个节点为空或者下一个节点的等待状态大于0,即为 CANCELLED
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从尾节点开始从后往前开始遍历
if (t.waitStatus <= 0) // 找到等待状态小于等于 0 的节点,找到最前的状态小于等于 0 的节点
s = t; // 保存节点
}
if (s != null) // 该节点不为为空,释放许可
LockSupport.unpark(s.thread);
}
...
}
其中,parkAndCheckInterrupt 方法内部使用到了 LockSupport.park(this)
,顺便简单介绍一下 park 方法。
LockSupport 类是 Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的方法,归结到 Unsafe 里,只有两个:
-
park(boolean isAbsolute, long time):阻塞当前线程
-
unpark(Thread jthread):使给定的线程停止阻塞
所以,节点进入等待队列后,是调用 park 使它进入阻塞状态的。只有头节点的线程是处于活跃状态的。
acquireQueued() 方法的流程图如下:
其中,shouldParkAfterFailedAcquire() 方法的流程图如下:
调用 acquireQueue 方法,可以使 sync queue 中的节点在独占且忽略中断的模式下获取(资源)。
只有当该节点的前驱节点的状态为 SIGNAL 时,才可以对该节点所封装的线程进行 park 操作。否则,将不能进行 park 操作。parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断。
release
release() 方法以独占模式释放对象,其源码如下:
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final boolean release(long arg) {
if (tryRelease(arg)) { // 释放成功
Node h = head; // 保存头节点
if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
unparkSuccessor(h); // 释放头节点的后继节点
return true;
}
return false;
}
...
}
其中,tryRelease 的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功时,如果头节点不为空并且头节点的状态不为 0,则调用 unparkSuccessor 方法释放头节点的后继节点。
AbstractQueuedSynchronizer总结
对于 AbstractQueuedSynchronizer 的分析,最核心的就是 sync queue 的分析。
-
每一个节点都是由前一个节点唤醒;
-
当节点发现前驱节点是 head 并且尝试获取成功,则会轮到该线程运行;
-
condition queue中的节点向sync queue中转移是通过signal操作完成的;
-
当节点的状态为SIGNAL时,表示后面的节点需要运行。
参考:
标签:node,Node,Java,AQS,队列,AbstractQueuedSynchronizer,线程,节点 From: https://www.cnblogs.com/larry1024/p/17762241.html