首页 > 其他分享 >DataX配置文件生成脚本

DataX配置文件生成脚本

时间:2023-12-18 20:48:17浏览次数:35  
标签:脚本 string 配置文件 database get source DataX mysql table

  • 创建文件
cd /opt/software
mkdir gen_import_config.py 
vim gen_import_config.py 
  • gen_import_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "slave1"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "123456"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "slave1"
hdfs_nn_port = "9000"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/software/datax/job/import"

def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)

if __name__ == '__main__':
    main(sys.argv[1:])
  • 安装依赖
pip install MySQL-python
# 查看
pip list

标签:脚本,string,配置文件,database,get,source,DataX,mysql,table
From: https://www.cnblogs.com/dogleftover/p/17912177.html

相关文章

  • 自动化查找并记录含图片文件夹的Python脚本
    功能介绍此Python脚本用于遍历指定的父目录,自动识别并记录所有包含图片文件(如PNG、JPG、GIF等格式)的子文件夹。脚本运行后,将在父目录下生成一个名为“文件夹名统计”的Excel表格,其中列出了所有含有图片的文件夹名称。这对于整理大量分散在不同子文件夹中的图片文件特别有用,尤其是......
  • shell脚本检测ssl证书有效时间
     [ytx@workshell]$catssl_check.sh#!/bin/bash#设置要检查的域名和端口DOMAIN="www.xxx.com"#域名PORT=443#端口#获取SSL证书信息CERT_INFO=$(openssls_client-connect${DOMAIN}:${PORT}-servername${DOMAIN}-showcerts</dev/null2>/dev......
  • Linux开机启动自定义脚本
    方式一:chkconfig命令首先编写好自启的脚本/etc/init.d/test.sh#!/bin/sh#chkconfig:23451090#创建个文件touch/opt/script/1.txt再给脚本添加上可执行权限:chmod+xtest.sh将脚本添加到开机启动项chkconfig--addtest.sh将脚本设置为自启动chkconf......
  • nginx日志切割脚本
    #!/bin/bash#utf-8#description:nginx滚动切割脚本,按照500M进行滚动切割#---------------------------------------------------------------------log_directory="/export/servers/nginx/logs"#日志文件目录max_size=500#日......
  • springboot配置文件的优先级
     1配置文件不同位置优先级不同 文件路径相对目录级别 classpath:application.ymlresources目录最低-程序员classpath:config/application.yml resources目录下的config目录项目经理file:application.ymljar包所在目录下的config目录运维file:co......
  • datax web采集oracle数据库,能连接无法使用的问题
    链接oracle数据库时,要注意自己数据的连接参数,下边代码中的高亮部分作为参考{"job":{"setting":{"speed":{"channel":3,"byte":1048576},"errorLimit":{"record":0......
  • 标题:Python脚本:将Excel文件拆分成多个工作表
    简介:本博客介绍了一个实用的Python脚本,旨在帮助用户处理和重塑Excel数据。这个脚本允许用户将一个包含多列数据的Excel文件拆分成多个新的Excel文件,每个文件包含10列数据。特别适用于需要将大型数据集分解为更小、更易管理的部分的场合。功能特点:用户友好的交互:脚本通过命令行......
  • Nginx配置文件解读
    Nginx安装完毕后,会产生相应的安装目录,根据前面的安装路径,Nginx的配置文件路径为/usr/local/nginx/conf其中nginx.conf为Nginx的主配置文件这里重点介绍下nginx.conf这个配置文件。Nginx配置文件默认有五个部分组成:分别是main、events、http、server和location其中:main部分设......
  • Monkey工具进行自定义脚本测试
    常规Monkey测试执行的是随机的事件流,但如果只是想让Monkey测试某个特定场景这时候就需要用到自定义脚本了,Monkey支持执行用户自定义脚本的测试,用户只需要按照Monkey脚本的规范编写好脚本,存放到手机上,启动Monkey通过-f参数调用脚本即可。一、Monkey脚本API方法LaunchActivity(pkg_n......
  • DataX-Web增量配置
    一、根据日期进行增量数据抽取1.页面任务配置打开菜单任务管理页面,选择添加任务按下图中5个步骤进行配置1.任务类型选DataX任务2.辅助参数选择时间自增3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更......