首页 > 数据库 >DataX-阿里开源离线同步工具在Windows上实现Sqlserver到Mysql全量同步和增量同步

DataX-阿里开源离线同步工具在Windows上实现Sqlserver到Mysql全量同步和增量同步

时间:2023-04-23 21:11:58浏览次数:56  
标签:同步 https Windows 数据源 离线 json DataX startTime

场景

Kettle-开源的ETL工具集-实现SqlServer到Mysql表的数据同步并部署在Windows服务器上:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119891674

上面讲过Kettle的使用,下面记录下阿里开源异构数据源同步工具DataX

DataX

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、

HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

https://github.com/alibaba/DataX

 

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,

DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,

只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。

目前每天完成同步8w多道作业,每日传输数据量超过300TB

DataX所支持的数据源

https://github.com/alibaba/DataX/wiki/DataX-all-data-channels

下面记录一个具体的示例-从Sqlserver同步数据到Mysql中,且表结构一样。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi

实现

1、DataX在Windows上的安装

参考官网快速开始文档:

https://github.com/alibaba/DataX/blob/master/userGuid.md

安装并配置好所需要的环境依赖。

 

这里不需要自己编译,所以只配置了jdk1.8以及Python3的环境变量。

按照文档下载地址,下载DataX工具包

https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz

下载之后解压即可。

2、启动并测试stream2stream数据转换

解压之后来到bin目录下,新建创建作业的配置文件stream2stream.json文件

修改json内容为

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}

这是官方提供的模板的示例json文件,用来自检是否成功配置和启动DataX。

然后在bin目录下打开cmd执行

python datax.py  ./stream2stream.json

等待执行完成没有提示报错,但是发现中文乱码

 

DataX命令框中文乱码需要设置编码格式,先在cmd中输入

chcp 65001

然后再执行上面命令

执行过程中的中文输出也不再乱码

 

执行结果也不再乱码

 

3、获取不同数据源转换的json模板。

上面是从stream到stream的数据源转换,如果是其它数据源的json模板如何获取。

DataX提供了获取不同数据源转换的json模板获取的指令

可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

如何获取数据源的名称,比如这里从sqlserver读取,写入到mysql,那么获取json模板的命令:

python datax.py -r sqlserverreader -w mysqlwriter

此时会返回一个sqlserver到mysql的json模板。

这是因为在其源码中目录就是这样叫的。

 

获取说可以直接点击进去里面的doc目录,查看示例的json文件内容

 

 

并且每个配置项的参数也有对应的说明

 

sqlserverreader参数说明

https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md

mysqlwriter参数说明

https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

所以这里新建全量更新的json文件sqlserver2mysqlALL.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                "jdbc:sqlserver://localhost:1433;DatabaseName=数据库名"
                                ],
                                "table": [
                                "表名"
                                ]
                            }
                        ],
                        "password": "改成自己的密码",
                        "username": "用户名",
                        "column": [
                        "checkid",
                        "cardID",
                        "hphm",
                        "startTime",
                        "endTime",
                        "linenumber",
                        "cwgt",
                        "cwgtUL",
                        "cwgtJudge",
                        "cwkc",
                        "cwkcResult",
                        "cwkcUL",
                        "cwkcJudge",
                        "cwkk",
                        "cwkkResult",
                        "cwkkUL",
                        "cwkkJudge",
                        "cwkg",
                        "cwkgResult",
                        "cwkgUL",
                        "cwkgJudge",
                        "wkccJudge",
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                        "checkid",
                        "cardID",
                        "hphm",
                        "startTime",
                        "endTime",
                        "linenumber",
                        "cwgt",
                        "cwgtUL",
                        "cwgtJudge",
                        "cwkc",
                        "cwkcResult",
                        "cwkcUL",
                        "cwkcJudge",
                        "cwkk",
                        "cwkkResult",
                        "cwkkUL",
                        "cwkkJudge",
                        "cwkg",
                        "cwkgResult",
                        "cwkgUL",
                        "cwkgJudge",
                        "wkccJudge",
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/数据库名?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                "表名"
                                ]
                            }
                        ],
                        "password": "密码",
                        "preSql": [
                        "delete from vehicleresult"
                        ],
                        "session": [],
                        "username": "用户名",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

