首页 > 其他分享 >DataX简单使用方法

DataX简单使用方法

时间:2022-12-12 09:45:10浏览次数:35  
标签:name column 方法 job DataX 简单 parameter channel datax

DataX

DataX是一个异构数据源离线同步工具

DataX与Sqoop需要根据需求选择对应的同步工具

DataX 安装

下载DataX工具包:

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

运行官方案例(需提取安装python):

# datax的bin目录下执行下面语句
python datax.py /export/servers/datax/job/job.json

使用案例

StremReader&StreamWriter

bin/datax.py -r streamreader -w streamwriter

StremReader&StreamWriter是一个空的json模板

输出结果:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": "" //设置要传输多少条数据
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", // 编码格式
                        "print": true // 是否打印输出到控制台
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""// 设置并发度,输出的数据条数 = 并发度 * 设置的传输数据条数
            }
        }
    }
}

根据模板编写配置文件

在job目录下创建stream2stream.json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"zhangsan"
                            },
                            {
                                "type":"string",
                                "value":18
                            }
                        ], 
                        "sliceRecordCount": "10"
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "UTF-8", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

执行文件:

python bin/datax.py job/stream2stream.json 

print结果:

统计结果:

mysql2hdfs

查看官方模板:

bin/datax.py -r mysqlreader -w hdfswriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], //需要同步的列名集合,使用JSON数组描述自带信息, *代表所有列
                        "connection": [
                            {
                                "jdbcUrl": [], // jdbcUrl:对数据库的JDBC连接信息,使用JSON数组描述,支持多个连接地址
                                "table": []//需要同步的表,支持多个
                                【"querySql:[]"】//可选项,自定义SQL获取数据,配置后,mysqllreader直接忽略table、column、where
                            }
                        ], 
                        "password": "", //数据库用户名对应的密码
                        "username": "", //数据库用户名
                        "where": ""//也是可选项,筛选条件
                        【"splitPk":""】//也是可选项,数据分片字段,一般是主键,仅支持整型。作用:类似flink里的解决数据倾斜方法。
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], //写入数据的字段,这里的[]不能填写*。其中填写name指定字段名,type指定字段数据类型
                        "compress": "", //hdfs文件压缩类型,默认不填写意味着没有压缩
                        "defaultFS": "", //hdfs文件系统namenode节点地址,格式:hdfs://ip:端口号 一般hadoop2.x的默认端口号为8020,3.x的为9820
                        "fieldDelimiter": "", // 字段分隔符
                        "fileName": "", // 写入文件名
                        "fileType": "", // 文件类型,目前只支持用户配置的"text"或"orc"
                        "path": "", //存储到Hadoop hdfs文件系统的路径信息
                        "writeMode": ""// hdfs写入前数据清理处理模式。是追加(append)还是清空再写(nonConflict)
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

上面【】内容是自加的,如果要使用,记得去除【】和注意上下文的格式。

关于mysql写入hdfs的注意事项

  • hdfs是只支持单写入,即一个文件只能由一个对象进行写入。

  • 如果设置了多并发度,那么被写入的文件后面是会添加.xxx的后缀的。

  • 当一个Task写入失败时,hdfs会删除其他Task写入成功的文件,来确保数据一致性。

实战

在job目录下创建mysql2hdfs.json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop111:3306/datax"
                                ], 
                                "table": [
                                    "student"
                                ]
                            }
                        ], 
                        "password": "1234", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://hadoop111:8020", 
                        "fieldDelimiter": "|", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

执行操作:

[root@hadoop111 datax]# python bin/datax.py job/mysql2hdfs.json

查看结果:

在文件名后自动添加了后缀名

设置hdfs的HA机制

以上面为例,只需在json添加“hadoopConfig”参数即可。ns,nn1,nn2只是代称

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop111:3306/datax"
                                ], 
                                "table": [
                                    "student"
                                ]
                            }
                        ], 
                        "password": "1234", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://hadoop111:8020", 
                        "hadoopConfig":{ 
                            
 	 						"dfs.nameservices": "ns", 
  							"dfs.ha.namenodes.ns": "nn1,nn2", 
  							"dfs.namenode.rpc-address.ns.nn1": "主机名:端口", 
  							"dfs.namenode.rpc-address.ns.nn2": "主机名:端口", 
  							"dfs.client.failover.proxy.provider.ns": 
							"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" 
                        },
                        "fieldDelimiter": "|", 
                        "fileName": "student.text", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

hdfs2mysql

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                        "column": ["*"], 
                        "defaultFS": "hdfs://hadoop111:8020", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": "|", 
                        "fileType": "text", 
                        "path": "/student.txt"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop111:3306/datax", 
                                "table": ["student"]
                            }
                        ], 
                        "password": "1234", 
                        "preSql": [], 
                        "session": [], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

oracle2mysql

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "oraclereader", 
                    "parameter": { 
                        "column": ["*"], 
                        "connection": [ 
                            { 
                                "jdbcUrl": 
["jdbc:oracle:thin:@hadoop102:1521:orcl"], 
                                "table": ["student"] 
                            } 
                        ], 
                        "password": "000000", 
                        "username": "atguigu" 
                    } 
                }, 
                "writer": { 
                    "name": "mysqlwriter", 
                    "parameter": { 
                        "column": ["*"], 
                        "connection": [ 
                            { 
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/oracle", 
                                "table": ["student"] 
                            } 
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert" 
                    } 
                } 
            } 
        ], 
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
}

