首页 > 其他分享 >DataX二次开发详解-Kingbasees86Reader、Kingbasees86Writer插件

DataX二次开发详解-Kingbasees86Reader、Kingbasees86Writer插件

时间:2024-04-23 16:58:05浏览次数:32  
标签:插件 plugin Kingbasees86Writer alibaba datax reader 二次开发 kingbasees86reader com

 

一、前提

国产数据库的崛起元年,不得不提人大金仓(Kingbase)、南大通用数据库(Gbase)、达梦数据库(DM)、华为数据库(GaussDB)、阿里数据库(Oceanbase)等,此文章介绍采用datax作为同步人大金仓Kingbase86数据库的工具。目前github上的datax版本功能仅支持Kingbase82系列产品。而项目上如果要用Kingbase86版本作为数据库,所以要对Datax源码进行二次开发,自己构建Kingbasees86Reader和Kingbasees86Writer插件。

二、实施

Kingbase的背景不赘述,同样基于JDBC协议进行远程连接数据库并执行相应的SQL语句将数据从KingbaseES库中SELECT出来,以前玩过Datax工具的同学可以简单把Kingbase理解成Mysql的同步脚本。

2.1 Kingbasees86Reader插件开发

目前Kingbasees86Reader支持大部分KingbaseES类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出Kingbasees86Reader针对KingbaseES类型转换列表:

DataX内部类型 KingbaseES数据类型
Long bigint, bigserial, integer, smallint, serial
Double double precision, money, numeric, real
String varchar, char, text, bit, inet
Date date, time, timestamp
Boolean bool
Bytes bytea

 

2.1.1 配置样例

下面是一个从KingbaseES数据库中同步抽取数据到本地作业的展示脚本

{
    "job": {
        "setting": {
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
                 "byte": 1048576
            },
            //出错限制
                "errorLimit": {
                //出错的record条数上限,当大于该值即报错。
                "record": 0,
                //出错的record百分比上限 1.0表示100%,0.02表示2%
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "kingbasees86reader",
                    "parameter": {
                        // 数据库连接用户名
                        "username": "xx",
                        // 数据库连接密码
                        "password": "xx",
                        "column": [
                            "id","name"
                        ],
                        //切分主键
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:kingbase86://host:port/database"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    //writer类型
                    "name": "streamwriter",
                    //是否打印内容
                    "parameter": {
                        "print":true,
                    }
                }
            }
        ]
    }
}

  

纯净版(验证可执行通过)

