标签:执行器 admin 自动检测 time update job JobRegistryHelper Executor
XXL-Job 自动检测执行器周期,执行器上下线的源码分析
XXL-Job 的自动注册和心跳检测代码相当简洁,本文继续分析它的 JobRegistryHelper 类是如何实现的,虽然只有短短的两百多行,但是值得一看。
本文要阐述两个问题:
- XXL-Job 是如何检测下线 Executor 且更新集群信息的?
- XXL-Job 的执行器心跳检测是谁发起的,后台做了什么操作?
JobRegistryHelper 类图结构
JobRegistryHelper 实现上面两个功能,它的类图结构:
拆解该类:
public class JobReqistryHelper (
private static Logger logger= LoggerFactory.getlogger(JobRegistryHelper.class);
private static JobRegistryHelper instance = new JobReqistryHelper();public static JobReqistryHelper getInstance() ( return instance;
private ThreadpoolExecutor reqistryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean tostop = false;
- 一个单例类。
- 有一个线程池
registryOrRemoveThreadPool,异步处理 Executor 的注册请求,更新 Registry 表的
update_time 字段。
- 有一个轮询线程
registryMonitorThread,轮询
findDead 的结果集删除失联记录,遍历
findAll 存活状态的执行器更新 Group 集群数据的
address_list 字段。
检测下线执行器并更新集群的流程
该类启动了一个轮询线程 registryMonitorThread,它的 run 只做了两件事:
- findDead 查找 90 秒以内未更新过的执行器,视为无效记录,并删除;
- findAll查询有效执行器,以执行器的
app_name 归并到 Group 表中对应集群的
address_list 字段。
有这个线程,控制平台就能自动感应执行器的上下线了,用到的两个 SQL 可以看看:
<select id="findDead" parameterType="java.util.HashMap" resultType="java.lang.Integer" > SELECT t.id FROM xxl_job_registry AS t WHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND) </select> <select id="findAll" parameterType="java.util.HashMap" resultMap="XxlJobRegistry"> SELECT <include refid="Base_Column_List" /> FROM xxl_job_registry AS t WHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND) </select> 复制代码
findDead 的条件是 update_time < nowTime - timeout【90秒】,即上次更新时间为 90 秒之前,而执行器端的心跳周期是 30 秒,超过这个时间,视为失联。
findAll 则相反,update_time > nowTime - timeout【90秒】。
值得注意的是,改造成 Oracle 和 Postgre 时,如果不用传递的 nowTime ,而用数据库的 NOW() 或者 sysdate ,那么必须保证数据库服务器的时间和执行器主机时间相差不超过 90 秒,否则 admin 将永远检测不到存活状态的执行器。
执行器注册流程
最初跟踪心跳检测的过程,从 Executor 端找 beat 关键字开始,但这个心跳请求是 admin 端在触发调度 run 之前发起的,并没有涉及到 Registry 表的更新。所以放弃该思路,从 admin 端 DAO 的调用入手。
XxlJobRegistryDao 类操作 Registry 表的 registryUpdate 方法只在一个地方被调用了,就是 JobRegistryHelper 异步提交任务的代码: 继续跟踪 registry ,一路就到了 admin 模块的 API 部分: 它就是 Executor 端向 admin 中心发送的注册请求的 API。我觉得这里定义为 “registry” 不恰当,它的本质是 Executor 端的一个心跳线程,每隔 30 秒就向注册中心发送注册请求。
因此,第二个问题的答案:Executor 端自动向 admin 发送心跳,然后 admin 端会更新注册表 xxl_job_registry 表对应记录的 update_time 字段,这个字段将是 admin 模块判断执行器上下线的依据。
标签:执行器,
admin,
自动检测,
time,
update,
job,
JobRegistryHelper,
Executor
From: https://www.cnblogs.com/muyi-yang/p/17319318.html