首页 > 其他分享 >异源数据同步 → DataX 同步启动后如何手动终止?

异源数据同步 → DataX 同步启动后如何手动终止?

时间:2024-11-05 08:49:03浏览次数:1  
标签:同步 process pid 终止 异源 DataX 进程

开心一刻

刚刚和老婆吵架,气到不行,想离婚
女儿突然站出来劝解道:难道你们就不能打一顿孩子消消气,非要闹离婚吗?
我和老婆同时看向女儿,各自挽起了衣袖
女儿补充道:弟弟那么小,打他,他又不会记仇

开心一刻

需求背景

项目基于 DataX 来实现异源之间的数据离线同步,我对 Datax 进行了一些梳理与改造

异构数据源同步之数据同步 → datax 改造,有点意思
异构数据源同步之数据同步 → datax 再改造,开始触及源码
异构数据源同步之数据同步 → DataX 使用细节
异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密
异源数据同步 → DataX 为什么要支持 kafka?
异源数据同步 → 如何获取 DataX 已同步数据量?

本以为离线同步告一段落,不会再有新的需求,可打脸来的非常快,产品经理很快找到我,说了如下一段话

昨天我在测试开发环境试用了一下离线同步功能,很好的实现了我提的需求,给你点赞!
但是使用过程中我遇到个情况,有张的表的数据量很大,一开始我没关注其数据量,所以配置了全量同步,启动同步后迟迟没有同步完成,我才意识到表的数据量非常大,一查才知道 2 亿多条数据,我想终止同步却发现没有地方可以进行终止操作
所以需要加个功能:同步中的任务可以进行终止操作

这话术算是被产品经理给玩明白了,先对我进行肯定,然后指出使用中的痛点,针对该痛点提出新的功能,让我一点反驳的余地都没有;作为一个讲道理的开发人员,面对一个很合理的需求,我们还是很乐意接受的,你们说是不是?

需求一接,问题就来了

如何终止同步

思考这个问题之前,我们先来回顾下 DataX 的启动;还记得我们是怎么集成 DataX 的吗,异构数据源同步之数据同步 → datax 再改造,开始触及源码 中有说明,新增 qsl-datax-hook 模块,该模块中通过命令

Process process = Runtime.getRuntime().exec(realCommand);
realCommand 就是启动 DataX 的 java 命令,类似

java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json

来启动 DataX,也就是给 DataX 单独启动一个 java 进程;那么如何停止 DataX,思路是不是就有了?问题是不是就转换成了

如何终止 java 进程

终止进程

如何终止进程,这个我相信你们都会

Linux:kill -9 pid
Win:cmd.exe /c taskkill /PID pid /F /T

但这有个前提,需要知道 DataX 的 java 进程的 pid,而 JDK8 中 Process 的方法如下

Process方法

是没有提供获取 pid 的方法,在不调整 JDK 版本的情况下,我们如何获取 DataX 进程的 pid?不同的操作系统获取方式不一样,我们分别对 LinuxWin 进行实现

  1. Linux

    实现就比较简单了,仅仅基于 JDK 就可以实现

    Field field = process.getClass().getDeclaredField("pid");
    field.setAccessible(true);
    int pid = field.getInt(process);
    

    通过反射获取 process 实现类的成员变量 pid 的值;这段代码,你们应该都能看懂吧

  2. Win

    Win 系统下,则需要依赖第三方工具 oshi

    <dependency>
        <groupId>com.github.oshi</groupId>
        <artifactId>oshi-core</artifactId>
        <version>6.6.5</version>
    </dependency>
    

    获取 pid 实现如下

    Field field = process.getClass().getDeclaredField("handle");
    field.setAccessible(true);
    long handle = field.getLong(process);
    WinNT.HANDLE winntHandle = new WinNT.HANDLE();
    winntHandle.setPointer(Pointer.createConstant(handle));
    int pid = Kernel32.INSTANCE.GetProcessId(winntHandle);
    

    同样用到了反射,还用到了 oshi 提供的方法

合并起来即得到获取 pid 的方法

/**
 * 获取进程ID
 * @param process 进程
 * @return 进程id,-1表示获取失败
 * @author 青石路
 */
public static int getProcessId(Process process) {
    int pid = NULL_PROCESS_ID;
    Field field;
    if (Platform.isWindows()) {
        try {
            field = process.getClass().getDeclaredField("handle");
            field.setAccessible(true);
            long handle = field.getLong(process);
            WinNT.HANDLE winntHandle = new WinNT.HANDLE();
            winntHandle.setPointer(Pointer.createConstant(handle));
            pid = Kernel32.INSTANCE.GetProcessId(winntHandle);
        } catch (Exception e) {
            LOGGER.error("获取进程id失败,异常信息:", e);
        }
    } else if (Platform.isLinux() || Platform.isAIX()) {
        try {
            field = process.getClass().getDeclaredField("pid");
            field.setAccessible(true);
            pid = field.getInt(process);
        } catch (Exception e) {
            LOGGER.error("获取进程id失败,异常信息:", e);
        }
    }
    LOGGER.info("进程id={}", pid);
    return pid;
}

