首页 > 其他分享 >使用debezium-connector-jdbc组件完成数据同步(io.debezium.connector.jdbc.JdbcSinkConnector)

使用debezium-connector-jdbc组件完成数据同步(io.debezium.connector.jdbc.JdbcSinkConnector)

时间:2024-01-26 19:24:08浏览次数:40  
标签:插件 jdbc connector io 2.5 debezium

1.情景展示

在网络上几乎找不到关于debezium-connector-jdbc插件的博客文章,基本上都在吹io.confluent.connect.jdbc.JdbcSinkConnector,由于一开始对数据同步插件并不了解,导致自己走了不少弯路。

生产数据组件:debezium-connector-mysql、debezium-connector-oracle等数据库组件,通过Source Connector完成了将表数据至kafka的推送工作。

消费数据组件:confluentinc-kafka-connect-jdbc、debezium-connector-jdbc等jdbc组件,通过Sink Connector拉取kafka数据推送到数据库当中。

如果你用的是debezium的官方组件来捕获表数据的变更记录的话,千万不要使用confluentinc-kafka-connect-jdbc插件,而应该使用debezium-connector-jdbc插件。

debezium-connector-jdbc插件可以和debezium提供的debezium-connector-mysql、debezium-connector-oracle-2.5.0等数据库组件,几乎实现了数据的无缝对接。

前面我们已经实现了将表数据到kafka的推送,下面来说如何将这些数据从kafka读出来并推送到数据库当中。

2.准备工作

插件下载

https://debezium.io/releases/

这个页面会展示当前debezium的最新版本,一般情况下,我们直接采用最新版就可以了。

以2.5版本为例,进行举例说明:

我们点击“More info”按钮,会跳转到此版本详情页:https://debezium.io/releases/2.5/

首先,映入眼帘的是:运行此插件所需的java版本,kafka版本,以及其所支持的数据库类型、版本号和驱动版本。

往下走,看到的是:Documentation

也就是说明文档,点击“Documentation”按钮,将会跳转到当前版本对应的说明文档页:https://debezium.io/documentation/reference/2.5/

然后找到:Getting Started-->点击"Installation",会跳转到插件安装界面:https://debezium.io/documentation/reference/2.5/install.html

debezium插件列表如下:

mysql插件:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz

oracle插件:https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.5.0.Final/debezium-connector-oracle-2.5.0.Final-plugin.tar.gz

SQLserver插件:https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.5.0.Final/debezium-connector-sqlserver-2.5.0.Final-plugin.tar.gz

jdbc插件:https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/$2.5.0.Final/debezium-connector-jdbc-2.5.0.Final-plugin.tar.gz

插件下载说明:

当你发现插件下载失败的时候,需要检查下载地址当中是否存在$,如果存在将其删掉,才是正确的地址。

如上面的jdbc插件,由于多了一个$,导致下载失败,我们把它去掉再下载就可以了:https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.5.0.Final/debezium-connector-jdbc-2.5.0.Final-plugin.tar.gz

插件用法参数说明

点击不同的数据库,将会跳转到对应的参数说明页。

mysql:https://debezium.io/documentation/reference/2.5/connectors/mysql.html

oracle:https://debezium.io/documentation/reference/2.5/connectors/oracle.html

jdbc:https://debezium.io/documentation/reference/2.5/connectors/jdbc.html

如何下载历史插件版本?

在说明文档页,我们点击切换说明文档的版本号,就能看到历史版本信息。

以2.0进行举例说明

如何下载2.0版的插件呢?

我们点击“2.0”,将会切换到2.0版的说明页:https://debezium.io/documentation/reference/2.0/index.html

我们点击"Mysql Connector plugin archive",将会自动下载debezium-mysql-2.0.1.Final,下载地址为:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.1.Final/debezium-connector-mysql-2.0.1.Final-plugin.tar.gz

插件安装

下载成功后,进行解压。

