首页 > 编程语言 >CountDownLatch源码分析

CountDownLatch源码分析

时间:2023-04-27 22:14:01浏览次数:41  
标签:分析 count AQS int state 源码 线程 CountDownLatch

1、CountDownLatch介绍

  CountDownLatch让一个或多个线程等待其他线程执行完成后再执行。在创建CountDownLatch对象时,必须指定线程数count,每当一个线程执行完成调用countDown()方法,线程数count减1,当count减到0时,await()方法就不再阻塞。

2、CountDownLatch使用

 1 import java.util.concurrent.CountDownLatch;
 2 
 3 public class TestCountDownLatch {
 4     public static void main(String[] args) {
 5         // 设置初始值
 6         int count = 5;
 7         CountDownLatch cdl = new CountDownLatch(count);
 8         for (int i = 0; i < count; i++) {
 9             Thread thread = new Thread(() -> {
10                 System.out.println(Thread.currentThread().getName());
11                 cdl.countDown();
12             });
13             // 设置当前线程为守护线程
14             thread.setDaemon(true);
15             // 启动线程
16             thread.start();
17         }
18         // 阻塞等待,直到state==0
19         try {
20             cdl.await();
21         } catch (InterruptedException e) {
22             e.printStackTrace();
23         }
24         // 主线程
25         System.out.println(Thread.currentThread().getName());
26     }
27 }

  结合上面的代码示例,为了便于理解,将CountDownLatch的state变化及调用await()、countDown()方法后线程的情况用下图展示,详情如下:

  

3、CountDownLatch源码分析

  CountDownLatch详情如下:

  

3.1、构造函数

  CountDownLatch没有无参构造函数,在有参构造函数中初始化了sync属性。

1 public CountDownLatch(int count) {
2     // count 合法校验
3     if (count < 0) throw new IllegalArgumentException("count < 0");
4     // 初始化sync属性
5     this.sync = new Sync(count);
6 }

3.2、Sync - 队列同步器

 1 // 抽象队列同步器
 2 private static final class Sync extends AbstractQueuedSynchronizer {
 3     private static final long serialVersionUID = 4982264981922014374L;
 4     
 5     // 将 count 赋值给 AQS 的 state 属性 
 6     Sync(int count) {
 7         setState(count);
 8     }
 9     // 获取 AQS 的 state 属性
10     int getCount() {
11         return getState();
12     }
13     // 判断所有线程是否都执行完成, 1 -> 全部执行完成;-1 -> 仍有线程在执行
14     protected int tryAcquireShared(int acquires) {
15         return (getState() == 0) ? 1 : -1;
16     }
17     // 释放锁
18     protected boolean tryReleaseShared(int releases) {
19         // 自旋
20         for (;;) {
21             // 获取 AQS 的 state 
22             int c = getState();
23             // 锁资源已经释放完毕,再次进入,直接返回false,什么也不做
24             if (c == 0)
25                 return false;
26             //  state - 1
27             int nextc = c-1;
28             // CAS 赋值操作
29             if (compareAndSetState(c, nextc))
30                 // 最后一个线程执行完,state = 0 ,返回true。
31                 // countDown() 唤醒等待队列中的其他挂起线程
32                 return nextc == 0;
33         }
34     }
35 }

3.3、await() - 阻塞等待

  CountDownLatch#await(),详情如下:

1 // AQS的state属性不为0, 阻塞
2 public void await() throws InterruptedException {
3     // 调用AQS提供的获取共享锁并允许中断的方法
4     sync.acquireSharedInterruptibly(1);
5 }

  AbstractQueuedSynchronizer#acquireSharedInterruptibly(),详情如下:

 1 // 获取共享锁,并允许其中断
 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
 3     // 线程中断,抛出异常
 4     if (Thread.interrupted())
 5         throw new InterruptedException();
 6     // 获取共享锁,由CountDownLatch实现
 7     if (tryAcquireShared(arg) < 0)
 8         // state > 0,说明有线程在持有锁资源,将当前线程添加到AQS等待队列中
 9         doAcquireSharedInterruptibly(arg);
10 }

  CountDownLatch#Sync#tryAcquireShared(),详情如下:

