什么是Fork/Join
Fork/Join 是JUC并发包下的一个并行处理框架,实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。
Fork/Join的运行流程大致如下所示:
需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止,这里体现了分而治之(divide and conquer) 的算法思想。
工作窃取算法
工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。
工作窃取流程如下图所示:
值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。
另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。
Fork/Join 实践
前面说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。
ForkJoinTask
ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。
fork()方法:使用线程池中的空闲线程异步提交任务
public final ForkJoinTask<V> fork() {
Thread t;
// ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
// 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则将线程加入队列
// 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
ForkJoinPool.common.externalPush(this);
return this;
}
其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里。
join()方法:等待处理任务的线程处理完毕,获得返回值。
我们看下join()的源码:
public final V join() {
int s;
// doJoin()方法来获取当前任务的执行状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 任务异常,抛出异常
reportException(s);
// 任务正常完成,获取返回值
return getRawResult();
}
/**
* doJoin()方法用来返回当前任务的执行状态
**/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
return (s = status) < 0 ? s :
// 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
// tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
// doExec()方法执行任务
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 如果是处于顶端并且任务执行完毕,返回结果
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
// awaitJoin():使用自旋使任务执行完成,返回结果
wt.pool.awaitJoin(w, this, 0L) :
// 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
externalAwaitDone();
}
我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:
RecursiveAction和RecursiveTask
通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。
两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask。
此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。
ForkJoinPool
ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。
ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。
我们来大致看下ForkJoinPool的源码:
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
// 任务队列
volatile WorkQueue[] workQueues;
// 线程的运行状态
volatile int runState;
// 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
static final ForkJoinPool common;
// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
// 其他构造方法都是源自于此方法
// parallelism: 并行度,
// 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory, // 工作线程工厂
UncaughtExceptionHandler handler, // 拒绝任务的handler
int mode, // 同步模式
String workerNamePrefix) { // 线程名prefix
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
}
WorkQueue:
双端队列,ForkJoinTask存放在这里。
runState:
ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。
当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。
ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。
Fork/Join的使用
上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。
下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:
斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:
1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······
如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。
public class FibonacciTest {
class Fibonacci extends RecursiveTask<Integer> {
int n;
public Fibonacci(int n) {
this.n = n;
}
// 主要的实现逻辑都在compute()里
@Override
protected Integer compute() {
// 这里先假设 n >= 0
if (n <= 1) {
return n;
} else {
// f(n-1)
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
// f(n-2)
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
// f(n) = f(n-1) + f(n-2)
return f1.join() + f2.join();
}
}
}
@Test
public void testFib() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
long start = System.currentTimeMillis();
Fibonacci fibonacci = new Fibonacci(40);
Future<Integer> future = forkJoinPool.submit(fibonacci);
System.out.println(future.get());
long end = System.currentTimeMillis();
System.out.println(String.format("耗时:%d millis", end - start));
}
}
上面例子的输出:
- CPU核数:4
- 计算结果:102334155
- 耗时:9490 ms
- 需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。
总结
并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率。因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。
如果要计算的任务比较简单,直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。
标签:fork,join,一文,队列,ForkJoinPool,ForkJoinTask,任务,线程,执行 From: https://www.cnblogs.com/GentleJim/p/17520137.html