注意这里的流程就是从sqlserver中读取指定列的数据,这里的column就是配置的那些列。

然后写入到mysql时需要预先执行一下删除语句,在preSql中配置的

delete from vehicleresult

vehicleresult是表名。然后写入模式是直接插入

然后执行以上json模板的命令

python datax.py  ./sqlserver2mysqlALL.json

即可实现全量更新。

注意事项,两边的数据结构包括类型、长度、是否非空等要保持一致。

比如sqlserver中某个字段不为空,存在空数据,但是mysql中对应字段设置为不为空,在同步时就会认定为脏数据进而同步失败。

上面全量更新结果

 

4、以上命令每执行一次,则进行一次全量更新,所以需要一个定时bat脚本来定时执行命令。

新建bat文件并修改内容为

#设置编码
chcp 65001
@echo off
title "同步数据"
set INTERVAL=15
timeout %INTERVAL%
 
:Again

python datax.py  ./sqlserver2mysqlALL.json

echo %date% %time:~0,8%
 
timeout %INTERVAL%
 
goto Again

以上内容代表每15秒执行一次

python datax.py  ./sqlserver2mysqlALL.json

将此bat放在bin下与json文件同级目录下,双击执行即可。

5、以上是全量更新,如何实现增量更新。

注意这里增量更新有条件限制,首先这里的数据没有删除只会新增和更新,而且更新只会更新当天的数据。

所以这里首先执行以上上面的全量更新,确保第一次对接将数据获取,然后后面用定时任务执行增量更新,只需要

查询和替换当前的数据即可。

另外得保证有日期时间字段,那么在读取数据和写入数据时就可以用where条件限制查询当前的数据。

另外这里的主键并不是自增的int型数据,不然也可以根据自增主键id进行增量更新。

这里的sqlserver是由三方系统提供且无法更改为需要的类型

修改上面的sqlserverreader添加where条件,查询当天的数据

Sqlserver中查询当天的数据

where datediff(day,startTime,getdate())=0

其中startTime为时间字段。

Mysql中查询当天的数据

WHERE DATE(startTime) = CURDATE()

所以修改上面的json文件为

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                "jdbc:sqlserver://localhost:1433;DatabaseName=数据库名"
                                ],
                                "table": [
                                "表名"
                                ]
                            }
                        ],
                        "password": "改成自己的密码",
                        "username": "用户名",
                        "where": "datediff(day,startTime,getdate())=0",
                        "column": [
                        "checkid",
                        "cardID",
                        "hphm",
                        "startTime",
                        "endTime",
                        "linenumber",
                        "cwgt",
                        "cwgtUL",
                        "cwgtJudge",
                        "cwkc",
                        "cwkcResult",
                        "cwkcUL",
                        "cwkcJudge",
                        "cwkk",
                        "cwkkResult",
                        "cwkkUL",
                        "cwkkJudge",
                        "cwkg",
                        "cwkgResult",
                        "cwkgUL",
                        "cwkgJudge",
                        "wkccJudge",
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                        "checkid",
                        "cardID",
                        "hphm",
                        "startTime",
                        "endTime",
                        "linenumber",
                        "cwgt",
                        "cwgtUL",
                        "cwgtJudge",
                        "cwkc",
                        "cwkcResult",
                        "cwkcUL",
                        "cwkcJudge",
                        "cwkk",
                        "cwkkResult",
                        "cwkkUL",
                        "cwkkJudge",
                        "cwkg",
                        "cwkgResult",
                        "cwkgUL",
                        "cwkgJudge",
                        "wkccJudge",
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/数据库名?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                "表名"
                                ]
                            }
                        ],
                        "password": "密码",
                        "preSql": [
                        "delete from 表名 WHERE DATE(startTime) = CURDATE();"
                        ],
                        "session": [],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

此时再用bat脚本定时执行即可,定时时间自行修改上面的15参数。