1 // 获取共享锁
2 protected int tryAcquireShared(int acquires) {
3     // 线程全部执行完成,返回 1;未全部执行完成,返回-1
4     return (getState() == 0) ? 1 : -1;
5 }

  AbstractQueuedSynchronizer#acquireSharedInterruptibly(),详情如下:

 1 // 将当前线程添加到AQS等待队列中
 2 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
 3     // 当前线程封装成Node,添加到AQS等待队列中
 4     final Node node = addWaiter(Node.SHARED);
 5     boolean failed = true;
 6     try {
 7         // 自旋
 8         for (;;) {
 9             // 获取当前线程节点的前驱节点
10             final Node p = node.predecessor();
11             // 前驱节点为等待队列头节点
12             if (p == head) {
13                 // 调用 CountDownLatch 实现的方法
14                 int r = tryAcquireShared(arg);
15                 // 返回值为1,表示 state 为 0 ,所有线程都释放了锁,无其他线程持有锁资源
16                 if (r >= 0) {
17                     // state = 0,将当前线程和后面所有排队的线程都唤醒。
18                     setHeadAndPropagate(node, r);
19                     p.next = null;
20                     failed = false;
21                     return;
22                 }
23             }
24             // *** 线程在此处被挂起,待所有线程释放锁资源后,即state = 0 ,线程被唤醒,再继续往下执行
25             // 挂起获取锁资源失败的线程,并且挂起的线程被中断,抛出InterruptedException异常
26             if (shouldParkAfterFailedAcquire(p, node) &&
27                 parkAndCheckInterrupt())
28                 throw new InterruptedException();
29         }
30     } finally {
31         if (failed)
32             cancelAcquire(node);
33     }
34 }

3.4、countDown() - 释放锁资源

  CountDownLatch#countDown(),详情如下:
