首页 > 其他分享 >Fork/Join框架运行原理

Fork/Join框架运行原理

时间:2022-11-04 12:32:10浏览次数:71  
标签:Fork Join 框架 int ForkJoinTask 队列 任务 线程 null


Fork/Join框架的入门可以参考​​Fork/Join框架​​。Fork/Join框架的核心类有两个一个是ForkJoinPool,它主要负责执行任务;一个是ForkJoinTask,主要负责任务的拆分和结果的合并;

ForkJoinPool

它和ThreadPoolExecutor一样也是一个线程池的实现,并且同样实现了Executor和ExecutorServiceie接口,类图如下:

Fork/Join框架运行原理_Fork/Join框架运行原理

核心内部类 WorkQueue

static final class WorkQueue {

// 队列初始容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

// 队列最大容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

volatile int qlock; // 1: locked, < 0: terminate; else 0
// 下一次出队的索引位
volatile int base; // index of next slot for poll
// 下一次入队索引位
int top; // index of next slot for push
// 存放任务的容器
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
// 执行当前队列任务的线程
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null

WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}

// 入队
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

// 初始化扩展扩容ForkJoinTask<?>[]
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
do { // emulate poll from old array, push to new array
ForkJoinTask<?> x;
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null))
U.putObjectVolatile(a, j, x);
} while (++b != t);
}
return a;
}


// 出队
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
while ((b = base) - top < 0 && (a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (base == b) {
if (t != null) {
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
else if (b + 1 == top) // now empty
break;
}
}
return null;
}
}

​WorkQueue​​主要作用是接受外部提交的任务,并且支持工作窃取。

  • 每一个​​WorkQueue​​​对应一个​​ForkJoinWorkerThread​​来执行队列里面的任务;
  • 使用​​ForkJoinTask<?>[]​​​数组来存储任务,这个数组初始化长度是​​INITIAL_QUEUE_CAPACITY = 1 << 13 = 8192​​​,最大长度为​​MAXIMUM_QUEUE_CAPACITY = 1 << 26 = 67108864​​​,​​ForkJoinTask<?>[]​​是在第一次提交任务的时候初始化;
  • 通过​​base​​​和​​top​​分别来记录下次出队的索引位和下一次入队的索引位。

核心属性

// Instance fields
// 线程池控制位
volatile long ctl; // main pool control
// 线程池状态
volatile int runState; // lockable status
// 工作队列数组
volatile WorkQueue[] workQueues; // main registry

构造函数

private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
  • int parallelism:并发度,默认是CPU核心数
  • ForkJoinWorkerThreadFactory factory:创建工作线程的工厂类
  • UncaughtExceptionHandler handler:异常处理的策略
  • int mode:队列算法标记,0:后进先出,65536:先进先出
  • String workerNamePrefix:工作线程名前缀

## 核心方法

execute()

public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}

只是判断了一下任务是否为null,然后就调用了​​externalPush​​方法。

externalPush()

final void externalPush(ForkJoinTask<?> task) {
// ws工作队列数组;q:存放当前任务的工作队列;m:工作队列数组最后一个索引位
WorkQueue[] ws; WorkQueue q; int m;
// 获取一个随机数
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
// 找到存放当前任务的工作队列q
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
// 获取q队列上的锁
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
// a:q队列中存放任务的数组;am是数组长度;n:数组中的任务数;q:下一个入队的索引位
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
// 判断队列是否满了
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// 计算存放任务的索引位
int j = ((am & s) << ASHIFT) + ABASE;
// 存放任务
U.putOrderedObject(a, j, task);
// 更新top指针(索引位)
U.putOrderedInt(q, QTOP, s + 1);
// 解锁
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
// 尝试创建或者激活线程
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 完整版的push,处理一些不常见的情况,比如初始化工作队列数组workQueues,和新的工作队列workQueues[i]
externalSubmit(task);
}
  1. 先判断工作队列数组​​workQueues​​​数组是否为NULL,如果是直接走下面完整版的​​push​​方法
  2. 根据随机数,找到当前任务所需要放入的工作队列​​p=workQueues[i]​
  3. 如果p为NULL直接走下面完整版的​​push​​方法
  4. 获取q队列上的锁
  5. 判断队列是否满了,如果是直接走下面完整版的​​push​​方法
  6. 任务入队
  7. 解锁

​externalSubmit(task)​​​整版的push方法,处理一些不常见的情况,比如初始化和扩容工作队列数组workQueues​​workQueues​​​;新创建工作队列​​workQueues[i]​

createWorker()

