多线程同步与任务完成等待机制总结
在多线程编程中,合理的同步机制能够有效地协调多个线程之间的执行顺序,确保任务按照预期执行。常见的同步机制包括 CountDownLatch
、CyclicBarrier
、CompletableFuture
、ExecutorService.invokeAll()
和 Phaser
。接下来,我们将通过具体场景加伪代码示例来介绍这些同步工具的应用。
1. CountDownLatch
CountDownLatch
适用于多个线程执行任务,并在所有线程完成时执行后续操作的场景。
场景:
假设我们有一个数据处理系统,启动时需要从多个数据源加载数据,等待所有数据源加载完成后,再进行数据汇总。
伪代码:
// 初始化 CountDownLatch,等待的数据源数量为 5
CountDownLatch latch = new CountDownLatch(5);
// 启动 5 个线程,从不同的数据源加载数据
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
loadDataFromSource(); // 从某个数据源加载数据
latch.countDown(); // 完成任务,计数器减 1
});
}
// 等待所有线程完成
latch.await(); // 阻塞直到所有线程执行完毕
// 所有数据加载完成后,执行数据汇总操作
aggregateData();
适用场景:
- 任务依赖:多个线程执行,等所有线程完成后执行统一操作,如数据处理、汇总。
- 资源管理:比如等待多个外部资源的加载(例如数据库连接、外部 API)。
示例代码:
java复制代码import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 初始化线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 初始化 CountDownLatch,等待 5 个数据源加载完成
CountDownLatch latch = new CountDownLatch(5);
// 启动 5 个线程,从不同的数据源加载数据
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
loadDataFromSource(); // 模拟从某个数据源加载数据
latch.countDown(); // 每个线程完成任务,计数器减 1
});
}
// 等待所有线程完成
latch.await(); // 阻塞直到所有线程执行完毕
// 所有数据加载完成后,执行汇总处理
System.out.println("所有数据加载完成,开始数据汇总...");
aggregateData();
// 关闭线程池
executor.shutdown();
}
private static void loadDataFromSource() {
try {
// 模拟加载数据的过程
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 数据加载完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void aggregateData() {
// 数据汇总逻辑
System.out.println("数据汇总完成");
}
}
说明:
- CountDownLatch 用来等待所有线程完成。
latch.await()
会阻塞主线程,直到计数器归零。 - 在所有线程完成任务后,主线程执行数据汇总操作。
2. CyclicBarrier
CyclicBarrier
适用于多个线程在某个同步点上等待,直到所有线程都到达同步点后一起继续执行。
场景:
假设我们有一个任务调度系统,需要定时对多个任务进行处理,并在每个调度周期结束时统一处理结果。
伪代码:
// 初始化 CyclicBarrier,等待 5 个线程
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
// 所有线程到达屏障点后执行的操作,如汇总处理结果
processResults();
});
// 启动 5 个线程处理任务
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
processTask(); // 处理任务
barrier.await(); // 等待其他线程到达同步点
});
}
适用场景:
- 批处理任务:当多个任务需要并行执行,在每个周期结束时一起汇总。
- 多阶段任务:多个任务按顺序执行,每个阶段结束时统一处理结果。
示例代码:
java复制代码import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) throws InterruptedException {
// 初始化线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 初始化 CyclicBarrier,等待 5 个线程完成
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
// 所有线程到达屏障点后执行的操作
System.out.println("所有线程到达同步点,开始汇总结果");
processResults();
});
// 启动 5 个线程处理任务
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
processTask(); // 处理任务
barrier.await(); // 等待其他线程到达同步点
});
}
// 关闭线程池
executor.shutdown();
}
private static void processTask() {
try {
// 模拟处理任务
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 任务处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void processResults() {
// 汇总结果的逻辑
System.out.println("所有任务处理完成,开始汇总结果");
}
}
说明:
- CyclicBarrier 用于多个线程在每个阶段同步。所有线程到达同步点后,主线程执行汇总操作。
3. CompletableFuture
CompletableFuture
用于处理异步任务,能够灵活地组合多个异步任务,并在任务完成时执行后续操作。
场景:
在一个数据同步系统中,需要从多个外部系统异步拉取数据,所有数据拉取完成后进行统一处理。
伪代码:
// 启动多个异步任务从不同系统拉取数据
CompletableFuture<Void>[] futures = new CompletableFuture[5];
for (int i = 0; i < 5; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
fetchDataFromSystem(); // 从系统拉取数据
});
}
// 等待所有异步任务完成
CompletableFuture.allOf(futures).join();
// 所有数据拉取完成后进行汇总处理
processFetchedData();
适用场景:
- 异步数据处理:当多个任务并发执行,并希望在所有任务完成后执行后续操作。
- 非阻塞任务:提高并发性,避免线程阻塞。
示例代码:
java复制代码import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 初始化线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 启动多个异步任务从不同系统拉取数据
CompletableFuture<Void>[] futures = new CompletableFuture[5];
for (int i = 0; i < 5; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
fetchDataFromSystem(); // 异步从系统拉取数据
}, executor);
}
// 等待所有异步任务完成
CompletableFuture.allOf(futures).join(); // 阻塞,直到所有任务完成
// 所有数据拉取完成后进行汇总处理
System.out.println("所有数据拉取完成,开始数据汇总...");
processFetchedData();
// 关闭线程池
executor.shutdown();
}
private static void fetchDataFromSystem() {
try {
// 模拟从外部系统拉取数据
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 数据拉取完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void processFetchedData() {
// 数据汇总逻辑
System.out.println("数据汇总完成");
}
}
说明:
- CompletableFuture 可以处理异步任务并在所有任务完成后执行后续操作,适合异步编程。
4. ExecutorService 的 invokeAll() 方法
invokeAll()
提交一组任务并阻塞直到所有任务完成,适用于需要处理多个任务并等待全部任务完成的场景。
场景:
在一个文件处理系统中,需要并行处理多个文件,确保所有文件都处理完再进行汇总操作。
伪代码:
// 创建一组任务
List<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
tasks.add(() -> {
processFile(); // 处理文件
return null;
});
}
// 提交任务并等待所有任务完成
executor.invokeAll(tasks);
// 所有任务完成后执行后续操作,如文件汇总
summarizeProcessedFiles();
适用场景:
- 任务队列处理:多个任务并行执行,确保所有任务完成后再进行汇总或后续操作。
- 并发任务管理:适用于已知所有任务并希望统一处理的场景。
示例代码:
java复制代码import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceInvokeAllExample {
public static void main(String[] args) throws InterruptedException {
// 初始化线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 创建一组任务
List<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
tasks.add(() -> {
processFile(); // 处理文件
return null;
});
}
// 提交任务并等待所有任务完成
executor.invokeAll(tasks);
// 所有任务完成后执行后续操作
System.out.println("所有文件处理完成,开始汇总...");
summarizeProcessedFiles();
// 关闭线程池
executor.shutdown();
}
private static void processFile() {
try {
// 模拟文件处理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 文件处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void summarizeProcessedFiles() {
// 文件汇总逻辑
System.out.println("文件汇总完成");
}
}
说明:
- ExecutorService.invokeAll() 用于并行处理多个任务,并确保所有任务完成后执行后续操作。
5. Phaser
Phaser
提供了一种更加灵活的同步机制,支持多个阶段的任务同步,线程数也可以动态增加或减少。
场景:
在一个多阶段数据处理系统中,需要多个线程在每个阶段完成后同步进行下一阶段操作。
伪代码:
// 初始化 Phaser,指定等待线程数量为 5
Phaser phaser = new Phaser(5);
// 启动多个线程处理数据
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
processStage1Data(); // 处理阶段 1 数据
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段 1
processStage2Data(); // 处理阶段 2 数据
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段 2
processStage3Data(); // 处理阶段 3 数据
phaser.arriveAndDeregister(); // 完成最后一个阶段,注销线程
});
}
// 等待所有线程完成所有阶段
phaser.awaitAdvance(phaser.getPhase());
适用场景:
- 多阶段任务同步:在多个阶段中执行任务,确保所有线程完成一个阶段后进入下一阶段。
- 线程数动态变化:适合线程数在不同阶段动态变化的场景。
示例代码:
java复制代码import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
// 初始化线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 初始化 Phaser,指定等待线程数量为 5
Phaser phaser = new Phaser(5);
// 启动多个线程处理任务
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
processStage1Data(); // 处理阶段 1 数据
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段 1
processStage2Data(); // 处理阶段 2 数据
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段 2
processStage3Data(); // 处理阶段 3 数据
phaser.arriveAndDeregister(); // 完成最后一个阶段,注销线程
});
}
// 等待所有线程完成所有阶段
phaser.awaitAdvance(phaser.getPhase());
// 关闭线程池
executor.shutdown();
}
private static void processStage1Data() {
try {
// 模拟阶段 1 数据处理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 阶段 1 数据处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void processStage2Data() {
try {
// 模拟阶段 2 数据处理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 阶段 2 数据处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void processStage3Data() {
try {
// 模拟阶段 3 数据处理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 阶段 3 数据处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
说明:
- Phaser 允许多个阶段的同步,在每个阶段完成时,线程等待并同步进入下一个阶段,适合需要多阶段任务的场景。
总结
在 Java 中,不同的同步工具有不同的适用场景。根据项目的具体需求,选择合适的同步机制能提高并发性能并避免线程安全问题。
CountDownLatch
:用于等待所有线程完成,适合任务依赖的场景。CyclicBarrier
:适用于多线程到达某个同步点后一起继续执行,常用于周期性任务。CompletableFuture
:适用于处理异步任务,能灵活地组合任务。ExecutorService.invokeAll()
:适用于确保一组任务完成后执行汇总操作。Phaser
:适用于多阶段任务,并且支持线程动态变化的场景。
选择合适的同步工具能够提升并发效率,确保任务按预期执行。
标签:总结,同步,java,汇总,任务,线程,完成,executor,多线程 From: https://blog.csdn.net/Liu_Downloads/article/details/145134673