首页 > 编程语言 >ConditionObject源码分析

ConditionObject源码分析

时间:2023-05-06 09:11:43浏览次数:49  
标签:分析 ConditionObject node 队列 Node 源码 线程 唤醒 节点

  ConditionObject是AbstractQueuedSynchronizer(AQS)实现的内部类,类图如下:

 0

1、Condition接口

  ConditionObject实现了Condition接口。先来看看Codition接口。

 

  Codition中主要定义了挂起线程和唤醒线程的接口方法。Condition接口详情如下:

 1 public interface Condition {
 2     
 3     /**
 4      * 自动释放锁资源
 5      * 挂起当前线程,通过方法signal/signalAll、或线程中断,唤醒挂起线程
 6      */
 7     void await() throws InterruptedException;
 8 
 9     /**
10      * 自动释放锁资源
11      * 挂起当前线程,通过方法signal/signalAll、或虚假唤醒,唤醒挂起线程
12      */
13     void awaitUninterruptibly();
14 
15     /**
16      * 自动释放锁资源
17      * 挂起当前线程,通过方法signal/signalAll、或线程中断、或挂起时间(纳秒)已到达、或虚假唤醒的方式,唤醒挂起线程
18      */
19     long awaitNanos(long nanosTimeout) throws InterruptedException;
20 
21     /**
22      * 自动释放锁资源
23      * 挂起当前线程,通过方法signal/signalAll、或线程中断、或挂起时间已到达,唤醒挂起线程
24      */
25     boolean await(long time, TimeUnit unit) throws InterruptedException;
26 
27     /**
28      * 自动释放锁资源
29      * 挂起当前线程,通过方法signal/signalAll、或线程中断、或挂起时间已到达,唤醒挂起线程
30      */
31     boolean awaitUntil(Date deadline) throws InterruptedException;
32 
33     /**
34      * 唤醒一个挂起的线程
35      */
36     void signal();
37 
38     /**
39      * 唤醒所有挂起的线程
40      */
41     void signalAll();
42 }

2、ConditionObject的使用

  ConditionObject使用详情如下:

 1 import java.util.concurrent.TimeUnit;
 2 import java.util.concurrent.locks.Condition;
 3 import java.util.concurrent.locks.ReentrantLock;
 4 
 5 public class TestConditionObject {
 6 
 7     public static void main(String[] args) {
 8         ReentrantLock lock = new ReentrantLock();
 9         Condition condition = lock.newCondition();
10         // 子线程
11         new Thread(() -> {
12             // 子线程加锁
13             lock.lock();
14             System.out.printf("子线程【%s】获取锁", Thread.currentThread().getName());
15             // 休眠3s
16             try {
17                 TimeUnit.SECONDS.sleep(3);
18             } catch (InterruptedException e) {
19                 e.printStackTrace();
20             }
21             // 挂起当前子线程,并释放锁资源
22             try {
23                  System.out.printf("子线程【%s】挂起,并释放锁资源", Thread.currentThread().getName());
24                 condition.await();
25             } catch (InterruptedException e) {
26                 e.printStackTrace();
27             }
28             System.out.printf("子线程【%s】执行结束,并释放锁资源", Thread.currentThread().getName());
29         }).start();
30 
31         // 休眠100ms
32         try {
33             TimeUnit.MILLISECONDS.sleep(100);
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         }
37 
38         // 主线程获得锁,挂起子线程
39         lock.lock();
40         System.out.println("主线程拿到锁资源,子线程已执行await方法");
41         condition.signal();
42         System.out.println("主线程唤醒了await挂起的子线程");
43         lock.unlock();
44     }
45 }

  通过上述代码执行结果,可推出线程执行示意图如下:

  

   JDK1.5提供的ArrayBlockingQueue源码中的put(E)和take()方法有对ConditionObject的具体的应用。

通过上述示意图可推导出如下结论:

  1、await() 会自动释放锁资源,因为main线程可以获取锁资源

  2、唤醒的线程通过 await() 会获取锁资源,因为await()在释放锁资源之后, 重新获取到锁,后续的unlock()才有意义。