oracle2hdfs

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "oraclereader",  
                    "parameter": { 
                        "column": ["*"],  
                        "connection": [ 
                            { 
                                "jdbcUrl": 
["jdbc:oracle:thin:@hadoop102:1521:orcl"],  
                                "table": ["student"] 
                            } 
                        ],  
                        "password": "000000",  
                        "username": "atguigu" 
                    } 
                },  
                "writer": { 
                    "name": "hdfswriter",  
                    "parameter": { 
                        "column": [ 
                            { 
                                "name": "id", 
                                "type": "int" 
                            }, 
                            { 
                                "name": "name", 
                                "type": "string" 
                            } 
 
                        ],  
                        "defaultFS": "hdfs://hadoop102:9000",  
                        "fieldDelimiter": "\t",  
                        "fileName": "oracle.txt",  
                        "fileType": "text",  
                        "path": "/",  
                        "writeMode": "append" 
                    } 
                } 
            } 
        ],  
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
}

mongoDB2hdfs

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "mongodbreader",  
                    "parameter": { 
                        "address": ["127.0.0.1:27017"],  
                        "collectionName": "atguigu",  
                        "column": [ 
                         	{ 
                         	 	"name":"name", 
                         	 	"type":"string" 
                         	}, 
                         	{ 
                         	 	"name":"url", 
                         	 	"type":"string" 
                         	} 
                        ],  
                        "dbName": "test",  
                    } 
                },  
                "writer": { 
                    "name": "hdfswriter",  
                    "parameter": { 
                        "column": [ 
                         	{ 
                         	 	"name":"name", 
                         	 	"type":"string" 
                         	}, 
                         	{ 
                         	 	"name":"url",                          	 	"type":"string" 
                         	} 
                        ],  
                        "defaultFS": "hdfs://hadoop102:9000",  
                        "fieldDelimiter": "\t",  
                        "fileName": "mongo.txt",  
                        "fileType": "text",  
                        "path": "/",  
                        "writeMode": "append" 
                    } 
                } 
            } 
        ],  
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
} 

mongoDB2mysql

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "mongodbreader",  
                    "parameter": { 
                        "address": ["127.0.0.1:27017"],  
                        "collectionName": "atguigu",  
                        "column": [ 
                         	{ 
                         	 	"name":"name", 
                         	 	"type":"string" 
                         	}, 
                         	{ 
                         	 	"name":"url", 
                         	 	"type":"string" 
                         	} 
                        ],  
                        "dbName": "test",  
                    } 
                },  
                "writer": { 
                    "name": "mysqlwriter",  
                    "parameter": { 
                        "column": ["*"],  
                        "connection": [ 
                            { 
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/test",  
                                "table": ["atguigu"] 
                            } 
                        ],  
                        "password": "000000",  
                        "username": "root",  
                        "writeMode": "insert" 
                    } 
                } 
            } 
        ],  
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
} 

sqlServer2hdfs

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "sqlserverreader",  
                    "parameter": { 
                        "column": [ 
                             "id", 
                            "name" 
                        ],  
                        "connection": [ 
                            { 
                                "jdbcUrl": [ 
                                    
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax" 
                                ],  
                                "table": [ 
                                    "student" 
                                ] 
                            } 
                        ],  
                        "username": "root",  
                        "password": "000000" 
                    } 
                },  
                "writer": { 
                    "name": "hdfswriter",  
                    "parameter": { 
                        "column": [ 
                            { 
                                "name": "id", 
                                "type": "int" 
                            }, 
                            { 
                                "name": "name", 
                                "type": "string" 
                            } 
                        ],   
                        "defaultFS": "hdfs://hadoop102:9000",  
                        "fieldDelimiter": "\t",  
                        "fileName": "sqlserver.txt",
                        "fileType": "text",  
                        "path": "/",  
                        "writeMode": "append" 
                    } 
				} 
            } 
        ],  
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
} 

sqlServer2mysql

{ 
    "job": { 
        "content": [ 
            { 
                "reader": { 
                    "name": "sqlserverreader",  
                    "parameter": { 
                        "column": [ 
                            "id", 
                            "name" 
                        ],  
                        "connection": [ 
                            { 
                                "jdbcUrl": [ 
                                    
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax" 
                                ],  
                                "table": [ 
                                    "student" 
                                ] 
                            } 
                        ],  
                        "username": "root",  
                        "password": "000000" 
                    } 
                },  
                "writer": { 
                    "name": "mysqlwriter",  
                    "parameter": { 
                        "column": ["*"],  
                        "connection": [ 
                            { 
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",  
                                "table": ["student"] 
                            } 
                        ],  
                        "password": "000000",  
                        "username": "root",  
                        "writeMode": "insert" 
                    } 
 	 	 	 	} 
            } 
        ],  
        "setting": { 
            "speed": { 
                "channel": "1" 
            } 
        } 
    } 
}

DataX使用优化

➢ job.setting.speed.channel : channel 并发数

➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速

➢ job.setting.speed.byte:全局配置 channel 的 byte 限速

➢ core.transport.channel.speed.record:单个 channel 的 record 限速

➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速

总结

DataX起始就是编写响应的json文件,然后运行bin/datax.py + 对应的json文件,完成数据的传输。

由什么不会写的json文件,都可用python bin/datax.py -r ... -w ... 来查看对应文件的编写

标签:name,column,方法,job,DataX,简单,parameter,channel,datax
From: https://www.cnblogs.com/Mr-Sponge/p/16975245.html

相关文章