首页 > 其他分享 >多线程同步与任务完成等待机制总结

多线程同步与任务完成等待机制总结

时间:2025-01-14 11:28:08浏览次数:3  
标签:总结 同步 java 汇总 任务 线程 完成 executor 多线程

多线程同步与任务完成等待机制总结

在多线程编程中,合理的同步机制能够有效地协调多个线程之间的执行顺序,确保任务按照预期执行。常见的同步机制包括 CountDownLatchCyclicBarrierCompletableFutureExecutorService.invokeAll()Phaser。接下来,我们将通过具体场景加伪代码示例来介绍这些同步工具的应用。

1. CountDownLatch

CountDownLatch 适用于多个线程执行任务,并在所有线程完成时执行后续操作的场景。

场景:

假设我们有一个数据处理系统,启动时需要从多个数据源加载数据,等待所有数据源加载完成后,再进行数据汇总。

伪代码:
// 初始化 CountDownLatch,等待的数据源数量为 5
CountDownLatch latch = new CountDownLatch(5);

// 启动 5 个线程,从不同的数据源加载数据
for (int i = 0; i < 5; i++) {
    executor.submit(() -> {
        loadDataFromSource();  // 从某个数据源加载数据
        latch.countDown();  // 完成任务,计数器减 1
    });
}

// 等待所有线程完成
latch.await();  // 阻塞直到所有线程执行完毕

// 所有数据加载完成后,执行数据汇总操作
aggregateData();
适用场景:
  • 任务依赖:多个线程执行,等所有线程完成后执行统一操作,如数据处理、汇总。
  • 资源管理:比如等待多个外部资源的加载(例如数据库连接、外部 API)。
示例代码:
java复制代码import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        // 初始化线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 初始化 CountDownLatch,等待 5 个数据源加载完成
        CountDownLatch latch = new CountDownLatch(5);

        // 启动 5 个线程,从不同的数据源加载数据
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                loadDataFromSource();  // 模拟从某个数据源加载数据
                latch.countDown();  // 每个线程完成任务,计数器减 1
            });
        }

        // 等待所有线程完成
        latch.await();  // 阻塞直到所有线程执行完毕

        // 所有数据加载完成后,执行汇总处理
        System.out.println("所有数据加载完成,开始数据汇总...");
        aggregateData();

        // 关闭线程池
        executor.shutdown();
    }

    private static void loadDataFromSource() {
        try {
            // 模拟加载数据的过程
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 数据加载完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void aggregateData() {
        // 数据汇总逻辑
        System.out.println("数据汇总完成");
    }
}
说明:
  • CountDownLatch 用来等待所有线程完成。latch.await() 会阻塞主线程,直到计数器归零。
  • 在所有线程完成任务后,主线程执行数据汇总操作。

2. CyclicBarrier

CyclicBarrier 适用于多个线程在某个同步点上等待,直到所有线程都到达同步点后一起继续执行。

场景:

假设我们有一个任务调度系统,需要定时对多个任务进行处理,并在每个调度周期结束时统一处理结果。

伪代码:
// 初始化 CyclicBarrier,等待 5 个线程
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
    // 所有线程到达屏障点后执行的操作,如汇总处理结果
    processResults();
});

// 启动 5 个线程处理任务
for (int i = 0; i < 5; i++) {
    executor.submit(() -> {
        processTask();  // 处理任务
        barrier.await();  // 等待其他线程到达同步点
    });
}
适用场景:
  • 批处理任务:当多个任务需要并行执行,在每个周期结束时一起汇总。
  • 多阶段任务:多个任务按顺序执行,每个阶段结束时统一处理结果。
