什么是AOS
AQS抽象队列同步器(AbstractQueuedSynchronizer)。是java juc里提供的一个实现锁同步机制的框架。它提供同步状态、阻塞和唤醒线程函数以及队列模型的原子管理。大多数同步类(Lock、Semaphore、ReentrantLock等)都是基于AQS实现的。AbstractQueuedSynchronizer是一个抽象类,实现了锁操作的大部分框架方法,还有一些具体实现留给子类实现。
实现原理
来看下AQS定义的几个主要属性
public abstract class AbstractQueuedSynchronizer{
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
state用来标识当前是否锁空闲。0表示空闲,>0表示有线程占用锁。如果这个时候有新线程进入,则会进入等待队列。等待队列使用链表结构形式存储。AQS里记录链表的head指针和tail指针。Node节点是AQS一个内部类。除了链表结构需要的前驱(prev)和后继(next)属性外。还有一个thread用来记录当前线程,waitStatus用来记录当前线程的等待状态。
waitStatus的可选值有:
DEFAULT | 0 | 默认值 |
SIGNAL | -1 | 当前节点的后续节点被park阻塞了。当前节点释放锁后需要unpark唤醒其后继节点 |
CONDITION | -2 | |
CANCELLED | 1 | 该节点不需要获取锁,已经取消,可以出队队列操作过程中会不断修改Node节点的这几个属性。 |
开始以ReentrantLock为例进行源码阅读。ReentrantLock里有两个实现类FairSync和NonfairSync继承自AQS对获取锁和释放锁方法进行了实现。类结构如下
class ReentrantLock implements Lock
{
private final Sync sync;
class Sync extends AbstractQueuedSynchronizer;
class NonfairSync extends Sync;
class FairSync extends Sync
}
公平锁与非公平锁
FairSync是公平锁,NonfairSync是非公平锁。ReentrantLock默认使用非公平锁,这里也从非公平锁开始看。
公平锁&非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
ReentrantLock使用lock()加锁,unlock()方法解锁。先从加锁开始。
加锁
加锁会调用lock()方法。然后lock调用Sync的lock()方法
调用NonfairSync.lock()方法
static final class NonfairSync extends Sync {
/**
*
* 2、先获取锁,就是调用compareAndSetState方法成功即标识拿到锁。
* 否则调用acquire方法。acquire是AbstractQueuedSynchronizer类中实现方法
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* AbstractQueuedSynchronizer的acquire方法if首先会执行该方法
*/
protected final boolean tryAcquire(int acquires) {
//nonfairTryAcquire方法在Sync中实现
return nonfairTryAcquire(acquires);
}
}
compareAndSetState()方法是AQS基类里的final方法,使用CAS方式来修改state的值。如果修改成功则获取锁成功,然后设置当前占用锁的线程为当前线程。(后面可重入使用)。如果设置state失败,则进入else调用acquire()方法再次尝试获取锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire()方法是AQS基类的方法,首先调用tryAcquire()方法,tryAcquire方法AQS中没有实现,是Sync里具体实现,非公平Sync会调用nonfairTryAcquire()方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//state是0表示当前没有线程暂用,再次CAS尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
*没有获取成功,但是当前占用锁的线程和当前线程是同一个线程.
* 那么锁信号量state累加赋值,实现了锁的可重入。
* 多次加锁要对应相同次数的解锁
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//还是没有获取到锁
return false;
}
如果还是没有获取到锁,会继续执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。首先来看addWaiter方法。就是线程入队操作。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//tail不为空,设置当前节点的前置节点为tail
node.prev = pred;
//设置整个AQS的tail节点为当前节点
if (compareAndSetTail(pred, node)) {
//设置前置节点的下一节点为当前节点
pred.next = node;
//到这里 tail 和node建立了双向链表。然后更新AQS的tail为当前node
return node;
}
}
//tail为空,进入初始化队列操作
enq(node);
return node;
}
/**
enq方法和addWaiter方法主要逻辑差不多,只有tail为空,队列中没有元素的时候
执行到enq进行必要的初始化
*/
private Node enq(final Node node) {
/**
* 入队操作
* 死循环
* 第一次:tail节点是个null,进if 生成 head = tail = new Node() 空节点
* 第二次:进else ,, Node() <-> node (tail) 第二次可能存在并发,如果compareAndSetTail失败会执行多次
*/
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置node的前置节点为第一次的空节点
node.prev = t;
//设置AQS的tail为当前节点
if (compareAndSetTail(t, node)) {
//设置tail的下一节点为当前节点,完成双向连表设置
t.next = node;
return t;
}
}
}
}
这里第一次tail为空的时候进入到enq方法,enq方法第一次循环创建了一个空node节点然后,head和tail都指向该节点。第二次将队列tail和tail建立指向,入队。然后将整个队列的tail指向该节点。
如果有T3线程加入排队,当前节点的前线修改为tail,设置当前队列的tail指向当前node,设置原来tail的后续为当前节点,完成node3的入队
继续跟随方法调用栈,addWaiter返回node,然后执行acquireQueued(node, arg))方法。来看acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
final Node p = node.predecessor();
//前序节点是head节点,进行获取锁操作
if (p == head && tryAcquire(arg)) {
//获取锁成功,当前节点出队
setHead(node);
//释放原head节点对当前节点的引用
p.next = null; // help GC
//修改标识为获取锁成功
failed = false;
//返回当前线程未被阻塞
return interrupted;
}
/**
* 获取锁失败,执行shouldParkAfterFailedAcquire判断是否当前线程需要进行park阻塞。防止不必要的空悬
* 返回true,则执行parkAndCheckInterrupt方法park当前线程,返回当前线程被终端
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果获取锁成功,取消正在进行的acquire操作,将当前节点status置为CANCELLED
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
acquireQueued方法这里一直死循环自旋操作。
拿到当前node的前序节点,判断前序节点是不是head节点
1、是head节点。再次调用tryAcquire尝试获取锁操作,如果获取锁成功,设置当前节点为head节点,解除引用关系。
当前节点的(thread、prev)置空,当前节点的前序节点的next置空
这样Node1节点相当于出队了,没有任何引用,可以进行垃圾回收
2、前序节点不是头节点或者获取锁失败。就要调用shouldParkAfterFailedAcquire方法判断当前线程是否需要park阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前置节点waitStatus是SIGNAL 需要park当前线程
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {//前置节点是CANCELLED的
/**
* 循环向前查找,跳过所有CANCELLED的node,剔除队列(链表取消指向,当前节点指向找到第一个不是canceled的,然后将该节点的next指向当前节点)
* 原理是 node1<-> node2(CANCELD)<-> node3(CANCELD)<->node(current)
* 变成 node1<-> node(current)
* 不挂起,进入下一次自旋
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前置节点waitStatus是默认值或PROPAGATE,修改前序节点为SIGNAL。 进入下一次自旋,下一次再来判断前序就是SIGNAL状态,就需要park当前线程了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
判断当前节点需不需要park的条件是其前序节点waitStatus是SIGNAL状态。
前序节点状态
case1、SIGNAL 当前节点需要park
case2、CANCELLED
如果其前序节点是CANCELLED取消状态,则继续递归的往前找前序的前序节点,直到找到不是取消状态的节点作为当前节点的前序进行连线。然后中间取消的节点相当于直接移除队列。这里过滤掉取消状态的线程节点后也不挂起,进入下一次自旋判断。
case3、其它状态
可能为初始状态,修改前序节点状态为SIGNAL,下次自旋时候判断就需要park当前线程了。
这里以上面队列中的T3为例,首先其prex节点是Node2对应T2不是head节点。就会进入是否需要挂起当前线程的判断方法。然后其prex节点Node2的waitStatus状态是初始值0,上面的第3种情况。会设置其prex的等待状态值为SIGNAL(-1)。第一次自旋结束。线程没有被挂起。继续回到acquireQueued方法开始第二次自旋。这个时候其prex节点Node2还不是head节点,但是其waitStatus状态已经由上次改为SIGNAL了。所以这一次自旋T3线程就需要被park阻塞挂起了。parkAndCheckInterrupt()方法会被执行。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
当前节点的自旋会修改其prex节点的waitStatus为SIGNAL。当前节点需不需要被park又依赖其prex节点的WaitStatus是不是SIGNAL。所以队列中的线程基本上经过两次自旋后都会被park挂起。然后等待被其prex线程唤醒。减少不必要的CPU资源消耗。
当T2节点拿到锁时自己会变成head节点
解锁
解锁要从unlock()方法开始。unlock会调用AOS的release方法,release首先尝试调用Sync实现的tryRelease方法。
//AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁
Node h = head;
//判断head节点不为空且head的waitStatus不为初始状态0
//为什么要判断head的waitStatus?
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒head线程的后继节点
return true;
}
return false;
}
//唤醒线程,node参数是head
private void unparkSuccessor(Node node) {
//判断head的waitStatus是否小于0,否则修改为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
*找到当前head节点的后继节点,判断不是null且waitStatus不是CANCELED,否则一直找
* 注意这里for循环是从tail往前找,一直找到最前一个符合条件的
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒当前节点对应线程
}
//Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//判断拿锁线程时当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//本次锁释放后,state回位0,所有锁都已释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
锁的释放要做两件基本事情。
1、释放锁资源(修改state状态)
释放AQS的state为拿锁时锁定的acquire量,如果state变为0,则表示当前锁空闲,可以唤醒下一个等待线程。
2、唤醒其next等待线程。
唤醒线程从队列的head开始出队,
唤醒head的后继为什么要判断head的waitStatus不等于0?
因为前面等待入队的时候当前线程自选需要park阻塞的前提时其prex节点waitStatus是SIGNAL(-1)。如果head节点waitStatus状态是0初始值,则其next对应线程还未进行park挂起操作。这个时候对其unpark会报错。
为什么找符合条件的node从tail开始往前找?
因为前面入队addWaiter时候双向链表建立prex,next关系时候不是原子操作。先设置的prev后设置的next,所以这里也从后往前找,先拿node.prev。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
总结
获取锁:多线程会竞争获取AQS中state资源。获取到的线程拿到锁。获取不到的进入等待队列。等待队列是双向链表格式,初始head节点是一个空Node对象,每个Node存有当前线程引用,还保有一个waitStatus状态值,用来标识当前线程状态。放入等待队列的线程会不断自旋去获取锁,可以调用aquire方法获取锁的条件是当前线程的前驱节点是head节点,因此公平锁一般还是先进先出。但是一般两次获取不到自己就会被挂起。是否需要挂起判断依据是当前节点的前驱waitStatus是不是SIGNAL状态。如果获取锁成功,当前队列head节点会指向该节点,然后解除其与原prex节点也就是原head节点的链接(原head节点出队),原head节点没有任何引用,所有引用属性都会值空,可以被回收。
释放锁:释放锁做两件事情,第一归还acquire的state数量。从队列head开始找下一个需要被唤醒的等待线程。释放完锁后当前线程虽然解锁但是还未完成出队,线程的出队在其next拿到锁后逻辑处理。
一个线程可以多次获取锁,对state累加每次的acquire量。实现锁的重入。多次加锁要对应相同次数解锁。
等待线程队列的维护基本上在AQS基类都已实现。大部分方法是final或private,一般继承AQS重写tryAcquire和tryRelease方法定义state修改的acquire量即可。
锁的实现类型
ReentrantLock
公平锁实现:新来线程要先判断当前队列是否有等待队列,如果有当前线程只能入队等待,不能尝试获取锁(修改state)。
非公平锁:新来的线程不管当前队列是否有数据,上来就先搞一次获取锁操作,获取不到再入队等待。
释放锁都是同样的实现,减去锁住的state量。
CountDownLatch
共享锁:CountDownLatch latch = new CountDownLatch(5);指定state数量,然后每个线程执行latch.countDown()释放一次锁state-1。主线程latch.await()会自旋加入等待队列被唤醒。当countDown()多次state置为0时,执行唤醒主线程方法。
Semaphore
共享锁:初始指定固定数量的共享资源,每个线程来到后先获取共享资源,成功后继续执行,否则进入等待队列。共享资源数为0的时候新来的线程获取通行证就会失败,必须等待其它有锁线程释放公共资源后才能再获取同行证。达到并发数控制的目的。
QA:
1、非公平锁,如果新来的线程抢占了刚被队列中unpark的等待线程的锁。当前线程运行结束,怎么唤醒其他线程,当前线程未入队。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放的时候都是从head开始找的,这个不会有问题,下次还是能找到他。
2、什么时候等待线程变成CANCELLED状态?