首页 > 其他分享 >数据抽取平台pydatax介绍--实现和项目使用

数据抽取平台pydatax介绍--实现和项目使用

时间:2024-02-27 10:12:43浏览次数:16  
标签:src 抽取 name -- datax org table pydatax columns

   数据抽取平台pydatax实现过程中,有2个关键点:

    1、是否能在python3中调用执行datax任务,自己测试了一下可以,代码如下:
    这个str1就是配置的shell文件     

try:
   result = os.popen(str1).read()
except Exception as e:
  print(e)

  2、是否能获取datax执行后的信息:用来捕获执行的情况和错误信息

         上面执行后的result就包含了datax的执行信息,对信息进行筛选,就可以获得

   pydatax的表设计 

        在上面的2个关键点解决后,其他问题就比较简单,设计相关的表:

datax_config   datax抽取表的模板配置(源表名,目标表名,模板id,抽取的字段,抽取条件(增量,全量,特殊),抽取时间,执行顺序等)
datax_config_repair   datax的出错修复表,结构和datax_config一样,用于datax出错后,修复数据用
datax_etl_error    datax的etl的报错信息(非异常字符的报错)
datax_json   datax的模板id配置(全量和增量2个模板文件名)
datax_log   datax运行抽取表的执行信息(是否执行完成,抽取行数,速度,读出行数,流量等)
datax_row_error  datax执行中,字段有异常字符的报错信息

 pydatax在项目中使用

       项目1: 直接配置datax的模板json,从oracle 11g抽取到postgresql中,

                     因postgresql中会对"0x"这些异常字符报错,如oracle中字段有这样字段,必须在抽取字段使用:

                    使用 replace(name,chr(0),'\'\'') as name 来代替 以前的字段 name

       项目2: 客户有9个分公司,用的ERP有9套,有9个库,不同版本,抽取的同一个表字段长度有不一样,字段可能有多有少,客户ERP核心分公司ERP几个月后有大版本升级。

                     因项目2中:数据仓库使用的GreePlum,datax的驱动用的是gpdbwriter-v1.0.4-hashdata.jar,该驱动自动删除"0x"非法字符,就不存在该错误

                     不可能写9个抽取json模板,再抽取,只能原有json模板上修改

                     字段长度不同: 取9个库的最大值,作为目标表字段的字段长度

                     字段个数不同:   取其一个核心分公司库表为基础建表,其他8个库表,如果有就保留,没有就字段数据为NULL,每次执行查询取出8个库的字段:                         

# 获取分公司库该表的字段,如对比核心库表字段的缺失,使用null as 字段替换,如果多余则废弃,
# 字段对比以核心库为标准
def get_org_src_columns(src_columns,org_name,tab_name):
    src_columns = src_columns
    # 分公司字段
    org_cols = get_org_cols(org_name,tab_name)
    lst = src_columns.split(",")
    cols1 = (org_cols + ',')
    src_columns1 = (src_columns + ',')
    for i in lst:
      str1 =i.strip() + ','  # 去掉空格,对比使用,字段名+',',这样避免有重复前缀的字段名,导致误判
      if (cols1.find(str(','+str1)) == -1):
        src_columns1 = src_columns1.replace(str(','+str1), ',NULL as ' + str1)
    return src_columns1.rstrip(',')

# 获取分公司库的表的字段用','合并成一个字符串
def get_org_cols(org_name,tab_name):
    conn = ora_conn()
    cur = conn.cursor()
    cols=""
    sql="select WM_CONCAT(COLUMN_NAME) cols from (SELECT  COLUMN_NAME FROM  all_tab_columns WHERE OWNER=upper('"+org_name+"') " \
        "and  table_name =upper('"+tab_name+"') order by COLUMN_ID asc) t ";
    cur.execute(sql)
    datas = cur.fetchall()
    for row in datas:
      cols= str(row[0])
    return cols;

       修改json模板支持同时抽取9个数据库,修改的9个库同时抽取oracle数据到greeplum全量json模板,见下载文件的:oracle_gp_table_df_job.json:  

    src_table_columns=row.get("src_table_column")
    # 其他8家分公司库
    src_table_columns_fz=get_org_src_columns(src_table_columns,"FZ",src_table_name)
    src_table_columns_jcg=get_org_src_columns(src_table_columns,"JCG",src_table_name)
    src_table_columns_ks=get_org_src_columns(src_table_columns,"KS",src_table_name)
    src_table_columns_qzdf=get_org_src_columns(src_table_columns,"QZDF",src_table_name)
    src_table_columns_sdsht=get_org_src_columns(src_table_columns,"SDSHT",src_table_name)
    src_table_columns_wfjx=get_org_src_columns(src_table_columns,"WFJX",src_table_name)
    src_table_columns_wst=get_org_src_columns(src_table_columns,"WST",src_table_name)
    src_table_columns_std=get_org_src_columns(src_table_columns,"STD",src_table_name)


    str1 = "/usr/bin/python /opt/module/datax/bin/datax.py /opt/module/datax/job/json/"+etl_mode+" -p  \" " \
           " -Dsrc_table_name='"+src_table_name+"' " \
           " -Ddes_table_name='"+des_table_name+"' " \
           " -Dsplit_pk_field='"+split_pk_field+"'   " \
           " -Drelation='"+relation+"' " \
           " -Dcondition='"+dcondition+"' " \
           " -Dsrc_table_columns='"+src_table_columns+"' " \
           " -Dsrc_table_columns_fz='" + src_table_columns_fz + "' " \
           " -Dsrc_table_columns_jcg='" + src_table_columns_jcg + "' " \
           " -Dsrc_table_columns_ks='" + src_table_columns_ks + "' " \
           " -Dsrc_table_columns_qzdf='" + src_table_columns_qzdf + "' " \
           " -Dsrc_table_columns_sdsht='" + src_table_columns_sdsht + "' " \
           " -Dsrc_table_columns_wfjx='" + src_table_columns_wfjx + "' " \
           " -Dsrc_table_columns_wst='" + src_table_columns_wst + "' " \
           " -Dsrc_table_columns_std='" + src_table_columns_std + "' " \
           " -Ddes_table_columns='"+des_table_columns+"' \" "

      这样修改后,就可以同时抽取9个库的数据,同时配置时,只需要配置核心库的相关字段等数据即可!  

      说明: 1,该平台没有可视化页面的后台管理系统,如果加上后台管理系统,就更完美,但目前是足够使用的!

      DATAX的GreePlum驱动plugin下载:  

                  https://files.cnblogs.com/files/zping/gpdbwriter.rar?t=1708999240&download=true

     pydatax源码下载地址:

                 https://files.cnblogs.com/files/zping/pydatax.rar?t=1708755764&download=true

 

