DataX
DataX是一个异构数据源离线
同步工具
DataX与Sqoop需要根据需求选择对应的同步工具
DataX 安装
下载DataX工具包:
http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
运行官方案例(需提取安装python):
# datax的bin目录下执行下面语句
python datax.py /export/servers/datax/job/job.json
使用案例
StremReader&StreamWriter
bin/datax.py -r streamreader -w streamwriter
StremReader&StreamWriter是一个空的json模板
输出结果:
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": "" //设置要传输多少条数据
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "", // 编码格式
"print": true // 是否打印输出到控制台
}
}
}
],
"setting": {
"speed": {
"channel": ""// 设置并发度,输出的数据条数 = 并发度 * 设置的传输数据条数
}
}
}
}
根据模板编写配置文件
在job目录下创建stream2stream.json文件
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type":"string",
"value":"zhangsan"
},
{
"type":"string",
"value":18
}
],
"sliceRecordCount": "10"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行文件:
python bin/datax.py job/stream2stream.json
print结果:
统计结果:
mysql2hdfs
查看官方模板:
bin/datax.py -r mysqlreader -w hdfswriter
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [], //需要同步的列名集合,使用JSON数组描述自带信息, *代表所有列
"connection": [
{
"jdbcUrl": [], // jdbcUrl:对数据库的JDBC连接信息,使用JSON数组描述,支持多个连接地址
"table": []//需要同步的表,支持多个
【"querySql:[]"】//可选项,自定义SQL获取数据,配置后,mysqllreader直接忽略table、column、where
}
],
"password": "", //数据库用户名对应的密码
"username": "", //数据库用户名
"where": ""//也是可选项,筛选条件
【"splitPk":""】//也是可选项,数据分片字段,一般是主键,仅支持整型。作用:类似flink里的解决数据倾斜方法。
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [], //写入数据的字段,这里的[]不能填写*。其中填写name指定字段名,type指定字段数据类型
"compress": "", //hdfs文件压缩类型,默认不填写意味着没有压缩
"defaultFS": "", //hdfs文件系统namenode节点地址,格式:hdfs://ip:端口号 一般hadoop2.x的默认端口号为8020,3.x的为9820
"fieldDelimiter": "", // 字段分隔符
"fileName": "", // 写入文件名
"fileType": "", // 文件类型,目前只支持用户配置的"text"或"orc"
"path": "", //存储到Hadoop hdfs文件系统的路径信息
"writeMode": ""// hdfs写入前数据清理处理模式。是追加(append)还是清空再写(nonConflict)
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
上面【】内容是自加的,如果要使用,记得去除【】和注意上下文的格式。
关于mysql写入hdfs的注意事项
-
hdfs是只支持单写入,即一个文件只能由一个对象进行写入。
-
如果设置了多并发度,那么被写入的文件后面是会添加.xxx的后缀的。
-
当一个Task写入失败时,hdfs会删除其他Task写入成功的文件,来确保数据一致性。
实战
在job目录下创建mysql2hdfs.json文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop111:3306/datax"
],
"table": [
"student"
]
}
],
"password": "1234",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"compress": "",
"defaultFS": "hdfs://hadoop111:8020",
"fieldDelimiter": "|",
"fileName": "student.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行操作:
[root@hadoop111 datax]# python bin/datax.py job/mysql2hdfs.json
查看结果:
在文件名后自动添加了后缀名
设置hdfs的HA机制
以上面为例,只需在json添加“hadoopConfig”
参数即可。ns,nn1,nn2
只是代称
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop111:3306/datax"
],
"table": [
"student"
]
}
],
"password": "1234",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"compress": "",
"defaultFS": "hdfs://hadoop111:8020",
"hadoopConfig":{
"dfs.nameservices": "ns",
"dfs.ha.namenodes.ns": "nn1,nn2",
"dfs.namenode.rpc-address.ns.nn1": "主机名:端口",
"dfs.namenode.rpc-address.ns.nn2": "主机名:端口",
"dfs.client.failover.proxy.provider.ns":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fieldDelimiter": "|",
"fileName": "student.text",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
hdfs2mysql
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://hadoop111:8020",
"encoding": "UTF-8",
"fieldDelimiter": "|",
"fileType": "text",
"path": "/student.txt"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop111:3306/datax",
"table": ["student"]
}
],
"password": "1234",
"preSql": [],
"session": [],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
oracle2mysql
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl":
["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/oracle",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
oracle2hdfs
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl":
["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "oracle.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
mongoDB2hdfs
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url", "type":"string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "mongo.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
mongoDB2mysql
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/test",
"table": ["atguigu"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
sqlServer2hdfs
{
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "sqlserver.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
sqlServer2mysql
{
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
DataX使用优化
➢ job.setting.speed.channel : channel 并发数
➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速
➢ job.setting.speed.byte:全局配置 channel 的 byte 限速
➢ core.transport.channel.speed.record:单个 channel 的 record 限速
➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速
总结
DataX起始就是编写响应的json文件,然后运行bin/datax.py + 对应的json文件,完成数据的传输。
由什么不会写的json文件,都可用python bin/datax.py -r ... -w ... 来查看对应文件的编写
标签:name,column,方法,job,DataX,简单,parameter,channel,datax From: https://www.cnblogs.com/Mr-Sponge/p/16975245.html