首页 > 编程语言 >Dating Java8系列之并行数据处理

Dating Java8系列之并行数据处理

时间:2024-01-12 23:44:06浏览次数:32  
标签:end Long start 任务 线程 phoneList 数据处理 Dating Java8

翎野君/文

 

图片

 

图片

分支合并框架

 

分支合并框架介绍

分支/合并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型。

要定义RecursiveTask,只需实现它唯一的抽象方法compute:

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

if (任务足够小或不可分) {   顺序计算该任务} else {     将任务分成两个子任务     递归调用本方法,拆分每个子任务,等待所有子任务完成     合并每个子任务的结果}

图片

 

图片

使用分支合并框架的例子

 

执行递增求和任务

public class CalculatorSumTask extends RecursiveTask<Long> {
// 开始的值 private Long start;
// 结束的值 private Long end;
// 阈值,即结束fork的条件 private static final Long THRESHOLD = 10000L;
public CalculatorSumTask(Long start, Long end) { this.start = start; this.end = end; }
/** * 求 start - end 之间的所有数的和 * * @return */ @Override protected Long compute() {
// 数的间隔 long length = end - start;
// 任务足够小或不可分 if (length <= THRESHOLD) { long sum = 0L; // 顺序计算该任务 for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long middle = (start + end) / 2; // 将任务分成两个子任务 CalculatorSumTask leftTask = new CalculatorSumTask(start, middle); leftTask.fork(); CalculatorSumTask rightTask = new CalculatorSumTask(middle + 1, end); rightTask.fork();
// 合并每个子任务的结果 return leftTask.join() + rightTask.join(); } }}

当把CalculatorSumTask任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把,要求和的数组分成两半,分给两个新的CalculatorSumTask,而它们也由ForkJoinPool安排执行。

因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步细分的条件(本例中是求和的项目数小于等于10000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。

图片

 

模拟发送短信

public class SendSmsTask extends RecursiveAction {
private List<String> phoneList;
private String smsText;
private static final Integer THRESHOLD = 1000;
public SendSmsTask(List<String> phoneList, String smsText) { this.phoneList = phoneList; this.smsText = smsText; }
private void send(String phoneNumber, String smsText) { try { Thread.sleep(10); System.out.println("发送:" + phoneNumber + " 内容:" + smsText); } catch (InterruptedException e) { e.printStackTrace(); } }
/** * 对一批手机号进行发短信处理 */ @Override protected void compute() { int batchSize = phoneList.size(); if (batchSize <= 1000) { for (String number : phoneList) { send(number, smsText); } } else { // 拆的逻辑和如何拆分由自己来定义 int middle = phoneList.size() / 2;
List<String> leftList = phoneList.subList(0, middle); List<String> rightList = phoneList.subList(middle, phoneList.size());
SendSmsTask leftTask = new SendSmsTask(leftList,smsText); SendSmsTask rightTask = new SendSmsTask(rightList,smsText);
leftTask.fork(); rightTask.fork();
// 下面这两步骤可以省略 leftTask.join(); rightTask.join(); } }}

 

图片

工作窃取算法

 

算法介绍

分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别。

分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。

  • 每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。

  • 基于前面所述的原因,某个线程可能已经完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。

  • 这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。

  • 这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。

  • 这就是为什么要划分成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

图片

相关代码

public class ParallelMain {
public static void main(String[] args) { Long start = Instant.now().toEpochMilli(); // 求0-10000000所有数据的和 Long sum = 0L; for (long i = 0; i <= 10000000000L; i++) { sum += i; } System.out.println(sum); Long end = Instant.now().toEpochMilli(); System.out.println("for循环单线程执行 耗费时间:" + (end - start));
// 获取Jvm最大的可用线程数 System.out.println(Runtime.getRuntime().availableProcessors());
Long start1 = Instant.now().toEpochMilli();
// 要处理的数据量的太小 ForkJoinPool forkJoinPool = new ForkJoinPool(); Long sum1 = forkJoinPool.invoke(new CalculatorSumTask(0L,10000000000L)); System.out.println(sum1);
Long end1 = Instant.now().toEpochMilli(); System.out.println("fork join 并行执行 耗费时间:" + (end1 - start1));
Long start2 = Instant.now().toEpochMilli();
Long sum2 = LongStream.rangeClosed(0L,10000000000L).parallel().reduce(0L,Long::sum); System.out.println(sum2);
Long end2 = Instant.now().toEpochMilli(); System.out.println("java8 parallel 并行执行 耗费时间:" + (end2 - start2)); }}

 

图片

小结

 

  • 分支/合并框架使用递归的方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。

  • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。

  • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是反直觉的,因此一定要测量,确保你并没有把程序变得更慢。

  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。

 

作者:给我馍馍
博客:https://www.cnblogs.com/givemomo/

 

本篇文章如有帮助到您,请给「给我馍馍」点个赞,感谢您的支持。

标签:end,Long,start,任务,线程,phoneList,数据处理,Dating,Java8
From: https://www.cnblogs.com/givemomo/p/17961799

相关文章

  • 工程监测领域振弦采集仪的数据处理与分析方法探讨
    工程监测领域振弦采集仪的数据处理与分析方法探讨在工程监测领域,振弦采集仪是常用的一种设备,用于测量和记录结构物的振动数据。数据处理和分析是使用振弦采集仪得到的数据的重要环节,可以帮助工程师了解结构物的振动特性,评估结构的安全性,以及检测结构的健康状况。下面是关于振弦采......
  • 工程监测领域振弦采集仪的数据处理与分析方法探讨
    工程监测领域振弦采集仪的数据处理与分析方法探讨在工程监测领域,振弦采集仪是常用的一种设备,用于测量和记录结构物的振动数据。数据处理和分析是使用振弦采集仪得到的数据的重要环节,可以帮助工程师了解结构物的振动特性,评估结构的安全性,以及检测结构的健康状况。下面是关于振弦采集......
  • java8日期时间格式化DateTimeFormatter多个格式
    原文地址:datetimeformatter.ofpatternmultipleformats-掘金DateTimeFormatter 是一个用于日期时间格式化和解析的类。使用 ofPattern 方法可以创建一个格式化器,该方法接受一个日期时间格式的字符串作为参数。如果您需要在同一个 DateTimeFormatter 对象中支持多种不同的......
  • 低代码开发中的文件上传与数据处理:实战指南
    在当今的信息化时代,数据已成为企业的重要资产。为了更好地管理和利用这些数据,许多企业开始采用表单上传组件来导入和处理数据。通过使用表单上传组件,用户可以方便地将文件上传至系统中,然后进行后续的数据处理和分析。这种方式的优点在于,用户无需掌握复杂的编程技术,即可完成数据导入......
  • 数据可扩展性与实时数据处理:技术与案例
    1.背景介绍数据可扩展性和实时数据处理是当今数据科学和人工智能领域的关键技术。随着数据规模的不断增长,传统的数据处理方法已经无法满足需求。因此,数据可扩展性技术成为了必须研究的领域。同时,随着互联网的普及和人们对实时信息的需求不断增加,实时数据处理技术也成为了关键技术。......
  • 数据中台的数据处理及应用说明
    科技飞速发展的时代,企业信息化建设会越来越完善,越来越体系化,当今数据时代背景下更加强调、重视数据的价值,以数据说话,通过数据为企业提升渠道转化率、改善企业产品、实现精准运营,为企业打造自助模式的数据分析成果,以数据驱动决策。数据中台把数据统一之后,会形成标准数据,再进行存储,形......
  • 大数据处理:高性能计算在数据挑战中的应用
    1.背景介绍大数据处理是指利用计算机科学和应用技术来处理和分析大规模、高速、多源、不确定性和复杂性高的数据集。大数据处理的核心挑战是如何在有限的时间和资源内,高效地处理和分析这些复杂的数据。高性能计算(HighPerformanceComputing,HPC)是一种计算机科学技术,旨在解决那些......
  • java8中object转list
    Java8中Object转List的实现概述在Java8中,我们可以使用StreamAPI将一个Object对象转换为List集合。本文将介绍如何使用StreamAPI实现此功能,并提供相应的示例代码。实现步骤下面是实现"Java8中Object转List"的步骤,我们可以使用以下表格形式展示:步骤描述1创建一个Obj......
  • java8找集合中最小的
    Java8找集合中最小的简介在Java编程中,我们经常需要在一个集合中寻找最小的元素。在Java8中,我们可以使用StreamAPI来实现这个功能。StreamAPI是Java8中引入的一个强大的功能,它可以让我们以一种更简洁、更易读的方式处理集合数据。本文将介绍如何使用Java8的StreamAPI来找到一个......
  • Java8 原子类 AtomicInteger 源码阅读
    AtomicInteger 是用 CAS(Compre And Swap,乐观锁)构造的一个 原子类。1. CAS CAS(CompareandSwap)比较并替换,CAS是实现乐观锁的一个重要操作。CAS是一个硬件指令,保证是原子操作,Java中通过UnSafe来实现。详细可一下我的这篇博文:传送。CAS 的基本步骤:执行函数CAS(V,E,N......