CyclicBarrier使用场景
应用实例
CyclicBarrier源代码分析
流程分析总结
1 CyclicBarrier使用场景
当有已知数量的线程需要在某一点同时执行时,先到达执行点的线程会进入等待,直到全部线程都到达执行点时,则会同时执行。
例如:有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态。
2 应用实例
public class Test { public static void main(String[] args){ CyclicBarrier cyclicBarrier = new CyclicBarrier(2); for (int i = 0; i < 2; i++) { new Thread(() -> { try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "正在执行"); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
输出
Thread-1正在执行 Thread-0正在执行
两个线程都到达执行点后,屏障消除,两个线程同时执行。
public class Test { public static void main(String[] args){ CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { System.out.println("线程全部准备就绪。"); }); for (int i = 0; i < 2; i++) { new Thread(() -> { try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "正在执行"); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
输出
线程全部准备就绪。 Thread-1正在执行 Thread-0正在执行
两个线程都到达执行点后,执行Rnnable汇总操作后,屏障消除,两个线程同时执行。
3 CyclicBarrier源代码分析
1 构造函数
可输入的参数:parties参与同时执行的线程个数,barrierAction当线程数量满足后执行的汇总操作。
parties不能<=0否则直接抛出异常,parties一旦赋值不会改变,表示屏障开启后无论进行了多少代,参与的线程数量不可改变。
count变量会随着参与线程的增加而减少,直到为0后表示屏障可以打开,当屏障进入下一代时,count会重新赋值成parties。
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
2 成员变量
//内部持有broken表示屏障是否被破坏,如果为true则表示屏障已经被破坏。
private static class Generation { boolean broken = false; }
//为了保证CyclicBarrier的安全,每个进入屏障的线程都需要加锁操作。 private final ReentrantLock lock = new ReentrantLock();
//当屏障不可以打开时,线程进入等待。 private final Condition trip = lock.newCondition();
//参与同时执行的线程个数。 private final int parties;
//线程数量满足后可选择执行的汇总操作。 private final Runnable barrierCommand;
//CyclicBarrier对象被创建后会初始化一次,此后每次屏障打开后,新一代的屏障也会刷新这个变量。 private Generation generation = new Generation();
//参与线程计数。 private int count;
3 核心方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L);//不指定线程最大等待时间 } catch (TimeoutException toe) { throw new Error(toe); } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout));//指定线程最大等待时间 }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock;
//上锁 lock.lock(); try {
//每一个线程到达后,首先判断屏障是否被破坏了,如果已经被破坏就直接抛出异常。 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); //如果线程中断了,也会破坏屏障 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //如果index==0表示这已经是最后一个线程了 int index = --count; if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand;
//执行可选的Runnable if (command != null) command.run(); ranAction = true;
//此方法唤醒所有等待线程,并重新创建Generation进入下一代,所以正常唤醒下,成员变量generation与局部变量g不是同一个对象。 nextGeneration();
//返回0表示所有线程均已被唤醒执行,但并不是都已经执行完毕,仅仅是一个计数标记。 return 0; } finally {
//如果汇总Runnable抛出了异常则也属于破坏了屏障 if (!ranAction) breakBarrier(); } } for (;;) { try {
//线程进入等待,或者进入一定时间的等待。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {
//如果线程捕获了中断异常屏障没有被破坏则触发屏障破坏。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //如果屏障已经被破坏则重置线程中断信号。
Thread.currentThread().interrupt(); } } //线程被唤醒,但屏障已经被破坏,调用过breakBarrier();抛出异常。 if (g.broken) throw new BrokenBarrierException(); //正常情况下,线程被唤醒,此时已经进入下一代,返回index,它标记了后续还有几个线程正在执行。 if (g != generation) return index; //线程被唤醒了,但是当前线程剩余的等待时间已经没有了,属于超时等待的线程被强制唤醒,也会抛出异常。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally {
//释放锁。 lock.unlock(); } }
//调用此方法表示屏障已经被破坏,唤醒所有等待的线程。
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
//调用此方法表示屏障成功打开,唤醒等待线程,进入下一代。
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
4 流程分析总结
1 初始化CyclicBarrier的成员变量,包括Lock,Condition,parties,count,Generation,Runnable。
2 当线程调用await()方法时,判断是否已经是计数器是否已经归零,如果是则首先执行Runnable。
3 CyclicBarrier进入下一代,唤醒所有等待线程,重置count为parties并且创建新的Generation实例。
4 如果计数器没有归零,则通过Condition中的await()阻塞线程。
5 计数器归零的情况下线程被唤醒,到达屏障的线程继续执行自己。
6 Runnable线程异常,等待线程被中断,线程等待超时,此等异常情况线程被唤醒则抛出异常。
标签:CyclicBarrier,屏障,线程,parties,new,执行,循环 From: https://www.cnblogs.com/zumengjie/p/17208772.html