首页 > 数据库 >SeaTunnel JDBC DB2 Sink Connector支持的工作原理,快来学习吧!

SeaTunnel JDBC DB2 Sink Connector支持的工作原理,快来学习吧!

时间:2024-04-19 16:23:17浏览次数:24  
标签:DB2 SeaTunnel jdbc 数据库 Connector JDBC SQL db2

file
DB2是IBM的一款关系型数据库管理系统,JDBC DB2 Source Connector是一个用于通过JDBC读取外部数据源数据的连接器。Apache SeaTunnel如何支持JDBC DB2 Sink Connector?请参考本文档。

支持引擎

Spark

Flink

SeaTunnel Zeta

主要功能

使用 Xa 事务 来确保 精确一次性。因此,只支持对支持 Xa 事务 的数据库进行 精确一次性 操作。您可以设置 is_exactly_once=true 来启用它。

描述

通过 JDBC 写入数据。支持批处理模式和流式模式,支持并发写入,支持精确一次性语义(使用 XA 事务保证)。

支持的数据源信息

数据源 支持的版本 驱动程序 URL Maven
DB2 不同的依赖版本有不同的驱动程序 com.ibm.db2.jdbc.app.DB2Driver jdbc:db2://127.0.0.1:50000/dbname 下载

数据库依赖

请下载与 'Maven' 相对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录中

例如,对于 DB2 数据源:cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

file

Sink 选项

名称 类型 必填 默认值 描述
url 字符串 - JDBC 连接的 URL。例如:jdbc:db2://127.0.0.1:50000/dbname
driver 字符串 - 用于连接到远程数据源的 JDBC 类名,如果使用 DB2,则值为 com.ibm.db2.jdbc.app.DB2Driver
user 字符串 - 连接实例的用户名
password 字符串 - 连接实例的密码
query 字符串 - 使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 具有更高的优先级。
database 字符串 - 使用此 databasetable-name 自动生成 SQL,并接收上游输入数据写入数据库。此选项与 query 互斥,并具有更高的优先级。
table 字符串 - 使用数据库和此表名自动生成 SQL,接收上游输入数据写入数据库。此选项与 query 互斥,并具有更高的优先级。
primary_keys 数组 - 此选项用于支持自动生成 SQL 时的 insertdeleteupdate 操作。
support_upsert_by_query_primary_key_exist 布尔 false 根据查询主键是否存在选择使用 INSERT SQL、UPDATE SQL 处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。请注意,此方法性能较低。
connection_check_timeout_sec 整数 30 用于等待验证连接的数据库操作完成的时间(以秒为单位)。
max_retries 整数 0 提交失败(executeBatch)的重试次数。
batch_size 整数 1000 用于批处理写入,当缓冲记录数量达到 batch_size 或时间达到 batch_interval_ms 时,数据将刷新到数据库。
batch_interval_ms 整数 1000 用于批处理写入,当缓冲记录数量达到 batch_size 或时间达到 batch_interval_ms 时,数据将刷新到数据库。
is_exactly_once 布尔 false 是否启用精确一次性语义,将使用 XA 事务。如果启用,需要设置 xa_data_source_class_name
generate_sink_sql 布尔 false 基于要写入的数据库表自动生成 SQL 语句。
xa_data_source_class_name 字符串 - 数据库驱动程序的 XA 数据源类名,例如,DB2 为 com.db2.cj.jdbc.Db2XADataSource。其他数据源请参考附录。
max_commit_attempts 整数 3 事务提交失败的重试次数。
transaction_timeout_sec 整数 -1 事务打开后的超时时间,默认为 -1(永不超时)。请注意,设置超时可能会影响精确一次性语义。
auto_commit 布尔 true 默认启用自动事务提交。
common-options - Sink 插件的通用参数,请参考 Sink Common Options 获取详细信息。

提示

如果未设置 partition_column,则将以单一并发方式运行;如果设置了 partition_column,则根据任务的并发度并行执行。

任务示例

简单示例:

该示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并发送到 JDBC Sink。FakeSource 生成总共 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(整数类型)。最终的目标表是 test_table,在表中也将有 16 行数据。在运行此作业之前,您需要在您的 DB2 中创建数据库 test 和表 test_table。如果您尚未安装和部署 SeaTunnel,请按照 安装 SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 使用 SeaTunnel 引擎快速入门 中的说明运行此作业。

