CyclicBarrier源码介绍
循环栅栏, 他的特点是可以循环使⽤,当多个线程都到达同指定点时,再同进执⾏。
测试案例:
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.test();
}
private void test() {
int num = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> this.doTask());
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < num; i++) {
threads.add(new Thread(() -> this.task01(cyclicBarrier), "游客_" + i));
}
threads.forEach(Thread::start);
threads.forEach(this::threadJoin);
System.out.println("=======去下一个地点=======");
threads.clear();
for (int i = 0; i < num; i++) {
threads.add(new Thread(() -> this.task02(cyclicBarrier), "游客_" + i));
}
threads.forEach(Thread::start);
}
private void task01(CyclicBarrier cyclicBarrier) {
randomSleep();
System.out.println(Thread.currentThread().getName() + " 坐进过山车座位,准备中。。。");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 开始大叫。。。");
}
private void task02(CyclicBarrier cyclicBarrier) {
randomSleep();
System.out.println(Thread.currentThread().getName() + " 坐进大摆锤座位,准备中。。。");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 开始大叫。。。");
}
private void doTask() {
System.out.println(" 设备启动。。。");
}
private void randomSleep() {
Random random = new Random();
int i = random.nextInt(5);
try {
Thread.sleep(i * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void threadJoin(Thread thread) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
游客_1 坐进过山车座位,准备中。。。
游客_3 坐进过山车座位,准备中。。。
游客_0 坐进过山车座位,准备中。。。
游客_2 坐进过山车座位,准备中。。。
游客_4 坐进过山车座位,准备中。。。
设备启动。。。
游客_4 开始大叫。。。
游客_3 开始大叫。。。
游客_0 开始大叫。。。
游客_1 开始大叫。。。
游客_2 开始大叫。。。
=======去下一个地点=======
游客_2 坐进大摆锤座位,准备中。。。
游客_3 坐进大摆锤座位,准备中。。。
游客_4 坐进大摆锤座位,准备中。。。
游客_1 坐进大摆锤座位,准备中。。。
游客_0 坐进大摆锤座位,准备中。。。
设备启动。。。
游客_0 开始大叫。。。
游客_2 开始大叫。。。
游客_3 开始大叫。。。
游客_1 开始大叫。。。
游客_4 开始大叫。。。
常用方法源码剖析
public class CyclicBarrier {
// 这个静态内部类是用来标记是否中断的
private static class Generation {
boolean broken = false;
}
/** CyclicBarrier是基于ReentrantLock实现的互斥操作,以及计数原子性操作 */
private final ReentrantLock lock = new ReentrantLock();
/** 基于当前的Condition实现线程的挂起和唤醒 */
private final Condition trip = lock.newCondition();
/** 记录有参构造传入的屏障数值,不会对这个数值做操作 */
private final int parties;
/* 当屏障数值达到0之后,优先执行当前任务 */
private final Runnable barrierCommand;
/** 初始化默认的Generation,用来标记线程中断情况 */
private Generation generation = new Generation();
//每来一个线程等待,就对count进行--
private int count;
// 在内部传入了parties,屏障点的数值
// 还传入了barrierAction,屏障点的数值达到0,优先执行barrierAction任务
public CyclicBarrier(int parties, Runnable barrierAction) {
// 健壮性判断
if (parties <= 0) throw new IllegalArgumentException();
// 当前类中的属性parties是保存屏障点数值的
this.parties = parties;
// 将parties赋值给属性count,每来一个线程,继续count做-1操作。
this.count = parties;
// 优先执行的任务
this.barrierCommand = barrierAction;
}
// 在CyclicBarrier中,提供了2个await方法
// 第一个是无参的方式,线程要死等,直屏障点数值为0,或者有线程中断
// 第二个是有参方式,传入等待的时间,要么时间到位了,要不就是直屏障点数值为0,或者有线程中断
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
//dowait方法主要包含了线程互相等待的逻辑,以及屏障点数值到达0之后的操作
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
说白了底层维护了一个int类型的count计数器,用来统计所有线程是否都达到同一状态了,到达同一状态之后就一起执行。
标签:Thread,游客,private,int,源码,介绍,new,CyclicBarrier From: https://www.cnblogs.com/dongyaotou/p/18392841