首页 > 其他分享 >DolohinScheduler 分布式任务调度框架 代码流程分解

DolohinScheduler 分布式任务调度框架 代码流程分解

时间:2023-05-22 13:44:52浏览次数:55  
标签:ProcessInstance WorkflowExecuteRunnable processInstance 任务 实例 DolohinScheduler 

一、DS-API模块

- 执行工作流



	
- 定时任务执行
	
	更新 schedule 参数
			
	-/schedule 新增 schedule 参数 做了什么事?
				将schedule参数 用ScheduleParam类进行解析
				
				有效性校验,而后解析保存到 t_ds_schedules 表内,更新 t_ds_process_definition 表
			

	-/online 任务上线 做了什么事?
			0、校验状态
			1、更新 t_ds_schedules 表状态为 上线状态
			2、调用QuartzScheduler.insertOrUpdateScheduleTask 即QZ定时任务API
				- 接受定时任务参数,并交由QZ处理
				- 设置 ProcessScheduleTask 任务,执行业务逻辑方法
					查询当前工作流的配置,然后往 Command 表内 存一条命令
				
				
	-/offline
			0、校验状态
			1、更新 t_ds_schedules 表状态为 下线状态
			2、调用QuartzScheduler.deleteScheduleTask 删除QZ内的定时任务
			

二、DS-MASTER模块


	1、master节点启动时,启动一个守护进程
		- 执行run方法
	this.masterSchedulerBootstrap.start()
		
		- 调用自己run
		
			MasterSchedulerBootstrap.java 中
			-1.1 命令转换为执行过程实例,循环遍历当前工作流的要执行的所有命令
			List<ProcessInstance> processInstances = command2ProcessInstance(commands);
				
				- 处理命令方法
				ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
					
					- 构建ProcessInstance实例
					ProcessInstance processInstance = constructProcessInstance(command, host);
					
						
						根据command和参数构建进程实例
						processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
						
							- 设置全局参数
							processInstance.setGlobalParams(globalParams);
						
					//插入工作流实例,ID自动生成
					processInstanceDao.upsertProcessInstance(processInstance);
						
					- 删除命令
					deleteCommandWithCheck(command.getId());
			
			
			-1.2 所有的实例执行实例 processInstances ,循环
			
				- 	循环创建执行器 Execute 执行器的实例(本质是一个线程)
					WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,processService,processInstanceDao,
                                nettyExecutorManager,processAlertManager,masterConfig,stateWheelExecuteThread,curingGlobalParamsService);
				-	将创建的执行器线程缓存到master的map中	*****重要 下一步使用
                    processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
				- 	向工作流时间队列内添加开始工作流事件     *****重要 下一步使用
					workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,processInstance.getId()));
					
		
		//消耗缓存中的 工作流事件、工作流线程实例 
		- workflowEventLooper.start();
		
			WorkflowEventLooper.java 中执行run方法
				- 从事件队列中取出一个事件
				workflowEvent = workflowEventQueue.poolEvent();
				- 处理工作流事件
				workflowEventHandler.handleWorkflowEvent(workflowEvent);
		
					- WorkflowStartEventHandler.java文件中
					
					- 根据事件获取工作流实例的ID 从全局的工作流缓存中 拿到 可执行的线程实例
					WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId())
					
					- 根据工作流线程实例获取 工作流实例
					ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
					
					- 执行工作流 workflowExecuteRunnable::call
					CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
						
						*****重要*****
							
						WorkflowExecuteRunnable.java文件中
						
						- 构建工作流节点的Dag图
						buildFlowDag();
													
						- 初始化任务队列
						initTaskQueue();
						
						- 提交任务节点
						submitPostNode(null);
							submitPostNode(String parentNodeCode) 
								WorkflowExecuteRunnable.submitStandByTask() #文件中遍历所有的任务实例 然后转换为任务
									Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
											boolean submit = taskProcessor.action(TaskAction.SUBMIT);
												BaseTaskProcessor.action
													BaseTaskProcessor.submit();
														submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance
															submitTask(ProcessInstance processInstance, TaskInstance taskInstance)
																submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance)
																	saveTaskInstance(TaskInstance taskInstance) #保存任务实例
		
		
		
		
		
		
	2、this.eventExecuteService.start() 事件执行
	
		- 启动一个执行器线程来处理 所有的工作流事件线程
		
			- 遍历所有的事件线程 this.processInstanceExecCacheManager.getAll()
				- 循环执行所有的事件
				workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
				
					- ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);
						
						- 实际任务执行 WorkflowExecuteRunnable.java   *****重要
						
						WorkflowExecuteRunnable.handleEvents()
						
							- 从事件集合中取出一个事件
							stateEvent = this.stateEvents.peek();
							
							- 处理当前取出的事件
							stateEventHandler.handleStateEvent(this, stateEvent)
							
								
								- 将事件包装成任务,并执行任务
								workflowExecuteRunnable.taskFinished(task);
							
							
							
							
							- 处理成功就从集合中移除该事件
							this.stateEvents.remove(stateEvent);
						
							
						- WorkflowExecuteRunnable *****重要类 里面包含 