# 定义运行时环境
env {
  # 您可以在这里设置 Flink 配置
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  # 这是一个示例源插件,仅用于测试和演示源插件功能
  FakeSource {
    parallelism = 1
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
  # 如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的源插件列表的信息,
  # 请访问 https://seatunnel.apache.org/docs/category/source-v2
}

transform {
  # 如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的转换插件列表的信息,
  # 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}

生成 Sink SQL

该示例不需要编写复杂的 SQL 语句,您可以配置数据库名称和表名称,以自动生成要插入的语句。

sink {
    jdbc {
        url = "jdbc:db2://127.0.0.1:50000/dbname"
        driver = "com.ibm.db2.jdbc.app.DB2Driver"
        user = "root"
        password = "123456"
        query = "insert into test_table(name,age) values(?,?)"
        }
  #  如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的接收插件列表的信息,
  #  请访问 https://seatunnel.apache.org/docs/category/sink-v2
}

sink {
    jdbc {
        url = "jdbc:db2://127.0.0.1:50000/dbname"
        driver = "com.ibm.db2.jdbc.app.DB2Driver"
        user = "root"
        password = "123456"
        # 根据数据库表名自动生成 SQL 语句
        generate_sink_sql = true
        database = test
        table = test_table
    }
}

精确一次性:

为了确保精确写入场景,我们保证精确一次性。
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"

    max_retries = 0
    user = "root"
    password = "123456"
    query = "insert into test_table(name,age) values(?,?)"

    is_exactly_once = "true"

    xa_data_source_class_name = "com.db2.cj.jdbc.Db2XADataSource"
}

}

本文由 白鲸开源 提供发布支持!

标签:DB2,SeaTunnel,jdbc,数据库,Connector,JDBC,SQL,db2
From: https://www.cnblogs.com/seatunnel/p/18146225

相关文章

  • JDBC数据库汇总Attack研究
    前言针对除Mysql的其它数据库的jdbcattack分析H2RCE介绍H2是一个用Java开发的嵌入式数据库,它本身只是一个类库,即只有一个jar文件,可以直接嵌入到应用项目中。H2主要有如下三个用途:第一个用途,也是最常使用的用途就在于可以同应用程序打包在一起发布,这样可以非常方便地......
  • 当 mysql-connector-java-5 遇上 MySQL8,终究还是错付了 → 门当户对真的很重要!
    开心一刻今天,老婆给我发消息老婆:老公,儿子从隔壁邻居家回来了老婆:是先打还是先洗?我:先洗吧,万一打错人了呢老婆:先洗脸吧,没错就边打边洗起因在我们的固有认知中, mysql-connector-java-5.x.x 连接的是 MySQL5 ,而 mysql-connector-java-8.x.x 连......
  • Spring Data JDBC: 映射无ID列的表
    解决方案在model层中,增加一个ID列,但需要加上@Transient,让其映射时做忽略@Data@Table(name="table_name",schema="you_schema")publicclasstableNameVo{@Id@TransientprivateLongid;@Column("column1")privateStringcolumn......
  • 【安装部署】Apache SeaTunnel 和 Web快速安装详解
    版本说明由于作者目前接触当前最新版本为2.3.4但是官方提供的web版本未1.0.0,不兼容2.3.4,因此这里仍然使用2.3.3版本。可以自定义兼容处理,官方提供了文档:https://mp.weixin.qq.com/s/Al1VmBoOKu2P02sBOTB6DQ因为大部分用户使用SeaTunnelWeb都是基于SeaTunnel-2.3.3版本做的适......
  • java连接ssmsSqlserver数据库 报错信息:com.microsoft.sqlserver.jdbc.SQLServerExce
    解决办法:将官网下载的驱动文件打开,找到如下路径,并复制,粘贴放到jdk的bin目录下......
  • 使用openGauss jdbc 3.0测试国密SM3用户认证
    使用openGaussjdbc3.0测试国密SM3用户认证本文出处:https://www.modb.pro/db/393728openGauss现在支持四种用户认证方式,通过postgresql.conf文件中的参数password_encryption_type确定,认证方式与该参数的对应关系如下表所示:认证方式 参数md5 password_encryption_ty......
  • 宋红康JDBC课程学习记录2
    宋红康JDBC课程学习记录2第3章:使用PreparedStatement实现CRUD操作3.1操作和访问数据库数据库连接被用于向数据库服务器发送命令和SQL语句,并接受数据库服务器返回的结果。其实一个数据库连接就是一个Socket连接。在java.sql包中有3个接口分别定义了对数据库的调用的......
  • java代码将16进制字符串转换为图片,jdbc入库blob字段,解决ORA-01704,PLS-00172,ORA-06550,
    从Oracle导出SQL文件中的insert语句包含blob字段,语句HEXTORAW函数将16进制的字符串入库,由于字符串太长,insert失败下面的代码读取完整的insert语句,将HEXTORAW函数连同16进制的字符串替换为NULL,先将字段置空插入记录,然后使用PreparedStatement对图片文件读流更新入库importorg.......
  • jdbc结合druid连接池访问postgreSQL数据库
    jdbc结合druid连接池访问postgreSQL数据库连接mysql的话也是一个道理,就是把对应的依赖和数据库驱动换一下一.在pom.xml里面加上对应的依赖<!--druid数据源--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring......
  • java连接Oracle(JDBC)
    packagetext.coming;importjava.sql.*;publicclassdemo{publicstaticvoidmain(String[]args){Connectionct=null;Statementstatement=null;try{Class.forName("oracle.jdbc.driver.OracleDriver");......