1.情景展示
在企业当中,往往会存在不同数据库之间的表的数据需要保持一致的情况(数据同步)。
如何将A库a表的数据同步至B库a表当中呢?(包含:新增、修改和删除)
往往不仅仅需要保持数据的一致性,还要保证数据的即时性,即:A库a表的数据发生变化后,B库a表也能立刻同步变化。
实时保持两表数据的一致性。
如何实现?
2.具体分析
要想及时知晓A库a表的数据变化,我们需要读取数据库的操作日志,从日志当中提取a表的操作日志。(至少要包含:新增、修改和删除)。
以mysql为例,就是读取mysql的binlog日志,而读取日志提取a表操作记录,我们可以通过组件来完成,无需自己手动解析日志。
3.解决方案
debezium-connector-mysql插件,可以很好的完成这个任务。
Debezium是一个开源的数据库事件捕捉和发布平台,旨在提供可靠的实时数据流。它基于分布式日志(如Apache Kafka)来捕获并传输数据库的变更事件,从而实现高效的数据同步和分发。通过使用Debezium的mysql连接器,可以轻松地将mysql数据库中的DML操作(包括INSERT、UPDATE、DELETE)的变更事件提取出来,并以实时的方式推送到Kafka消息队列中。
由于debezium-connector-mysql插件是结合kafka来实现的,我们自然需要用到kafka。
debezium-connector-mysql插件最终实现的效果是:监听mysql库的binlog日志,实时捕获myql数据变更记录,并将变化的数据发布到kafka的主题当中。
在开始之前,我们需要先了解一下kafka connect,通过它我们可以将其它系统与kakfa进行连接,完成主题的发布与订阅。
具体就是:通过该服务,我们可以使用REST API的方式调用kafka服务器来完成消息的发布与订阅。
更多关于kafka connect的用法,见文末推荐。
使用debezium官方提供的source connector,部署到apache kafka connect中,debezium的connector捕获到源数据库数据更新,发送到kafka集群中。
插件下载
地址:https://www.confluent.io/hub/debezium/debezium-connector-mysql
插件源码地址:https://github.com/debezium/debezium
插件安装
下载成功后,进行解压。
来到KAFKA_HOME目录下,创建一个plugins目录。
并将刚才解压的插件移到plugins目录下。
由于名称里面带有两个debezium,可以去掉一个。
参数说明
或者:https://debezium.io/documentation/reference/2.2/connectors/mysql.html
这里我只讲自己用的的参数,其余参数说明及用法,可通过上述说明文档自行查阅。
具体的数据结构,下面有。
name属性:代表的是连接器的名称,该名称具有唯一性!(名字随便起,但必须唯一)。
名字最好能让人望文生义,如:debezium-connector-source-mysql-63,这一看就知道:创建的是Source Connector,用的插件是:debezium-connector-mysql。
connector.class属性:Source Connector的实现类,在这里我们需要填:io.debezium.connector.mysql.MySqlConnector(它是debezium-connector-mysql的源连接器)。
database.hostname属性:数据库服务器所属IP,如:127.0.0.1。
database.port属性:数据库端口号,如:1521。
database.user属性:数据库用户名,如:scott。
database.password属性:数据库密码,如:123456。
database.server.id属性:给数据库设置id,该值具有唯一性,默认取值范围:[5400,6400],数据类型:int。
database.include.list属性:要监控的数据库名称列表,可以同时监控多个数据库,但通常情况下,我们只监控一个数据库,如:test。
table.include.list属性:要捕获数据变更记录的表名称列表,可以同时监控多张表。构成:databaseName.tableName,多个使用逗号隔开,如:test.t_patient。
schema.history.internal.store.only.captured.tables.ddl属性:默认值:false,代表的含义是:debezium会将被监控数据库下所有的表的变更记录,都进行捕获。
而实际上,我们只需捕获table.include.list里面设置的表,而不是所有的表。需将此值设为:true。
column.include.list属性:可以设置只捕获表中的部分字段变更记录,可以监控多张表的部分字段变更记录。构成:databaseName.tableName.columnName,多个请使用逗号隔开,如:test.t_patient.create_time,test.t_patient.id。
schema.history.internal.kafka.topic属性:连接器将在其中存储数据库模式历史记录的Kafka主题的全名,如:schema-history-test-63。
schema.history.internal.kafka.bootstrap.servers属性:kafka服务器地址,如:localhost:9092。
inconsistent.schema.handling.mode属性:默认值:fail,表示的是:schema的处理模式。可选值:[fail,error,warn,ignore]。我们选择warn,而不是直接报错,更容易发现问题。
topic.prefix属性:将要发布的主题的名称前缀,该值具有唯一性(kafka会根据此主题前缀来生成主题名称。消费者需要根据topic名称来订阅数据)。
snapshot.mode属性:快照模式,默认值:initial,可选值:[initial,initial_only,when_needed,never,schema_only,schema_only_recovery]。
initial(默认)(初始全量,后续增量):连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。
initial_only(只全量,不增量):连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。
schema_only(只增量,不全量):连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录。
4.运行
准备工作
启动服务
启动zookeeper,启动kafka,启动kafka connect。
如果没有安装kafka或者不知道怎么启动,具体操作见文末推荐。
日志授权
发布主题
接口地址:
http://localhost:8083/connectors
请求数据:
{
"name" : "debezium-connector-source-mysql-63",
"config" : {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.user" : "username",
"database.server.id" : 63,
"schema.history.internal.kafka.bootstrap.servers" : "localhost:9092",
"column.include.list" : "test.t_patient.create_time,test.t_patient.id,test.t_patient.jz_time,test.t_patient.patient_id_no,test.t_patient.patient_name,test.t_patient.patient_sex,test.t_patient.update_time,test.t_patient.zy_time",
"database.port" : "3306",
"schema.history.internal.store.only.captured.tables.ddl" : true,
"inconsistent.schema.handling.mode" : "warn",
"topic.prefix" : "topic-63",
"schema.history.internal.kafka.topic" : "schema-history-63",
"database.hostname" : "192.168.0.1",
"database.password" : "password",
"table.include.list" : "test.t_patient",
"database.include.list" : "test",
"snapshot.mode" : "initial"
}
}
响应数据:
{
"name" : "debezium-connector-source-mysql-63",
"type" : "source",
"config" : {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.user" : "username",
"database.server.id" : "63",
"schema.history.internal.kafka.bootstrap.servers" : "localhost:9092",
"column.include.list" : "test.t_patient.create_time,test.t_patient.id,test.t_patient.jz_time,test.t_patient.patient_id_no,test.t_patient.patient_name,test.t_patient.patient_sex,test.t_patient.update_time,test.t_patient.zy_time",
"database.port" : "3306",
"schema.history.internal.store.only.captured.tables.ddl" : "true",
"inconsistent.schema.handling.mode" : "warn",
"topic.prefix" : "topic-63",
"schema.history.internal.kafka.topic" : "schema-history-63",
"database.hostname" : "192.168.0.142",
"database.password" : "password",
"table.include.list" : "test.t_patient",
"database.include.list" : "test",
"snapshot.mode" : "initial",
"name" : "debezium-connector-source-mysql-63"
},
"tasks" : [{
"connector" : "debezium-connector-source-mysql-63",
"task" : 0
}
]
}
查看已发布的主题
请求地址:
http://localhost:8083/connectors/debezium-connector-source-mysql-63/topics
响应数据:
{"debezium-connector-source-mysql-63":{"topics":["topic-medi_data_cent-63.medi_data_cent.t_patient","topic-medi_data_cent-63"]}}
消费数据
切换到KAFKA_HOME\bin\windows目录下,输入cmd,按Enter键打开cmd窗口。
输入以下命令并执行
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-63.数据库名.t_patient --from-beginning
执行结果如下:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"topic-dbName-63.dbName.t_patient.Envelope","version":1},"payload":{"before":null,"after":{"id":1,"patient_name":"Lily","patient_sex":"女","patient_id_no":"123123123","create_time":1701941356000,"update_time":19698,"jz_time":"2023-12-07T15:29:27Z","zy_time":34173000000},"source":{"version":"2.2.1.Final","connector":"mysql","name":"topic-dbName-63","ts_ms":1703893892000,"snapshot":"first","db":"dbName","sequence":null,"table":"t_patient","server_id":0,"gtid":null,"file":"binlog.003274","pos":140774141,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1703843838705,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"topic-dbName-63.dbName.t_patient.Envelope","version":1},"payload":{"before":null,"after":{"id":2,"patient_name":"anny","patient_sex":"男","patient_id_no":"123456789","create_time":1702027779000,"update_time":19699,"jz_time":"2023-12-08T15:29:50Z","zy_time":34194000000},"source":{"version":"2.2.1.Final","connector":"mysql","name":"topic-dbName-63","ts_ms":1703893892000,"snapshot":"true","db":"dbName","sequence":null,"table":"t_patient","server_id":0,"gtid":null,"file":"binlog.003274","pos":140774141,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1703843838722,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"topic-dbName-63.dbName.t_patient.Envelope","version":1},"payload":{"before":null,"after":{"id":3,"patient_name":"张三丰","patient_sex":"男","patient_id_no":"123987345","create_time":1702027860000,"update_time":19699,"jz_time":"2023-12-08T15:31:11Z","zy_time":34275000000},"source":{"version":"2.2.1.Final","connector":"mysql","name":"topic-dbName-63","ts_ms":1703893892000,"snapshot":"true","db":"dbName","sequence":null,"table":"t_patient","server_id":0,"gtid":null,"file":"binlog.003274","pos":140774141,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1703843838723,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"patient_name"},{"type":"string","optional":true,"field":"patient_sex"},{"type":"string","optional":true,"field":"patient_id_no"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"create_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"update_time"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"jz_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"zy_time"}],"optional":true,"name":"topic-dbName-63.dbName.t_patient.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"topic-dbName-63.dbName.t_patient.Envelope","version":1},"payload":{"before":null,"after":{"id":4,"patient_name":"李丽丽","patient_sex":"女","patient_id_no":"126736790","create_time":1702027934000,"update_time":19699,"jz_time":"2023-12-08T15:32:19Z","zy_time":34341000000},"source":{"version":"2.2.1.Final","connector":"mysql","name":"topic-dbName-63","ts_ms":1703893892000,"snapshot":"last","db":"dbName","sequence":null,"table":"t_patient","server_id":0,"gtid":null,"file":"binlog.003274","pos":140774141,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1703843838723,"transaction":null}}
源库源表数据
尚未写完,节后再更。
写在最后
哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!