CyclicBarrier的源码分析
与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。
CyclicBarrier内部结构
- Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。
- nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。
- breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。
- await() :CyclicBarrier的核心方法,计数器递减处理。
构造函数
构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
// 参数合法性校验
if (parties <= 0) throw new IllegalArgumentException();
// final修饰,所有线程执行完成归为或重置时 使用
this.parties = parties;
// 在await方法中计数值,表示还有多少线程待执行await
this.count = parties;
// 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
this.barrierCommand = barrierAction;
}
CyclicBarrier属性
核心方法源码分析
await()
在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。
CyclicBarrier#await 详情如下:
// 执行没有超时时间的await
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 执行dowait()
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
// 执行有超时时间的await
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
// 获取锁对象
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取generation对象
final Generation g = generation;
// 这组线程中在执行过程中是否异常、超时、中断、重置
if (g.broken)
throw new BrokenBarrierException();
// 这组线程被中断,重置标识与计数值,
// 将Waiter队列中的线程转移到AQS队列,抛出InterruptedException
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 计数值 - 1
int index = --count;
// 这组线程都已准备就绪
if (index == 0) {
// 执行结果标识
boolean ranAction = false;
try {
// 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务
// 此处设计与 FutureTask 构造参数设计类似
final Runnable command = barrierCommand;
if (command != null)
// 执行任务
command.run();
// 执行完成,设置为true
ranAction = true;
// CyclicBarrier属性归位
nextGeneration();
return 0;
} finally {
// 执行过程中出现问题
if (!ranAction)
// 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
breakBarrier();
}
}
// -- 之后,count不为0,表示还有线程在等待
// 自旋 直到被中断、超时、异常、count = 0
for (;;) {
try {
// 未设置超时时间
if (!timed)
// 挂起线程,将线程转移到 Condition 队列
trip.await();
// 未达到等待时间
else if (nanos > 0L)
// 挂起线程,并返回剩余等待时间
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 中断异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 线程中断
Thread.currentThread().interrupt();
}
}
// 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 超时,抛出异常TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁资源
lock.unlock();
}
}
breakBarrier() - 结束CyclicBarrier的执行
// 结束CyclicBarrier的执行
private void breakBarrier() {
// 设置线程执行过程中是否异常、中断、重置标识
generation.broken = true;
// 重置计数值
count = parties;
// 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒
// 有后继节点的,设置ws = -1;
// 无后继节点的,设置ws = 0
trip.signalAll();
}
reset() - 重置CyclicBarrier
// 重置CyclicBarrier
public void reset() {
// 获取锁对象
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 设置当前generation属性,并将Waiter队列中线程转移到AQS队列
breakBarrier();
// 重置generation 属性、计数值
nextGeneration();
} finally {
// 释放锁
lock.unlock();
}
}
nextGeneration() - CyclicBarrier归位
private void nextGeneration() {
// 将Waiter队列中线程转移到AQS队列
trip.signalAll();
// 计数值、generation 归位
count = parties;
generation = new Generation();
}
总结
CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。
parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。
当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;
barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。
标签:分析,AQS,队列,await,generation,源码,线程,CyclicBarrier From: https://blog.csdn.net/qq_36324341/article/details/142299861