首页 > 编程语言 >CyclicBarrier源码分析

CyclicBarrier源码分析

时间:2023-05-06 09:22:22浏览次数:69  
标签:分析 执行 队列 await generation 源码 线程 CyclicBarrier

1、CyclicBarrier的介绍

  CyclicBarrier 被称为栅栏,允许一组线程相互等待,直到这一组线程都准备完毕,放行,程序方可继续执行。

  就好像做摩天轮,游乐园规定,至少有9个游客乘坐摩天轮,管理员才可以启动摩天轮,游客数和管理员少一个条件,摩天轮都不会启动。

2、CyclicBarrier的使用

  根据上面摩天轮的案例,程序代码如下:

 1 import java.util.concurrent.BrokenBarrierException;
 2 import java.util.concurrent.CyclicBarrier;
 3 
 4 public class TestCyclicBarrier {
 5 
 6     public static void main(String[] args) throws Exception {
 7         int parties = 10;
 8         CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
 9             System.out.println("======== 启动摩天轮.... ========");
10         });
11 
12         for (int i = 1; i <= parties - 1; i++) {
13             final int current = i;
14             new Thread(() -> {
15                 System.out.println("编号为 " + current + " 的游客已准备就绪");
16                 try {
17                     if (current < 9) {
18                         System.out.println("sorry,游客数不足,无法启动摩天轮");
19                     } else {
20                         System.out.println("OK,游客数已达标,准备启动摩天轮");
21                     }
22                     barrier.await();
23 
24                     System.out.println("编号为 " + current + " 的游客尖叫");
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 } catch (BrokenBarrierException e) {
28                     e.printStackTrace();
29                 }
30             }).start();
31         }
32 
33         System.out.println("管理员已到位");
34         barrier.await();
35 
36     }
37 }

  执行结果如下:

  

3、CyclicBarrier的源码分析

  与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。有关ConditionObject的分析,参考:

ConditionObject源码分析

3.1、CyclicBarrier概览

  Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。

  nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。

  breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。

  await() :CyclicBarrier的核心方法,计数器递减处理。

  

3.2、构造函数

  构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:

 1 public CyclicBarrier(int parties) {
 2     this(parties, null);
 3 }
 4 
 5 public CyclicBarrier(int parties, Runnable barrierAction) {
 6     // 参数合法性校验
 7     if (parties <= 0) throw new IllegalArgumentException();
 8     // final修饰,所有线程执行完成归为或重置时 使用
 9     this.parties = parties;
10     // 在await方法中计数值,表示还有多少线程待执行await
11     this.count = parties;
12     // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
13     this.barrierCommand = barrierAction;
14 }

3.3、CyclicBarrier属性

  

3.4、核心方法分析

1、await() - 源码分析

  在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。

CyclicBarrier#await 详情如下:

 1 // 执行没有超时时间的await
 2 public int await() throws InterruptedException, BrokenBarrierException {
 3     try {
 4         // 执行dowait()
 5         return dowait(false, 0L);
 6     } catch (TimeoutException toe) {
 7         throw new Error(toe); 
 8     }
 9 }
10 
11 // 执行有超时时间的await
12 public int await(long timeout, TimeUnit unit)
13     throws InterruptedException,
14            BrokenBarrierException,
15            TimeoutException {
16     return dowait(true, unit.toNanos(timeout));
17 }

  await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:

 1 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
 2     // 获取锁对象
 3     final ReentrantLock lock = this.lock;
 4     // 加锁
 5     lock.lock();
 6     try {
 7         // 获取generation对象
 8         final Generation g = generation;
 9         
10         // 这组线程中在执行过程中是否异常、超时、中断、重置
11         if (g.broken)
12             throw new BrokenBarrierException();
13         
14         // 这组线程被中断,重置标识与计数值,
15         //     将Waiter队列中的线程转移到AQS队列,抛出InterruptedException
16         if (Thread.interrupted()) {
17             breakBarrier();
18             throw new InterruptedException();
19         }
20         
21         // 计数值 - 1
22         int index = --count;
23         // 这组线程都已准备就绪
24         if (index == 0) { 
25             // 执行结果标识
26             boolean ranAction = false;
27             try {
28                 // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务
29                 //     此处设计与 FutureTask 构造参数设计类似
30                 final Runnable command = barrierCommand;
31                 if (command != null)
32                     // 执行任务
33                     command.run();
34                 // 执行完成,设置为true
35                 ranAction = true;
36                 // CyclicBarrier属性归位
37                 nextGeneration();
38                 return 0;
39             } finally {
40                 // 执行过程中出现问题
41                 if (!ranAction)
42                     // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
43                     breakBarrier();
44             }
45         }
46 
47         // -- 之后,count不为0,表示还有线程在等待
48         // 自旋 直到被中断、超时、异常、count = 0
49         for (;;) {
50             try {
51                 // 未设置超时时间
52                 if (!timed)
53                     // 挂起线程,将线程转移到 Condition 队列
54                     trip.await();
55                 // 未达到等待时间
56                 else if (nanos > 0L)
57                     // 挂起线程,并返回剩余等待时间
58                     nanos = trip.awaitNanos(nanos);
59             } catch (InterruptedException ie) {
60                 // 中断异常
61                 if (g == generation && ! g.broken) {
62                     breakBarrier();
63                     throw ie;
64                 } else {
65                     // 线程中断
66                     Thread.currentThread().interrupt();
67                 }
68             }
69             
70             // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
71             if (g.broken)
72                 throw new BrokenBarrierException();
73             
74             if (g != generation)
75                 return index;
76             
77             // 超时,抛出异常TimeoutException
78             if (timed && nanos <= 0L) {
79                 breakBarrier();
80                 throw new TimeoutException();
81             }
82         }
83     } finally {
84         // 释放锁资源
85         lock.unlock();
86     }
87 }