标签:ProcessInstance,WorkflowExecuteRunnable,processInstance,任务,实例,DolohinScheduler,
From: https://www.cnblogs.com/iullor/p/17420394.html

相关文章

  • 单体框架、分布式框架、微服务框架
     单体框架、分布式框架、微服务框架1.单体框架1.概念将业务功能集中在一个项目中开发,打包部署。(意思就是说,所有的功能在一个项目中进行实现,不用管复杂的架构设计,只需要创建一个项目,有功能就往这个项目里面加代码就ok了)2.优点1、框架简单,不需要搞复杂的框架设计2、部署成......
  • 聊聊Seata分布式解决方案AT模式的实现原理
    什么是Seata分布式事务解决方案Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。AT模式AT模式目前来看是Seata框架独有的一种模式,其它的分布式框架上并没有此种模式......
  • 性能测试-JMeter分布式测试及其详细步骤
    性能测试概要性能测试是软件测试中的一种,它可以衡量系统的稳定性、扩展性、可靠性、速度和资源使用。它可以发现性能瓶颈,确保能满足业务需求。很多系统都需要做性能测试,如Web应用、数据库和操作系统等。性能测试种类非常多,有些概念也很相近:LoadTestingBaselineTestingSmok......
  • 分布式和集群部署架构设计
    一、微服务部署架构设计方案1、先采用微服务架构设计,将工程拆分成多个模块(通过接口彼此调用,降低代码的耦合度)2、后采用分布式工作方式将拆分开的各个模块分别部署到多台服务器上(通过缩短单个任务的执行时间来提升效率)3、再采用集群物理方式将各个模块部署到多台服务器上(通过提......
  • 分布式系统关键技术:流量与数据调度
    1、流量调度与服务治理的关系服务治理时内部系统的事,流量调度可以是内部的,更是外部接入层的事。服务治理时数据中心的事,而流量调度要做的好,应该是数据中心之外的事,也就是我们常说的边缘计算或者CDN。 2、流量调度的主要功能和关键技术流量调度系统应该主要具备的功能:依据系......
  • 分布式事务一致性与本地消息表
    我个人比较推崇本地消息表模式来实现最终一致性。首先本地消息表的设计不仅可以解决事务一致性的问题,对于消息队列常见问题中的消息丢失与消息幂等其实都是可以通过本地消息表来解决;其带来的好处是多重的。什么是分布式事务一致性大白话就是对数据源进行拆分后,多库多机器的多数......
  • 分布式系统关键技术:服务调度
    1、服务关键程度服务关键程度,主要是梳理和定义服务的重要程度。这不是使用技术可以完成的,而是需要细致地管理对业务的理解,才能定义出架构中各个服务的重要程度。 2、服务依赖关系梳理出服务间的依赖关系,这点也非常重要。我们常说,“没有依赖,就没有伤害”。这句话的意思就是说......
  • java-raft框架之atomix进行分布式管理
    共识算法在一个分布式的系统中,管理各个节点的一致性(共识)一直是个很有难度的问题。在近几十年的发展中,于1990年诞生的Paxos算法是其中最为经典的代表,并一统江湖数几十载。如著名的zookeeper、chubby都是基于Paxos算法的经典应用。不过Paxos算法的复杂度和难以理解性,也吸引了一些大佬......
  • 分布式事务理解
    分布式系统:是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。分布式事务:是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。根据分布式系统CAP定理和base理论可知,分布式系统只......
  • 银行核心系统如何选型分布式数据库(含6大落地要点验证)
    银行核心系统如何选型分布式数据库(含6大落地要点验证)dbaplus社群 2022-06-24 投诉阅读数:854来自专栏:数据库全能进阶共38篇a​​本文根据洪烨老师在〖deeplus直播:金融业数据库转型与国产化改造〗线上分享演讲内容整理而成。(文末有回放及PPT获取方式,不要错过)  随......