如此,可以将await()方法分成两部分,挂起线程的部分,被唤醒后的部分,流程:await() -> signal() -> await(),下面通过源码来验证我们的推论:

3、ConditionObject源码分析

  ConditionObject的属性信息:

// condition队列中的头节点
private transient Node firstWaiter;
// condition队列中的尾节点
private transient Node lastWaiter;

  挂起线程、唤醒线程整体流程:

  

3.1、ConditionObject的初始化

  ConditionObject是在Lock接口提供的newCondition()方法完成初始化的。以ReentrantLock为例,ReentrantLock#newCondition()详情如下:

1 // 抽象队列同步器
2 private final Sync sync;
3 
4 // 创建ConditionObject对象
5 public Condition newCondition() {
6     return sync.newCondition();
7 }

  Sync#newCondition()详情如下:

final ConditionObject newCondition() {
    return new ConditionObject();
}

  通过初始化方法可以得出结论:一个lock锁中可以有多个Condition对象。且当某个Condition进行操作时,不会影响其他Condition。

3.2、await() - 挂起线程前奏

1、await()挂起线程前奏流程

 

2、await()挂起线程前奏源码分析

  AbstractQueuedSynchronizer#ConditionObject#await() 详情如下:

 1 // 线程中断,挂起线程退出
 2 private static final int REINTERRUPT =  1;
 3 /** Mode meaning to throw InterruptedException on exit from wait */
 4 // 抛出异常,挂起线程退出
 5 private static final int THROW_IE    = -1;
 6 
 7 public final void await() throws InterruptedException {
 8     // 当前线程已中断,抛异常
 9     if (Thread.interrupted())
10         throw new InterruptedException();
11     // 将当前线程封装成Node,并添加到Condition链表
12     Node node = addConditionWaiter();
13     // 释放全部锁资源,并保存线程重入次数
14     int savedState = fullyRelease(node);
15     // 线程中断模式
16     int interruptMode = 0;
17     // 判断当前Node是否在AQS队列中,在fullyRelease方法时释放了全部说资源,若当前线程刚释放锁资源,
18     // 其他线程就执行了signal方法,此时当前线程会被方法哦AQS的队列中,线程不需要再执行LockSupport.park(this);挂起线程
19     while (!isOnSyncQueue(node)) {
20         // 若没有在AQS队列中,在Condition单向链表里,挂起线程。
21         LockSupport.park(this);
22         
23         // ...
24     }
25     // ...
26 }

  添加需要挂起的线程,AbstractQueuedSynchronizer#addConditionWaiter() 详情如下:

 1 // 在condition单向链表中添加需要挂起的线程
 2 private Node addConditionWaiter() {
 3     // 获取尾节点
 4     Node t = lastWaiter;
 5     // 如果队列尾节点的状态不为 -2,将尾节点从队列移除
 6     if (t != null && t.waitStatus != Node.CONDITION) {
 7         unlinkCancelledWaiters();
 8         // 获取最新的尾节点
 9         t = lastWaiter;
10     }
11     // 将当前线程封装为Node,设置状态为-2
12     Node node = new Node(Thread.currentThread(), Node.CONDITION);
13     // 尾节点为null
14     if (t == null)
15         // 当前节点设置为头节点
16         firstWaiter = node;
17     else
18         // 将当前节点设置在lastWaiter的后面
19         t.nextWaiter = node;
20     // 当前节点设置为的尾节点
21     lastWaiter = node;
22     // 返回当前节点
23     return node;
24 }

  移除需要取消的节点,AbstractQueuedSynchronizer#ConditionObject#unlinkCancelledWaiters() 详情如下:

 1 // 移链表中被取消的节点
 2 private void unlinkCancelledWaiters() {
 3     // 获取头节点
 4     Node t = firstWaiter;
 5     Node trail = null;
 6     // t不为null
 7     while (t != null) {
 8         // 获取t的下一节点
 9         Node next = t.nextWaiter;
10         // t节点状态不为-2,需被移除
11         if (t.waitStatus != Node.CONDITION) {
12             // t节点的下一节点设置为null
13             t.nextWaiter = null;
14             // trail为null,说明t是头节点,首次进入循环
15             if (trail == null)
16                 // 将头节点执行next节点
17                 firstWaiter = next;
18             else
19                 // trail有值,表示不是头节点位置,将t的上一节点指向t的下一节点
20                 trail.nextWaiter = next;
21             // 如果next为null,说明单向链表遍历结束
22             if (next == null)
23                 // 将trail作为链表尾节点
24                 lastWaiter = trail;
25         }
26         // t的状态为-2
27         else
28             // 将t赋值给局部变量trail
29             trail = t;
30         // t指向它的下一节点
31         t = next;
32     }
33 }

  释放锁资源,AbstractQueuedSynchronizer#ConditionObject#fullyRelease() 详情如下:

 1 // 释放锁资源
 2 final int fullyRelease(Node node) {
 3     // 释放资源失败标识
 4     boolean failed = true;
 5     try {
 6         // 获取重入次数
 7         int savedState = getState();
 8         // 释放锁资源
 9         if (release(savedState)) {
10             // 更新释放锁资源标识
11             failed = false;
12             // 返回重入次数
13             return savedState;
14         } else {
15             // 释放锁资源失败,抛出异常
16             throw new IllegalMonitorStateException();
17         }
18     } finally {
19         // 释放锁资源失败,当前节点状态设置为取消
20         if (failed)
21             node.waitStatus = Node.CANCELLED;
22     }
23 }

  此处验证了前面的推导,执行await()方法会释放锁资源,同时保存重入次数,方便在获取锁时初始化重入次数。

  当前节点是否在AQS队列中,AbstractQueuedSynchronizer#isOnSyncQueue() 详情如下:
 1 // 判断当前线程节点是否在AQS队列中
 2 final boolean isOnSyncQueue(Node node) {
 3     // 线程节点状态为 -2,表示在condition单向链表中,未在AQS队列
 4     // prev节点值为null,未在AQS队列中
 5     if (node.waitStatus == Node.CONDITION || node.prev == null)
 6         return false;
 7     // 当前线程节点的next不为null,表示在AQS队列中  ???
 8     if (node.next != null) 
 9         return true;
10     
11     // 在AQS队列中寻找当前线程节点
12     return findNodeFromTail(node);
13 }

  在AQS队列中寻找当前线程节点,AbstractQueuedSynchronizer#findNodeFromTail() 详情如下:

 1 // 在AQS队列中寻找当前线程节点
 2 private boolean findNodeFromTail(Node node) {
 3     // 获取AQS队尾节点
 4     Node t = tail;
 5     for (;;) {
 6         // 当前线程节点在AQS队列中存在
 7         if (t == node)
 8             // 返回true
 9             return true;
10         // 遍历结束,AQS队列中没有此节点
11         if (t == null)
12             // 返回false
13             return false;
14         // t的上一节点,继续遍历
15         t = t.prev;
16     }
17 }

