一. ConditionObject概述
synchronized提供了wait和notify的方法实现线程在持有锁时,可以实现挂起,唤醒的操作。其实ReentrantLock也拥有这个功能,ReentrantLock提供了await和signal方法去实现类似wait和notify的功能。同样的,想执行await或者是signal就必须先持有lock锁的资源。此外,await和signal方法离不开AQS中的一个内部类——ConditionObject,就是它内部提供了如await,signal等方法帮我们实现了线程在持有锁时,可以实现挂起,唤醒的操作。此外,后续的源码讲解需要大家对ReentrantLock源码已经有了研读基础,如果没有的,请务必先看完并发编程笔记一和二。
二. ConditionObject的应用
还记得java并发线程的一个经典应用案例吗,就是开启两个线程,一个线程打印偶数,一个线程打印奇数,最后实现顺序打印1-100的功能。以往我们用synchronized中的wait以及notify方法可以轻松实现,同理,我们使用ConditionObject,也同样的可以实现,如下代码。
package com.js.practice.thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author jiangsong
* @classPath com.js.practice.thread
* @datetime 2024/12/9 23:24
* @description
**/
public class PrintNums03 {
public static int i = 1;
public static boolean isEVEN = false;
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition evencondition = lock.newCondition();
Condition oddCondition = lock.newCondition();
new Thread(()->{
while(i < 100) {
lock.lock();
while (isEVEN) {
try {
oddCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("奇数:" + i++);
isEVEN = true;
evencondition.signal();
lock.unlock();
}
}).start();
new Thread(()->{
while(i < 100) {
lock.lock();
while(!isEVEN) {
try {
evencondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("偶数:" + i++);
isEVEN = false;
oddCondition.signal();
lock.unlock();
}
}).start();
}
}
建议看不明白的同学们可以直接先复制我这个代码,自己去执行看看,验证一下如ReentrantLock也能轻松实现synchronized中wait和notify的效果,然后再自己去写出来,只有实践才能让自己更加深刻的感受和体会。上述代码我们可以看到,想要执行如await或者signal方法,同样需要线程先持有锁,只不过此处持有的时Lock锁。
三. ConditionObject的构建方式和核心属性
由第二章节的应用,我们在通过lock锁对象执行newCondition方法时,本质就是直接new的AQS提供的ConditionObject对象,如下代码。
final ConditionObject newCondition() {
return new ConditionObject();
}
由上,我们还可以得出一个结论,Lock锁可以同时拥有多个ConditionObject对象,并且各个ConditionObject的操作是独立的,互不影响。
接着,再往下看,ConditionObject实际上只有两个核心属性,如下代码块。
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
虽然Node对象有prev和next,但是在ConditionObject中是不会使用这两个属性的,只要在Condition队列中,这两个属性都是null。在ConditionObject中只会使用Node节点的nextWaiter的属性实现单向链表的效果。
四. await方法前置分析
到这里,咱们进行一个从简单再到复杂的拆解讲法。先分析在持有了Lock锁后的线程调用了await方法做了什么,其实很简单,有如下操作:
- 判断线程是否中断,如果中断了,什么都不做。
- 没有中断,就将当前线程封装为Node添加到Condition的单向链表中
- 一次性释放掉锁资源。
- 如果当前线程没有在AQS队列,就正常执行LockSupport.park(this)挂起线程。
接下来,基于我们对于上述操作的了解,一起来看一下它的源码实现,此处只做了await方法的前置分析,为了便于理解,后置分析在讲完signal操作后会再详解,大家不用着急,一步步跟上来。
// await方法的前置分析,只分析到线程挂起
public final void await() throws InterruptedException {
// 先判断线程的中断标记位是否是true
if (Thread.interrupted())
// 如果是true,就没必要执行后续操作挂起了。
throw new InterruptedException();
// 在线程挂起之前,先将当前线程封装为Node,并且添加到Condition队列中
Node node = addConditionWaiter();
// fullyRelease在释放锁资源,一次性将锁资源全部释放,并且保留重入的次数
int savedState = fullyRelease(node);
// 省略一行代码……
// 当前Node是否在AQS队列中?
// 执行fullyRelease方法后,线程就释放锁资源了,如果线程刚刚释放锁资源,其他线程就立即执行了signal方法,
// 此时当前线程就被放到了AQS的队列中,这样一来线程就不需要执行LockSupport.park(this);去挂起线程了
while (!isOnSyncQueue(node)) {
// 如果没有在AQS队列中,正常在Condition单向链表里,正常挂起线程。
LockSupport.park(this);
// 省略部分代码……
}
// 省略部分代码……
}
// 线程挂起先,添加到Condition单向链表的业务~~
private Node addConditionWaiter() {
// 拿到尾节点。
Node t = lastWaiter;
// 如果尾节点有值,并且尾节点的状态不正常,不是-2,尾节点可能要拜拜了~
if (t != null && t.waitStatus != Node.CONDITION) {
// 如果尾节点已经取消了,需要干掉取消的尾节点~
unlinkCancelledWaiters();
// 重新获取lastWaiter
t = lastWaiter;
}
// 构建当前线程的Node,并且状态设置为-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果last节点为null。直接将当前节点设置为firstWaiter
if (t == null)
firstWaiter = node;
else
// 如果last节点不为null,说明有值,就排在lastWaiter的后面
t.nextWaiter = node;
// 把当前节点设置为最后一个节点
lastWaiter = node;
// 返回当前节点
return node;
}
// 干掉取消的尾节点。
private void unlinkCancelledWaiters() {
// 拿到头节点
Node t = firstWaiter;
// 声明一个节点,爱啥啥~~~
Node trail = null;
// 如果t不为null,就正常执行~~
while (t != null) {
// 拿到t的next节点
Node next = t.nextWaiter;
// 如果t的状态不为-2,说明有问题
if (t.waitStatus != Node.CONDITION) {
// t节点的next为null
t.nextWaiter = null;
// 如果trail为null,代表头结点状态就是1,
if (trail == null)
// 将头结点指向next节点
firstWaiter = next;
else
// 如果trail有值,说明不是头结点位置
trail.nextWaiter = next;
// 如果next为null,说明单向链表遍历到最后了,直接结束
if (next == null)
lastWaiter = trail;
}
// 如果t的状态是-2,一切正常
else {
// 临时存储t
trail = t;
}
// t指向之前的next
t = next;
}
}
// 一次性释放锁资源
final int fullyRelease(Node node) {
// 标记位,释放锁资源默认失败!
boolean failed = true;
try {
// 拿到现在state的值
int savedState = getState();
// 一次性释放干净全部锁资源
if (release(savedState)) {
// 释放锁资源失败了么? 没有!
failed = false;
// 返回对应的锁资源信息
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 如果释放锁资源失败,将节点状态设置为取消
node.waitStatus = Node.CANCELLED;
}
}
五. signal方法分析
此处老套路,我将signal操作也分为了如下几个部分:
- 确保执行signal方法的是持有锁的线程
- 脱离Condition的队列
- 将Node状态从-2改为0
- 将Node添加到AQS队列
- 为了避免当前Node无法在AQS队列正常唤醒做了一些判断和操作
接下来,基于我们对于上述操作的理解,来看看它的源码又是怎么实现的呢?
// 线程挂起后,可以基于signal唤醒~
public final void signal() {
// 在ReentrantLock中,如果执行signal的线程没有持有锁资源,直接扔异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到排在Condition首位的Node
Node first = firstWaiter;
// 有Node在排队,才需要唤醒,如果没有,直接告辞~~
if (first != null)
doSignal(first);
}
// 开始唤醒Condition中的Node中的线程
private void doSignal(Node first) {
// 先一波do-while走你~~~
do {
// 获取到第二个节点,并且将第二个节点设置为firstWaiter
if ( (firstWaiter = first.nextWaiter) == null)
// 说明就一个节点在Condition队列中,那么直接将firstWaiter和lastWaiter置位null
lastWaiter = null;
// 如果还有nextWaiter节点,因为当前节点要被唤醒了,脱离整个Condition队列。将nextWaiter置位null
first.nextWaiter = null;
// 如果transferForSignal返回true,一切正常,退出while循环
} while (!transferForSignal(first) &&
// 如果后续节点还有,往后面继续唤醒,如果没有,退出while循环
(first = firstWaiter) != null);
}
// 准备开始唤醒在Condition中排队的Node
final boolean transferForSignal(Node node) {
// 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 如果失败了,说明在signal之前应当是线程被中断了,从而被唤醒了。
return false;
// 如果正常的将Node的状态从-2改为0,这是就要将Condition中的这个Node扔到AQS的队列。
// 将当前Node扔到AQS队列,返回的p是当前Node的prev
Node p = enq(node);
// 获取上一个Node的状态
int ws = p.waitStatus;
// 如果ws > 0 ,说明这个Node已经被取消了。
// 如果ws状态不是取消,将prev节点的状态改为-1,。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果prev节点已经取消了,可能会导致当前节点永远无法被唤醒。立即唤醒当前节点,基于acquireQueued方法,
// 让当前节点找到一个正常的prev节点,并挂起线程
// 如果prev节点正常,但是CAS修改prev节点失败了。证明prev节点因为并发原因导致状态改变。还是为了避免当前
// 节点无法被正常唤醒,提前唤醒当前线程,基于acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程
LockSupport.unpark(node.thread);
// 返回true
return true;
}
六. await方法后置分析
我将await方法的后置分析,分为了如下几个部分:
- 唤醒之后,要先确认是中断唤醒还是signal唤醒,还是signal唤醒后被中断
- 确保当前线程的Node已经在AQS队列中
- 执行acquireQueued方法,等待锁资源。
- 在获取锁资源后,要确认是否在获取锁资源的阶段被中断过,如果被中断过,并且不是THROW_IE,那就确保interruptMode是REINTERRUPT。
- 确认当前Node已经不在Condition队列中了
- 最终根据interruptMode来决定具体做的事情
- 0:嘛也不做。
- THROW_IE:抛出异常
- REINTERRUPT:执行线程的interrupt方法
接下来,我们再进行源码的研读,如下代码块。
// 现在分析await方法的后半部分
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 中断模式~
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果线程执行到这,说明现在被唤醒了。
// 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中)
// 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。
// 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理)
// checkInterruptWhileWaiting可以确认当前中如何唤醒的。
// 返回的值,有三种
// 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列)
// THROW_IE(-1):中断唤醒,并且可以确保在AQS队列
// REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// Node一定在AQS队列
// 执行acquireQueued,尝试在ReentrantLock中获取锁资源。
// acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 如果线程在AQS队列排队时,被中断了,并且不是THROW_IE状态,确保线程的interruptMode是REINTERRUPT
// REINTERRUPT:await不是中断唤醒,但是后续被中断过!!!
interruptMode = REINTERRUPT;
// 如果当前Node还在condition的单向链表中,脱离Condition的单向链表
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,什么事都不做!
if (interruptMode != 0)
// 如果不是0~
reportInterruptAfterWait(interruptMode);
}
// 判断当前线程被唤醒的模式,确认interruptMode的值。
private int checkInterruptWhileWaiting(Node node) {
// 判断线程是否中断了。
return Thread.interrupted() ?
// THROW_IE:代表线程是被interrupt唤醒的,需要向上排除异常
// REINTERRUPT:代表线程是signal唤醒的,但是在唤醒之后,被中断了。
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
// 线程是正常的被signal唤醒,并且线程没有中断过。
0;
}
// 判断线程到底是中断唤醒的,还是signal唤醒的!
final boolean transferAfterCancelledWait(Node node) {
// 基于CAS将Node的状态从-2改为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 说明是中断唤醒的线程。因为CAS成功了。
// 将Node添加到AQS队列中~(如果是中断唤醒的,当前线程同时存在Condition的单向链表以及AQS的队列中)
enq(node);
// 返回true
return true;
}
// 判断当前的Node是否在AQS队列(signal唤醒的,但是可能线程还没放到AQS队列)
// 等到signal方法将线程的Node扔到AQS队列后,再做后续操作
while (!isOnSyncQueue(node))
// 如果没在AQS队列上,那就线程让步,稍等一会,Node放到AQS队列再处理(看CPU)
Thread.yield();
// signal唤醒的,返回false
return false;
}
// 确认Node是否在AQS队列上
final boolean isOnSyncQueue(Node node) {
// 如果线程状态为-2,肯定没在AQS队列
// 如果prev节点的值为null,肯定没在AQS队列
if (node.waitStatus == Node.CONDITION || node.prev == null)
// 返回false
return false;
// 如果节点的next不为null。说明已经在AQS队列上。、
if (node.next != null)
// 确定AQS队列上有!
return true;
// 如果上述判断都没有确认节点在AQS队列上,在AQS队列中寻找一波
return findNodeFromTail(node);
}
// 在AQS队列中找当前节点
private boolean findNodeFromTail(Node node) {
// 拿到尾节点
Node t = tail;
for (;;) {
// tail是否是当前节点,如果是,说明在AQS队列
if (t == node)
// 可以跳出while循环
return true;
// 如果节点为null,AQS队列中没有当前节点
if (t == null)
// 进入while,让步一手
return false;
// t向前引用
t = t.prev;
}
}
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
// 如果是中断唤醒的await,直接抛出异常!
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 如果是REINTERRUPT,signal后被中断过
else if (interruptMode == REINTERRUPT)
// 确认线程的中断标记位是true
// Thread.currentThread().interrupt();
selfInterrupt();
}
七.awaitNanos&signalAll方法分析
awaitNanos:仅仅是在await方法的基础上,做了一内内的改变,整体的逻辑思想都是一样的。挂起线程时,传入要阻塞的时间,时间到了,自动唤醒,走添加到AQS队列的逻辑。
// await指定时间,多了个时间到了自动醒。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// deadline:当前线程最多挂起到什么时间点
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// nanosTimeout的时间小于等于0,直接告辞!!
if (nanosTimeout <= 0L) {
// 正常扔到AQS队列
transferAfterCancelledWait(node);
break;
}
// nanosTimeout的时间大于1000纳秒时,才可以挂起线程
if (nanosTimeout >= spinForTimeoutThreshold)
// 如果大于,正常挂起
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 计算剩余的挂起时间,可能需要重新的走while循环,再次挂起线程
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 剩余的挂起时间
return deadline - System.nanoTime();
}
signalAll方法。这个方法一看就懂,之前signal是唤醒1个,这个是全部唤醒,跟signal方法几乎是一模一样的,只是唤醒一个和唤醒多个的区别。
// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列
private void doSignalAll(Node first) {
// 将头尾都置位null~
lastWaiter = firstWaiter = null;
do {
// 拿到next节点的引用
Node next = first.nextWaiter;
// 断开当前Node的nextWaiter
first.nextWaiter = null;
// 修改Node状态,扔AQS队列,是否唤醒!
transferForSignal(first);
// 指向下一个节点
first = next;
} while (first != null);
}
八. 总结
本次课程主要是围绕AQS中的内部类ConditionObject做了一个深层次源码解析,我们可以不用将每行代码全部读懂,但是希望大家起码要理解它做了些什么,又是怎么做的。只有理解了这部分,后续我们在阅读java.util.concurrent并发工具包的其他源码时才能得心应手,希望大家多看,多写,多去自己体会,最好是可以做到跟我一样,自己去拆解ConditionObject类是什么,又能帮我们做什么,怎么去做的,然后最好还能自己对着源码写下自己的注解。下节课,我将对ReentrantReadWriteLock做源码深度解析,如果有前面课程的基础,尤其是第一节课的同学,那这个ReentrantReadWriteLock的源码解析对于你来说将会很轻松,因为它实际上和ReentrantLock实现大部分一致。
标签:Node,ConditionObject,node,AQS,编程,源码,线程,唤醒,节点 From: https://blog.csdn.net/qq_41191331/article/details/144484445