来到KAFKA_HOME目录下,创建一个plugins目录。

并将刚才解压的插件移到plugins目录下。

最好把版本号也加上。

另外的话,jdbc插件的版本号最好和其余数据库插件的版本号保持一致。

参数说明

说明文档:https://debezium.io/documentation/reference/2.5/connectors/jdbc.html

2.5.0版本常用参数说明

具体的数据结构,下面有。 

name属性:代表的是连接器的名称,该名称具有唯一性!(名字随便起,但必须唯一)。

名字最好能让人望文生义,如:debezium-connector-sink-mysql-63-sourceTableName,这一看就知道:

创建的是Sink Connector,用的插件是:debezium-connector-jdbc,源库是mysql以及源表表名。

tasks.max属性:此连接器创建的最大任务数,默认值为1(MySQL 连接器始终使用单个任务,因此不使用此值),数据类型:int。

connector.class属性:Sink Connector的实现类,在这里我们需要填:io.debezium.connector.jdbc.JdbcSinkConnector(它是debezium-connector-jdbc的sink连接器)。

connection.url属性:数据库连接地址,如:jdbc:mysql://192.168.0.1:3306/scott?useUnicode=true&characterEncoding=utf8&allowPublicKeyRetrieval=true&useTimezone=true&serverTimezone=Asia/Shanghai。

connection.username属性:数据库用户名,如:scott。

connection.password属性:数据库密码,如:123456。 

topics或topics.regex属性:将要发布的主题的名称前缀,该值具有唯一性(kafka会根据此主题前缀来生成主题名称。消费者需要根据topic名称来订阅数据)。

table.name.format属性:待接收数据的表(目标表)。

field.include.list属性:待同步的表字段。格式:topics.regex:fieldName,多个使用逗号隔开。

说明1:可以设置只同步部分字段(指定几个字段,就同步几个字段),如果不带此参数,将默认自动同步全部字段。

说明2:目标表不存在,如果需要其自动创建的话,必须设置此属性。

说明3:表字段名称区分大小写(必须和源表字段名称一模一样)。

insert.mode属性:插入模式,默认值:insert,可选值:[insert, upsert, update],这里我们需要设为:upsert。

upsert代表的含义是:如果主键不存在,则连接器执行 INSERT 操作;如果主键存在,则连接器执行 UPDATE 操作。

当使用upsert模式时,必须指定primary.key.mode和primary.key.fields。

primary.key.mode属性:主键模式,默认值:none,可选值:[none,kafka,record_key,record_value],这里我们需要设为:record_key。

当其值设为kafka时,属性schema.evolution的值不能为basic。

delete.enabled属性:是否将null记录值视为删除,默认值:false。当为true时,primary.key.mode的值必须指定为:record_key。

这里,我们需要将其设为:true。

primary.key.fields属性:主键字段,多个字段使用逗号分割(也就是:支撑联合主键)。

说明1:主键字段名称区分大小写(必须和源表字段名称一模一样)。

说明2:源表与目标表的主键必须保持一致。

说明3:它的值依赖于属性primary.key.mode(根据源表变更记录捕获的数据存到kafka当中,所以从kafka取数据的时候,它是根据源表记录进行查找的)。

truncate.enabled属性:当出现truncate操作时,是否进行同步(清空表数据),默认值:false。

schema.evolution属性:是否同步表结构更新(当目标表不存在时,会自动创建),默认值:none,可选值:[none, basic]。

none:仅支持DML(insert、update和delete操作);basic:DML+DDL。

说明:如果需要该插件进行自动建表操作,该值必须设为:basic。

errors.log.enable属性:是否显示错误日志,默认值:false。

errors.log.include.messages属性:错误日志是否包含错误信息,默认值:false。

dialect.sqlserver.identity.insert属性:是否允许为SQLSERVER表中的标识列插入显式值,默认值:false。

3.运行

准备工作

启动服务