3.3、signal() - 唤醒线程

1、signal() 唤醒线程流程

  

2、signal() 唤醒线程源码分析

  AbstractQueuedSynchronizer#ConditionObject#signal() 详情如下:

 1 // 唤醒线程
 2 public final void signal() {
 3     // 当前线程没有持有锁资源,抛出异常
 4     if (!isHeldExclusively())
 5         throw new IllegalMonitorStateException();
 6     // 获取condition单向链表中的头节点
 7     Node first = firstWaiter;
 8     // 头节点不为空,有Node节点在等待
 9     if (first != null)
10         // 唤醒头节点
11         doSignal(first);
12 }

  AbstractQueuedSynchronizer#ConditionObject#doSignal() 详情如下:

 1 // 唤醒Condition中的Node中的线程
 2 private void doSignal(Node first) {
 3     do {
 4         // 获取第二个节点,并将第二个节点设置为头节点
 5         // 若condition队列中就一个节点,则将队列的首、尾节点设置为null
 6         if ( (firstWaiter = first.nextWaiter) == null)
 7             lastWaiter = null;
 8         // 若condition队列中不止一个节点,还有nextWaiter节点,由于当前节点要被唤醒了,从Condition队列移走
 9         // 需要将当前节点的nextWaiter置位null
10         first.nextWaiter = null;
11     // 将condition队列中的节点移动到AQS队列中
12     } while (!transferForSignal(first) &&
13              // 如果first后还有节点,继续唤醒,如果没有,退出while循环
14              (first = firstWaiter) != null);
15 }

  AbstractQueuedSynchronizer#transferForSignal 详情如下:

 1 // 唤醒在condition中排队的Node
 2 final boolean transferForSignal(Node node) {
 3     
 4     // 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。
 5     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 6         // CAS失败了,说明在signal之前线程被中断了,从而被唤醒了
 7         return false;
 8 
 9     // 如果正常的将Node的状态从-2改为0,表示要将Condition中的Node扔到AQS的队列了。
10     // 将当前Node扔到AQS队列,返回的p是当前Node的prev
11     Node p = enq(node);
12     // 获取当前Node上一个Node的状态
13     int ws = p.waitStatus;
14     // 若ws > 0 ,说明这个Node已经被取消了。
15     // 若ws状态不是取消,将prev节点的状态改为-1,。
16     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
17         // 1、ws > 0 ,prev节点被取消了
18         // 2、prev节点状态正常,但CAS更新prev节点失败
19         // 上述两种场景,可能导致当前节点无法被唤醒。JDK的处理是:立即唤醒当前节点,基于 await() 中的acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程
20         LockSupport.unpark(node.thread);
21     // 返回true
22     return true;
23 }

