1.情景展示
在网络上几乎找不到关于debezium-connector-jdbc插件的博客文章,基本上都在吹io.confluent.connect.jdbc.JdbcSinkConnector,由于一开始对数据同步插件并不了解,导致自己走了不少弯路。
生产数据组件:debezium-connector-mysql、debezium-connector-oracle等数据库组件,通过Source Connector完成了将表数据至kafka的推送工作。
消费数据组件:confluentinc-kafka-connect-jdbc、debezium-connector-jdbc等jdbc组件,通过Sink Connector拉取kafka数据推送到数据库当中。
如果你用的是debezium的官方组件来捕获表数据的变更记录的话,千万不要使用confluentinc-kafka-connect-jdbc插件,而应该使用debezium-connector-jdbc插件。
debezium-connector-jdbc插件可以和debezium提供的debezium-connector-mysql、debezium-connector-oracle-2.5.0等数据库组件,几乎实现了数据的无缝对接。
前面我们已经实现了将表数据到kafka的推送,下面来说如何将这些数据从kafka读出来并推送到数据库当中。
2.准备工作
插件下载
这个页面会展示当前debezium的最新版本,一般情况下,我们直接采用最新版就可以了。
以2.5版本为例,进行举例说明:
我们点击“More info”按钮,会跳转到此版本详情页:https://debezium.io/releases/2.5/
首先,映入眼帘的是:运行此插件所需的java版本,kafka版本,以及其所支持的数据库类型、版本号和驱动版本。
往下走,看到的是:Documentation
也就是说明文档,点击“Documentation”按钮,将会跳转到当前版本对应的说明文档页:https://debezium.io/documentation/reference/2.5/
然后找到:Getting Started-->点击"Installation",会跳转到插件安装界面:https://debezium.io/documentation/reference/2.5/install.html
debezium插件列表如下:
jdbc插件:https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/$2.5.0.Final/debezium-connector-jdbc-2.5.0.Final-plugin.tar.gz
插件下载说明:
当你发现插件下载失败的时候,需要检查下载地址当中是否存在$,如果存在将其删掉,才是正确的地址。
如上面的jdbc插件,由于多了一个$,导致下载失败,我们把它去掉再下载就可以了:https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.5.0.Final/debezium-connector-jdbc-2.5.0.Final-plugin.tar.gz
插件用法参数说明
点击不同的数据库,将会跳转到对应的参数说明页。
mysql:https://debezium.io/documentation/reference/2.5/connectors/mysql.html
oracle:https://debezium.io/documentation/reference/2.5/connectors/oracle.html
jdbc:https://debezium.io/documentation/reference/2.5/connectors/jdbc.html
如何下载历史插件版本?
在说明文档页,我们点击切换说明文档的版本号,就能看到历史版本信息。
以2.0进行举例说明
如何下载2.0版的插件呢?
我们点击“2.0”,将会切换到2.0版的说明页:https://debezium.io/documentation/reference/2.0/index.html
我们点击"Mysql Connector plugin archive",将会自动下载debezium-mysql-2.0.1.Final,下载地址为:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.1.Final/debezium-connector-mysql-2.0.1.Final-plugin.tar.gz
插件安装
下载成功后,进行解压。
来到KAFKA_HOME目录下,创建一个plugins目录。
并将刚才解压的插件移到plugins目录下。
最好把版本号也加上。
另外的话,jdbc插件的版本号最好和其余数据库插件的版本号保持一致。
参数说明
说明文档:https://debezium.io/documentation/reference/2.5/connectors/jdbc.html
2.5.0版本常用参数说明
具体的数据结构,下面有。
name属性:代表的是连接器的名称,该名称具有唯一性!(名字随便起,但必须唯一)。
名字最好能让人望文生义,如:debezium-connector-sink-mysql-63-sourceTableName,这一看就知道:
创建的是Sink Connector,用的插件是:debezium-connector-jdbc,源库是mysql以及源表表名。
tasks.max属性:此连接器创建的最大任务数,默认值为1(MySQL 连接器始终使用单个任务,因此不使用此值),数据类型:int。
connector.class属性:Sink Connector的实现类,在这里我们需要填:io.debezium.connector.jdbc.JdbcSinkConnector(它是debezium-connector-jdbc的sink连接器)。
connection.url属性:数据库连接地址,如:jdbc:mysql://192.168.0.1:3306/scott?useUnicode=true&characterEncoding=utf8&allowPublicKeyRetrieval=true&useTimezone=true&serverTimezone=Asia/Shanghai。
connection.username属性:数据库用户名,如:scott。
connection.password属性:数据库密码,如:123456。
topics或topics.regex属性:将要发布的主题的名称前缀,该值具有唯一性(kafka会根据此主题前缀来生成主题名称。消费者需要根据topic名称来订阅数据)。
table.name.format属性:待接收数据的表(目标表)。
field.include.list属性:待同步的表字段。格式:topics.regex:fieldName,多个使用逗号隔开。
说明1:可以设置只同步部分字段(指定几个字段,就同步几个字段),如果不带此参数,将默认自动同步全部字段。
说明2:目标表不存在,如果需要其自动创建的话,必须设置此属性。
说明3:表字段名称区分大小写(必须和源表字段名称一模一样)。
insert.mode属性:插入模式,默认值:insert,可选值:[insert, upsert, update],这里我们需要设为:upsert。
upsert代表的含义是:如果主键不存在,则连接器执行 INSERT 操作;如果主键存在,则连接器执行 UPDATE 操作。
当使用upsert模式时,必须指定primary.key.mode和primary.key.fields。
primary.key.mode属性:主键模式,默认值:none,可选值:[none,kafka,record_key,record_value],这里我们需要设为:record_key。
当其值设为kafka时,属性schema.evolution的值不能为basic。
delete.enabled属性:是否将null记录值视为删除,默认值:false。当为true时,primary.key.mode的值必须指定为:record_key。
这里,我们需要将其设为:true。
primary.key.fields属性:主键字段,多个字段使用逗号分割(也就是:支撑联合主键)。
说明1:主键字段名称区分大小写(必须和源表字段名称一模一样)。
说明2:源表与目标表的主键必须保持一致。
说明3:它的值依赖于属性primary.key.mode(根据源表变更记录捕获的数据存到kafka当中,所以从kafka取数据的时候,它是根据源表记录进行查找的)。
truncate.enabled属性:当出现truncate操作时,是否进行同步(清空表数据),默认值:false。
schema.evolution属性:是否同步表结构更新(当目标表不存在时,会自动创建),默认值:none,可选值:[none, basic]。
none:仅支持DML(insert、update和delete操作);basic:DML+DDL。
说明:如果需要该插件进行自动建表操作,该值必须设为:basic。
errors.log.enable属性:是否显示错误日志,默认值:false。
errors.log.include.messages属性:错误日志是否包含错误信息,默认值:false。
dialect.sqlserver.identity.insert属性:是否允许为SQLSERVER表中的标识列插入显式值,默认值:false。
3.运行
准备工作
启动服务
启动zookeeper,启动kafka,启动kafka connect。
查看所有参数配置(可忽略)
http://localhost:8083/connector-plugins/io.debezium.connector.jdbc.JdbcSinkConnector/config
6.拓展
当目标库目标表不存在时,自动建表
写在最后
哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!
相关推荐:
- 个人主页
- debezium+kafka实现mysql数据同步(debezium-connector-mysql)
- debezium+kafka实现oracle数据同步(debezium-connector-oracle)
- kafka 自定义开发Sink Connector组件(兼容mysql和oracle)
- Kafka Connect用法