一、介绍
控制多个线程在某个时刻达到一个共同的屏障点(Barrier Point),然后再同时继续执行。当所有线程都到达屏障点时,屏障就会打开,所有线程可以继续执行,直到下一个屏障点再次等待所有线程到达。
二、特性
1. 可重用
当所有线程到达屏障点后,可以重置屏障,让所有线程再次从屏障点开始执行。
2. 可带参数
可以指定一个Runnable对象,在所有线程都到达屏障点后,先执行这个Runnable对象。
3. 线程同步
直到所有线程都到达屏障点,然后才会继续执行。
4. 灵活性
可以指定屏障点的数量,即到达屏障点的线程数量,也可以在构造函数中指定一个处理器,处理所有到达屏障点的线程。
5. 线程安全
CyclicBarrier是线程安全的,可以安全地用于多线程环境。
三、实现原理
实现原理是基于ReentrantLock和Condition的,它使用一个计数器来记录到达屏障点的线程数量。当一个线程调用await()方法时,它会被阻塞并加入到等待队列中,直到计数器的值达到指定的数量。此时,所有被阻塞的线程都会被唤醒,并继续执行。
CyclicBarrier会在计数器的值达到指定数量后,重新初始化计数器,并调用指定的Runnable对象。同时,它还会使用一个boolean类型的变量来记录屏障是否被打开。
四、应用场景
1. 数据流水线
每个线程处理完数据后,需要等待其他线程完成,才能继续执行下一个阶段。
2. 并行计算
等待所有线程都完成任务后,再汇总结果。
3. 多线程初始化
某些线程需要等待其他线程初始化完成后,才能开始执行。
五、扩展内容
1. CyclicBarrier的reset()方法可以重置屏障,使计数器回到初始值。如果某个线程在等待时调用了reset()方法,所有正在等待的线程都会抛出BrokenBarrierException异常。
2. CyclicBarrier的构造函数中可以指定一个Runnable对象,当计数器值达到设定值时,会自动执行该Runnable对象。
3. CyclicBarrier的await()方法有多个重载版本,其中最常用的是不带参数的await()方法,它会让当前线程等待直到所有线程都到达屏障点。还有一个带参数的await(long timeout, TimeUnit unit)方法,它会让当前线程等待一定的时间,如果在指定的时间内其他线程没有到达屏障点,当前线程会抛出TimeoutException异常。
4. 在Java 8中,CyclicBarrier还新增了getNumberWaiting()和getParties()两个方法,可以分别获取当前正在等待的线程数量和参与屏障的线程总数。
5. CyclicBarrier适用于一组线程需要相互等待,然后一起执行下一步操作的场景,例如多线程计算矩阵和、多线程下载文件后合并等。
六、CyclicBarrier与CountDownLatch区别
1. CyclicBarrier的计数器可以重置,可以用来反复使用;而CountDownLatch的计数器只能使用一次;
2. CyclicBarrier可以让一组线程在达到屏障点之前互相等待,然后一起执行下一步操作;而CountDownLatch只能让等待的线程在计数器为0时同时开始执行下一步操作;
3. CyclicBarrier可以指定一个Runnable对象,在计数器值达到设定值时自动执行;而CountDownLatch没有类似的功能;
如果需要在多个线程之间反复同步,那么CyclicBarrier是一个更好的选择;如果只需要在所有线程都准备好后一起执行下一步操作,那么,CountDownLatch则是更合适的选择。
七、实际应用
1. 案例一
(1) 场景
等待3个线程都到达屏障点后才能继续执行。
(2) 代码
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrierCase1
* CyclicBarrier的简单示例: 等待设置的线程都到达屏障点后才能继续执行。
*
* @author wxy
* @since 2023-04-23
*/
public class CyclicBarrierCase1 {
private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierCase1.class);
public static void main(String[] args) {
int count = 3;
CyclicBarrier barrier = new CyclicBarrier(count, () -> {
LOGGER.info("全部任务完成");
});
for (int index1 = 1; index1 <= count; index1++) {
new Thread(() -> {
try {
for (int index2 = 1; index2 <= 2; index2++) {
LOGGER.info("线程{}执行第{}次任务", Thread.currentThread().getName(), index2);
}
LOGGER.info("线程" + Thread.currentThread().getName() + "等待开始");
barrier.await();
LOGGER.info("线程" + Thread.currentThread().getName() + "等待完毕");
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread" + index1).start();
}
}
}
在这个示例中,使用CyclicBarrier创建了一个屏障,屏障的计数器值为3,表示需要等待3个线程都到达屏障点后才能继续执行。每个线程执行2次任务后,会调用await()方法等待其他线程完成任务。当所有线程都到达屏障点时,会执行指定的Runnable对象,输出"全部任务完成"。输出如下结果:
2. 案例二
(1) 场景
以下是一个使用CyclicBarrier实现多线程计算矩阵和的示例代码
(2) 代码
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CyclicBarrier;
/**
* MatrixSumCase
* CyclicBarrier实现多线程计算矩阵和的示例
*
* @author wxy
* @since 2023-04-23
*/
public class MatrixSumCase {
private static final Logger LOGGER = LoggerFactory.getLogger(MatrixSumCase.class);
/**
* 矩阵大小
*/
private static final int N = 1000;
/**
* 线程数
*/
private static final int THREADS = 10;
/**
* 矩阵
*/
private static final int[][] MATRIX = new int[N][N];
/**
* 行和数组
*/
private static final int[] ROW_SUMS = new int[N];
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREADS, new Runnable() {
// 所有线程达到屏障点后执行的操作
public void run() {
int sum = 0;
for (int i = 0; i < N; i++) {
sum += ROW_SUMS[i];
}
LOGGER.info("Matrix sum is " + sum);
}
});
public static void main(String[] args) {
// 初始化矩阵
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
MATRIX[i][j] = i + j;
}
}
// 创建多个线程计算矩阵的行和
for (int index = 0; index < THREADS; index++) {
final int threadId = index;
new Thread(new Runnable() {
public void run() {
int start = threadId * (N / THREADS);
int end = (threadId == THREADS - 1) ? N : (threadId + 1) * (N / THREADS);
for (int i = start; i < end; i++) {
int rowSum = 0;
for (int j = 0; j < N; j++) {
rowSum += MATRIX[i][j];
}
ROW_SUMS[i] = rowSum;
}
try {
// 等待其他线程到达屏障点
CYCLIC_BARRIER.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}).start();
}
}
}
上面的示例代码中,首先创建一个NxN的矩阵,并使用多个线程来计算矩阵的每一行的和,将结果保存在一个长度为N的数组中。使用CyclicBarrier来实现多个线程之间的同步,等待所有线程都计算完毕后再执行后续的求和操作。输出如下结果:
标签:CyclicBarrier,Java,int,屏障,线程,多线程,等待 From: https://blog.51cto.com/u_15898747/6223593