首页 > 其他分享 >DataX介绍及应用实例

DataX介绍及应用实例

时间:2023-06-20 09:58:21浏览次数:41  
标签:name 数据源 json 实例 DataX 应用 NULL datax

一、DataX简介

  • DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
  • DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
  • 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

1.1 框架设计

 

  • DataX 本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.2 核心架构

https://github.com/alibaba/DataX/blob/master/introduction.md

1.3 开源地址

https://github.com/alibaba/DataX

二、安装DataX

2.1 下载地址

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

2.2 环境安装

DataX依赖于Java和Python环境,所以需要先安装。

1、安装Java环境(查看已安装的Java版本):

具体安装请参考 https://www.cnblogs.com/fengguozhong/p/11843156.html

[root@localhost bin]# java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

2、Python安装(查看已安装的python版本,安装过程在此不做详述):

[root@localhost opt]# python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>>

3、安装DataX环境

直接解压压缩包(绿色解压,无需编译安装):

cd /opt  && mkdir datax
tar -zxvf datax.tar.gz

检查DataX是否安装成功,若出现如下结果表示安装成功。

执行命令python datax.py -r streamreader -w streamwriter

控制台信息内容:

[root@localhost bin]# python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

注意事项

*注意:DataX安装完成后,需要删除文件plugin目录下的隐藏文件,这个非常重要,不删除将会影响任务的执行。(此步骤不做,将在后面做也可以)

rm -rf /opt/datax/plugin/*/._*  #删除隐藏文件

三、案例

3.1 实现从stream到stream数据读取

1、开发datax配置文件

  • 方法一:执行此命令获得帮助 python datax.py -r streamreader -w streamwriter
  • 方法二:进入/opt/datax/job目录下, 拷贝一份job.json,命名为stream2stream.json
python datax.py -r streamreader -w streamwriter #方法一
#或者
cp job.json  stream2stream.json  #方法二

stream2stream.json文件内容:

{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [{
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [{
            "value": "DataX",
            "type": "string"
          },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "1989-06-04 00:00:00",
              "type": "date"
            },
            {
              "value": true,
              "type": "bool"
            },
            {
              "value": "test",
              "type": "bytes"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "encoding": "UTF-8"
        }
      }
    }]
  }
}

2、启动datax打印数据,执行任务。

cd /opt/datax/bin/
./datax.py /opt/datax/job/stream2stream.json

执行过程中如遇到配置文件问题,请检查并删除plugin目录下reader 和 writer目录中的隐藏文件,详细说明见: 常见问题 。

3、成功执行,任务结果:

 

3.2 实现从mysql到stream数据读取

需求:从mysql数据库的表中读取数据并打印在控制台。

建库、建表、插入数据:

#创建数据库master_test_db
CREATE DATABASE /*!32312 IF NOT EXISTS*/`master_test_db` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_unicode_ci */;
USE `master_test_db`;

#创建表tb_user
CREATE TABLE `tb_user` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(20) COLLATE utf8_unicode_ci DEFAULT NULL,
  `age` INT(11) DEFAULT NULL,
  `address` VARCHAR(100) COLLATE utf8_unicode_ci DEFAULT NULL,
  `status` INT(2) NOT NULL DEFAULT '0',
  `city` VARCHAR(6) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci CHECKSUM=1 DELAY_KEY_WRITE=1 ROW_FORMAT=DYNAMIC;

#插入记录
INSERT  INTO `tb_user`(`id`,`name`,`age`,`address`,`status`,`city`) VALUES (1,'guozhong',20,'北京市石景山区鲁谷大街',0,'我的老是人是'),(2,'zhang',23,'石家庄市红旗大街42号',1,NULL),(8,'Sam',30,'山西省西安市XXXX1111',1,NULL),(9,'liuli',30,'山西省西安市',0,NULL),(10,'xxxx',30,'xxxx111',0,NULL),(11,'xxxx',30,'山西省西安市',1,NULL),(12,'Sam',30,'山西省西安市XXXX1111',0,NULL),(13,'xxxx',30,'山西省西安市',1,NULL),(14,'Sam11',30,'xxxx111',0,NULL),(15,'Sam11',30,'山西省西安市XXXX1111',0,NULL),(16,'Sam11',30,'xxxx111',0,NULL),(19,'lulu',20,'北京',0,NULL),(20,'tonton',20,'北京',0,NULL);