得到的 pid 是不是正确的,我们是不是得验证一下?写个 mainClass

/**
 * mainClass
 * @author 青石路
 */
public class HookMain {

    public static void main(String[] args) throws Exception {
        String command = "";
        if (Platform.isWindows()) {
            command = "ping -n 1000 localhost";
        } else if (Platform.isLinux() || Platform.isAIX()) {
            command = "ping -c 1000 localhost";
        }
        Process process = Runtime.getRuntime().exec(command);
        int processId = ProcessUtil.getProcessId(process);
        System.out.println("ping 进程id = " + processId);
        new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream(), System.getProperty("sun.jnu.encoding")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

利用 maven 打包成可执行 jar 包

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.qsl.hook.HookMain</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <id>copy-dependencies</id>
                    <phase>package</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>target/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

然后执行 jar

java -jar qsl-datax-hook-0.0.1-SNAPSHOT.jar

我们来看下输出结果

  1. Linux

    jar 输出日志如下

    Linux_输出

    我们 ps 下进程

    ps -ef|grep ping
    
    Linux_验证
  2. Win

    jar 输出日志如下

    win_输出

    我们再看下任务管理器的 ping 进程

    win_验证

可以看出,不管是 Linux 还是 Win,得到的 pid 都是正确的;得到 pid 后,终止进程就简单了

/**
 * 终止进程
 * @param pid 进程的PID
 * @return true:成功,false:失败
 */
public static boolean killProcessByPid(int pid) {
    if (NULL_PROCESS_ID == pid) {
        LOGGER.error("pid[{}]异常", pid);
        return false;
    }
    String command = "kill -9 " + pid;
    boolean result;
    if (Platform.isWindows()) {
        command = "cmd.exe /c taskkill /PID " + pid + " /F /T ";
    }
    Process process  = null;
    try {
        process = Runtime.getRuntime().exec(command);
    } catch (IOException e) {
        LOGGER.error("终止进程[pid={}]异常:", pid, e);
        return false;
    }
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
        //杀掉进程
        String line;
        while ((line = reader.readLine()) != null) {
            LOGGER.info(line);
        }
        result = true;
    } catch (Exception e) {
        LOGGER.error("终止进程[pid={}]异常:", pid, e);
        result = false;
    } finally {
        if (!Objects.isNull(process)) {
            process.destroy();
        }
    }
    return result;
}

完整流程应该是

  1. 使用 Runtime.getRuntime().exec(java命令) 启动 DataX,并获取到 Process

    java 命令指的是启动 DataX 的 java 命令,例如

    java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json
    
  2. 通过 ProcessUtil#getProcessId 获取 Process 的 pid,并与同步任务信息绑定进行持久化

    通过任务id 可以查询到对应的 pid

  3. 触发任务 终止,通过任务id找到对应的 pid,通过 ProcessUtil#killProcessByPid 终止进程

    终止了进程也就终止了同步任务

如果 qsl-datax-hook 是单节点,上述处理方案是没有问题的,但生产环境下,qsl-datax-hook 不可能是单节点,肯定是集群部署,那么上述方案就行不通了,为什么呢?我举个例子

假设 qsl-datax-hook 有 2 个节点:A、B,在 A 节点上启动 DataX 同步任务(taskId = 666)并得到对应的 pid = 1488,终止任务 666 的请求被负载均衡到 B 节点,会发生什么情况

  1. B 节点上没有 pid = 1488 进程,那么终止失败,A、B 节点都不受影响
  2. B 节点上有 pid = 1488 进程,这个进程可能是 DataX 同步任务进程,也可能是其他进程,那么这个终止操作就会产生可轻可重的故障了!

然而需要终止的同步任务却还在 A 节点上安然无恙的执行着

所以集群模式下,我们不仅需要将 pid 与任务进行绑定,还需要将任务执行的节点信息也绑定进来,节点信息可以是 节点ID,也可以是 节点IP,只要能唯一标识节点就行;具体实现方案,需要结合具体的负载均衡组件来做设计,由负载均衡组件将任务终止请求分发到正确的节点上,而不能采用常规的负载均衡策略进行分发了;因为负载均衡组件很多,所以实现方案没法统一设计,需要你们结合自己的项目去实现,我相信对你们来说很简单

你懂我意思吧_懂

总结

