作用
让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开放,所有被屏障拦截的线程才会继续运行。
核心方法
- CyclicBarrier(int parties):默认构造方法,parties参数表示屏障拦截的线程数量
- CyclicBarrier(int parties, Runnable barrierAction):barrierAction表示屏障开放的时候会优先执行barrierAction线程,会直接使用最后到达屏障的那个线程来执行barrierAction。
- await():告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,直到所有线程到达屏障后,屏障开放一起执行
- reset():重置计数器
- getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量
- isBroken()方法用来了解阻塞的线程是否被中断
应用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
示例
package com.xiaolyuh;
import com.alibaba.fastjson.JSON;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier并发工具类
*
* @author yuhao.wang3
* @since 2019/6/27 15:52
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
Random random = new Random();
Map<String, Long> map = new ConcurrentHashMap<>();
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> {
System.out.println(Thread.currentThread().getName() + " 3所有线程到达屏障的时候,优先执行barrierAction线程。。。。。。。");
System.out.println(Thread.currentThread().getName() + " 3" + JSON.toJSONString(map));
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
Thread.sleep(200 + random.nextInt(200));
System.out.println(Thread.currentThread().getName() + " 1等待所有线程到达屏障------------");
map.put(Thread.currentThread().getName(), Thread.currentThread().getId());
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " 2所有线程到达屏障的时候,开始执行业务代码================");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
try {
cyclicBarrier.await();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 主线程完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出结果:
Thread-1 1等待所有线程到达屏障------------
Thread-0 1等待所有线程到达屏障------------
Thread-2 1等待所有线程到达屏障------------
Thread-2 3所有线程到达屏障的时候,优先执行barrierAction线程。。。。。。。
Thread-2 3{"Thread-0":13,"Thread-1":14,"Thread-2":15}
Thread-2 2所有线程到达屏障的时候,开始执行业务代码================
Thread-1 2所有线程到达屏障的时候,开始执行业务代码================
Thread-0 2所有线程到达屏障的时候,开始执行业务代码================
main 主线程完成
源码
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
// 使用ReentrantLock来做并发控制
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 验证线程的中断状态(y有作用)
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 计数器减一
int index = --count;
// 计数器减到0需要放行
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 修改放行状态和唤醒左右等待线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 没有设置过期时间就一直等待
trip.await();
else if (nanos > 0L)
// 调用有超时机制的等待方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
参考
《java并发编程的艺术》
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下