CyclicBarrier源码解析
描述:
一个同步帮助,允许一组线程互相等待到达一个共同的屏障点。Cyclicbarrier 在涉及固定大小的线程组的程序中非常有用,这些线程必须偶尔相互等待。这个屏障称为cyclic
,因为它可以在等待的线程被释放后被重用。
A CyclicBarrier
支持一个可选的Runnable
命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此 屏障操作 对更新共享状态很有用。
示例用法:以下是在并行分解设计中使用障碍的示例:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}
这里,每个工作线程处理矩阵的一行,然后等待屏障,直到所有行都被处理。 当处理所有行时,执行提供的Runnable
屏障操作并合并行。 如果合并确定已经找到解决方案,那么done()
将返回true
,并且每个工作人员将终止。
如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 为了方便这一点,每次调用await()
返回该线程在屏障上的到达索引。 然后,您可以选择哪个线程应该执行屏障操作,例如:
if (barrier.await() == 0) {
// log the completion of this iteration
}
CyclicBarrier
对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException
(或InterruptedException
)异常离开如果他们也在同一时间被打断)。
内存一致性效果:线程中调用的行动之前, await()
happen-before 行动是屏障操作的一部分,进而发生,之前的动作之后,从相应的成功返回await()
其他线程。
源码:
public class CyclicBarrier {
/**
* 屏障的每次使用都表示为一个生成实例。每当障碍被跳闸或复位时,生成就会改变。
* 使用barrier的线程可能会关联许多代 —— 由于分配锁给等待线程的不确定方式 —— 但一次只能有一个代是活动的( count 适用的那个),其余的都是中断或触发的。
* 如果发生了中断,但没有后续的重置,则不需要活动代。
*/
private static class Generation {
boolean broken = false;
}
/**
* 用于保护屏障入口的锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 条件等待,直到跳闸
*/
private final Condition trip = lock.newCondition();
/**
* 参与方数量
*/
private final int parties;
/**
* 跳闸时运行的命令
*/
private final Runnable barrierCommand;
/**
* 当前一代
*/
private Generation generation = new Generation();
/**
* 仍在等待的参与方数。每一代从参与方倒数到 0。
* 它在每一代人中或在被破坏时都会被重置。
*/
private int count;
/**
* 更新障碍旅行状态并唤醒所有人。仅在持有锁时调用。
*/
private void nextGeneration() {
// 上一代的信号完成
trip.signalAll();
// 建立下一代
count = parties;
generation = new Generation();
}
/**
* 设置当前屏障生成为破碎,并唤醒所有人。仅在持有锁时调用。
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/**
* 主要壁垒代码,涵盖各种政策。
*/
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* 创建一个新的 CyclicBarrier,当给定数量的方(线程)等待它时,它将跳闸,
* 当barrier被跳闸时,它将执行给定的barrier动作,由最后一个进入barrier的线程执行。
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* 创建一个新的 CyclicBarrier ,当给定数量的方(线程)等待它时,它将跳闸,并且在跳闸时不执行预定义的操作。
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* 返回跳过此屏障所需的参与方数。
*/
public int getParties() {
return parties;
}
/**
* 等待,直到所有 parties 都在此屏障上调用了 await 。
*
* 1.如果当前线程不是最后一个到达的,那么出于线程调度的目的,它将被禁用,并处于休眠状态,直到发生以下事情之一:
* 最后一个到了; 或其他线程中断当前线程; 或其他线程中断了其中一个等待线程; 或其他线程在等待barrier时超时; 或其他一些线程在这个barrier上调用reset。
* 2.如果当前线程:在进入此方法时设置其中断状态; 或等待时中断;然后 InterruptedException 被抛出,当前线程的中断状态被清除。
* 3.如果屏障是 reset ,而任何线程正在等待,或如果屏障 isBroken被打破时被调用,或当任何线程正在等待,那么 BrokenBarrierException 将被抛出。
* 4.如果任何线程在等待时被中断,那么所有其他等待线程将抛出 BrokenBarrierException , barrier将处于broken状态。
* 5.如果当前线程是到达的最后一个线程,并且构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。
*
* 如果在barrier操作期间发生异常,则该异常将在当前线程中传播,并且barrier处于断开状态。
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* 等待,直到所有 parties 都在此屏障上调用了 await 。或者指定的等待时间过去。
*
* 1.如果当前线程不是最后一个到达的,那么出于线程调度的目的,它将被禁用,并处于休眠状态,直到发生以下事情之一:
* 最后一个到达; 或指定的超时时间已过; 或一些其他线程中断当前线程; 或其他线程中断了其中一个等待线程; 或其他线程在等待barrier时超时; 或其他一些线程在这个barrier上调用 reset。
* 2.如果当前线程: 在进入此方法时设置其中断状态; 或等待时中断;然后 InterruptedException 被抛出,当前线程的中断状态被清除。
* 3.如果指定的等待时间超过,则抛出 TimeoutException 。如果时间小于或等于零,该方法将根本不等待。
* 4.如果屏障是 reset,而任何线程正在等待,或如果屏障 isBroken被打破时被调用,或当任何线程正在等待,那么 BrokenBarrierException 将被抛出。
* 5.如果任何线程在等待时被中断,那么所有其他等待线程将抛出 BrokenBarrierException , barrier将处于broken状态。
* 6.如果当前线程是到达的最后一个线程,并且构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。
*
* 如果在barrier操作期间发生异常,则该异常将在当前线程中传播,并且barrier处于断开状态。
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/**
* 查询该屏障是否处于损坏状态。
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/**
* 将屏障重置为初始状态。
* 如果任何一方当前正在栅栏旁等待,他们将返回一个 BrokenBarrierException。
* 请注意,由于其他原因发生断裂后,重置<em>可能会比较复杂;线程需要以其他方式重新同步,并选择一个来执行重置。
* 可能更可取的是,为后续使用创建一个新的屏障。
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/**
* 返回当前在屏障处等待的参与方数量。此方法主要用于调试和断言。
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}
设计思路:
主要使用 await() 方法,使各线程处于阻塞状态,直到达到一定的数量,调用 signalAll() 释放所有线程继续执行。往复使用该操作
标签:CyclicBarrier,屏障,lock,private,源码,线程,new,解析,final From: https://www.cnblogs.com/coolyang/p/17126883.html