{
    "job": {
        "setting": {
            "speed": {
                 "byte": 1048576
            },
                "errorLimit": {
                 "record": 0,
          "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "kingbasees86reader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": ["id","name"],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "t1"
                                ],
                                "jdbcUrl": [
                                    "jdbc:kingbase8://192.168.12.104:54321/test"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

  

配置一个自定义SQL的数据库同步任务到本地内容的作业:

{
    "job": {
        "setting": {
            "speed": 1048576
        },
        "content": [
            {
                "reader": {
                    "name": "kingbasees86reader",
                    "parameter": {
                        "username": "xx",
                        "password": "xx",
                        "where": "",
                        "connection": [
                            {
                                "querySql": [
                                    "select db_id,on_line_flag from db_info where db_id < 10;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:kingbase86://host:port/database", "jdbc:kingbase86://host:port/database"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}

  

 2.1.2 代码实现

代码架构

 

 代码-package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
                <include>plugin_job_template.json</include>
            </includes>
            <outputDirectory>plugin/reader/kingbasees86reader</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>kingbasees86reader-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/reader/kingbasees86reader</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>src/main/libs</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>plugin/reader/kingbasees86reader/libs</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/reader/kingbasees86reader/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

  

代码-Constant

package com.alibaba.datax.plugin.reader.kingbasees86reader;

public class Constant {

    public static final int DEFAULT_FETCH_SIZE = 1000;

}

  

代码-Kingbasees86Reader

package com.alibaba.datax.plugin.reader.kingbasees86reader;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;

import java.util.List;

public class Kingbasees86Reader extends Reader {

    private static final DataBaseType DATABASE_TYPE = DataBaseType.KingbaseES86;

    public static class Job extends Reader.Job {

        private Configuration originalConfig;
        private CommonRdbmsReader.Job commonRdbmsReaderMaster;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            int fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
                    Constant.DEFAULT_FETCH_SIZE);
            if (fetchSize < 1) {
                throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
                        String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize));
            }
            this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize);

            this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE);
            this.commonRdbmsReaderMaster.init(this.originalConfig);
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderMaster.split(this.originalConfig, adviceNumber);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderMaster.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderMaster.destroy(this.originalConfig);
        }

    }

    public static class Task extends Reader.Task {

        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderSlave;

        @Override
        public void init() {
            this.readerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId());
            this.commonRdbmsReaderSlave.init(this.readerSliceConfig);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);

            this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderSlave.post(this.readerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig);
        }

    }

}

  

代码-plugin.json

{
  "name": "kingbasees86reader",
  "class": "com.alibaba.datax.plugin.reader.kingbasees86reader.Kingbasees86Reader",
  "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
  "developer": "alibaba"
}

  

代码-plugin_job_template.json

{
  "name": "kingbasees86reader",
  "parameter": {
    "username": "",
    "password": "",
    "connection": [
      {
        "table": [],
        "jdbcUrl": []
      }
    ]
  }
}

  

注意1:在根目录的package.xml文件下添加

		<fileSet>
            <directory>kingbasees86reader/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>

  

注意2:在根目录的pom.xml文件下添加

        <module>kingbasees86reader</module>

  

注意3:在DataBaseType中注册Reader信息

 

 

2.1.3 打包上传

可以在根目录下注释掉不需要的module,加速打包过程。

 将下面的几个文件复制到Kingbase安装目录下对应的plugin文件夹下

 

 2.1.4 KingbaseES创建测试表

注意:需要先启动kingbase Server服务以及检查防火墙是否关闭

启动Kingbase Server服务

cd /opt/Kingbase/ES/V8/Server/bin

./sys_ctl start -D /opt/Kingbase/ES/V8/data

 

 

2.1.5 执行DataX同步脚本进行测试

 2.1.6 可能遇到的问题

Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数

 解决办法:

进入DataX安装目录,修改文件datax/conf/core.json

 修改core -> transport -> channel -> speed -> “byte”: 2000000,将单个channel的大小改为2MB即可。

 

 

 

 

 

相关文档

GitHUb:https://github.com/alibaba/DataX/tree/master

GitEE:https://gitee.com/mirrors/DataX/tree/master

阿里云Maven仓库:https://developer.aliyun.com/mvn/search

标签:插件,plugin,Kingbasees86Writer,alibaba,datax,reader,二次开发,kingbasees86reader,com
From: https://www.cnblogs.com/lxzcloud/p/18152829

相关文章

  • Keepass安装使用方法(包含浏览器插件使用方法)
    相关后续阅读:Keepass调用Xshell、SecureCRT、RDP、Putty的方法(一劳永逸版)安装方法:1、安装KeePass-2.56-Setup.exe,选择语言——English2、一路默认后,安装到默认路径:C:\ProgramFiles\KeePassPasswordSafe23、将语言包Chinese_Simplified.lngx解压拷贝到C:\ProgramFiles\Ke......
  • springboot项目找不到符号问题以及模块聚合项目maven插件使用的相关问题 问题如图
    参考:https://www.cnblogs.com/coderxiaobai/p/15005181.html问题:更换maven,清空缓存重新导入依赖依然无效后(1)解决方法:方式一:删除项目中.idea文件夹,重新打开项目,选中jdk版本,重新导入依赖即可。(2)如果不是上述的原因可能是项目是模块聚合项目,原因就是父工程的pom中存在maven插......
  • uniapp使用z-paging插件
    1.通过dcloud插件市场下载,导入Hbuiderx,参考官网:https://z-paging.zxlee.cn/start/install.html#%E9%80%9A%E8%BF%87%E6%8F%92%E4%BB%B6%E5%B8%82%E5%9C%BA%E5%AE%89%E8%A3%852.通过npm下载(uniapp小程序项目发现通过npm下载方式,主包体积比方式1小,所以使用)npminstallz-pa......
  • 这10款VS Code神仙插件,嵌入式程序员必备
    大家好,我是知微!嵌入式软件开发工程师平时可能更多的是使用SourceInsight、Keil、IAR来阅读代码,写代码。VSCode大家都听说过,功能十分强大,而且免费!或许是因为这款软件上手有一定的学习成本,所以有些小伙伴也不想轻易去尝试。知微在这里强烈建议大家去试一试,VSCode提供十分丰富的......
  • 国产良心软件uTools+常用插件
    合集-开发工具(8) 1.Weblogic11g安装部署-winserver篇2023-05-072.给你安利一款国产良心软件uTools2023-05-133.gitee图床不能用了,心态崩了2023-05-164.windows环境下如何优雅搭建ftp服务?2023-05-175.IntelliJIDEA上手这一篇就够了,从入门到上瘾2023-05-226.继copilot之......
  • Java集成系列:高效构建自定义插件
    前言随着软件开发的快速发展和需求的不断增长,开发人员面临着更多的压力和挑战。传统的开发方法需要花费大量的时间和精力,而低代码开发平台的出现为开发人员提供了一种更加高效、快速的开发方式。今天小编就以构建命令插件为例,展示如何使用Java语言高效构建自定义插件。环境准备......
  • 2.5K star 一款插件化&易拓展的即时聊天(IM)平台
    简介Tailchat基于HTML设计,适合任何平台或操作系统,但它仍然无法在Web中提供一些原生支持。所以Tailchat也有客户端提供手机通知、桌面截图等操作系统支持。功能消息支持基本信息,支持如文本/链接/提及/图像/文件等多种信息类型,并支持对任何你想要的信息的附加反应。你......
  • 性能测试——性能测试-linux监控工具——Jmeter插件之ServerAgent服务器性能监控工具
    安装插件1、在Jmeter官网:https://jmeter-plugins.org/wiki/PluginsManager/下载插件管理器Plugins-manager.jar  参考博客地址:https://blog.csdn.net/qq_45664055/article/details/105979481              需要先安装java,设置环境变量: ......
  • mysql连接控制插件connection_control介绍
    原文链接:https://blog.csdn.net/yabingshi_tech/article/details/132718295前言:当连接数据库失败次数过多时,MySQL是否会限制登录呢?数据库服务端应该怎么应对暴力破解呢?本篇文章介绍下MySQL中的连接控制插件,一起来学习下此插件的作用。1.连接控制(connection_control)插件介绍......
  • 致远OA及相关OA系统集成与二次开发
    发现一个名为台部落的繁体字网站,一直采集我的个人博客及个人网站的网站,并且在他自己的网站上面创建了一个跟我名称的账户,并把文章标记为“原創”且没有标注原文链接。实在是无耻至极。在此作出声明,本人的文章的发布地址为lrach.com(龙渊个人博客),发布后一段时间会在博客园发布作为......