1 // countDown方法, 实际上调用了AQS的释放共享锁操作
2 public void countDown() {
3     sync.releaseShared(1);
4 }
  AbstractQueuedSynchronizer#releaseShared(),详情如下:
 1 // AQS提供的释放共享锁方法,CountDownLatch实现了 tryReleaseShared 方法 
 2 public final boolean releaseShared(int arg) {
 3     // 尝试释放锁资源
 4     if (tryReleaseShared(arg)) {
 5         // 没有线程持有锁资源,唤醒等待队列中的其他挂起线程
 6         doReleaseShared();
 7         return true;
 8     }
 9     return false;
10 }

  CountDownLatch#Sync#tryReleaseShared(),详情如下:

 1 protected boolean tryReleaseShared(int releases) {
 2     // 自旋
 3     for (;;) {
 4         // 获取当前持有锁资源的线程数
 5         int c = getState();
 6         // state已为0,返回false,那么再次执行countDown,什么事情也不做
 7         if (c == 0)
 8             return false;
 9         // count - 1 
10         int nextc = c-1;
11         // CAS 完成赋值操作
12         if (compareAndSetState(c, nextc))
13             // 没有线程持有锁资源,返回true
14             return nextc == 0;
15     }
16 }

  AbstractQueuedSynchronizer#doReleaseShared(),详情如下:

 1 // 没有线程持有锁资源的处理
 2 private void doReleaseShared() {
 3     // 自旋
 4     for (;;) {
 5         // 获取等待队列的头节点
 6         Node h = head;
 7         // 等待队列中有挂起线程待唤醒
 8         if (h != null && h != tail) {
 9             int ws = h.waitStatus;
10             // 线程待唤醒
11             if (ws == Node.SIGNAL) {
12                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
13                     continue;        
14                 // 唤醒线程   
15                 unparkSuccessor(h);
16             }
17             // CAS失败
18             else if (ws == 0 &&
19                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
20                 continue;               
21         }
22         // 等待队列头节点被改变,结束循环
23         if (h == head) 
24             break;
25     }
26 }

3.5、总结

  CountDownLatch基于 AQS + CAS 实现,CountDownLatch的构造函数中必须指定count,同时初始继承AQS的内部类Sync,通过Sync对象将count赋值给AQS的state属性,这样就可以基于AQS提供的方法完成CountDownLatch的功能。

  调用countDown()方法,实际上是将AQS中 state 减 1。所有线程执行完成,state 会被修改为 0 ,在countDown()中会唤醒等待队列中挂起的线程。

  调用await()方法,实际上是判断AQS中的 state 是否为 0。state > 0,表示有线程仍在执行,此时await()会阻塞线程。当最后一个线程执行结束,state 变为 0,countDown()唤醒线程后,await()正常执行结束,不再阻塞。

 

标签:分析,count,AQS,int,state,源码,线程,CountDownLatch
From: https://www.cnblogs.com/RunningSnails/p/17360363.html

相关文章

  • vue2源码-十六、异步组件
    异步组件Vue中异步组件的写法有很多,主要用作大的组件异步加载的markdown组件editor组件。就是先渲染一个注释标签,等组件加载完毕,最后再重新渲染forceUpdate(图片懒加载)使用异步组件会配合webpack原理:异步组件默认不会调用Vue.extend()方法所有Ctor上没有cid属性,没有cid属......
  • TK Mall:以技术创新与数据分析驱动的全球跨境电商平台
    2022年,TKMall在新加坡成立,是一家以技术创新和数据分析为驱动力的全球跨境电商平台,该平台凭借其高质量的服务和领先的技术优势,在全球范围内迅速崛起。作为一家新兴的跨境电商平台,TKMall主打移动电商业务,其90%的卖家来自中国,同时也是日韩、东南亚最受TikTok主播青睐的带货选品移动......
  • RCU-3——经典(可抢占)RCU代码分析
    基于Linux-5.10一、相关数据结构1.structrcu_statercu_state用于描述RCU全局状态。structrcu_state{structrcu_nodenode[NUM_RCU_NODES];/*Hierarchy.*/structrcu_node*level[RCU_NUM_LVLS+1];/*Hierarchylevels(+1toshutbogusgccwarning)......
  • RCU-4——不可抢占RCU代码分析
    基于Linux-5.10一、不可抢占RCU1.不可抢占RCU不允许进程在读端临界区被其它进程抢占,使用函数rcu_read_lock_sched()/rcu_read_unlock_sched()标记读临界区。前者禁止内核抢占,后者开启内核抢占。staticinlinevoidrcu_read_lock_sched(void)//include/linux/rcupdate.h......
  • nginx出现504错误的原因分析及解决
    线上环境使用的是nginx代理到后端应用(java),对应用进行了一次压测发现nginx出现大量的504代码,即网关超时(GatewayTime-out)错误。 原因分析:首先504是网关超时错误,通常是nginx将请求代理到后端应用时,后端应用没有在规定的时间返回数据,需要开发检查下应用那块有什么耗时的操作,比如:......
  • ubuntu2004 下源码安装boost
    ubuntu2004下源码安装boosthttps://www.aiuai.cn/aifarm1186.htmlhttps://www.boost.org/users/history/version_1_78_0.htmlhttps://cloud.tencent.com/developer/article/1804511https://stackoverflow.com/questions/12578499/how-to-install-boost-on-ubuntuboost版本在......
  • SpringCloud微服务架构分析说明!
    SpringCloud是一个基于SpringBoot的微服务框架,它提供了一系列的工具和组件,用于构建分布式系统中各个微服务之间的通信和互联,实现服务发现、负载均衡、分布式配置等功能。下面我们来具体解析一下SpringCloud微服务架构。服务注册与发现在微服务架构中,服务的数量非常多,因此需要一个机......
  • Python的OS模块分析文件路径层次信息——获取文件路径、全名(文件名+尾缀)
    Code:importosFilePath="/a/bb/ccc/dddd.png"FolderPath,FullFileName=os.path.split(FilePath)Name,Suffix=os.path.splitext(FullFileName)print("文件路径:{}\n文件全名:{}\n文件名:{}\n文件后缀:{}".format(FolderPath,FullFileName,Name,......
  • app逆向之安卓native层安全逆向分析(六):frida调试跟栈+unidbg补环境大动作
    前言继续跟着龙哥的unidbg学习:SO逆向入门实战教程六:s_白龙~的博客-CSDN博客还是那句,我会借鉴龙哥的文章,以一个初学者的角度,加上自己的理解,把内容丰富一下,尽量做到不在龙哥的基础上画蛇添足,哈哈。感谢观看的朋友分析 首先抓个包看看: 这里面这个sign,就是今天的重点了这个......
  • 直播平台搭建源码,使用EasyExcel实现导入导出功能
    直播平台搭建源码,使用EasyExcel实现导入导出功能使用,添加依赖 <dependencies> <!--https://mvnrepository.com/artifact/com.alibaba/easyexcel--> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>......