3.4、await() - 挂起线程终章

1、await()挂起线程终章流程

  

2、await()挂起线程终章源码分析

  await() 中线程唤醒后的处理,AbstractQueuedSynchronizer#ConditionObject#await() 详情如下:

 1 // 线程中断,挂起线程退出
 2 private static final int REINTERRUPT =  1;
 3 /** Mode meaning to throw InterruptedException on exit from wait */
 4 // 抛出异常,挂起线程退出
 5 private static final int THROW_IE    = -1;
 6 
 7 public final void await() throws InterruptedException {
 8     if (Thread.interrupted())
 9         throw new InterruptedException();
10     Node node = addConditionWaiter();
11     int savedState = fullyRelease(node);
12     // 中断模式
13     int interruptMode = 0;
14     while (!isOnSyncQueue(node)) {
15         LockSupport.park(this);
16         
17          // 如果线程执行到这,说明现在被唤醒了,确定线程以何种方式唤醒
18          // 如果线程不是通过signal唤醒,退出循环
19         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
20             break;
21     }
22     // Node在AQS队列,尝试在ReentrantLock中获取锁资源
23 
24     // acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过
25     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)       
26         // REINTERRUPT:表示await不是被中断唤醒,但是后续被中断过
27         interruptMode = REINTERRUPT;
28     // 如果当前Node还在condition的单向链表中,将当前Node从Condition的单向链表清除
29     if (node.nextWaiter != null) 
30         unlinkCancelledWaiters();
31     // 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,无需处理
32     // 如果interruptMode不是0
33     if (interruptMode != 0)
34         reportInterruptAfterWait(interruptMode);
35 }

  判断线程以何种方式唤醒, AbstractQueuedSynchronizer#ConditionObject#checkInterruptWhileWaiting() 详情如下:

 1 /**
 2  * 判断线程以何种方式唤醒:
 3  *   0:正常signal唤醒,并且线程未被中断
 4  *   THROW_IE(-1):中断唤醒,被interrupt唤醒的
 5  *   REINTERRUPT(1):signal唤醒后,线程被中断了
 6  */
 7 private int checkInterruptWhileWaiting(Node node) {
 8     return
 9         // 线程是否被中断过 
10         Thread.interrupted() ?
11         // 线程被中断过,判断是中断唤醒的,还是signal唤醒后又被中断了
12         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
13         // signal唤醒
14         0;
15 }

  判断线程是中断唤醒还是signal唤醒后被中断,AbstractQueuedSynchronizer#transferAfterCancelledWait(),详情如下:

 1 // 判断线程是中断唤醒的还是被signal唤醒后被线程中断了
 2 final boolean transferAfterCancelledWait(Node node) {
 3     // CAS 将 Node 状态由 -2 更新为 0 
 4     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
 5         // CAS 成功,说明线程是中断唤醒的,将Node添加进AQS队列中
 6         // 中断唤醒:未通过signal流程移除condition单向链表的当前线程节点,在AQS队列和condition单向链表中同时存在
 7         enq(node);
 8         // 返回中断唤醒处理标识
 9         return true;
10     }
11     // signal唤醒的,当前线程还为被放到AQS队列中,等待signal方法将线程节点添加到AQS队列中
12     while (!isOnSyncQueue(node))
13         // 当前线程节点没在AQS队列上,线程让步,等待一会,直到Node放到AQS队列再处理
14         Thread.yield();
15     // 返回中断唤醒处理标识
16     return false;
17 }

  尝试获取锁资源,AbstractQueuedSynchronizer#acquireQueued() 详情如下:

 1 // 尝试获取锁资源
 2 final boolean acquireQueued(final Node node, int arg) {
 3     // 获取锁资源失败标识
 4     boolean failed = true;
 5     try {
 6         // 线程中断标识
 7         boolean interrupted = false;
 8         for (;;) {
 9             // 获取当前线程节点的前一节点
10             final Node p = node.predecessor();
11             // 当前线程节点为AQS队列的首个节点,并且尝试获取锁资源成功
12             if (p == head && tryAcquire(arg)) {
13                 // 将当前线程节点设置为AQS队列头节点
14                 setHead(node);
15                 // 当前线程节点的next设置为null
16                 p.next = null;
17                 // 设置获取锁资源标识
18                 failed = false;
19                 // 返回中断标识
20                 return interrupted;
21             }
22             // 获取锁资源失败,线程挂起,并在挂起过程中被中断过
23             if (shouldParkAfterFailedAcquire(p, node) &&
24                 parkAndCheckInterrupt())
25                 // 中断标识设置为true
26                 interrupted = true;
27         }
28     } finally {
29         // 获取锁资源成功,取消试图获取锁资源的当前节点
30         if (failed)
31             cancelAcquire(node);
32     }

  此处验证了前面的推导,执行await()方法会释放锁资源,当线程被唤醒后,通过之前记录了的锁重入次数重新获取锁资源。

  线程挂起后,被中断唤醒的处理,AbstractQueuedSynchronizer#ConditionObject#reportInterruptAfterWait() 详情如下:
 1 // 在线程挂起后,被中断唤醒的处理
 2 private void reportInterruptAfterWait(int interruptMode)
 3     throws InterruptedException {
 4     // 抛出异常
 5     if (interruptMode == THROW_IE)
 6         throw new InterruptedException();
 7     // 中断处理
 8     else if (interruptMode == REINTERRUPT)
 9         selfInterrupt();
10 }

4、总结

ConditionObject提供了Lock中挂起线程、唤醒线程的方法。

  await() 将当前线程封装成Node节点,添加到condition队列中,同时释放当前线程持有锁资源,记录重入次数,挂起线程;

  signal() 唤醒线程,将condition队列中的Node移动到AQS队列中,等待执行;

  await() 获取在AQS队列中的Node,通过之前记录的锁重入次数,尝试获取锁资源。

 

标签:分析,ConditionObject,node,队列,Node,源码,线程,唤醒,节点
From: https://www.cnblogs.com/RunningSnails/p/17375905.html

相关文章

  • TCP的三次握手和四次挥手分析
    一、tcp报文格式主要关注的字段为:源端口号(SourcePort),目的端口号(DestinationPort)序列号seq(SequenceNumber)确认号ack(AcknowledgmentNumber)标志位:ACK,SYN,FIN二、三次握手客户端将TCP报文标志位SYN置为1,随机产生一个序号值seq=x,发送给服务端。发送完毕后,客户端进入SYN_......
  • java基于springboot+vue的校园新闻网站、校园新闻管理系统,附源码+数据库+文档+PPT,适合
    1、项目介绍校园新闻网站的主要使用者分为管理员和用户,实现功能包括管理员:首页、个人中心、用户管理、新闻类型管理、校园新闻管理、留言板管理、论坛交流、系统管理,用户前台:首页、校园新闻、论坛交流、留言反馈、个人中心、后台管理等功能。由于本网站的功能模块设计比较全面,所......
  • 事后诸葛亮分析
    作业要求目标事后诸葛亮分析作业要求作业要求目录设想与目标计划资源变更管理设计/实现测试与发布总结团队成员角色与贡献设想与目标我们的软件要解决什么问题?是否定义得很清楚?是否对典型用户和典型场景有清晰的描述?此项目是基于商户和仓库两个部门之间联系,记录......
  • R语言决策树、随机森林、逻辑回归临床决策分析NIPPV疗效和交叉验证
    全文链接:http://tecdat.cn/?p=32295原文出处:拓端数据部落公众号临床决策(clinical decision making)是医务人员在临床实践过程中,根据国内外医学科研的最新进展,不断提出新方案,与传统方案进行比较后,取其最优者付诸实施,从而提高疾病诊治水平的过程。在临床医疗实践中,许多事件......
  • 事后诸葛亮分析
    这个作业属于哪个课程2023软件工程-双学位作业要求团队作业6——复审与事后分析项目团队下岗工人在就业队目录1.事后诸葛亮分析1.1设想和目标1.2计划1.3资源1.4变更管理1.5设计/实现1.6测试1.7总结2.8照片2.团队成员在Alpha阶段的角色和具体贡献1.事后诸葛亮分......
  • 95计费版赛题 赛题分析+代码
    95计费版赛题赛题分析+代码1.1描述1.2术语解释1.3数学描述1.3.1约束1.4目标2.1简单总结题目节点可以分为边缘节点和客户节点,边缘节点的各个时刻的分配流量的从小到大排序的前95%作为成本显然,每个节点的后5%是可以白嫖的,因此需要增加白嫖的流量题目为组合优化......
  • java基于springboot+vue的垃圾分类管理系统,附源码+文档+PPT+数据库
    1、项目介绍垃圾分类网站的主要使用者分为管理员和用户、垃圾分类管理员,实现功能包括管理员:首页、个人中心、用户管理、垃圾分类管理员管理、垃圾分类管理、垃圾类型管理、垃圾图谱管理、系统管理,垃圾分类管理员;首页、个人中心、用户管理、垃圾分类管理员管理、垃圾分类管理、垃......
  • 以京东为例,分析优惠价格叠加规则
      一、平行优惠计算原则 1、什么是“平行式门槛计算规则”平行式门槛计算规则,即每一层级优惠都直接根据商品的单品基准价来计算是否符合门槛,店铺/平台促销、优惠券类优惠之间是并列关系,只要单品基准价或单品基准价总和(即各商品单品基准价的总和)满足各层级优惠门槛,则可......
  • MASA MinimalAPI源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口
    源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口1.ServiceBase1.AutoMapRoute源码如下:AutoMapRoute自动创建map路由,MinimalAPI会根据service中的方法,创建对应的api接口。比如上文的一个方法:publicasyncTask<WeatherForecast[]>PostWeather(){re......
  • 《安富莱嵌入式周报》第311期:300V可调节全隔离USB PD电源,开源交流负载分析仪,CANFD Tra
    周报汇总地址:http://www.armbbs.cn/forum.php?mod=forumdisplay&fid=12&filter=typeid&typeid=104 视频版:https://www.bilibili.com/video/BV1Hh4y1H7dR1、运行速度1Hz木头材料晶体管https://liu.se/en/news-item/varldens-forsta-tratransistor研究人员设计并测试了第......