Java启动DataX数据同步,如何终止/停止/中断同步数据任务:
1、找到datax的core模块找到类:ProcessInnerScheduler.java,将taskGroupContainerExecutorService对象存起来,
需要中断数据同步任务时,取出对象调用shutdownNow()方法;
2、代码改造如下:
@Override
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
//*************************************自定义代码,记录taskGroupContainerExecutorService,用于终止任务*************************************
ThreadPoolUtil.dataxTaskExecutorServiceMap.put(taskGroupConfiguration.getInt(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID),
this.taskGroupContainerExecutorService);
}
this.taskGroupContainerExecutorService.shutdown();
}
3、终止datax同步数据任务,代码如下:
/**标签:同步,Java,taskGroupContainerExecutorService,任务,DataX,taskId,终止,数据 From: https://www.cnblogs.com/haidaogege/p/16844576.html
* 终止datax同步服务
*
* @param taskId
* @return
*/
public static boolean cancelDataxTaskExecutorService(Integer taskId) {
boolean cancelResult = false;
try {
ExecutorService executorService = dataxTaskExecutorServiceMap.get(taskId);
if (executorService != null) {
executorService.shutdownNow();
dataxTaskExecutorServiceMap.remove(taskId);
LogCacheUtil.setLogList(taskId, "强制终止同步数据任务.");
}
cancelResult = true;
} catch (Exception e) {
LogCacheUtil.setLogList(taskId, "强制终止同步数据任务失败:" + e);
e.printStackTrace();
}
return cancelResult;
}