1、CyclicBarrier的介绍
CyclicBarrier 被称为栅栏,允许一组线程相互等待,直到这一组线程都准备完毕,放行,程序方可继续执行。
就好像做摩天轮,游乐园规定,至少有9个游客乘坐摩天轮,管理员才可以启动摩天轮,游客数和管理员少一个条件,摩天轮都不会启动。
2、CyclicBarrier的使用
根据上面摩天轮的案例,程序代码如下:
1 import java.util.concurrent.BrokenBarrierException; 2 import java.util.concurrent.CyclicBarrier; 3 4 public class TestCyclicBarrier { 5 6 public static void main(String[] args) throws Exception { 7 int parties = 10; 8 CyclicBarrier barrier = new CyclicBarrier(parties, () -> { 9 System.out.println("======== 启动摩天轮.... ========"); 10 }); 11 12 for (int i = 1; i <= parties - 1; i++) { 13 final int current = i; 14 new Thread(() -> { 15 System.out.println("编号为 " + current + " 的游客已准备就绪"); 16 try { 17 if (current < 9) { 18 System.out.println("sorry,游客数不足,无法启动摩天轮"); 19 } else { 20 System.out.println("OK,游客数已达标,准备启动摩天轮"); 21 } 22 barrier.await(); 23 24 System.out.println("编号为 " + current + " 的游客尖叫"); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } catch (BrokenBarrierException e) { 28 e.printStackTrace(); 29 } 30 }).start(); 31 } 32 33 System.out.println("管理员已到位"); 34 barrier.await(); 35 36 } 37 }
执行结果如下:
3、CyclicBarrier的源码分析
与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。有关ConditionObject的分析,参考:
3.1、CyclicBarrier概览
Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。
nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。
breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。
await() :CyclicBarrier的核心方法,计数器递减处理。
3.2、构造函数
构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:
1 public CyclicBarrier(int parties) { 2 this(parties, null); 3 } 4 5 public CyclicBarrier(int parties, Runnable barrierAction) { 6 // 参数合法性校验 7 if (parties <= 0) throw new IllegalArgumentException(); 8 // final修饰,所有线程执行完成归为或重置时 使用 9 this.parties = parties; 10 // 在await方法中计数值,表示还有多少线程待执行await 11 this.count = parties; 12 // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程 13 this.barrierCommand = barrierAction; 14 }
3.3、CyclicBarrier属性
3.4、核心方法分析
1、await() - 源码分析
在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。
CyclicBarrier#await 详情如下:
1 // 执行没有超时时间的await 2 public int await() throws InterruptedException, BrokenBarrierException { 3 try { 4 // 执行dowait() 5 return dowait(false, 0L); 6 } catch (TimeoutException toe) { 7 throw new Error(toe); 8 } 9 } 10 11 // 执行有超时时间的await 12 public int await(long timeout, TimeUnit unit) 13 throws InterruptedException, 14 BrokenBarrierException, 15 TimeoutException { 16 return dowait(true, unit.toNanos(timeout)); 17 }
await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:
1 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { 2 // 获取锁对象 3 final ReentrantLock lock = this.lock; 4 // 加锁 5 lock.lock(); 6 try { 7 // 获取generation对象 8 final Generation g = generation; 9 10 // 这组线程中在执行过程中是否异常、超时、中断、重置 11 if (g.broken) 12 throw new BrokenBarrierException(); 13 14 // 这组线程被中断,重置标识与计数值, 15 // 将Waiter队列中的线程转移到AQS队列,抛出InterruptedException 16 if (Thread.interrupted()) { 17 breakBarrier(); 18 throw new InterruptedException(); 19 } 20 21 // 计数值 - 1 22 int index = --count; 23 // 这组线程都已准备就绪 24 if (index == 0) { 25 // 执行结果标识 26 boolean ranAction = false; 27 try { 28 // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务 29 // 此处设计与 FutureTask 构造参数设计类似 30 final Runnable command = barrierCommand; 31 if (command != null) 32 // 执行任务 33 command.run(); 34 // 执行完成,设置为true 35 ranAction = true; 36 // CyclicBarrier属性归位 37 nextGeneration(); 38 return 0; 39 } finally { 40 // 执行过程中出现问题 41 if (!ranAction) 42 // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列 43 breakBarrier(); 44 } 45 } 46 47 // -- 之后,count不为0,表示还有线程在等待 48 // 自旋 直到被中断、超时、异常、count = 0 49 for (;;) { 50 try { 51 // 未设置超时时间 52 if (!timed) 53 // 挂起线程,将线程转移到 Condition 队列 54 trip.await(); 55 // 未达到等待时间 56 else if (nanos > 0L) 57 // 挂起线程,并返回剩余等待时间 58 nanos = trip.awaitNanos(nanos); 59 } catch (InterruptedException ie) { 60 // 中断异常 61 if (g == generation && ! g.broken) { 62 breakBarrier(); 63 throw ie; 64 } else { 65 // 线程中断 66 Thread.currentThread().interrupt(); 67 } 68 } 69 70 // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常 71 if (g.broken) 72 throw new BrokenBarrierException(); 73 74 if (g != generation) 75 return index; 76 77 // 超时,抛出异常TimeoutException 78 if (timed && nanos <= 0L) { 79 breakBarrier(); 80 throw new TimeoutException(); 81 } 82 } 83 } finally { 84 // 释放锁资源 85 lock.unlock(); 86 } 87 }
2、breakBarrier() - 结束CyclicBarrier的执行
1 // 结束CyclicBarrier的执行 2 private void breakBarrier() { 3 // 设置线程执行过程中是否异常、中断、重置标识 4 generation.broken = true; 5 // 重置计数值 6 count = parties; 7 // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒 8 // 有后继节点的,设置ws = -1; 9 // 无后继节点的,设置ws = 0 10 trip.signalAll(); 11 }
3、reset() - 重置CyclicBarrier
1 // 重置CyclicBarrier 2 public void reset() { 3 // 获取锁对象 4 final ReentrantLock lock = this.lock; 5 // 加锁 6 lock.lock(); 7 try { 8 // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列 9 breakBarrier(); 10 // 重置generation 属性、计数值 11 nextGeneration(); 12 } finally { 13 // 释放锁 14 lock.unlock(); 15 } 16 }
4、nextGeneration() - CyclicBarrier归位
与reset()不同,nextGeneration()是在任务执行完成后,对 CyclicBarrier 做归位,不会设置线程执行异常、超时、中断标识。
1 private void nextGeneration() { 2 // 将Waiter队列中线程转移到AQS队列 3 trip.signalAll(); 4 // 计数值、generation 归位 5 count = parties; 6 generation = new Generation(); 7 }
4、总结
CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。
parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。
当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;
barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。
标签:分析,执行,队列,await,generation,源码,线程,CyclicBarrier From: https://www.cnblogs.com/RunningSnails/p/17375944.html