一、DS-API模块
- 执行工作流
- 定时任务执行
更新 schedule 参数
-/schedule 新增 schedule 参数 做了什么事?
将schedule参数 用ScheduleParam类进行解析
有效性校验,而后解析保存到 t_ds_schedules 表内,更新 t_ds_process_definition 表
-/online 任务上线 做了什么事?
0、校验状态
1、更新 t_ds_schedules 表状态为 上线状态
2、调用QuartzScheduler.insertOrUpdateScheduleTask 即QZ定时任务API
- 接受定时任务参数,并交由QZ处理
- 设置 ProcessScheduleTask 任务,执行业务逻辑方法
查询当前工作流的配置,然后往 Command 表内 存一条命令
-/offline
0、校验状态
1、更新 t_ds_schedules 表状态为 下线状态
2、调用QuartzScheduler.deleteScheduleTask 删除QZ内的定时任务
二、DS-MASTER模块
1、master节点启动时,启动一个守护进程
- 执行run方法
this.masterSchedulerBootstrap.start()
- 调用自己run
MasterSchedulerBootstrap.java 中
-1.1 命令转换为执行过程实例,循环遍历当前工作流的要执行的所有命令
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
- 处理命令方法
ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
- 构建ProcessInstance实例
ProcessInstance processInstance = constructProcessInstance(command, host);
根据command和参数构建进程实例
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
- 设置全局参数
processInstance.setGlobalParams(globalParams);
//插入工作流实例,ID自动生成
processInstanceDao.upsertProcessInstance(processInstance);
- 删除命令
deleteCommandWithCheck(command.getId());
-1.2 所有的实例执行实例 processInstances ,循环
- 循环创建执行器 Execute 执行器的实例(本质是一个线程)
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,processService,processInstanceDao,
nettyExecutorManager,processAlertManager,masterConfig,stateWheelExecuteThread,curingGlobalParamsService);
- 将创建的执行器线程缓存到master的map中 *****重要 下一步使用
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
- 向工作流时间队列内添加开始工作流事件 *****重要 下一步使用
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,processInstance.getId()));
//消耗缓存中的 工作流事件、工作流线程实例
- workflowEventLooper.start();
WorkflowEventLooper.java 中执行run方法
- 从事件队列中取出一个事件
workflowEvent = workflowEventQueue.poolEvent();
- 处理工作流事件
workflowEventHandler.handleWorkflowEvent(workflowEvent);
- WorkflowStartEventHandler.java文件中
- 根据事件获取工作流实例的ID 从全局的工作流缓存中 拿到 可执行的线程实例
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId())
- 根据工作流线程实例获取 工作流实例
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
- 执行工作流 workflowExecuteRunnable::call
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
*****重要*****
WorkflowExecuteRunnable.java文件中
- 构建工作流节点的Dag图
buildFlowDag();
- 初始化任务队列
initTaskQueue();
- 提交任务节点
submitPostNode(null);
submitPostNode(String parentNodeCode)
WorkflowExecuteRunnable.submitStandByTask() #文件中遍历所有的任务实例 然后转换为任务
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
BaseTaskProcessor.action
BaseTaskProcessor.submit();
submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance
submitTask(ProcessInstance processInstance, TaskInstance taskInstance)
submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance)
saveTaskInstance(TaskInstance taskInstance) #保存任务实例
2、this.eventExecuteService.start() 事件执行
- 启动一个执行器线程来处理 所有的工作流事件线程
- 遍历所有的事件线程 this.processInstanceExecCacheManager.getAll()
- 循环执行所有的事件
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
- ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);
- 实际任务执行 WorkflowExecuteRunnable.java *****重要
WorkflowExecuteRunnable.handleEvents()
- 从事件集合中取出一个事件
stateEvent = this.stateEvents.peek();
- 处理当前取出的事件
stateEventHandler.handleStateEvent(this, stateEvent)
- 将事件包装成任务,并执行任务
workflowExecuteRunnable.taskFinished(task);
- 处理成功就从集合中移除该事件
this.stateEvents.remove(stateEvent);
- WorkflowExecuteRunnable *****重要类 里面包含
标签:ProcessInstance,WorkflowExecuteRunnable,processInstance,任务,实例,DolohinScheduler,事
From: https://www.cnblogs.com/iullor/p/17420394.html