示例代码:
java复制代码import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample {

    public static void main(String[] args) throws InterruptedException {
        // 初始化线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 初始化 CyclicBarrier,等待 5 个线程完成
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            // 所有线程到达屏障点后执行的操作
            System.out.println("所有线程到达同步点,开始汇总结果");
            processResults();
        });

        // 启动 5 个线程处理任务
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                processTask();  // 处理任务
                barrier.await();  // 等待其他线程到达同步点
            });
        }

        // 关闭线程池
        executor.shutdown();
    }

    private static void processTask() {
        try {
            // 模拟处理任务
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 任务处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void processResults() {
        // 汇总结果的逻辑
        System.out.println("所有任务处理完成,开始汇总结果");
    }
}
说明:
  • CyclicBarrier 用于多个线程在每个阶段同步。所有线程到达同步点后,主线程执行汇总操作。

3. CompletableFuture

CompletableFuture 用于处理异步任务,能够灵活地组合多个异步任务,并在任务完成时执行后续操作。

场景:

在一个数据同步系统中,需要从多个外部系统异步拉取数据,所有数据拉取完成后进行统一处理。

伪代码:
// 启动多个异步任务从不同系统拉取数据
CompletableFuture<Void>[] futures = new CompletableFuture[5];
for (int i = 0; i < 5; i++) {
    futures[i] = CompletableFuture.runAsync(() -> {
        fetchDataFromSystem();  // 从系统拉取数据
    });
}

// 等待所有异步任务完成
CompletableFuture.allOf(futures).join();

// 所有数据拉取完成后进行汇总处理
processFetchedData();
适用场景:
  • 异步数据处理:当多个任务并发执行,并希望在所有任务完成后执行后续操作。
  • 非阻塞任务:提高并发性,避免线程阻塞。
示例代码:
java复制代码import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {

    public static void main(String[] args) {
        // 初始化线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 启动多个异步任务从不同系统拉取数据
        CompletableFuture<Void>[] futures = new CompletableFuture[5];
        for (int i = 0; i < 5; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                fetchDataFromSystem();  // 异步从系统拉取数据
            }, executor);
        }

        // 等待所有异步任务完成
        CompletableFuture.allOf(futures).join();  // 阻塞,直到所有任务完成

        // 所有数据拉取完成后进行汇总处理
        System.out.println("所有数据拉取完成,开始数据汇总...");
        processFetchedData();

        // 关闭线程池
        executor.shutdown();
    }

    private static void fetchDataFromSystem() {
        try {
            // 模拟从外部系统拉取数据
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 数据拉取完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void processFetchedData() {
        // 数据汇总逻辑
        System.out.println("数据汇总完成");
    }
}
说明:
  • CompletableFuture 可以处理异步任务并在所有任务完成后执行后续操作,适合异步编程。

4. ExecutorService 的 invokeAll() 方法

invokeAll() 提交一组任务并阻塞直到所有任务完成,适用于需要处理多个任务并等待全部任务完成的场景。

场景:

在一个文件处理系统中,需要并行处理多个文件,确保所有文件都处理完再进行汇总操作。

伪代码:
// 创建一组任务
List<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    tasks.add(() -> {
        processFile();  // 处理文件
        return null;
    });
}

// 提交任务并等待所有任务完成
executor.invokeAll(tasks);

// 所有任务完成后执行后续操作,如文件汇总
summarizeProcessedFiles();
适用场景:
  • 任务队列处理:多个任务并行执行,确保所有任务完成后再进行汇总或后续操作。
  • 并发任务管理:适用于已知所有任务并希望统一处理的场景。