标签:src,抽取,name,--,datax,org,table,pydatax,columns
From: https://www.cnblogs.com/zping/p/18008554

相关文章

  • Leetcode 53. 最大子数组和
    题目给你一个整数数组nums,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。子数组是数组中的一个连续部分。输入输出样例输入:nums=[-2,1,-3,4,-1,2,1,-5,4]输出:6解释:连续子数组[4,-1,2,1]的和最大,为6。输入:nums=[1]输出:1输入:nums......
  • C#使用sapnoc类库SAPRFC接口调用
     使用NuGet引用sapnoc程序包来进行RFC接口的连接,类库框架为framework,暂不支持.netCore框架,会提示冲突。    编写SapRfcHelper帮助类供接口的连接调用using SAP.Middleware.Connector;using System;using System.Collections.Generic;using ......
  • 关于单片机的地址总线和数据总线
    1.一般来说内存容量是指地址总线还是数据总线单片机的容量通常指2.单片机所说的8位,16位,32位指的是什么,是地址总线的长度还是数据总线在单片机中,通常所说的"8位","16位","32位"指的是数据总线的宽度,即一次可以传输的数据位数。这决定了单片机一次可以处理的数据量大小。例如,一个......
  • abc305_f (构造实现)
    首先考虑正常的怎么做,就是一遍dfs,是\(O(n)\)的,然而这题在到达每一个点时都要输出它的下一个点才能到达下一个点,同时看到这个\(2n\)觉得不对劲,自然想到走过去走回来,耗2n代码实现还是有点东西的,走过去好搞,但走回来时怎么办呢。我们想到dfs是一个栈,所以在要退出时输出就可以了#inc......
  • ABC283E (dp思想)
    难度1这题看到一点思路也没有,但是看到最小操作数想到二分,dp和贪心,二分答案的check显然不会,贪心不会。发现对于一行,前面的\(i-2\)是不会影响的,这一行也不会影响后面的\(i+2\)行,所以是dp。考虑如何设计状态因为\(i-1\)和\(i+1\)行都会影响,所以设计出来一个dp[i][0/1][0/1][0/1]的东......
  • python3的json数据库-TinyDB
    无意间看到TinyDB这个词汇,就去查了一下,就发现了它的官方网站这里然后就是按照他说的步骤去做。第1步安装  pip3installtinydb 安装成功后,创建一个文件名字叫做 test.py,输入下面的代码:fromtinydbimportTinyDB,Query#创建数据库对象db=TinyDB('db.json')#......
  • 多线程系列(十) -ReadWriteLock用法详解
    一、摘要在上篇文章中,我们讲到ReentrantLock可以保证了只有一个线程能执行加锁的代码。但是有些时候,这种保护显的有点过头,比如下面这个方法,它仅仅就是只读取数据,不修改数据,它实际上允许多个线程同时调用的。publicclassCounter{privatefinalLocklock=newReentra......
  • 机器学习策略篇:详解单一数字评估指标(Single number evaluation metric)
    单一数字评估指标无论是调整超参数,或者是尝试不同的学习算法,或者在搭建机器学习系统时尝试不同手段,会发现,如果有一个单实数评估指标,进展会快得多,它可以快速告诉,新尝试的手段比之前的手段好还是差。所以当团队开始进行机器学习项目时,经常推荐他们为问题设置一个单实数评估指标。......
  • P10156(dp思想)
    难度2也是比较有意思的一道题。首先发现每个小团体独立,所以对于每个小团体分开直接暴力dp,dp[i][j]表示当前小团体做到第i人,走了j人,然后O(n)转移,加上部分分喜提50pts。为什么要O(n)转移呢,因为我要枚举匹配的两个人然后算贡献。但是对于这种带绝对值的贡献,我们一般都要把绝对值拆掉......
  • 【13.0】JavaScript之流程控制
    【一】if判断【1】语法//if-elseif(条件){条件成立执行的代码块}else{条件不成立时执行的代码块}//if-elseif-elseif(条件){条件成立执行的代码块}elseif(条件){条件成立执行的代码块}else{条件不成立时执行的代码块}//()条件{}执行的代码块【2】if~e......