  1. 任务的启动方式不同,终止方式也会有所不同,如何优雅的终止,是我们需要考虑的重点
  2. 直接杀进程的方式,简单粗暴,但不够优雅,一旦错杀,问题可大可小,如果有其他方式,不建议选择该方式
  3. 适用单节点的终止方式不一定适用于集群,大家设计方案的时候一定要做全方位的考虑
  4. 示例代码:qsl-datax-hook

标签:同步,process,pid,终止,异源,DataX,进程
From: https://www.cnblogs.com/youzhibing/p/18524122

相关文章

  • 中国地质大学(武汉)2024年新生赛(同步赛)
    发现没几个人写这场比赛的题解,顺便给补题的人提供一点思路,故而火速出了这篇(不会都去打区域赛了吧,悲~)A点击查看代码voidsolve(){ intn; cin>>n; cout<<n-1<<'\n';}B模拟题根据题意:一、预约:考虑为0的情况:1.此时读者有书2.读者上次预约时间未超过d天其......
  • 群晖数据自动同步百度云盘
    1、在群晖后台打开应用CloudSync应用(没有需自己安装)。 2、选择百度云 3、这里需要注意下,同步实际上是以下这两个文件夹同步,并不是百度云的根目录,1、群辉nas:/homes/zhanglei/百度网盘2、百度网盘:我的应用数据->CloudSync另外建议:设置中的同步方向修改为仅下载远程更改......
  • 配置数据同步环境v1
    配置数据同步环境v1.01配置Canal+MQ数据同步环境1.1配置Mysql主从同步根据Canal的工作原理,首先需要开启MySQL主从同步。1.在MySQL中需要创建一个用户,并授权进入mysql容器:dockerexec-itmysql/bin/bash--使用命令登录:mysql-uroot-p--创建用户用户名:canal密码:c......
  • 异源数据同步 → 如何获取 DataX 已同步数据量?
    开心一刻今天,表妹问我:哥,我男朋友过两天要生日了,你们男生一般喜欢什么,帮忙推荐个礼物呗我:预算多少表妹:预算300我:20块买条黑丝,剩下280给自己买支口红,你男朋友生日那天你都给自己用上表妹:秒啊,哥我:必须的嘛,你要知道男人最懂男人!前情回顾关于异源数据同步工具DataX,我已经写了好几......
  • 中国地质大学(武汉)2024年新生赛(同步赛)A - E(待完善)
    发现没几个人写这场比赛的题解,顺便给补题的人提供一点思路,故而火速出了这篇(不会都去打区域赛了吧,悲~)A点击查看代码voidsolve(){ intn; cin>>n; cout<<n-1<<'\n';}B模拟题根据题意:一、预约:考虑为0的情况:1.此时读者有书2.读者上次预约时间未超过d天其......
  • luoguP1131 时态同步
    有N个节点构成的电路树,编号为S的的节点为激发器,会产生电流并通过导线往下传递,给出电流在各边上传递递需要的时间w[i][j],可以花1个单位的代价将任意1条边的耗时加1,现要求电流同时到达所有叶子节点,求修改边的最小代价。1<=N<=5E5;1<=w[i][j]<=1E6分析:自下而上dp,对于节点x,先算出以......
  • 数据同步rsync
    数据同步rsyncRsync本地模式和远程模式1.安装yuminstall-yrsync2.命令语法本地模式rsync参数srcdest1.对文件同步[[email protected]]#rsync-avzP/var/log/messages/tmp/sendingincrementalfilelistmessages3,311,285100%84.50MB/s......
  • 【开题报告】基于Springboot+vue信阳市多目的地同步导航系统(程序+源码+论文) 计算机毕
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着城市化进程的加速,信阳市的交通网络日益复杂,市民及游客在出行时面临着路线选择多样化、目的地分散化的挑战。传统的导航方式往往局限于单一目的地......
  • 【牛客训练记录】中国地质大学(武汉)2024年新生赛(同步赛)
    训练情况赛后反思B题大模拟急到红温了,WA了四发,未考虑到部分细节情况A题直接输出\(x-1\)即可。#defineintlonglongusingnamespacestd;voidsolve(){ intx;cin>>x; cout<<x-1; }signedmain(){ //intT;cin>>T;while(T--) solve(); return0;}B题......
  • 【shell脚本】使用 Shell 脚本比较和同步目录:自动化文件管理的利器
    原创日常运维文档在系统管理中,比较两个目录的内容是一项常见任务,尤其在数据备份和服务器维护时,它显得尤为重要。为此,我们可以使用Shell脚本来简化这个过程,实现自动化。下面将对一个名为compare_files.sh的脚本进行详细介绍,该脚本能够比较目录大小并使用rsync检查内容一......