在​​externalSubmit(task)​​​方法中我们创建了工作队列后,我们需要将工作队列里面的工作线程启动起来,然后处理工作队列里面的任务,最终​​externalSubmit(task)​​​方法会调用​​createWorker()​​方法创建工作线程,并启动线程。

private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 创建工作线程,并将调用registerWorker方法将工作线程和工作队列做绑定
if (fac != null && (wt = fac.newThread(this)) != null) {
// 启动工程线程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 最后解绑工作线程和工作队列的关系
deregisterWorker(wt, ex);
return false;
}

runWorker()

​ForkJoinWorkerThread.run()​​​最终会调用​​ForkJoinPool.runWorker()​​方法,来循环的执行队列里面的任务。

final void runWorker(WorkQueue w) {
// 分配队列中存储任务的数据
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
// 自旋,执行队列中的任务
for (ForkJoinTask<?> t;;) {
// 获取任务
if ((t = scan(w, r)) != null)
// 执行任务
w.runTask(t);
// 等待任务
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}

这个方法最终回去调用到​​ForkJoinTask.exec()​​​方法,进而调用到子类​​RecursiveTask​​​和​​RecursiveAction​​​的​​compute()​​方法,任务执行。

ForkJoinTask

fork()

当我们调用ForkJoinTask的fork方法时,程序会将任务放到队列里面去,然后异步地执行这个任务。代码如下:

public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

push方法把当前任务存放到工作队列中的ForkJoinTask数组中,代码如下:

final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

join()

Join方法的主要作用是阻塞当前线程并等待获取结果,代码如下:

public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}

首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。

  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出CancellationException。
  • 如果任务状态是抛出异常,则直接抛出对应的异常。

让我们再来分析一下doJoin()方法的实现代码。

private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 执行任务
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
// 阻塞非工作线程,直到工作线程执行完毕
externalAwaitDone();
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}

在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

参考

《java并发编程的艺术》

源码

​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​

spring-boot-student-concurrent 工程



标签:Fork,Join,框架,int,ForkJoinTask,队列,任务,线程,null
From: https://blog.51cto.com/u_15861563/5823589

相关文章

  • 异步框架tornado下使用pyppeteer将动态html转化为pdf
    项目背景:云上服务器存储html,前端通过传递给后端html_url,由后端服务器获取html文件进行渲染,生成pdf,然后将pdf上传云上服务器。 使用的框架/库:tornado/pyppeteer/......
  • spring框架略解一( 3.2.13 )
    一、功能  二、结构   1.DataAccess/Integration(数据访问/集成)数据访问/集成层包括JDBC、ORM、OXM、JMS和Transactions模块,具体介绍如下。JDBC模块:......
  • 在 uniapp 项目上使用 uView UI框架
    下载uView插件官方下载地址......
  • 图解教程P1-c#和.net框架-2022年11月3日
    1.net之前VB,C,C++.WIN32.API,MFC微软基础类库,com组件对象模型。2.2002发布.net第一个版本面向对象,多平台,行业标准,安全性3..net框架构成执行环境CLRcommentlan......
  • 如何看待阿里开源的Rax框架?开源
    作者:果大链接:https://www.zhihu.com/question/54738521/answer/146132135来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。WhyRax?从201......
  • 05-分布式计算框架
    目录​​一,MapReduce​​​​1,简介​​​​2,原理​​​​2.1基本概念​​​​2.2程序执行过程​​​​2.3作业运行模式​​​​二,Spark​​​​1,简介​​​​1.1背景​......
  • 如何写一个给自己的框架写一个优雅的Java Config模块
    发博词工作时间长一点的工程师,平时的“工作生活”中肯定免不了写一些大大小小的框架和中间件。最近在看SpringSecurity的源码,由于这个框架要解决的问题比较多、复杂且零碎,......
  • pytest框架学习随想
    前言:最近一段时间在学习pytest框架,在学习过程中也遇到了很多的问题,简单跟大家分享一下1.什么是单元测试框架:(1).单元测试框架:指在软件开发当中,针对软件的最小单位(......
  • NCF(NeuCharFramework)框架的使用 当前所用框架版本(0.3.1-beta3)
    1、官网介绍:NCF-NeuCharFramework|NCF文档2、下载NCF框架代码:https://github.com/NeuCharFramework/NCF3、运行NCF框架用vs2022打开下载的NCF项目NCF\src\back-en......
  • 【OkHttp框架】OkHttp框架原理与参数配置
    一、参考资料​​Okhttp的线程池|黑白了落夜的博客​​​​OKhttpClient简单使用总结「建议收藏」-腾讯云开发者社区-腾讯云​​......