2、breakBarrier() - 结束CyclicBarrier的执行

 1 // 结束CyclicBarrier的执行
 2 private void breakBarrier() {
 3     // 设置线程执行过程中是否异常、中断、重置标识
 4     generation.broken = true;
 5     // 重置计数值
 6     count = parties;
 7     // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒
 8     // 有后继节点的,设置ws = -1;
 9     // 无后继节点的,设置ws = 0
10     trip.signalAll();
11 }

3、reset() - 重置CyclicBarrier

 1 // 重置CyclicBarrier
 2 public void reset() {
 3     // 获取锁对象
 4     final ReentrantLock lock = this.lock;
 5     // 加锁
 6     lock.lock();
 7     try {
 8         // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列
 9         breakBarrier();  
10         // 重置generation 属性、计数值
11         nextGeneration();
12     } finally {
13         // 释放锁
14         lock.unlock();
15     }
16 }

4、nextGeneration() - CyclicBarrier归位

  与reset()不同,nextGeneration()是在任务执行完成后,对 CyclicBarrier 做归位,不会设置线程执行异常、超时、中断标识。

1 private void nextGeneration() {
2     // 将Waiter队列中线程转移到AQS队列
3     trip.signalAll();
4     // 计数值、generation 归位
5     count = parties;
6     generation = new Generation();
7 }

4、总结

  CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。

  parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。

  当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;

  barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。

 

标签:分析,执行,队列,await,generation,源码,线程,CyclicBarrier
From: https://www.cnblogs.com/RunningSnails/p/17375944.html

相关文章

  • ConditionObject源码分析
    ConditionObject是AbstractQueuedSynchronizer(AQS)实现的内部类,类图如下: 1、Condition接口ConditionObject实现了Condition接口。先来看看Codition接口。 Codition中主要定义了挂起线程和唤醒线程的接口方法。Condition接口详情如下:1publicinterfaceCon......
  • TCP的三次握手和四次挥手分析
    一、tcp报文格式主要关注的字段为:源端口号(SourcePort),目的端口号(DestinationPort)序列号seq(SequenceNumber)确认号ack(AcknowledgmentNumber)标志位:ACK,SYN,FIN二、三次握手客户端将TCP报文标志位SYN置为1,随机产生一个序号值seq=x,发送给服务端。发送完毕后,客户端进入SYN_......
  • java基于springboot+vue的校园新闻网站、校园新闻管理系统,附源码+数据库+文档+PPT,适合
    1、项目介绍校园新闻网站的主要使用者分为管理员和用户,实现功能包括管理员:首页、个人中心、用户管理、新闻类型管理、校园新闻管理、留言板管理、论坛交流、系统管理,用户前台:首页、校园新闻、论坛交流、留言反馈、个人中心、后台管理等功能。由于本网站的功能模块设计比较全面,所......
  • 事后诸葛亮分析
    作业要求目标事后诸葛亮分析作业要求作业要求目录设想与目标计划资源变更管理设计/实现测试与发布总结团队成员角色与贡献设想与目标我们的软件要解决什么问题?是否定义得很清楚?是否对典型用户和典型场景有清晰的描述?此项目是基于商户和仓库两个部门之间联系,记录......
  • R语言决策树、随机森林、逻辑回归临床决策分析NIPPV疗效和交叉验证
    全文链接:http://tecdat.cn/?p=32295原文出处:拓端数据部落公众号临床决策(clinical decision making)是医务人员在临床实践过程中,根据国内外医学科研的最新进展,不断提出新方案,与传统方案进行比较后,取其最优者付诸实施,从而提高疾病诊治水平的过程。在临床医疗实践中,许多事件......
  • 事后诸葛亮分析
    这个作业属于哪个课程2023软件工程-双学位作业要求团队作业6——复审与事后分析项目团队下岗工人在就业队目录1.事后诸葛亮分析1.1设想和目标1.2计划1.3资源1.4变更管理1.5设计/实现1.6测试1.7总结2.8照片2.团队成员在Alpha阶段的角色和具体贡献1.事后诸葛亮分......
  • 95计费版赛题 赛题分析+代码
    95计费版赛题赛题分析+代码1.1描述1.2术语解释1.3数学描述1.3.1约束1.4目标2.1简单总结题目节点可以分为边缘节点和客户节点,边缘节点的各个时刻的分配流量的从小到大排序的前95%作为成本显然,每个节点的后5%是可以白嫖的,因此需要增加白嫖的流量题目为组合优化......
  • java基于springboot+vue的垃圾分类管理系统,附源码+文档+PPT+数据库
    1、项目介绍垃圾分类网站的主要使用者分为管理员和用户、垃圾分类管理员,实现功能包括管理员:首页、个人中心、用户管理、垃圾分类管理员管理、垃圾分类管理、垃圾类型管理、垃圾图谱管理、系统管理,垃圾分类管理员;首页、个人中心、用户管理、垃圾分类管理员管理、垃圾分类管理、垃......
  • 以京东为例,分析优惠价格叠加规则
      一、平行优惠计算原则 1、什么是“平行式门槛计算规则”平行式门槛计算规则,即每一层级优惠都直接根据商品的单品基准价来计算是否符合门槛,店铺/平台促销、优惠券类优惠之间是并列关系,只要单品基准价或单品基准价总和(即各商品单品基准价的总和)满足各层级优惠门槛,则可......
  • MASA MinimalAPI源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口
    源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口1.ServiceBase1.AutoMapRoute源码如下:AutoMapRoute自动创建map路由,MinimalAPI会根据service中的方法,创建对应的api接口。比如上文的一个方法:publicasyncTask<WeatherForecast[]>PostWeather(){re......