AbstractQueuedSynchronizer(AQS)是一个抽象队列同步器,它用于构建依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。该类的目的在于提供基本功能的封装,适用于大多数需要使用单个原子int值表示同步状态的同步器。举例来说,ReentrantLock、Semaphore以及FutureTask等都是基于AQS实现的,我们也可以通过继承AQS来实现自定义的同步器。
核心思想
网络上常见的解释是:
如果所请求的共享资源是空闲的,那么当前请求资源的线程将被设置为有效的工作线程,并且共享资源将被锁定。然而,如果所请求的共享资源已被占用,则需要一套机制来阻塞等待线程并在适当时分配锁。AQS采用了CLH队列锁的实现方式来处理这种情况,即将无法立即获取到锁的线程添加到队列中进行排队等待。
我理解,可以把AQS视为一种锁,并将其用于管理共享资源的状态和线程请求。当有线程请求资源时,AQS会记录该线程作为当前工作线程,并将自身设置为锁定状态。如果其他线程请求AQS,则它们将被记录到等待队列中,无法获取到锁的线程进入阻塞等待状态。
为什么需要 AQS
在深入研究AQS之前,我们可能会问为什么需要AQS? 其中synchronized关键字和CAS原子类已经提供了丰富的同步方案。
然而,在实际需求中,我们对同步的需求各不相同。例如,如果我们需要对锁设置超时时间,单独依靠synchronized关键字或CAS是无法实现的,需要进行二次封装。而JDK中提供了丰富的同步方案,比如使用基于AQS实现的ReentrantLock。
用法
要将AQS用作同步器的基础,请根据具体使用情况重新定义以下方法,可以使用getState、setState和/或compareAndSetState来检查和/或修改同步状态:
-
tryAcquire
-
tryRelease
-
tryAcquireShared
-
tryReleaseShared
-
isHeldExclusively
这些方法的默认实现会抛出UnsupportedOperationException。这些方法的实现必须是线程安全的,并且通常应该是短暂而不是阻塞的。定义这些方法是使用此类的唯一受支持的方式。所有其他方法都被声明为最终方法,因为它们不应该被修改。
你可能还会发现从AbstractOwnableSynchronizer继承的方法对于跟踪拥有独占同步器的线程非常有用。我们鼓励您使用这些方法,这样监控和诊断工具可以帮助用户确定哪些线程持有锁。
尽管这个类基于内部FIFO队列,但它并不自动执行FIFO调度策略。独占同步的核心形式为:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
AQS是一种底层技术,用于实现锁机制。它提供了一种通过队列的方式,将争用锁的线程进行排队等待的机制。与此类似的是,共享模式也是通过类似的机制来实现。然而,共享模式可能涉及到级联信号的问题。
在获取操作之前,新获取的线程有可能会在其他被阻塞和排队的线程之前获得锁资源。如果需要的话,可以通过定义tryAcquire和/或tryAcquireShared方法来禁止插队,从而提供公平的FIFO获取顺序。具体来说,大多数公平同步器可以在tryAcquire返回false时,通过调用hasQueuedPredecessors方法返回true,以保证公平性。还有其他修改的可能性。
默认的插入策略通常是贪婪、放弃和避免护送策略,这种策略可以提供最高的吞吐量和可扩展性。尽管这不能保证公平性或无饥饿,但它允许较早排队的线程在较晚排队的线程之前重新竞争,并且每次重新竞争都有公平机会与传入线程成功竞争。此外,虽然获取操作不是传统意义上的自旋,但它们可能会在阻塞之前多次调用tryAcquire,并在其中插入其他计算。这样做可以提供自旋带来的许多好处,而不会增加太多的负担。如果需要的话,可以通过预先调用具有"快速路径"检查的方法来增加这一点,例如预先检查hasContended和/或hasQueuedThreads,以便仅在同步器可能不会争用的情况下执行自旋操作。
AQS提供了高效且可扩展的同步基础,适用于依赖于整数状态、获取和释放参数以及内部FIFO等待队列的同步器。如果这还不够,可以使用原子类、自定义java.util.Queue类和LockSupport阻塞支持从较低级别构建同步器。
用法示例
这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。它还支持条件并公开一些检测方法:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Reports whether in locked state
public boolean isLocked() {
return getState() != 0;
}
public boolean isHeldExclusively() {
// a data race, but safe due to out-of-thin-air guarantees
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Provides a Condition
public Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isLocked(); }
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。因为锁存器是非独占的,所以它使用共享的获取和释放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
AQS 底层原理
父类 AbstractOwnableSynchronizer
AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
// 设置当前持有锁的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。
CLH 队列
AQS(AbstractQueuedSynchronizer)的等待队列是CLH(Craig, Landin, and Hagersten)锁定队列的一种变体。CLH锁通常用于自旋锁。AQS通过将每个请求共享资源的线程封装为一个CLH节点来实现。这个节点的定义如下:
/** CLH Nodes */
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
// methods for atomic operations
final boolean casPrev(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值
}
final boolean casNext(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, NEXT, c, v);
}
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
final void setPrevRelaxed(Node p) { // for off-queue assignment
U.putReference(this, PREV, p);
}
final void setStatusRelaxed(int s) { // for off-queue assignment
U.putInt(this, STATUS, s);
}
final void clearStatus() { // for reducing unneeded signals
U.putIntOpaque(this, STATUS, 0);
}
private static final long STATUS = U.objectFieldOffset(Node.class, "status");
private static final long NEXT = U.objectFieldOffset(Node.class, "next");
private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}
CLH节点的数据结构是一个双向链表的节点,其中每个操作都经过CAS(Compare and Swap)来确保线程安全。要将其加入CLH锁队列,您可以将其自动拼接为新的尾节点;要出队,则需要设置头节点,以便下一个满足条件的等待节点成为新的头节点:
+------+ prev +-------+ prev +------+
| | <---- | | <---- | |
| head | next | first | next | tail |
| | ----> | | ----> | |
+------+ +-------+ +------+
Node 中的 status 字段表示当前节点代表的线程的状态。status 存在三种状态:
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2; // in a condition wait
-
WAITING:表示等待状态,值为 1。
-
CANCELLED:表示当前线程被取消,为 0x80000000。
-
COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。
在上面的 COND 中,提到了一个条件等待队列的概念。首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:
-
ExclusiveNode
-
SharedNode
-
ConditionNode
前两者都是空实现:
static final class ExclusiveNode extends Node { }
static final class SharedNode extends Node { }
而最后的 ConditionNode 多了些内容:
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter;
// 检查线程是否中断或当前线程的状态已取消等待。
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
ConditionNode 拓展了两个方法:
-
检查线程状态是否处于等待。
-
阻塞当前线程:当前线程正在等待执行,通过
LockSupport.park()
阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。
而到这一步,所有的信息都指向了一个相关的类 Condition 。
总结
AQS是锁机制实现的底层技术,它通过队列的方式将争用锁的线程进行排队等待。与此不同的是,Condition代替了Object中的同步方法能力。
当Condition进入等待状态时,它会在等待队列的队尾插入新的节点,并且通过release方法释放锁资源。在释放锁资源后,它会通过acquire开启一个自旋尝试重新获取锁资源。当自旋一定时间后,如果未成功获取到锁资源,就会通过LockSupport的API进入阻塞状态。
对于依赖单个原子int值来表示同步状态的解读,Node通过status属性来控制与之关联的线程的等待状态,而AQS的state用于控制同步状态。
顶尖架构师栈
关注回复关键字
【C01】超10G后端学习面试资源
【IDEA】最新IDEA激活工具和码及教程
【JetBrains软件名】 最新软件激活工具和码及教程
标签:Node,return,AQS,编程,public,并发,线程,final From: https://www.cnblogs.com/dc-s/p/17719588.html