前言
本文是作者的第一篇文章,目的就是可以分享自己个人的一些技术上的心得体会以及找寻志同道合的人来共同讨论技术。
个人学习难免会有一些理解上的错误,所以写博客也是为了记录和反思自己的学习过程,进一步加深对技术的理解和掌握。希望通过这篇博客,能够帮助到一些和我一样在技术道路上不断探索的朋友,也期待能收到大家的建议和意见,共同进步。
文章目录
一、AQS概述
谈到并发,那么就不得不提到AQS(AbstractQueuedSynchronizer)了,它是Java并发包的一个核心类,可以用于实现各种同步器提供了一个框架,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。
其核心思想就是:如果被请求的共享资源空闲,那么就把请求获取资源的线程设置为有效的工作线程,然后将共享资源的标志设置为锁定状态标识已被获取。而如果请求的共享资源不空闲,即它已被其他线程占据,那么我们就需要通过FIFO队列来实现一套线程阻塞等待以及唤醒分配的机制。
所以接下来,我会从底层源码来分析AQS的实现。
二、AQS底层结构
2.1 AQS底层基本变量
我们先看一下AQS底层基本变量:
private transient volatile Node head; // 头结点
private transient volatile Node tail; // 尾节点
private volatile int state; // 表示共享资源(可以理解为是否获取到锁的标志)
private transient Thread exclusiveOwnerThread; // 表示当前占据锁的线程
AQS的底层有一个java volatile int state
变量,用来表示共享资源的获取情况,为0时代表着没有线程获取过此资源,而等它大于0时,则表示有线程正在获取着资源。其底层还维护了一个基于CLH队列实现的FIFO阻塞队列,用来解决多线程竞争资源被阻塞的情况。
tips:
1、由于AQS支持可重入机制,那么这就意味着一个线程可以多次获取同一个锁而不会被阻塞,而state则可以理解为两种状态。即,state为0是表示没有线程拿到锁,而当state为n时(n >= 1),表示线程拿到锁,n为重入次数。
2、为什么是volatile?
2.1 因为是并发过程,所以必然出现线程安全问题,所以在这里通过volatile关键字可以解决这个问题,
2.2 volatile可以保证可见性。毕竟volatile修饰的变量无论读写都要去主存操作。即,每当volatile修饰的变量被某个线程修改的时候,会将此变量刷新到主存,而其他线程无论是读还是写此变量也是需要去主存读取的,因此哪怕是在并发环境下,此变量的新值对所有的线程都是最新版本的,不存在线程获取值不一样的情况。
2.2 Node节点结构
然后再让我们看一下Node节点:
abstract static class Node {
volatile Node prev; // 表示前驱指针
volatile Node next; // 表示后继指针
Thread waiter; // 表示竞争锁失败需要被挂起的线程
volatile int status; // 表示Node在队列中的状态
volatile int waitStatus; // 节点的等待状态
/* 下列四个变量表示Node在队列中的状态 */
static final int CANCELLED = 1; // 表示线程获取锁请求已经取消(线程等待超时或被中断~)
static final int SIGNAL = -1; // 表示线程需要被唤醒,等待资源释放
static final int CONDITION = -2; // 表示节点在条件队列中,等待某个条件的满足
static final int PROPAGATE = -3; // 在共享模式下,表示后续节点需要被唤醒并继续执行
}
2.3 FIFO队列
最后我们再看一下FIFO队列结构图:
PS:先简单过一下上面的结构,具体在文中后续讨论
三、源码分析
文中准备以ReentrantLock 里面的加锁、解锁源码来进行分析。版本:jdk1.8
3.1 lock
3.1.1 lock
跟踪源码,发现当我们进入 lock()方法时,它会进行如下逻辑:
public void lock() {
// sync是Sync的变量,而Sync则是一个抽象类,其中NonfairSync以及FairSync都继承了Sync类
// 而sync是公平锁还是非公平锁取决于你传入的fair值是否为true
// sync = fair ? new FairSync() : new NonfairSync();
// 而非公平锁和公平锁所对应实现的lock方法不同
sync.lock();
}
那我们接着来看一下sync.lock()
方法【此处非公平锁和公平锁的逻辑不同】。
先看非公平锁的代码:
final void lock() {
// 因为是非公平锁,所以可以看到,我们直接尝试CAS将当前的State从0置为1
// 不管你三七二十一,如果State置为1了那么我们拿到锁了,如果没有置为1,那么说明没拿到
if (compareAndSetState(0, 1))
// 如果置为1,那么就会把AQS中占据锁的线程设置为当前请求的此线程
setExclusiveOwnerThread(Thread.currentThread());
// 如果没置为1,那么要老老实实的执行下面逻辑
else
acquire(1);
}
// 将占据锁的线程设置为当前请求的此线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
然后我们再看公平锁的代码:
final void lock() {
// 因为我们是公平锁,所以为了实现公平,要去执行acquire中的逻辑
acquire(1);
}
这里其实可以看出,我们非公平锁和公平锁一开始的加锁逻辑就是:一个直接去CAS操作,如果失败了才去进入acquire,执行它的逻辑,而另外一个则是一开始就直接进入到acquire,执行它的逻辑。
3.1.2 acquire
之后我们再看一下acquire()
方法。
// 共同调用的acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire()
方法中一共有四个方法,下面我们分别进行解释。
3.1.2.1 tryAcquire
首先我们研究tryAcquire
,这个方法其实也分公平锁和非公平锁,它们的区别就是有无hasQueuedPredecessors
方法,差别其实不是特别的大。
首先我们看一下非公平锁的代码:
protected final boolean tryAcquire(int acquires) {
// 调用一下非公平锁的尝试加锁逻辑
return nonfairTryAcquire(acquires);
}
// 非公平锁的尝试加锁逻辑
final boolean nonfairTryAcquire(int acquires) {
// 将current赋值为进入此方法的当前线程
final Thread current = Thread.currentThread();
// 获取state的状态,也就是判断目前这个是否现在被其他线程占据
int c = getState();
// 如果为0,证明此时没有线程正在持有锁,那么我们就尝试加锁喽
if (c == 0) {
// 尝试CAS将state置为1,从而加锁
if (compareAndSetState(0, acquires)) {
// 加锁成功,将AQS中占据锁的线程设置为当前请求的此线程,返回true
setExclusiveOwnerThread(current);
return true;
}
}
// current == getExclusiveOwnerThread() :判断持有锁的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
// 进入这里说明持有锁的线程就是当前线程,那么证明重入了~
// 所以 nextc = c + acquires; 即为 ’state + 1‘ 操作
int nextc = c + acquires;
// 判断是否越界
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置state
setState(nextc);
// 返回true
return true;
}
// 如果到了这里,那么证明这个线程没有获取到资源,那么就返回false。
return false;
}
然后我们再看一下非公平锁的代码:
其实可以看到我们公平锁的区别就是if中的逻辑判断多了一个方法,如下
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires))
protected final boolean tryAcquire(int acquires) {
// 将current赋值为进入此方法的当前线程
final Thread current = Thread.currentThread();
// 获取state的状态,也就是判断目前这个是否现在被其他线程占据
int c = getState();
// 如果为0,证明此时没有线程正在持有锁,那么我们就尝试加锁喽
if (c == 0) {
/* 【注意!区别来了】
先去调用hasQueuedPredecessors方法,检查是否有检查是否有等待队列
如果有的话,那么返回true,那么也可以直接调到else if逻辑了
*/
// 因为是 短路与 操作,左侧的整体表达式为false,那么右侧的表达式将不会被计算了嘛~
// 之后左侧整体为true,那么执行CAS操作,即,尝试CAS将state置为1,从而加锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 加锁成功,将AQS中占据锁的线程设置为当前请求的此线程,返回true
setExclusiveOwnerThread(current);
return true;
}
}
// current == getExclusiveOwnerThread() :判断持有锁的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
// 进入这里说明持有锁的线程就是当前线程,那么证明重入了~
// 所以 nextc = c + acquires; 即为 ’state + 1‘ 操作
int nextc = c + acquires;
// 判断是否越界
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置state
setState(nextc);
// 返回true
return true;
}
// 如果到了这里,那么证明这个线程没有获取到资源,那么就返回false。
return false;
}
/* 什么时候为false?
1:h == t 的时候返回false,此时说明队列中没有其他节点,当前线程不可能是排在队列中的第一个线程
2:((s = h.next) == null || s.thread != Thread.currentThread());
2.1: s = h.next: 用于暂存 head.next 节点,所以之后s代表着h.next
2.2: 当s == null,说明头结点的下一个节点为null,
即队列中只有头结点和尾结点,那么当前线程不是队列中的第一个线程,返回false。
2.3: s.thread != Thread.currentThread():
如果头结点的下一个节点不是当前线程,说明当前线程有前驱线程,返回false。
ps:false作用其实就是为了公平,一旦返回false,那么我们就会不去进行”争抢“,而是回去排队。
*/
public final boolean hasQueuedPredecessors() {
Node t = tail; // 尾结点
Node h = head; // 头结点
Node s; // 局部变量,用于下面来暂存 head.next 节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
tips:
所以对于公平锁因为它要保证公平(比如避免线程饥饿),所以线程需要按照请求锁的顺序依次获得锁。而对于非公平锁它又不管你线程是否饥饿,所以在非公平锁的逻辑之中,线程会不按请求顺序获得锁,允许“插队”。
3.1.2.2 addWaiter
一旦tryAcquire
方法返回false,那么就应该addWaiter
方法出厂了!
addWaiter内心OS:嘿嘿嘿,终于到我了,让你们看看我的厉害
我们来看看addWaiter
源码:
private Node addWaiter(Node mode) {
// 将当前线程赋值封装进Node节点,等待排队 (实际上就是获取锁失败的线程)
Node node = new Node(Thread.currentThread(), mode);
// 将tail赋值给pre
Node pred = tail;
// 如果pred不为空[当前尾节点],那么就证明阻塞队列中有节点,执行里面的逻辑
// 因为队列未初始化的时候会直接进入到下面当enq()方法执行初始化~
if (pred != null) {
// 下面的逻辑其实就是将node节点变为新的尾结点~
// 1. 将node的前驱指针指向当前队列尾结点pred
node.prev = pred;
/* 2.
尝试CAS操作将pred指向node【pred = node】,成功返回true,失败返回false。
为什么CAS?
因为可能有很多个线程同时进入到这个判断内,所以需要保证并发的安全性。
即,如果在此期间其他线程修改了尾节点(pred 已不再是当前的尾节点),
该操作将返回 false,并且不会执行更新。
*/
if (compareAndSetTail(pred, node)) {
// 3. 将pred的后继指针指向node,最后返回node节点。
pred.next = node;
return node;
}
}
/* 进入enq()有两种情况
1、AQS中阻塞队列未空时,要去进行 初始化操作。
2、compareAndSetTail(pred, node) 为false的那些节点。
*/
enq(node);
return node;
}
private Node enq(final Node node)
// 循环插入~
for (;;) {
Node t = tail;
if (t == null) { // t为空表示着队列没Node,进行初始化
// CAS操作将head指向新创建的Node
if (compareAndSetHead(new Node()))
// tail 指向 head
tail = head;
} else { // 队列有Node,进行添加到队列的操作
// 和上方if (pred != null) 中的逻辑一样,我就不写了hhhh
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
tips:
所以compareAndSetTail()
确保尾节点的更新是原子的,从而可以解决多线程并发导致的安全问题。
3.1.2.3 acquireQueued
因为这个方法里的逻辑很复杂,所以我把里面的一些方法拆开来分析。
首先我们看acquireQueued
,这个方法其实就是在同步队列中获取锁资源。
final boolean acquireQueued(final Node node, int arg) {
// 获取锁标志:之前获取锁失败了,所以现在为true
boolean failed = true;
try {
// 标记等待的时候中是否被中断
boolean interrupted = false;
// 死循环,只有当 获取锁成功 or 被中断 才跳出循环
for (;;) {
// 获取当前线程节点的前继结点
final Node p = node.predecessor();
// 前继结点是头结点 并且 尝试获取锁成功,那么就进入里面的逻辑
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 将原头节点的next指针置空
p.next = null; // help GC
// 现在获取锁了,所以锁标志为false了,然后返回打断状态。
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire:决定是否挂起线程
// parkAndCheckInterrupt:挂起
// 然后设置打断标记为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取锁失败,那么就不获取了(因为它的心碎了...)
if (failed)
cancelAcquire(node);
}
}
然后我们来看predecessor
、shouldParkAfterFailedAcquire
以及parkAndCheckInterrupt
方法
// 获取当前节点的前驱节点,为空的话抛异常,否则返回。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 当在获取锁失败后,决定当前节点是否应该被挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 因为后面节点需要前继结点唤醒,而前继结点的waitStatus则是由后续节点来决定,所以需要
// 拿到前驱节点的等待状态
int ws = pred.waitStatus;
// 前驱节点状态为SIGNAL,表示其后继节点(当前节点)可以被挂起,返回true
if (ws == Node.SIGNAL)
return true;
// 此时ws = CANCELLED = 1。 【四种状态中CANCELLED = 1,详见2.2 Node节点结构】
// 表示前继节点获取锁的请求取消了,所以要遍历前驱节点链表。
if (ws > 0) {
// 一直找一直找,直到找到第一个没有取消的节点,也就是这个节点有没有取消获取锁
// 那么把当前节点插入这个节点后面,而这个过程中那些可能会出现的 CANCELLED 节点可以GC
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 这个时候那么就是除了SIGNAL、CANCELLED之外的状态,也就是等待唤醒。
// 所以这个时候CAS修改状态为SIGNAL,此时当前节点可以保证前继节点是有效的
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,表示不应该被挂
return false;
}
// 节点挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
最后我们再看看finally代码块中的cancelAcquire
方法,到了这个时候,获取锁辣么多次都失败,那么就不获取了(因为它的心碎了…)
// 取消AQS中排队的node
private void cancelAcquire(Node node) {
// 如果节点为空,那么就忽略喽
if (node == null)
return;
// 将节点线程置为null
node.thread = null;
// 一直往前变量,找到有效节点(即,找到没有取消获取锁首位节点)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取首位有效节点的后继结点,后面有用~
Node predNext = pred.next;
// 设置node状态为取消
node.waitStatus = Node.CANCELLED;
// 当前节点是尾结点,那么就把tail节点替换为它前面的那个节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
/* 3个条件
1.如果不是头结点 &&
2. (它的waitStatus为SIGNAL 或者 waitStatus并且CAS成功置为SIGNAL) &&
3. 并且前驱节点的线程不为空
*/
// node的前驱节点不是head(两种可能:处于队列中间 or 上面CAS操作失败)
if (pred != head &&
// 获取pre节点状态,并判断是否为-1。是-1的话那么继续下一个判断,
// 否则 如果ws <= 0那么我就将改为ws改为-1。(因为可能会并发操作嘛,)
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
/* 判断上一个节点的线程是否不为null,两种情况。
1、因为可能并发情况,导致前一个节点设置为null...
2、因为可能并发情况,导致node变为头结点,因为头结点其实就是个虚拟节点,用来辅助的
*/
pred.thread != null) {
/* 到了这一步的时候,那么前驱节点一定是有效节点
所以其实前面的哪些判断就是为了避免后面节点无法被唤醒
(因为不知道自己后面后没有节点,有节点那么前面做的都很值得,没节点也没事,因为唤醒的时候也会判断嘛hhh)
*/
Node next = node.next;
// 后继节点不为空,并且waitStatus<=0(表示着它是有效节点)
if (next != null && next.waitStatus <= 0)
// 然后把前驱节点的后继结点替换为当前节点的后继结点(其实就是把自己从队列中去掉了)
compareAndSetNext(pred, predNext, next);
} else {
// 前面的条件不满足(其实到这里就代表:node节点是head的后继结点)
unparkSuccessor(node);
}
// next 指向自己,帮助GC。(因为自己也得从队列中移除)
node.next = node; // help GC
}
}
其实这个方法就是取消节点嘛,所以肯定该有的取消步骤你都得有,那么都需要什么呢?
首先肯定跟node节点有关啊(看下图)。
流程小总结:
- 先把线程置为null:
node.thread = null
- 往前找到第一个有效节点作为当前节点的新prev
- 设置节点状态为撤销:
node.waitStatus = Node.CANCELLED
- 把
CANCELLED
的节点排除队列中(node:当前节点 不懂的可以画个图哦)
a. node位于队列尾部:那么肯定就是将node前面第一个有效节点的next指向空,然后node的next指针指向自己,GC一下。(此处不留爷,自有留爷处~)
b. node位于head的后继节点:这样的话,就把head节点的next指向node的后继节点,然后node后继结点的pre指向head呗。(就是head节点和node的后继结点一块玩了,不带node了)
c. 节点处于中间位置:就是它前后的节点连接起来(就是node的前驱、后继结点一块玩了,不带node了)
3.1.2.4 selfInterrupt
因为上面文字有点多,怕你们忘记,所以补个之前代码(我真贴心~hhhhh)
public final void acquire(int arg) {
// tryAcquire:尝试获取锁
// addWaiter:把获取线程失败的节点加到队列中
// acquireQueued:抢救一次,看看能不能获取锁。直到失败或者中断呗。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 其实就是中断当前线程的作用。
// 因为 我获取锁失败了,然后又把它添加进队列设置了中断了。所以不去中断还能干啥,对吧。
selfInterrupt();
}
// 中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
好了,至此,lock方法介绍完毕,接下来让我们看看unlock方法。
ps:好累啊,呜呜呜,求大家给个关注,点赞以及收藏。hhhhhhhhhhh,谢谢啦~
3.2 unlock
对于解锁的话,就没有公平锁和公平锁之分啦,所以马上就写完喽,加油,大家!
跟踪源码,我们可以看见当进入unlock
方法时,会进入如下逻辑:
public void unlock() {
// 调用release方法,因为有可重入特性,所以每次传1
sync.release(1);
}
接下来我们看看release
方法。
public final boolean release(int arg) {
// 尝试进行释放锁,如果释放锁成功返回true,否则返回false。
// 我们先看下面的tryRelease方法。
if (tryRelease(arg)) {
// 执行完tryRelease之后,先将h设置为head
Node h = head;
// 如果有头结点,队列中有数据,那么去执行unparkSuccessor()吧
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 获取此时state次数【之前state-1】
int c = getState() - releases;
// 判断当前的线程与获取锁的线程是否相同,不同抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否可以释放锁的标志
boolean free = false;
// 如果c为0,那么state为0,即可以释放锁
if (c == 0) {
// 设置可以释放锁
free = true;
// 把此时获取锁的线程置null
setExclusiveOwnerThread(null);
}
// 设置state为减1之后的值
setState(c);
// 返回是否释放锁的标志
return free;
}
public final boolean release(int arg) {
// 尝试进行释放锁,如果释放锁成功返回true,否则返回false。
// 我们先看下面的tryRelease方法。
if (tryRelease(arg)) {
// 执行完tryRelease之后,先将h设置为head
Node h = head;
// 如果有头结点,队列中有数据【因为waitStatus不为0】,那么执行unparkSuccessor()
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 获取此时state次数【之前state-1】
int c = getState() - releases;
// 判断当前的线程与获取锁的线程是否相同,不同抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否可以释放锁的标志
boolean free = false;
// 如果c为0,那么state为0,即此时满足释放锁的条件
if (c == 0) {
// free置为true
free = true;
// 把此时获取锁的线程置null
setExclusiveOwnerThread(null);
}
// 设置state为减1之后的值
setState(c);
// 返回是否释放锁的标志
return free;
}
// 唤醒~
private void unparkSuccessor(Node node) {
// 获取节点waitStatus
int ws = node.waitStatus;
// 如果小于0,先CAS将其Status改为0【还是因为并发,可能会很多线程去唤醒后续节点,所以修改】
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到头结点的下一个节点(第一个排队的节点)
Node s = node.next;
// 如果s是null或者节点无效的,执行里面逻辑
if (s == null || s.waitStatus > 0) {
// 既然你无效的话,我也不管你是为null还是说撤销了,那么我直接置为null
s = null;
// 从尾结点一直往前遍历,找到距离head最近的有效节点,然后赋值给s
// (然后进行下面的if判断,唤醒这个最近的有效节点[waitStatus <= 0] )
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// s不等于空(有节点在排队),进行唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
四、写在最后
恭喜你!你已经看到这里啦,希望本篇文章能够对你的学习有所帮助!
如果可以,希望你可以给作者一个点赞、一个收藏或是一个评论来给予我鼓励。当然,最好能有关注啦哈哈哈哈,下篇文章,我们不见不散!
【阿涛wink.jpg】