1、ForkJoinPool简介
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架。ForkJoinPool是Java中提供了一个线程池,特点是用来执行分治任务。主题思想是将大任务分解为小任务,然后继续将小任务分解,直至能够直接解决为止,然后再依次将任务的结果合并。
ForkJoinPool是一种工作窃取算法的线程池,它特别适用于计算密集型任务,尤其是那些可以递归分解成更小子任务的问题。ForkJoinPool的原理主要基于两个核心概念:fork和join。
- Fork:当一个任务可以分解成多个子任务时,这些子任务会被“fork”出去,即它们会被分配给不同的线程执行。在Java中,这种分解通常通过继承
RecursiveTask
类来实现,子任务通过调用fork()
方法被提交到ForkJoinPool中执行。 - Join:当子任务完成后,主任务需要“join”这些子任务的结果,以便继续执行或返回最终结果。在Java中,这通常通过覆盖
compute()
方法来实现,在这个方法中,你可以获取子任务的结果并将其汇总。
ForkJoinPool的内部实现依赖于工作窃取算法(Work-Stealing Algorithm),这是一种有效的任务调度策略。具体来说,每个ForkJoinPool维护了一个工作队列,队列中的任务按照FIFO(先进先出)的方式被处理。此外,ForkJoinPool还会创建一些内部线程,这些线程会尝试从其他队列中“偷取”任务来执行,从而实现了工作负载的平衡。这种策略能够有效地利用系统资源,尤其是在多核处理器上,可以显著提高并行计算的效率。
ForkJoinPool的适用场景主要是那些可以自然分解成多个子任务的问题,例如大数据处理、图像处理等计算密集型任务。通过合理地分解任务并利用ForkJoinPool的并行处理能力,可以显著提高程序的执行效率。
在处理大文件清单导入任务时,单线程处理方式可能效率低下,尤其是当文件非常大时。Java的ForkJoinPool提供了一种高效的并行处理框架,可以显著提升大文件处理的速度。本文将介绍如何使用ForkJoinPool对大文件清单导入进行优化,并提供详细的Java代码示例。
2、ForkJoinPool原理
ForkJoinPool 是 Java 提供的一种用于并行执行任务的线程池,专为实现分治法(Divide and Conquer)的任务而设计。它的核心理念是将一个大任务拆分成多个小任务,并行执行这些小任务,然后合并结果。
ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的任务框架。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果。得到最终的结果。其广泛用在java8的stream中。
这个描述实际上比较接近于单机版的map-reduce。都是采用了分治算法,将大的任务拆分到可执行的任务,之后并行执行,最终合并结果集。区别就在于ForkJoin机制可能只能在单个jvm上运行,而map-reduce则是在集群上执行。此外,ForkJoinPool采取工作窃取算法,以避免工作线程由于拆分了任务之后的join等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。这里涉及到的两个基本知识点是分治法和工作窃取。
2.1 分治法思想
ForkJoinPool 的工作基于分治法思想:
分割(Fork):将一个大任务递归地拆分成若干个更小的子任务。
执行(Execute):并行地执行这些子任务。
合并(Join):在所有子任务完成后,将它们的结果合并,得到最终结果。
分治法的基本思想是一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。简单的问题,可以用二分法来完成。
二分法,就是我们之前在检索的时候经常用到的Binary Search 。这样可以迅速将时间复杂度从O(n)降低到O(log n)。那么对应到ForkJoinPool对问题的处理也如此。
基本原理如下图:
2.2、工作窃取思想
ForkJoinPool 的核心是工作窃取算法(Work-Stealing Algorithm)。
这一算法的主要特点包括:
- 工作队列:每个工作线程都有一个双端队列(Deque)来存储任务。当一个任务被分割成多个子任务时,子任务会被压入工作线程的队列中。
- 窃取任务:如果某个工作线程完成了自己的任务并且队列为空,它会尝试从其他繁忙工作线程的队列末尾窃取任务。这种任务窃取机制有效地平衡了各个工作线程的负载,提高了并行处理效率。
- 任务执行:工作线程从队列头部取任务执行,自底向上地执行子任务(LIFO),从其他队列窃取任务时则从队列尾部取任务(FIFO),以减少任务之间的依赖。
工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。
在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。
一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。
示意图如下:
3 ForkJoinPool原理实现
ForkJoinPool线程池,其执行任务的线程对象是ForkJoinWorkerThread子类,任务均被包装为ForkJoinTask的子类.
Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。
3.1 ForkJoinWorkerThread类
Thread子类,其中主要内容有:线程队列创建、销毁、执行
3.1.1 线程队列创建
在构造器中通过ForkJoinPool.registerWorker方法为当前线程关联队列,队列位置为线程池队列数组的奇数位置
3.1.2 线程的销毁
通过ForkJoinPool.deregisterWorker方法进行销毁
3.1.4 线程的运行
run方法内为其主要逻辑,不贴代码了;需要在其线程队列建立后,持有数据还未申请空间之前进行线程执行,否则不做任何处理
回调方法onStart,表示线程开始执行;通过ForkJoinPool.runWorker方法来执行任务;onTermination回调方法接收异常处理;
3.2 ForkJoinTask类
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。
// 创建一个包含parallelism个并行线程的ForkJoinPool
public ForkJoinPool(int parallelism)
//以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool
public ForkJoinPool()
创建ForkJoinPool实例后,可以调用ForkJoinPool的submit(ForkJoinTask task) 或者invoke(ForkJoinTask task) 来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。
RecursiveTask 代表有返回值的任务
RecursiveAction 代表没有返回值的任务。
这些任务通过 fork() 方法进行分割,通过 join() 方法进行合并。
抽象类,实现了Future、Serializable接口;其主要内容:任务异常收集、fork-join执行流程(join也可以是invoke、get等操作,但这里就依据join来讲解)
task有以下几种状态
volatile int status;
static final int DONE_MASK = 0xf0000000;
static final int NORMAL = 0xf0000000;
static final int CANCELLED = 0xc0000000;
static final int EXCEPTIONAL = 0x80000000;
static final int SIGNAL = 0x00010000;
static final int SMASK = 0x0000ffff;
NORMAL:结束状态,正常结束,负数
CANCELLED:结束状态,用户取消,负数
EXCEPTIONAL:结束状态,执行异常,负数
SIGNAL:等待通知执行状态,正数
0 : 起始状态
3.2.1 异常收集
异常数据收集,是根据弱引用机制来处理;弱引用任务节点结构如下:
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { final Throwable ex; ExceptionNode next; final long thrower; final int hashCode; ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next,
ReferenceQueue exceptionTableRefQueue) {
super(task, exceptionTableRefQueue);
this.ex = ex; // 原始异常
this.next = next; // 相同hash的节点指针域
this.thrower = Thread.currentThread().getId(); // 线程标识
this.hashCode = System.identityHashCode(task); // 与对象地址相对应的hash
}
}
弱引用节点相关数据结构
private static final ExceptionNode[] exceptionTable; // 异常数据
private static final ReentrantLock exceptionTableLock; // 异常节点锁
private static final ReferenceQueue exceptionTableRefQueue; // 弱引用回收队列
采用的数组存储,并利用hash进行映射,单链表进行冲突解决;并在需要处理异常时,实时去除已经销毁的task节点异常;常用操作如下:
记录异常:recordExceptionalCompletion方法,在任务未完成的情况才会记录
清除当前节点异常:clearExceptionalCompletion方法
获取异常:getThrowableException,非当前线程异常,需要进行包装转换
清理无效task相关联异常:expungeStaleExceptions静态方法,清除掉回收队列中task所有相关异常节点
3.2.2 fork-join逻辑
fork方法用于向队列中保存任务;偶数任务队列中未依赖于线程,奇数队列为线程私有
public final ForkJoinTask fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
当前在ForkJoinWorkerThread线程中执行,则调用workQueue.push方法存入队列
放入线程池中队列数组中偶数位置的队列中
join方法用于阻塞获取结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
同样需要根据线程类型判断
状态小于0,也即任务已结束,则直接返回,如果是异常则会抛出异常
未执行时,不是ForkJoinWorkerThread线程内执行,以当前任务实例为锁对象,进行等待(更具体的逻辑在externalAwaitDone方法内分析)
未执行时,ForkJoinWorkerThread线程内执行;如果任务为当前线程队列的顶部(也就是最后一个提交的)且执行后处于结束状态,则返回
线程池内awaitJoin进行等待(其时可能存在窃取其它任务队列进行执行)
externalAwaitDone方法
首先尝试执行,如果满足下面条件,则会执行doExec方法(调用exec()方法进行具体执行)
CountedCompleter任务类型,则common线程池方法externalHelpComplete返回true
其它任务类型,common线程池tryExternalUnpush方法返回true
如果未执行,则通过staus原子操作+synchronized锁,进行等待
3.2.3、RecursiveTask实现
案例:计算1-10000累加之和。
使用RecursiveTask有返回值的任务,编写SumTask
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Long> {
//达到多少不在分段
private int MIN_SUB = 100;
private int start;
private int end;
/**
* 构造SumTask
*
* @param start 开始累加数
* @param end 结束累加数
*/
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start < MIN_SUB) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//拆分逻辑
int middle = (end + start) / 2;
SumTask task1 = new SumTask(start, middle);
SumTask task2 = new SumTask(middle + 1, end);
task1.fork();
task2.fork();
//等到子任务做完
long sum1 = task1.join();
long sum2 = task2.join();
sum = sum1 + sum2;
}
return sum;
}
}
测试
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class ForkJoinPoolTest {
/**
* 测试forkjoinpool
*
* @param args 参数
*/
public static void main(String[] args) {
ForkJoinPool excutor = new ForkJoinPool(10);
SumTask task = new SumTask(1, 10000);
try {
ForkJoinTask<Long> future = excutor.submit(task);
System.out.println("result:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.3 ForkJoinPool类
这里主要有一些常量的意义、队列结构、执行流程、窃取线程思路;
3.3.1 状态成员变量
volatile long ctl;
volatile int runState;
final int config;
ct1,64位,分为4段,每相邻16位为一段
- 高16位,正在处理任务的线程个数;初始化为并行数的负值(构造器中线程的并行线程数,一般来说为能创建的最大线程数)
- 次高16位,线程总数,初始化为并行数的负值
- 次低16位,线程状态,小于0时需要添加新的线程,或者说48位的位置为1时,需要添加线程
- 低16位,空闲线程对应的任务队列在队列数组的索引位置
runState,有下面几种状态,默认态为0
private static final int STARTED = 1;
private static final int STOP = 1 << 1;
private static final int TERMINATED = 1 << 2;
private static final int SHUTDOWN = 1 << 31;
config:低16位代表 并行度(parallelism),高16位:队列模式,默认是后进先出
3.3.2 线程队列
volatile WorkQueue[] workQueues
数组结构,分为线程队列和非线程队列,随机寻找位置进行创建与查找;达到WorkQueue均匀处理,以减少WorkQueue同步开销
volatile int scanState; // 负数:inactive, 非负数:active, 其中奇数代表scanning
int stackPred; // sp = (int)ctl, 前一个队列栈的标示信息,包含版本号、是否激活、以及队列索引
int nsteals; // 窃取的任务数
int hint; // 一个随机数,用来帮助任务窃取,在 helpXXXX()的方法中会用到
int config; // 配置:二进制的低16位代表 在 queue[] 中的索引,高16位:mode可选FIFO_QUEUE(1 << 16)和LIFO_QUEUE(1 << 31),默认是LIFO_QUEUE
volatile int qlock; // 锁定标示位:1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // 任务列表
WorkQueue中数据结构主体:任务数组、任务队列头部、尾部;以及线程操作同步标志,使用原子操作+volatile来实现,-1表示不允许操作了、0表示可以操作、1表示正常操作
因此其方法可以分为线程安全方法、非线程安全方法;线程安全方法用于窃取,非线程安全方法用于线程内任务执行
push方法:队列尾部加入数据,非线程安全
growArray方法:数组扩容,2被扩容,非线程安全
pop方法:从尾部取出数据,原子操作保证线程安全,但不保证成功
pollAt方法:从头部取出数据,原子操作保证线程安全,但不保证成功
poll: 从头部取出数据,原子操作+自旋,保证线程安全
nextLocalTask:根据策略,进行取出数据(根据congfig来进行处理),线程安全
peek:根据出队模式返回队头或者队尾元素,但不取出,非线程安全
tryUnpush:尝试判断是否为队尾任务,线程安全,但结果不一定准确
sharedPush:共享队列(偶数位置的WorkQueue实例),队尾增加数据方法,使用qlock原子操作来实现线程安全,但不保证结果准确,其中队列扩容通过growAndSharedPush方法处理并增加数据
trySharedUnpush:判断任务是否处于队尾,原子操作保证线程安全,不保证结果准确
cancelAll: 取消所有任务
localPopAndExec:从队尾开始执行任务,原子操作+自旋来保证线程安全,存在线程竞争时,则退出,不进行处理
localPollAndExec:从队头开始执行任务,原子操作+自旋来保证线程安全,存在线程竞争时,则退出,不进行处理
runTask:执行窃取任务,并依据出队某事调用localPopAndExec或者localPollAndExec来继续本线程队列任务处理
tryRemoveAndExec:自旋+原子操作,尽可能执行线程私有队列中的任务;非队尾数据,原子操作为EmptyTask
popCC:取出队尾的CountedCompleter任务,原子操作+自旋保证线程安全
pollAndExecCC:取出队头CountedCompleter任务,并执行,原子操作+自旋保证线程安全
3.3.3 调用流程
主要有下面三个流程提交任务流程、线程执行流程、获取结果流程
从类的角度来看
- 线程池提交任务
- ForkJoinTask类的fork
从功能角度来看
- Fork线程内部提交任务
- 非Fork线程提交任务,第一个任务肯定是这种方式
外部提交任务
内部提交任务,直接调用线程私有WorkQueue对象,push方法加入队尾
3.3.4、示例
假设我们有一个包含数百万行记录的文件,需要将其导入数据库。如果采用单线程方式逐行处理,效率会非常低。通过ForkJoinPool,我们可以将文件拆分成若干个小块,并行处理每个小块,从而大大提高导入效率。
下面是一个使用ForkJoinPool对大文件清单导入进行优化的Java示例:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class LargeFileImporter {
private static final int THRESHOLD = 1000; // 每个任务处理的最大行数
public static void main(String[] args) throws IOException {
String filePath = "path/to/largefile.txt";
List<String> lines = readLines(filePath);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ImportTask mainTask = new ImportTask(lines, 0, lines.size());
forkJoinPool.invoke(mainTask);
}
private static List<String> readLines(String filePath) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
return reader.lines().toList();
}
}
static class ImportTask extends RecursiveTask<Void> {
private List<String> lines;
private int start;
private int end;
public ImportTask(List<String> lines, int start, int end) {
this.lines = lines;
this.start = start;
this.end = end;
}
@Override
protected Void compute() {
if (end - start <= THRESHOLD) {
importLines(lines.subList(start, end));
} else {
int mid = (start + end) / 2;
ImportTask leftTask = new ImportTask(lines, start, mid);
ImportTask rightTask = new ImportTask(lines, mid, end);
invokeAll(leftTask, rightTask);
}
return null;
}
private void importLines(List<String> lines) {
// 这里可以进行实际的导入操作,例如插入数据库
for (String line : lines) {
// Insert into database
System.out.println("Importing: " + line);
}
}
}
}
4、ForkJoinPool应用场景分析
ForkJoinPool 是 Java 7 引入的一种用于并行处理任务的线程池,特别适合分治法的任务。它的设计旨在通过将大任务拆分成小任务并行执行,来提高处理效率。
以下探讨 ForkJoinPool 的优缺点以及适用的应用场景。
4.1 优点
- 高效的任务分割与合并:ForkJoinPool 支持递归地将大任务分割成更小的子任务,并在子任务完成后合并结果。这种分而治之的方式使得处理大规模数据变得更加高效。
- 工作窃取算法:ForkJoinPool 采用工作窃取算法(work-stealing algorithm),当某个工作线程完成任务时,它会从其他繁忙的工作线程那里“窃取”任务。这种机制有效地平衡了各个线程的负载,提高了 CPU 的利用率。
- 适用于多核处理器:ForkJoinPool 专为多核处理器设计,能够充分利用多核优势,显著提升多线程任务的执行效率。
- 简化并行编程:通过 ForkJoinTask 类和其子类 RecursiveTask 和 RecursiveAction,开发者可以更方便地编写并行任务,降低了并行编程的复杂性。
4.2 缺点
- 任务分割的开销:虽然 ForkJoinPool 适合处理大任务,但如果任务过于细小,任务分割和合并的开销可能会超过任务本身的执行时间,反而降低效率。
- 复杂的调试与监控:并行任务调试和监控相较于单线程任务要复杂得多,特别是任务之间的依赖关系和同步问题,可能会导致难以发现和解决的并发错误。
- 内存消耗:ForkJoinPool 在处理大任务时可能会占用大量内存,尤其是在递归深度较大的情况下,每个任务需要存储状态信息,可能会导致堆栈溢出或内存不足的问题。
4.3 应用场景
- 递归算法:ForkJoinPool 非常适合应用于递归算法,例如归并排序、快速排序和斐波那契数列等,这些算法可以通过递归分解任务并行执行。
- 大规模数据处理:对于需要处理大量数据的场景,例如大文件的读取和解析、大型数组或集合的计算等,ForkJoinPool 可以显著提高处理效率。
- 图像处理:在图像处理领域,许多操作(如滤波、变换等)可以分解为对图像块的并行处理,这种场景非常适合使用 ForkJoinPool。
- 复杂计算:对于需要进行复杂计算且计算过程可以分解为独立子任务的场景,ForkJoinPool 提供了一种高效的并行处理方式。
5、总结
通过ForkJoinPool,我们可以有效地将大文件清单导入任务并行化,从而显著提升处理效率。
本文提供的示例展示了如何使用ForkJoinPool进行任务拆分和并行处理,实际应用中可以根据具体需求进行进一步优化和扩展。
标签:队列,ForkJoinPool,对大,int,任务,导入,线程,final From: https://blog.csdn.net/Rookie_CEO/article/details/140599014