首页 > 数据库 >Python——脚本实现datax全量同步mysql到hive

Python——脚本实现datax全量同步mysql到hive

时间:2024-10-22 19:50:02浏览次数:8  
标签:string database Python hive source json 全量 mysql table

文章目录


前言

在我们构建离线数仓时或者迁移数据时,通常选用sqoop和datax等工具进行操作,sqoop和datax各有优点,datax优点也很明显,基于内存,所以速度上很快,那么在进行全量同步时编写json文件是一项很繁琐的事,是否可以编写脚本来把繁琐事来简单化,接下来我将分享这样一个mysql全量同步到hive自动生成json文件的python脚本。


一、展示脚本

# coding=utf-8
import json
import getopt
import os
import sys
import pymysql

# MySQL 相关配置,需根据实际情况作出修改
mysql_host = "XXXXXX"
mysql_port = "XXXX"
mysql_user = "XXX"
mysql_passwd = "XXXXXX"

# HDFS NameNode 相关配置,需根据实际情况作出修改
hdfs_nn_host = "XXXXXX"
hdfs_nn_port = "XXXX"

# 生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/XXX/XXX/XXX"


def get_connection():
    return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return list(map(lambda x: x[0], get_mysql_meta(database, table)))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

二、使用准备

1、安装python环境

这里我安装的是python3环境

sudo yum install -y python3

2、安装EPEL

EPEL(Extra Packages for Enterprise Linux)是一个由 Fedora Special Interest Group 维护的软件仓库,提供了大量在官方 RHEL 或 CentOS 软件仓库中没有的软件包。当你在 CentOS 或 RHEL 系统上需要安装一些不在官方软件仓库中的软件时,通常会先安装epel - release

sudo yum install -y epel-release

3、安装脚本执行需要的第三方模块

pip3 install pymysql
pip3 install cryptography

这里可能由于斑纹问题cryptography安装不上去更新一下pip和setuptools

pip3 install --upgrade pip
pip3 install --upgrade setuptools

重新安装cryptography

pip3 install cryptography

三、脚本使用方法

1、配置脚本

首先根据自己服务器修改脚本相关配置

2、创建.py文件

vim /xxx/xxx/xxx/gen_import_config.py

3、执行脚本

python3 /脚本路径/gen_import_config.py -d 数据库名 -t 表名

4、测试生成json文件是否可用

datax.py -p"-Dtargetdir=/表在hdfs存放路径" /生成的json文件路径

执行时首先要确保targetdir目标地址在hdfs上存在,如果没有需要创建后再次执行

标签:string,database,Python,hive,source,json,全量,mysql,table
From: https://blog.csdn.net/qq_68076599/article/details/143166317

相关文章

  • Python停车场车位识别
    程序示例精选Python停车场车位识别如需安装运行环境或远程调试,见文章底部个人QQ名片,由专业技术人员远程协助!前言这篇博客针对《Python停车场车位识别》编写代码,代码整洁,规则,易读。学习与应用推荐首选。文章目录一、所需工具软件二、使用步骤       1.......
  • python 轻松实现公司内部音视频会议
     一些公司内部会议系统价格比较昂贵,而且经常出现问题,为了保证公司内部数据泄密问题,可以自己开发一个内部视频会议软件。会议窗口如下开发语言        python3.9功能描述:        python实现公司内部音视频会议、收发文件实现代码(简易版)     ......
  • Python学习的自我理解和想法(19)
    #1024程序员节|征文#学的是b站的课程(千锋教育),跟老师写程序,不是自创的代码!今天是学Python的第19天,学的内容是面向对象。开学了,时间不多,写得不多,见谅。目录1.面向对象的三大特性(1).封装(2).继承(3).多态2.继承(1).简单使用(2).有构造函数的继承1.继承父类的构造方法......
  • COP3502 P2: RLE with Images Python
    COP3502P2:RLEwithImagesPythonOverviewInthisprojectstudentswilldeveloproutinestoencodeanddecodedataforimagesusingrun-lengthencodingRLE).Studentswillimplementencodinganddecodingofrawdata,conversionbetweendataandstring......
  • Python基础学习目录
    Python学习目录Python自动化第一周Python自动化第二周Python文件的操作Python函数的进阶Python装饰器Python函数基础Python深浅copyPython迭代器、生成器Python推导式Python内置函数及匿名函数Python递归及二分查找算法Python面向对象(基础篇)Pytho......
  • 004 Python数据类型
    1#int可以将纯整数构成的字符串转换成整型,若包含其它非整数符号则会报错2s='123'3res=int(s)4print(res,type(res))56#s='12.3'7#res=int(s)8#print(res,type(s))910#十进制与其它进制之间的相互转换11#十进制转其它进制12print......
  • [Python] Selenium监控网络请求
      Selenium监控网络有两种方式,第一种使用代理,第二张是使用CDP(ChromeDevToolsProtocol)协议,下面直接进入主题分别介绍如何使用代理和CDP协议监控网络请求。  一、使用Selenium-Wire设置代理拦截处理请求。  Selenium-Wire是基于Selenium开发的抓包工具,基本使用方式如下:fr......
  • python第六章课后习题
    点击查看代码print("学号:2023310143028")点击查看代码defprim(graph,start):num_nodes=len(graph)visited=[False]*num_nodesmin_heap=[(0,start,-1)]mst_cost=0mst_edges=[]whilemin_heap:......
  • Python 数据分析与可视化有什么区别
    在当今的数据驱动时代,Python已成为数据分析和数据可视化的重要工具。尽管这两个领域经常在数据科学项目中相互交织,但它们在功能和目的上存在本质区别。本文旨在详细探讨Python在数据分析和数据可视化方面的差异,包括它们的定义、使用的主要库、应用场景以及在实际项目中的作用。通......
  • python第四章课后习题
    点击查看代码importnumpyasnpimportcvxpyascpx=cp.Variable(6,pos=True)obj=cp.Minimize(x[5])a1=np.array([0.025,0.015,0.055,0.026])a2=np.array([0.05,0.27,0.19,0.185,0.185])a3=np.array([1,1.01,1.02,1.045,1.065])k=0.05;kk=[];qq=[]whil......