表记录:

1、开发datax配置文件

执行此命令获得帮助:python datax.py -r mysqlreader -w streamwriter

python datax.py -r mysqlreader -w streamwriter

控制台打印结果为如下,接下来配置该文件即可:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

将配置修改完成的文件命名为 :mysql2stream.json,其文件内容为:

{
    "job": {
        "setting": {
            "speed": {
                "channel": "3"
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "testuser",
                    "password": "******",
                    "column": [
                        "id", "name"
                    ],
                    "where": "",
                    "connection": [{
                        "jdbcUrl": [
                            "jdbc:mysql://192.168.100.1:3306/master_test_db"
                        ],
                        "table": [
                            "tb_user"
                        ]
                    }]
                }
            },
            "writer": {
                "name": "streamwriter",
                "parameter": {
                    "encoding": "UTF-8",
                    "print": true
                }
            }
        }]
    }
}

2、启动datax打印数据,执行任务。

cd /opt/datax/bin/
./datax.py /opt/datax/job/mysql2stream.json

3、成功执行,任务结果:

3.3 实现从mysql到mysql数据同步

1、开发datax配置文件

执行此命令获得帮助:python datax.py -r mysqlreader -w mysqlwriter

也可以参考 datax/plugin/writer/mysqlwriter目录下的plugin_job_template.json,reader同理。

注意:writer中的jdbcUrl是用"",表示只能连接一个库,而reader中的jdbcUrl是[]中括号,表示可以连接多个库。

python datax.py -r mysqlreader -w mysqlwriter

控制台打印结果为如下,接下来配置该文件即可:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", #读取端
                    "parameter": {
                        "column": [],  #需要同步的列(*表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [], #数据库连接信息,可以多个库
                                "table": [] #表
                            }
                        ],
                        "password": "", 
                        "username": "", 
                        "where": "" #筛选条件
                    }
                },
                "writer": {
                    "name": "mysqlwriter", #写入端
                    "parameter": {
                        "column": [], #需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "", #只能有一个库,不使用[]中括号括起来
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": "" # replace,update 或 insert
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "" #并发数
            }
        }
    }
}

2、将配置修改完成的文件命名为 :mysql2mysql.json,其文件内容为:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["name","age","address","status","city"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://47.92.113.122:3306/master_test_db?useUnicode=true&characterEncoding=utf8"],
                                "table": ["tb_user"]
                            }
                        ],
                        "username": "testuser",
                        "password": "******",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["name","age","address","status","city"],
                        "connection": [
                            {

                                "jdbcUrl": "jdbc:mysql://47.92.113.122:3306/master_test_db?useUnicode=true&characterEncoding=utf8",
                                "table": ["tb_user2"]

                            }
                        ],
                        "password": "******",
                        "preSql": [],
                        "session": [],
                        "username": "testuser",
                        "writeMode": "insert",
                        "encoding": "UTF-8",
                        "print":true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}
  1. 启动datax打印数据,执行任务。
cd /opt/datax/bin/
./datax.py /opt/datax/job/mysql2mysql.json

3、成功执行,任务结果:

 

四、⚠️常见问题

4.1 执行任务-报配置文件错误

一般部署完DataX之后,首次执行任务会报如下错误:

控制台详细错误内容:

[root@localhost bin]# ./datax.py /opt/datax/job/stream2stream.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2023-06-14 16:29:29.709 [main] WARN  ConfigParser - 插件[streamreader,streamwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
2023-06-14 16:29:30.713 [main] ERROR Engine -

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
        at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
        at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
        at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
        at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
        at com.alibaba.datax.core.Engine.entry(Engine.java:137)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)

解决方法:

分别进入 datax/plugin/reader 和datax/plugin/writer目录下,删除这些隐藏文件。

rm -rf ./._*

4.2 mysql同步数据出现乱码问题

在mysql同步完成的结果数据中,中文内容出现乱码的问题,如下:

可以通过设置job.json中 的jdbcUrl 中,数据库连接字符串中设置utf8编码:

jdbc:mysql://192.168.100.1:3306/master_test_d?useUnicode=true&characterEncoding=utf8

 

 