然后再新增一条今天的数据测试同步效果。

将上面增量更新的json命名为sqlserver2mysqlAdd.json

 

 

标签:同步,https,Windows,数据源,离线,json,DataX,startTime
From: https://www.cnblogs.com/badaoliumangqizhi/p/17347764.html

相关文章

  • 【MAUI Blazor踩坑日记】2.关于Windows上的相机问题
    前言MAUI中Windows上,调用MediaPicker.Default.CapturePhotoAsync()并不能启动相机拍照。关于这个问题可以查看https://github.com/dotnet/maui/issues/7660,https://github.com/dotnet/maui/pull/13220,好消息是已经修复了,坏消息是.net8修复了,而且还没发布.所以目前怎么办,http......
  • m基于simulink和S函数实现SVPWM永磁同步电机双PI转矩脉动控制系统仿真
    1.算法仿真效果matlab2022a仿真结果如下:     2.算法涉及理论知识概要        永磁同步电机(PMSM)基本结构为定子、转子和端盖。其中转子磁路结构是永磁同步电机(PMSM)与其它电机最主要的区别,其在很大程度上决定了永磁同步电机(PMSM)的实际性能指标[12,13,14]......
  • VS2019离线下载安装
    移步https://learn.microsoft.com/en-us/visualstudio/install/create-a-network-installation-of-visual-studio?view=vs-2019#download-the-visual-studio-bootstrapper-to-create-the-network-layout安装步骤......
  • windows11安装adb步骤
    1.在官网上下载adb工具下载网址:https://adbdownload.com/ 2.下载的一个安装包,解压到你想放置的文件夹目录,如下图所示(需记住安装包的位置)3,右击电脑-》属性-》高级系统设置-》环境变量-》系统变量,找到path;4.双击path后,点击新建,把刚才的文件夹地址黏贴上面 5.验证环境是否......
  • Linux作为rsync的服务端,Windows作为rsync的客户端
    服务端:centos7     172.16.106.199客户端:Windows10  172.16.106.143 服务端配置:创建服务器要同步数据的目录/tmp/rsyncmkdir/tmp/rsync设置权限为700chmod700 /tmp/rsync/ 编辑配置文件/etc/rsyncd.conf (修改该文件 除了修改监听端口跟ip需要重启,修......
  • windows10移动U盘安装介质
    一、问题引入一般重装系统都是通过PE系统工具,但是大部分PE系统工具会夹带广告和垃圾软件。这时需要一个官方引导安装Windows10的介质,本文简单介绍官方安装介质的使用。二、解决过程......
  • 基于Canal实现MySQL 8.0 数据库数据同步
    前言服务器说明主机名称操作系统说明192.168.11.82Ubuntu22.04主库所在服务器192.168.11.28OracleLinuxServer8.7从库所在服务器版本说明MySQL版本:MySQL_8.0.32Canal版本:Canal_1.1.7//我的canal安装部署在192.168.11.82上,当然你也可以部......
  • Cmd输入python会打开 Windows 应用商店 解决方法
    当我在CMD中输入Python时,它会打开Windows应用商店让我下载Python3.7。这个问题今天无缘无故地开始了。我没有更改或下载有关Python的任何内容,并且已经尝试重新安装Python,并且Path环境变量是正确的。Answers使用Windows搜索栏查找“管理应用执行别名”。Pytho......
  • vue 3.0 windows node切换
    '"bash"'不是内部或外部命令,也不是可运行的程序或批处理文件。https://blog.csdn.net/cnds123321/article/details/121257762超级管理运行cmdC:\ProgramFiles\nodejs>gnvmlistUsage:gnvm[flags]gnvm[command]AvailableCommands:config......
  • Windows10安装Transmission,并使用Web远程访问教程
    安装Transmission从官方路径下载Transmission安装包 https://transmissionbt.com/download/在组件安装界面,需要把Daemon和Webinterface,同步安装上。下载配置transmission-web-control3.访问https://github.com/ronggang/transmission-web-control ,下载最新源码包......