本文主要介绍在之家广告业务系统中运用任务编排治理工具的场景及其可以解决的问题,讲解任务编排框架的核心要点,以及向大家演示一个任务编排框架的基本结构,让大家对任务编排工具增强业务开发效率,提高研发质量有更深刻的理解。
1.背景
我们开始以下面的实际业务场景代入任务编排工具的使用。
广告引擎流量接入层中一个用户流量请求过来,系统会对流量进行一系列的验证,数据加工处理最终会将用户流量转化为广告引擎可召回的广告请求,具体过程如下:
从图 1 左侧图中可以看到,实际一个广告流量在真正发起广告请求时会经历反作弊,请求染色, 搜索分词,IP分词,用户画像等5个任务步骤的层层加工处理,同时第一个任务是前置条件,后面四个任务过程是可以并行处理,任务间彼此无依赖,最终这五个步骤完毕后会统一进行任务结果的聚合,也就是达到异步请求聚合的节点。
上面的任务流翻译成任务节点顺序就变成了上图 1 中右侧的内容了,针对这种一个事情有多个任务可以并发执行并最终汇聚结果的情况,是很典型的异步并发编程实践;编程实现中可以很方便使用 Java 语言 JUC 中自定义线程池 + CompletableFuture + 线程计数器CountDownLatch 来并发执行任务,计数器等待所有任务完成后汇总结果。
但如果我们业务场景再变复杂一些,比如拿部门电商平台商品详情页面展示接口数据汇聚过程举例。商详页面需要分别从商品产品侧,商品类别侧获取数据,而每一侧的数据又需要划分成不同的任务分别向不同的地方来获取数据,最后将获得数据的分类聚合的业务场景,我用一个抽象图描述下这个任务过程图如下:
图2
这个任务拆解图与上个场景中的内容来看最大的区别是多了一组异步并发任务。如果继续采用原编程方法,具体的过程代码结构简洁示例如下:
// 并行处理任务 Product 的任务 (例如:TaskA 下的任务列表)
@Resource
List<ProductTask> productTasks;
// 依赖于Item的 任务 (例如:TaskB 下的任务列表)
@Resource
List<ItemTask> itemTasks;
public void testFuture(HttpServletRequest httpServletRequest) {
DataContext dataContext = new DataContext();
dataContext.setHttpServletRequest(httpServletRequest);
// 第一个并发编程控制环节
List<Future> product = new ArrayList<>();
for (Task asyncTask : productTasks) {
Future<?> submit = threadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
product.add(submit);
}
for (Future future : product) {
try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 第二个并发编程控制环节
List<Future> item = new ArrayList<>();
for (Task asyncTask : itemTasks) {
Future<?> submit = threadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
item.add(submit);
}
for (Future future : item) {
try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 当前 2 个环节都执行完成后,下面可以进行的结果汇总过程
}
1.1
存在的问题分析
●存在两个多线程异步并发编程代码相似的处理流程,如果业务场景再复杂一些,比如有 3 个或是 4 个并发场景呢?或是某一个并发场景中存在任务上下依赖的情况,可以想象在这个过程中,运行多线程编程的代码也会变成特别臃肿,多线程处理那套模板会出现多次,且无法清晰地看清任务的流程。编程人员针对这种复杂的业务流程要处理两大类的事务,一类是多线程技术运用对任务流程的控制,一类是业务本身的逻辑实现。
●两个多线程异步并发编程代码段,比如 TaskA, TaskB 包含的子任务都实现了多线程异步并发编程。但 TaskA 与 TaskB 之间是无法并发编程的。需要 TaskA 那段完成之后,才能顺序执行 TaskB 那段。也就是 TaskB 这一整块需要等待 TaskA 整个任务的完成。但实际组装整个商详页面数据,这两个任务间是可以不用彼此等待的, 所以会产生等待时间影响系统整体 QPS。
所以遇到业务复杂的场景,我们会考虑采用分而治之的思想把业务拆解成一个个子任务块来执行,并会使用多线程技术来提效实现整个业务流转。此时多线程编程技术的运用也会变得复杂,在实际任务间存在依赖关系,并发限制,超时和错误处理,多阶段任务等情况需要一并解决的场景下,单纯地采用传统的多线程编程技术(CompletableFuture)会让编程体验下降很多,任务间,数据间的依赖并不清晰,后续维护时需要针对业务中的任务块进行更换、顺序调整或者由串行改并行时需要增加很多重构工作。
不同业务场景中的任务拆分之后,其组合的的场景也是丰富的,很多时候是串行&并行并存的场景,比如如下几个例子:
图3
所以基于上述痛点分析后,如果能有一种多线程编程技术封装,能将多个任务按照依赖关系和执行顺序组合起来的技术,这样业务编程人员在实际业务实现中能更加专注于业务本身,无需关注多线程代码结构的维护,任务执行的细节,而且还能提高开发效率和业务系统的可维护性,那将会对研发体验与研发质量都将有质的提升。
而这就是任务编排技术的范畴。
2.任务编排介绍
任务编排是一种将多个任务按照依赖关系和执行顺序组合起来的技术。可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖,复杂一点的编排之后就能形成一个工作流。
在业务系统开发中,一些业务复杂的场景需要同时处理多个任务,这些任务之间可能有先后顺序和依赖关系,此时通过任务编排技术可以很方便地组合这些任务,满足业务需求。
与传统多线程编程相比,任务编排的最大优点是易于调度和控制,因为任务编排框架通常会提供一组方法接口来处理任务之间的依赖关系和执行顺序,编程人员只需要根据任务的性质和优先级来描述整个任务流程,简化了管理和调度的复杂性。
相反,对于只利用多线程去处理任务的方式,程序拥有复杂的线程控制逻辑,修改各个线程共享数据的方法,有大量重复的多线程编程代码,这将增加编程难度和程序的不稳定性。
此外,使用任务编排的模型,可以根据任务流程进行系统分析,提出资源高峰并进行优化,从而提高系统效率。
总之,这些业务开发场景都需要用到任务编排,使用任务编排可以使得业务开发人员更加专注于业务本身,无需关注任务执行的细节,提高开发效率,改善编程体验以及增强业务系统的可维护性。
3.任务编排框架核心探究
在遇到业务复杂的场景需要同时处理多个任务的场景下,我们需要优先考虑实现一个任务编排框架来支撑原子任务按业务流来进行任务流转,优化线程之间的交互,确保任务按正确的顺序执行,从而避免进入复杂多线程编程的泥潭。那我们现在可以思考如何实现一个支持任务编排工作流的框架,这个事项中会有哪些思考呢?
以下是我列举要实现一个任务编排框架需要思考的问题:
1. 任务间的依赖关系如何维护,用什么方式可以存储这些原子任务间的彼此关系?
2. 任务间的信息如何传递,任务参数如何透传,任务结果如何传递给下一个任务?
3. 任务流程始何跑起来,如何起到控制作用;先执行什么,后执行什么,如何实现任务间并行执行,又是如何控制并发执行任务的前后顺序的?
4. 任务运行中出现了异常怎么感知,怎么处理,会对整个流程有什么影响?
下面我们针对这4类问问整体进行一个个拆解,并配以流程图及示例代码来讲解具体的框架设计,以及简洁明了的解释为何这样设计;在开始介绍前我们先有以下的前提
●前提一:该任务编排框架定位于业务系统开发过程中,用于替代传统多线程编程流程控制那个环节,目标是应用框架工具后,业务开发人员更加专注于业务本身,无需关注任务并行执行,任务流程控制的细节。这些场景有别于我们熟知且常见的大数据分析平台中的任务编排场景。
●前提二:我们已经对业务系统的原子性任务己经有了完善合理的拆分,并成功的梳理出任务间的上下依赖关系。
●前提三:这将是一个小而精的轻量级任务编排,任务并发执行的流程控制框架,你可以理解其就是对常用 JUC 工具包的二次包装,是可以提取成一种工具类,也可以抽象成一种任务编排并发执行平台。后续内容将向大家讲解流程编排这个环节的核心设计。
3.1
整体架构
图4
整个框架需要的角色相对较少,主要包含任务启动器,任务流程解析器,任务流程处理器,任务总线(TaskContext)以及业务任务块组成。
任务启动器扮演入口角色,负责整个流程的启动,是与其它系统模块交互的门户。
任务流程解析器记录着所有业务任务块间的前后依赖关系,可以理解是一堆配置,这部分配置信息可以由多种数据容器来承载。
任务流程处理器是这个任务编排的心脏,其负责从任务流程解析器中解读出编排好的任务间关系,并统一采用多线程并发编程技术来并发执行这些任务,需要对任务进行前后执行顺序进行控制,并需要对在运行中任务出现了异常进行感知。
业务任务块中是真正业务功能实现者,其声明多个回调方法让任务流程处理器在适当的时机进行调用,从而来完成业务需求的功能,最终达到业务需求目录。
任务总线会贯穿整体任务编排并发执行过程的始终,用于传递任务启动时传入的任务参数,任务配置等信息。
当所有任务执行完毕后,所有任务的执行结果会汇总,用于最终的业务返回。
3.2
任务启动入口
任务编排的启动者,启动类 TaskScheduling 是业务方调用某一个任务编排的统一入口,其负责后续的任务流的启动。
图5
该启动入口主要是给业务方指定任务启动时的环境参数,该类是整体框架的门面。整个任务编排过程全部被其封装在内部,通过提供要求的输入参数来完成编排好的任务启动。具体该类的声明示例代码如下:
public class TaskScheduling {
/**
* 用户自定义线程池
*/
private static ExecutorService executorService;
/**
* 任务编排启动入口
* @param taskContext 任务执行上下文对象,存放的有任务参数
* @param executorService 用户指定的线程池
* @param nodes 编排好任务节点列表,启动时可以只传入顶节点
* @param timeout 任务编排整体超时时长限制
* @return 返回每一个任务的结果
*/
public static Map<String, SchedulingNode> start(TaskContext taskContext, ExecutorService executorService, List<SchedulingNode> nodes,long timeout) {
//定义一个map,存放所有的node,key为node唯一name,value是该node,流程结束之后可以从value中获取node的结果
ConcurrentHashMap<String, SchedulingNode> resultNode = new ConcurrentHashMap<>();
//线程池确认
TaskScheduling.executorService = executorService;
//编排好的流程正式启动
CompletableFuture[] count = new CompletableFuture[nodes.size()];
for (int i = 0; i < nodes.size(); i++) {
SchedulingNode node = nodes.get(i);
count[i] = CompletableFuture.runAsync(()->{
node.execute(executorService,node, timeout, resultNode, taskContext);
},executorService);
}
// 等待编排好的流程全部结束
try {
CompletableFuture.allOf(count).get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// 异常的一些处理
}
return resultNode;
}
3.3
任务流程解析器
任务流程解析器模块是整体任务编排框架重要的的模块,主要有2大职责,分别是:
●按照某种数据结构提供数据容器来保存每一个被编排的任务,并能维护好任务间彼此的关系
●提供任务关系解析引擎,方便后续流程读取任务依赖的规则
能承载任务编排工作流信息的数据容器简单可以使用父子两个列表来实现,也可以使用较通用的 DAG 有向无环图这种数据结构。这里简单提下 DAG ,具体的代码落地存储图可以使用邻接矩阵和邻接表这两种数据结构,这两种示例图如下:
图6
以邻接矩阵数据结构为例:比如节点1之后有 2 个子节点分别是节点 2 和节点 4 那么可以采用邻接矩阵中 Array[1][2], Array[1][4]标识为 1 即可。当整个矩阵的点按有向无环关系标记完成后,这个矩阵内的二维空间值就维护出了任务节点间的依赖,这样一组任务编排关系就可以落地了。
此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但邻接矩阵能更快地判断连通性,所以代码实现上,我们会选择邻接矩阵,这样在判断两点之间是否有边更方便点。具体 Java 代码实现推荐大家看下开源工程 powerJob 中 DAG工作流的代码。
在初始实现一个任务编排框架时,往往会先使用简单易理解的容器来实现,下面我向大家示例采用父子两个列表的方式来承载任务编排工作流信息。
/**
* 依赖的父节点
*/
protected List<SchedulingNode> fatherHandler = new ArrayList<>();
/**
* 绑定下游的子节点
*/
protected List<SchedulingNode> sonHandler = new ArrayList<>();
public SchedulingNode<T, V> setSonHandler(List<SchedulingNode> nodes){
this.sonHandler = nodes;
return this;
}
public SchedulingNode<T, V> setFatherHandler(List<SchedulingNode> nodes){
this.fatherHandler = nodes;
return this;
}
public SchedulingNode<T, V> setSonHandler(SchedulingNode...nodes){
ArrayList<SchedulingNode> nodeList = new ArrayList<>();
for (SchedulingNode node : nodes){
nodeList.add(node);
}
return setSonHandler(nodeList);
}
public void setFatherHandler(SchedulingNode...nodes){
ArrayList<SchedulingNode> nodeList = new ArrayList<>();
for (SchedulingNode node : nodes){
nodeList.add(node);
}
setFatherHandler(nodeList);
}
分别使用 fatherHandler ,sonHandler 这两个 List 容器来保存任务编排过程中每一个任务节点的依赖父节点,绑定下游子节点的信息。可以采用如下的组织方式来维护一组任务节点流程。
// 任务节点间互相关联代码示例。
node1.setSonHandler(node2,node3);
node2.setSonHandler(node5).setFatherHandler(node1);
node3.setSonHandler(node4).setFatherHandler(node1);
node4.setSonHandler(node5).setFatherHandler(node3);
node5.setFatherHandler(node2,node4);
图7
上述代码的组装就实现了在内存中存储图片中对任务节点的依赖关系,完成了一次任务间工作流程关系的编排;有了任务的编排关系,就可以使用线程将整个任务流行起来,我们正式开始进入任务流程处理的环节。
3.4
任务流程处理器
图8
任务流程处理器是每一个业务节点的流程运行,流程间控制的心脏,其运行时通用属性与功能抽象如上图所示,每一个流程在执行时都要关注处理自身的节点属性随流程的推进行状态的变更。而这一个环节是每一个节点运行必须进行的,所以我们可以把这个重复性,统一的流程环节抽出来形成一个通用的流程处理器,后面的节点要想拥有该流程处理能力,只继要继承该类即可。
该类的定义结构整体如下:
@Data
public abstract class SchedulingNode<T,V> {
/**
* 节点的名字 默认使用类名
*/
protected String taskName = this.getClass().getSimpleName();
/**
* 节点执行结果状态
*/
ResultState status = ResultState.DEFAULT;
/**
* 将来要处理的任务节点的param参数
*/
protected T param; /**
* 节点状态
*/
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
/**
* 节点状态标记位
*/
private final AtomicInteger state = new AtomicInteger(0);
/**
* 默认节点执行结果
*/
private volatile WorkResult<V> workResult = WorkResult.defaultResult(getTaskName());
/**
* 任务执行方法
* @param executorService
* @param fromNode 来源于哪个父节点
* @param remainTime 执行该节点剩余的时间
* @param allParamUseNodes 全局所有的节点
* @param taskContext 上下文
*/
public void execute(ExecutorService executorService, SchedulingNode fromNode, long remainTime,
ConcurrentHashMap<String, SchedulingNode<T,V>> allParamUseNodes,
TaskContext taskContext) {
// 重复可复用的具体流程控制逻辑代码
...
}
}
任务流程处理器 SchedulingNode 这个抽象类中做的重要的事通俗来说就是遍历任务编排流中的各个节点,然后提交给线索程中去执行就可以,过程中需要控制节点的流转的顺序以及一些节点状态保存,结果的保存。上述代码中的 execute 方法就是该环节的入口,所有的结果存入在 allParamUseNodes map容器中,任务节点的状态都是自身节点的属性控制。整体环节的流程图分享如下:
图9
从流程图中可以看到,整体是一条顺序执行的流程,这里任务编排框架最主要的目标是要把每一个任务流中的节点执行完,为了实现任务间按编排好的顺序与依赖的方向执行,增加了很多环节的控制。控制的方向采用逆向处理的手段,优先假设自己节点前有依赖,节点后有下游,需要双方面关注他们的状态,只有自身达到可执行的时机才会执行自己的功能,否则都是优先处理子节点,而实际调用子节点的方法最终也会执行这个 execute 方法入口,只是传入的节点参数已做了变更。这一点类似不断遍历迭代的思想,来完成整个流程的运行。
现在回到最初提到的思考的问题3:任务流程始何跑起来,如何起到控制作用?实际从上述的流程图中也能看到答案,就是本案例中采用到判断节点的执行状态来判断的,因为这是跟实际业务需要有关的,任务编排了就是需要任务执行的过程有先有后,这样才能满足后一个任务依赖前一个任务的执行才可以的要求,不然大家都是并行执行的。所以任务本身会有一个非常重要的任务状态,任务执行结果状态这2个属性,通过流的推进不断判断这些状态是否达到满足下个节点执行的前提条件从而起到了控制的作用。
这一个块的流程推进,状态判断的伪代码如下:
/*如果前方有依赖,存在两种情况,以图7中的编排任务流举例说明
一种是前面只有一个依赖。即 node2 -> node5
一种是前面有多个依赖。node2 node4 -> node5。需要node2、node4都完成了才能轮到node5。
*/
//只有一个依赖
if (fatherHandler.size() == 1) {
doDependsOneJob(fromNode, taskContext);
runSonHandler(taskContext,executorService, remainTime, now);
} else {
//有多个依赖时
doDependsJobs(executorService, fromNode, fatherHandler, now, remainTime, taskContext);
}
// 任务执行前,先做状态的判断,以起到控制的作用。
private void doDependsOneJob(SchedulingNode dependWrapper, TaskContext taskContext) {
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = timeoutResult();
fastFail(INIT, null);
} else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
} else {
//前面任务正常完毕了,该自己了
runSelf(taskContext);
}
}
3.5
业务节点
业务节点是最终的任务执行,用于执行具体的业务耗时代码,所有的业务逻辑都在这里类别中实现,一个编排的任务流中有多少个节点,那么就会有多少个业务节点实例。
业务节点为了复用业务执行器的控制逻辑,所以业务节点都会继承任务流程处理器,业务节点需要声明一些回调函数,用于任务执行过程中的回调钩子,比如声明出业务任务的核心法,用于任务流程处理器调用执行真正的业务内容,又比如当任务执行成功,或是失败时是否需要回调告知给业务方,自行用于状态的感知与告警处理等。
业务节点的伪代码如下:
@Component
public class Node1 extends SchedulingNode<String,String> {
@Resource
private SampleService sampleService;
@Override
public Object task(TaskContext taskContext, String param) {
// 具体的业务代码逻辑实现
// eg: db操作,http操作等
sampleService.findItemById(...);
...
}
@Override
public void onSuccess(TaskSupport support) {
// 节点任务执行成功时的回调
...
}
@Override
public void onFail(TaskSupport support) {
// 节点任务执行失败时的回调
...
}
}
这样我们基本完成了这个任务编排框架的设计,最后可以如下来进行示例图中的任务编排以及执行:
// 任务节点间互相关联代码示例。
node1.setSonHandler(node2,node3);
node2.setSonHandler(node5).setFatherHandler(node1);
node3.setSonHandler(node4).setFatherHandler(node1);
node4.setSonHandler(node5).setFatherHandler(node3);
node5.setFatherHandler(node2,node4);
// 设置上下文
TaskContext taskContext = new TaskContext();
Map<String, SchedulingNode> results = TaskScheduling.start(5000L, taskContext, node1);
...
4.总结
任务编排框架是一种用于管理和协调多个任务执行的技术,一些业务复杂的场景需要同时处理多个任务,这些任务之间可能有先后顺序和依赖关系,此时通过任务编排技术可以很方便地组合这些任务,满足业务需求。使用任务编排可以使得业务开发人员更加专注于业务本身,无需关注任务执行的细节,提高开发效率,改善编程体验以及增强业务系统的可维护性,可扩展性。
本文分别从引入任务编排的场景以及如何设计一个任务编排框架两个方面进行了阐述,核心是想与大家分享实现任务编排这一个工具类的框架需要思考的点,以及该框架是如何起到在任务并发执行的情况下进行流程控制的作用,并以流程图的方式向大家展示了编排框架最核心的任务流转控制过程。
本文在探究任务编排框架实现原理过程中,也参考了目前开源项目的实现,主要是 Gobrs-Async, async_QY-master ,在此向该开源项目及开源作者表示感谢,谢谢他们的无私分享和贡献,同时我也想向大家分享这些项目,阅读项目源码,学习项目的设计思想并运用到实际的项目中来 ; 优秀的开源项目链接见文章末尾的参考列表。
在文章的最后,我也思考了本文中描述的框架可以有以下的优化点:
-
任务编排的过程可以由图形界面来承载,采用业务通用的 DGA 有向无环图的数据结构来表述任务间的依赖关系 -
可以对任务线程池进行监控,来了解系统的负载,资源分配情况 -
可以引入动态线程程技术,不同的任务编排技术可以进行不同的配置。 -
可以有统一的日志追踪组件,记录任务整个生命周期的 debug 日志,方便排查任务异常的原因及问题定位。 -
可以将编排框架抽象成一个任务编排平台,来承接更多的业务线的业务场景,减少重复造轮子。 -
可以将这个编排框架模块载入到现有的定时任务调度平台,将任务编排整合到任务调度平台中,针对有业务关联的多个任务进行依赖整合,使用任务的调度顺序,时间更合理,同时也减少任务调度平台的任务个数,减轻任务调度压力, 减少任务间的互斥性等待等任务管理相关的复杂度设计。 -
业务关联的任务编排成任务流,可以通过图形的方式展示出业务间任务组合全貌,任务间可以轻松进行前后流程的修改。能方便后续业务的调整,应对业务的变更具有更强的扩展性。
参考
开源项目:async_QY-master
开源项目:asyncTool
开源项目:Gobrs-Async
开源项目:powerjob
作者|刘元军