标签:name,数据源,json,实例,DataX,应用,NULL,datax
From: https://www.cnblogs.com/fengguozhong/p/17492747.html

相关文章

  • 京东微前端应用MicroApp,主应用vite-vue3,子应用vite-vue3+pinia
    micro-app官方地址micro-app官方demo地址这篇文章主要是为了记录,本人在使用中遇到的一些问题,供参考资源找不到->本地使用代理,显示nginx转发子应用使用组件插槽或者pinia,路由懒加载报错问题->小项目几个路由不要懒加载,大项目中懒加载的时候不要使用pinia或者组件中不适用......
  • 自然语言处理 Paddle NLP - 信息抽取技术及应用
    1.什么是信息抽取即自动从无结构或半结构的文本中抽取出结构化信息的任务(病历抽取)2.实体抽取3.关系抽取4.事件抽取信息抽取和知识图谱是一个上下游的关系。抽取的结果,可以组装成知识图谱(一种存储知识的结构)医疗、金融、法律,三大行业用得比较多从问诊中抽取信息贷款......
  • 动手开发第一个 SAP Fiori Elements 应用
    本教程的前五篇文章,我们已经为SAPFioriElements的创建做了足够的铺垫。0.迈入SAPFioriElements开发的大门-什么是FioriElements,它和FreestyleUI5开发方式有何区别?1.SAPFioriElements开发环境的搭建和开发准备工作2.在ES5系统注册用户以获得Fiori......
  • Linux环境下I2C应用程序编写
    原文:https://blog.csdn.net/propor/article/details/129667596本文介绍Linux环境下,对I2C设备进行操作。在对I2C总线进行操作时,可采用i2c-tools对I2C进行查看及操作,待通过工具可对I2C进行操作后,再编写程序进行操作。1.i2c-tools的使用1)安装sudoaptinstalli2c-tools2)查询......
  • Linux 操作系统及应用考试题
    Linux操作系统及应用考试题要求:操作题截屏到WORD文档,文件名字为学号+姓名。每道题如能显示,应显示你的用户名,必要的步骤需截图,图片不要太大。考试操作题(100分)。......
  • To ChatGPT:让你更加随意地使用所有ChatGPT应用
    现在其实已经有很多在线的llm服务了,当然也存在许多开源部署方案,但是不知道大家有没有发现一个问题,目前基于ChatGPT开发的应用,都是使用的OpenAI的接口。换句话说,如果没有OpenAI账号,就没有办法使用这些应用。但是其实这些应用并不是强依赖于OpenAI的接口,其他的在线llm服务也是可以的......
  • mix-blend-mode和background-blend-mode应用场景
    mix-blend-mode使多重叠元素的颜色发生混合,包括元素与元素,元素与图片background-blend-mode使得多个背景发生混合,包括背景图与背景图,背景图与背景色isolation:isolate可以创建层叠上下文,就可以阻断mix-blend-mode,使多个元素能分组进行混合,不干扰常用场景1.图片后方的元素......
  • Angular 应用里 NullInjectorError - No provider for XX 错误的一个场景和分析过程
    最近处理客户incident,有个客户从Spartacus4升级到5.2之后,启动Storefront,console遇到了一个错误消息:NullInjectorError-NoproviderforAnonymousConsentTemplatesAdapter!引起这个错误消息的场景有很多,这个incident最后的场景是:以前的module通过loadedfor......
  • 【工程应用八】终极的基于形状匹配方案解决(小模型+预生成模型+无效边缘去除+多尺度+各
      我估摸着这个应该是关于形状匹配或者模版匹配的最后一篇文章了,其实大概是2个多月前这些东西都已经弄完了,只是一直静不下来心整理文章,提醒一点,这篇文章后续可能会有多次修改(但不会重新发文章,而是在后台直接修改或者增加),所以有需要的朋友可以随时重复查看。 这次带来的更新......
  • 2023年十大最受欢迎的Flutter开源应用程序
    原文出处:https://juejin.cn/post/7245170503798538296在移动应用开发领域,Flutter以其跨平台能力和漂亮的用户界面获得了巨大的人气。随着其开发者社区的不断壮大,Flutter生态系统已经见证了众多开源应用程序的诞生。这些开源应用不仅展示了Flutter的多功能性,而且还为开发者提供......