启动zookeeper,启动kafka,启动kafka connect。

查看所有参数配置(可忽略)

http://localhost:8083/connector-plugins/io.debezium.connector.jdbc.JdbcSinkConnector/config

 

 

 

 

6.拓展

当目标库目标表不存在时,自动建表

 

 

写在最后

  哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!

 相关推荐:

标签:插件,jdbc,connector,io,2.5,debezium
From: https://www.cnblogs.com/Marydon20170307/p/17990522

相关文章

  • The artifact mysql:mysql-connector-java:jar:8.0.33 has been relocated to com.mys
    Theartifactmysql:mysql-connector-java:jar:8.0.33hasbeenrelocatedtocom.mysql:mysql-connector-j:jar:8.0.33:MySQLConnector/Jartifactsmovedtoreverse-DNScompliantMaven2+coordinates.1.异常信息Theartifactmysql:mysql-connector-java:jar:8.0.33hasb......
  • AngusTester 和 JMeter JDBC 测试结果比较
    本次测试主要目的是对比AngusTester和JMeter对JDBC协议性能表现。测试环境以下测试AngusTester和JMeter使用相同环境。注意:本次测试是在同一台PC机上进行,如果想测试特定配置下数据库的准确性能,需要将测试机和MySQL服务器分开部署,或者使用更高配置的测试服务器。软......
  • Spring的JdbcTemplate使用教程
    什么是JdbcTemplate?Spring框架对JDBC进行封装,使用JdbcTemplate方便实现对数据库操作。准备工作引入jdbcTemplate的相关依赖:案例实操创建jdbc.properties文件,配置数据库信息jdbc.driver=com.mysql.cj.jdbc.Driverjdbc.url=jdbc:mysql://localhost:3306/dbtest1?serv......
  • 从SQL到Java数据类型映射的JDBC规范
    SQL类型Java类型CHARjava.lang.StringVARCHARjava.lang.StringLONGVARCHARjava.lang.StringNUMERICjava.math.BigDecimalDECIMALjava.math.BigDecimalBITbooleanTINYINTbyteSMALLINTshortINTEGERint......
  • elasticsearch学习笔记2 - logstash jdbc 同步MYSQL到ES
    logstash是一个管道工具input输入output输出filter过滤条件咋们初级先了解到这些再说比较优秀的教程文档作为基础知识需要了解:https://blog.csdn.net/qq_19283249/article/details/130839158?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170564710516800215......
  • ShardingSphere-JDBC学习
    springBoot 引入maven<dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.0.0-RC1<......
  • jmeter使用jdbc连接SQL server,执行SQL报错处理
    前置环境参数:jdk-8u391-windows-x64,驱动:sqljdbc4.jar备注:这是解决后的截图,将就用问题一:使用jmeter5.5,使用jdbc连接SQLserver,执行SQL报错处理,如下图 报错信息:java.lang.UnsupportedClassVersionError:com/microsoft/sqlserver/jdbc/SQLServerDriverhasbeencompiledby......
  • spring boot 3.2.1 dremio jdbc jprofiler 集成
    jprofiler可以直接与idea集成,对于分析一些实际需要debug但是不好复现的问题还是比较方便的,以下是一个简单的与dremio集成的,springboot使用了3.2(jdk需要17)同时也会包含一些启动说明安装idea插件直接plugins的市场中搜索安装就可以了,之后就是配置了idea启动配置因......
  • dremio jdbc 访问最好使用链接池工具
    昨天在碰到一个dremiojdbc比较奇怪的问题,按照了标准的jdbc操作(建立链接,创建Statement,处理数据,关闭Statement,关闭连接)当进行多次执行(多次建立连接操作)发现dremio有一个操作异常,造成数据表创建有问题(事务不完整)参考代码 privatestaticvoidv3(){tr......
  • Spring-jdbc
    1引入相关依赖<dependencies><!--springjdbcSpring持久化层支持jar包--><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><versi......