示例代码:
java复制代码import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceInvokeAllExample {

    public static void main(String[] args) throws InterruptedException {
        // 初始化线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 创建一组任务
        List<Callable<Void>> tasks = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            tasks.add(() -> {
                processFile();  // 处理文件
                return null;
            });
        }

        // 提交任务并等待所有任务完成
        executor.invokeAll(tasks);

        // 所有任务完成后执行后续操作
        System.out.println("所有文件处理完成,开始汇总...");
        summarizeProcessedFiles();

        // 关闭线程池
        executor.shutdown();
    }

    private static void processFile() {
        try {
            // 模拟文件处理
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 文件处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void summarizeProcessedFiles() {
        // 文件汇总逻辑
        System.out.println("文件汇总完成");
    }
}
说明:
  • ExecutorService.invokeAll() 用于并行处理多个任务,并确保所有任务完成后执行后续操作。

5. Phaser

Phaser 提供了一种更加灵活的同步机制,支持多个阶段的任务同步,线程数也可以动态增加或减少。

场景:

在一个多阶段数据处理系统中,需要多个线程在每个阶段完成后同步进行下一阶段操作。

伪代码:
// 初始化 Phaser,指定等待线程数量为 5
Phaser phaser = new Phaser(5);

// 启动多个线程处理数据
for (int i = 0; i < 5; i++) {
    executor.submit(() -> {
        processStage1Data();  // 处理阶段 1 数据
        phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段 1

        processStage2Data();  // 处理阶段 2 数据
        phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段 2

        processStage3Data();  // 处理阶段 3 数据
        phaser.arriveAndDeregister();  // 完成最后一个阶段,注销线程
    });
}

// 等待所有线程完成所有阶段
phaser.awaitAdvance(phaser.getPhase());
适用场景:
  • 多阶段任务同步:在多个阶段中执行任务,确保所有线程完成一个阶段后进入下一阶段。
  • 线程数动态变化:适合线程数在不同阶段动态变化的场景。
示例代码:
java复制代码import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class PhaserExample {

    public static void main(String[] args) {
        // 初始化线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 初始化 Phaser,指定等待线程数量为 5
        Phaser phaser = new Phaser(5);

        // 启动多个线程处理任务
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                processStage1Data();  // 处理阶段 1 数据
                phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段 1

                processStage2Data();  // 处理阶段 2 数据
                phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段 2

                processStage3Data();  // 处理阶段 3 数据
                phaser.arriveAndDeregister();  // 完成最后一个阶段,注销线程
            });
        }

        // 等待所有线程完成所有阶段
        phaser.awaitAdvance(phaser.getPhase());

        // 关闭线程池
        executor.shutdown();
    }

    private static void processStage1Data() {
        try {
            // 模拟阶段 1 数据处理
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 阶段 1 数据处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void processStage2Data() {
        try {
            // 模拟阶段 2 数据处理
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 阶段 2 数据处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void processStage3Data() {
        try {
            // 模拟阶段 3 数据处理
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 阶段 3 数据处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
说明:
  • Phaser 允许多个阶段的同步,在每个阶段完成时,线程等待并同步进入下一个阶段,适合需要多阶段任务的场景。

总结

在 Java 中,不同的同步工具有不同的适用场景。根据项目的具体需求,选择合适的同步机制能提高并发性能并避免线程安全问题。

  • CountDownLatch:用于等待所有线程完成,适合任务依赖的场景。
  • CyclicBarrier:适用于多线程到达某个同步点后一起继续执行,常用于周期性任务。
  • CompletableFuture:适用于处理异步任务,能灵活地组合任务。
  • ExecutorService.invokeAll():适用于确保一组任务完成后执行汇总操作。
  • Phaser:适用于多阶段任务,并且支持线程动态变化的场景。

选择合适的同步工具能够提升并发效率,确保任务按预期执行。

标签:总结,同步,java,汇总,任务,线程,完成,executor,多线程
From: https://blog.csdn.net/Liu_Downloads/article/details/145134673

相关文章

  • 代码随想录算法训练营总结
            为期2个月的训练营时间,总算是一步一步的顺利结束了,撒花撒花!!!    这个训练营算是我第一次比较系统的进行学习数据结构和算法以及刷力扣,以前总是刷到一半就半途而费了,这次总算是坚持着跟着群里的打卡节奏一步一步的完结了。    对于内容来说,内......
  • C++中线程同步与互斥的4种方式介绍、对比、场景举例
    在C++中,当两个或更多的线程需要访问共享数据时,就会出现线程安全问题。这是因为,如果没有适当的同步机制,一个线程可能在另一个线程还没有完成对数据的修改就开始访问数据,这将导致数据的不一致性和程序的不可预测性。为了解决这个问题,C++提供了多种线程同步和互斥的机制。1.......
  • 基于DVB-T的COFDM+16QAM+LDPC图传通信系统matlab仿真,包括载波同步,定时同步,信道估计
    1.算法仿真效果matlab2022a仿真结果如下(完整代码运行后无水印):   图传测试:  仿真操作步骤可参考程序配套的操作视频。 2.算法涉及理论知识概要       基于DVB-T的COFDM+16QAM+LDPC码通信链路是一种常用的数字视频广播系统,用于实现高效的传输和接收。该......
  • GodoOS 知识库实现文件系统监控与同步机制
    引言在软件开发中,文件系统的实时监控和同步是一项关键任务。无论是为了实现增量备份、日志分析还是数据同步,都需要一种高效且可靠的方法来跟踪文件的更改。本文将深入探讨如何使用Go语言及其fsnotify库实现一个强大的文件系统监控和同步系统,并进一步优化其性能和可靠性。......
  • 飞轮储能系统的建模与Simulink仿真(永磁同步电机作为飞轮驱动电机)
     ......
  • c语言——【linux】多线程编程 (内附练习及代码)
    1:开启一个线程主线程中:使用标准IO,向一个文件中写入任意数据分支线程:使用标准IO,读取该文件中的数据#include<stdio.h>#include<string.h>#include<unistd.h>#include<stdlib.h>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#include<p......
  • 多线程的实现
    1.线程和进程的概述1.1什么是进程?正在执行的程序,称为进程。进程是系统分配资源的单元。现在系统支持多进程的。1.2什么是线程?线程,又称轻量级进程(LightWeightProcess)。线程是进程中的一条执行路径,也是CPU的基本调度单位。若一个程序可同一时间执行多个线程,就是支......
  • 进程、线程,java如何实现多线程任务,Thread里面常用的方法
    1.线程和进程的概述1.1什么是进程正在执行的程序,称为进程。进程是系统分配资源的单元。1.2什么是线程线程又称轻量级的进程,是CPU调度的基本单位,一个进程由一个或多个线程组成进程与进程之间无法共享数据同一个进程的线程与线程之间可以共享数据2.并发与并行的概述并......
  • 深入探讨聚合函数(COUNT, SUM, AVG, MAX, MIN):分析和总结数据的新视野
    title:深入探讨聚合函数(COUNT,SUM,AVG,MAX,MIN):分析和总结数据的新视野date:2025/1/13updated:2025/1/13author:cmdragonexcerpt:在数据分析和数据库管理领域,聚合函数(AggregateFunctions)是获取数据总结和统计信息的关键工具。聚合函数如COUNT、SUM、AVG、M......
  • 深入探讨聚合函数(COUNT, SUM, AVG, MAX, MIN):分析和总结数据的新视野
    title:深入探讨聚合函数(COUNT,SUM,AVG,MAX,MIN):分析和总结数据的新视野date:2025/1/13updated:2025/1/13author:cmdragonexcerpt:在数据分析和数据库管理领域,聚合函数(AggregateFunctions)是获取数据总结和统计信息的关键工具。聚合函数如COUNT、SUM、AVG、M......