首页 > 数据库 >debezium+kafka实现sqlserver数据同步(debezium-connector-sqlserver)

debezium+kafka实现sqlserver数据同步(debezium-connector-sqlserver)

时间:2024-06-17 16:54:28浏览次数:14  
标签:optional sqlserver kafka connector field type debezium

SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE N'SQL Server Agent (%'

1.情景展示

在企业当中,往往会存在不同数据库之间的表的数据需要保持一致的情况(数据同步)。

如何将A库a表的数据同步至B库a表当中呢?(包含:新增、修改和删除)

往往不仅仅需要保持数据的一致性,还要保证数据的即时性,即:A库a表的数据发生变化后,B库a表也能立刻同步变化。

实时保持两表数据的一致性。

如何实现?

2.具体分析

要想及时知晓A库a表的数据变化,我们需要读取数据库的操作日志,从日志当中提取a表的操作日志。(至少要包含:新增、修改和删除)。

以sqlserver为例,就是读取sqlserver的变更日志,而读取日志提取a表操作记录,我们可以通过组件来完成,无需自己手动解析日志。

3.解决方案

debezium-connector-sqlserver插件,可以很好的完成这个任务。

Debezium是一个开源的数据库事件捕捉和发布平台,旨在提供可靠的实时数据流。它基于分布式日志(如Apache Kafka)来捕获并传输数据库的变更事件,从而实现高效的数据同步和分发。通过使用Debezium的sqlserver连接器,可以轻松地将sqlserver数据库中的DML操作(包括INSERT、UPDATE、DELETE)的变更事件提取出来,并以实时的方式推送到Kafka消息队列中。

由于debezium-connector-sqlserver插件是结合kafka来实现的,我们自然需要用到kafka。

debezium-connector-sqlserver插件最终实现的效果是:监听sqlserver的变更日志,实时捕获sqlserver数据变更记录,并将变化的数据发布到kafka的主题当中。

Debezium的sqlserver连接器捕获并记录sqlserver服务器上数据库中发生的行级更改,还包括在连接器运行时添加的表。可以通过Connector配置,为指定的schema和表获取更改事件,或者忽略、屏蔽或截断特定列中的值等过滤操作。

Debezium通过使用本地LogMiner数据库包或XStream API从sqlserver获取更改事件。

在开始之前,我们需要先了解一下kafka connect,通过它我们可以将其它系统与kakfa进行连接,完成主题的发布与订阅。

具体就是:通过该服务,我们可以使用REST API的方式调用kafka服务器来完成消息的发布与订阅。

更多关于kafka connect的用法,见文末推荐。

使用debezium官方提供的source connector,部署到apache kafka connect中,debezium的connector捕获到源数据库数据更新,发送到kafka集群中。

插件下载

地址:https://debezium.io/releases/

点击debezium的最新版本(More info)。

支持的版本号说明:

点击“installation guide”。

2.5.0.Final版本地址:https://debezium.io/documentation/reference/2.5/install.html

connector插件列表展示如下。

mysql connector 2.5.0版插件地址:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz

oracle connector 2.5.0版插件地址:https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.5.0.Final/debezium-connector-oracle-2.5.0.Final-plugin.tar.gz

SQLserver connector 2.5.0版插件地址:https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.5.0.Final/debezium-connector-sqlserver-2.5.0.Final-plugin.tar.gz

插件源码地址:https://github.com/debezium/debezium

插件安装

下载成功后,进行解压。

来到KAFKA_HOME目录下,创建一个plugins目录。

并将刚才解压的插件移到plugins目录下。

重命名在插件后面添加版本号。

参数说明

说明文档:

https://debezium.io/documentation/reference/2.5/connectors/sqlserver.html

这里我只讲自己用的的参数,其余参数说明及用法,可通过上述说明文档自行查阅。

具体的数据结构,下面有。 

name属性:代表的是连接器的名称,该名称具有唯一性!(名字随便起,但必须唯一)。

名字最好能让人望文生义,如:debezium-connector-source-sqlserver-73,这一看就知道:创建的是Source Connector,用的插件是:debezium-connector-oracle。

connector.class属性:Source Connector的实现类,在这里我们需要填:io.debezium.connector.oracle.SqlServerConnector(它是debezium-connector-oracle的源连接器)。

database.hostname属性:数据库服务器所属IP,如:127.0.0.1。

database.port属性:数据库端口号,如:1521。

database.user属性:数据库用户名,如:test。

database.password属性:数据库密码,如:123456。

database.server.id属性:给数据库设置id,该值具有唯一性,默认取值范围:[5400,6400],数据类型:int。

database.names属性:要监控的数据库名称,如:test。

database.encrypt属性:通过jdbc连接sqlserver使用ssl加密,默认值为true。

需要将此值设为false。

不关闭的话,报错如下:com.microsoft.sqlserver.jdbc.SQLServerException: “Encrypt”属性设置为“true”且 “trustServerCertificate”属性设置为“false”,但驱动程序无法使用安全套接字层 (SSL) 加密与 SQL Server 建立安全连接。

database.sqlserver.agent.status.query属性:查询sqlserver代理的运行状态的sql语句,默认使用的sql语句为:

SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE N'SQL Server Agent (%'

debezium-connector-sqlserver-2.5.0.Final.jar\io\debezium\connector\sqlserver\SqlServerConnection.class中的定义的AGENT_STATUS_QUERY的默认值是上面这个sql。

sqlserver代理服务是英文状态时,用这个sql查询自然没有问题;但是,如果安装的代理服务用的中文的话,那就查不到了,查不到就会报错。

因为在sqlserver当中,要想使用cdc必须开启代理服务,debezium-connector-sqlserver插件检测不到这个服务,自然就不工作了。

所以,我们需要将查询sqlserver代理的运行状态的sql语句指定成下面这个就可以了。

SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE N'SQL Server Agent (%' OR dss.[servicename] LIKE N'SQL Server 代理 (%'

代理服务名称无论是中文还是英文,都能查得到。

schema.include.list属性:数据库架构,默认值:无。

这里需要设置成dbo。

在sqlserver中,默认使用的架构为dbo(database owner)。

tasks.max属性:要运行的任务数,默认值:1。该值需要<=database.names的值。

table.include.list属性:要捕获数据变更记录的表名称列表,可以同时监控多张表。构成:schemaName.tableName,多个使用逗号隔开,如:dbo.t_patient。

schema.history.internal.store.only.captured.tables.ddl属性:默认值:false,代表的含义是:debezium会将被监控数据库下所有的表的变更记录,都进行捕获。

而实际上,我们只需捕获table.include.list里面设置的表,而不是所有的表。需将此值设为:true。

column.include.list属性:可以设置只捕获表中的部分字段变更记录,可以监控多张表的部分字段变更记录。构成:schemaName.tableName.columnName,多个请使用逗号隔开,如:dbo.T_PATIENT_ZS.CREATE_TIME,dbo.T_PATIENT_ZS.ID。

schema.history.internal.kafka.topic属性:连接器将在其中存储数据库模式历史记录的Kafka主题的全名,如:schema-history-test-63。

schema.history.internal.kafka.bootstrap.servers属性:kafka服务器地址,如:localhost:9092。

event.processing.failure.handling.mode属性:默认值:fail,表示的是:schema的处理模式。可选值:[fail,warn,ignore]。我们选择warn,而不是直接报错,更容易发现问题。

topic.prefix属性:将要发布的主题的名称前缀,该值具有唯一性(kafka会根据此主题前缀来生成主题名称。消费者需要根据topic名称来订阅数据)。

errors.log.enable属性:默认值:false。是否输出错误日志,可将值设为true。

skipped.operations属性:默认值:t。不需要监控的操作,可选值:c(insert/create),u(update),d(delete),t(truncate),none。

snapshot.mode属性:快照模式,默认值:initial,可选值:[initial,initial_only,when_needed,never,schema_only,schema_only_recovery]。

initial(默认)(初始全量,后续增量):连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。

initial_only(只全量,不增量):连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。

schema_only(只增量,不全量):连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录。 

4.运行

准备工作

启动服务

启动zookeeper,启动kafka,启动kafka connect。

如果没有安装kafka或者不知道怎么启动,具体操作见文末推荐。

日志处理

第一步:数据库开启CDC

用账户密码登录要监控的表所在的数据库,并执行以下命令。

EXEC sys.sp_cdc_enable_db

开启成功后,该库下模式里面会多出一个“cdc”。(默认只有dbo)

第二步:为表开启CDC

在sqlserver中,只为库开启cdc,是不行的。

我们还要给表开启cdc。

exec sys.sp_cdc_enable_table 
@source_schema='dbo',
@source_name='要监控的表名',
@role_name=null,
@capture_instance=DEFAULT

说明:

要监控的表,需要我们使用上述命令,一个一个的开,虽然很麻烦,但别无它法。

开启成功后,我们可以打开cdc模式。

以上两张表,就是当前库下开启cdc的表,当然,源表表名并不是如此。

我们也可以使用sql来查询当前库下已经开启cdc的表有哪些。

SELECT
	t.name AS TableName,
	s.name AS SchemaName
FROM
	sys.tables t
INNER JOIN 
    sys.schemas s ON
	t.schema_id = s.schema_id
WHERE
	t.is_tracked_by_cdc = 1;

开启代理服务

在sqlserver当中,要想使用cdc功能,必须启动代理服务

发布主题

接口地址:

http://localhost:8083/connectors

请求数据:

{
	"name": "debezium-connector-source-oracle-73",
	"config": {
		"connector.class": "io.debezium.connector.oracle.OracleConnector",
		"database.user": "test",
		"database.dbname": "orcl",
		"database.server.id": 73,
		"tasks.max": 1,
		"database.url": "jdbc:oracle:thin:@192.168.0.1:1521/orcl",
		"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
		"event.processing.failure.handling.mode": "warn",
		"column.include.list": "test.T_PATIENT_ZS.CREATE_TIME,test.T_PATIENT_ZS.ID,test.T_PATIENT_ZS.JZ_TIME,test.T_PATIENT_ZS.PATIENT_ID,test.T_PATIENT_ZS.UPDATE_TIME,test.T_PATIENT_ZS.ZS_ID",
		"log.mining.strategy": "online_catalog",
		"database.port": "1521",
		"schema.history.internal.store.only.captured.tables.ddl": true,
		"topic.prefix": "topic-orcl-73",
		"decimal.handling.mode": "string",
		"schema.history.internal.kafka.topic": "schema-history-orcl-73",
		"database.hostname": "192.168.0.1",
		"database.password": "test123",
		"table.include.list": "test.T_PATIENT_ZS",
		"skipped.operations": "none",
		"errors.log.enable": true,
		"snapshot.mode": "initial"
	}
}

响应数据:

{
	"name": "debezium-connector-source-oracle-73",
	"type": "source",
	"config": {
		"connector.class": "io.debezium.connector.oracle.OracleConnector",
		"database.user": "test",
		"database.dbname": "orcl",
		"database.server.id": "73",
		"tasks.max": "1",
		"database.url": "jdbc:oracle:thin:@192.168.0.1:1521/orcl",
		"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
		"event.processing.failure.handling.mode": "warn",
		"column.include.list": "test.T_PATIENT_ZS.CREATE_TIME,test.T_PATIENT_ZS.ID,test.T_PATIENT_ZS.JZ_TIME,test.T_PATIENT_ZS.PATIENT_ID,test.T_PATIENT_ZS.UPDATE_TIME,test.T_PATIENT_ZS.ZS_ID",
		"log.mining.strategy": "online_catalog",
		"database.port": "1521",
		"schema.history.internal.store.only.captured.tables.ddl": "true",
		"topic.prefix": "topic-orcl-73",
		"decimal.handling.mode": "string",
		"schema.history.internal.kafka.topic": "schema-history-orcl-73",
		"database.hostname": "192.168.0.1",
		"database.password": "test123",
		"table.include.list": "test.T_PATIENT_ZS",
		"skipped.operations": "none",
		"errors.log.enable": "true",
		"snapshot.mode": "initial",
		"name": "debezium-connector-source-oracle-73"
	},
	"tasks": [{
		"connector": "debezium-connector-source-oracle-73",
		"task": 0
	}]
}

主题发布成功后,debezium将会自动捕获sqlserver数据变更日志,并将数据推送到kafka当中。

查询主题

主题的生成条件说明:

debezium source connector启动之后,当指定监控的表数据发生变更时,它才会创建主题(也就是往kafka当中推送数据)。

 

如果处于监控中的表一直没有数据变更,那这个主题就一直不会产生。

具体就是:

当snapshot.mode=schema_only时,source connector启动后,不会立马创建主题,而是直到源表数据发生变化时,才会创建主题(也就是:debezium-sqlserver插件通过读取日志,监控到该表发生了变化,会自动将变化的数据推送到kafka上,这时,主题自然就生成了)。

当snapshot.mode=initial时,source connector启动后,如果源表已经有数据,它会立马创建主题;

反之,如果源表一条数据都没有,不会立马创建主题,直到源表有新增数据时,才会创建主题。 

主题生成规则:

一张表对应一个主题。

sqlserver主题构成:topic.prefix.userName.tableName(oracle的用户名统一会变成大写)

这里t_patient表对应的主题是:topic-orcl-73.TEST.T_PATIENT_ZS。

查看已发布的主题

请求地址:

http://localhost:8083/connectors/debezium-connector-source-oracle-73/topics

响应数据: 

{"debezium-connector-source-oracle-73":{"topics":["topic-orcl-73.TEST.T_PATIENT_ZS","topic-orcl-73"]}}

消费数据

通过产生的主题消费数据。

切换到KAFKA_HOME\bin\windows目录下,输入cmd,按Enter键打开cmd窗口。

输入以下命令并执行

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-orcl-73.TEST.T_PATIENT_ZS  --from-beginning

执行结果如下:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"PATIENT_ID"},{"type":"string","optional":true,"field":"ZS_ID"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"CREATE_TIME"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"UPDATE_TIME"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"JZ_TIME"},{"type":"string","optional":false,"field":"ID"}],"optional":true,"name":"topic-orcl-73.TEST.T_PATIENT_ZS.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"PATIENT_ID"},{"type":"string","optional":true,"field":"ZS_ID"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"CREATE_TIME"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"UPDATE_TIME"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"JZ_TIME"},{"type":"string","optional":false,"field":"ID"}],"optional":true,"name":"topic-orcl-73.TEST.T_PATIENT_ZS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int64","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"topic-orcl-73.TEST.T_PATIENT_ZS.Envelope","version":1},"payload":{"before":null,"after":{"PATIENT_ID":"1","ZS_ID":"1","CREATE_TIME":1704326400000,"UPDATE_TIME":1704326400000,"JZ_TIME":1702826697000000,"ID":"1"},"source":{"version":"2.5.0.Final","connector":"oracle","name":"topic-orcl-73","ts_ms":1704339457000,"snapshot":"false","db":"ORCL","sequence":null,"schema":"TEST","table":"T_PATIENT_ZS","txId":"0b001c0067ad8900","scn":"17183616021516","commit_scn":"17183616021532","lcr_position":null,"rs_id":"0x00852f.000cb4f2.0010","ssn":0,"redo_thread":1,"user_name":"TEST"},"op":"c","ts_ms":1704339461968,"transaction":null}}

源库源表数据

5.同步数据

这样一来,我们就实现了数据同步的前半部分:抽取数据 push。 

示例:

至于将kafka中存储的数据更新到别的数据库(同步数据 pull),有三种实现方式:

第一种:使用插件io.confluent.connect.jdbc.JdbcSinkConnector;

第二种:自定义开发消费组件;

第三种:使用插件io.debezium.connector.jdbc.JdbcSinkConnector(推荐使用)。

说明:

网上看到的几乎都是使用第一种方式来完成数据同步,这其实是不对的,也有使用第二种方式来完成数据同步的。

但,最好的还是使用第三种方式,因为它可以和io.debezium.connector.mysql.SqlServersConnector插件无缝对接,不需要进行额外的数据转换操作。

另外,关于kafka connect的更多用法,具体见文末推荐。

写在最后

  哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!

 相关推荐:

标签:optional,sqlserver,kafka,connector,field,type,debezium
From: https://www.cnblogs.com/Marydon20170307/p/18252737

相关文章

  • kafka常用命令(详细)
    目录一、KAFKA启停命令1.前台启动2.后台启动3.停止命令二、Topic 相关命令2.1.创建Topic2.2.查询Topic列表2.3.查询Topic详情2.4.增加Topic的partition数2.5.查看topic指定分区offset的最大值或最小值2.6.删除Topic三、消息相关命令3.1.......
  • 使用SpringBoot对接Kafka
    Kafka是什么,以及如何使用SpringBoot对接Kafka一、Kafka与流处理我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为......
  • (高清pdf集合)图灵程序设计丛书:大规模数据处理入门与实战(套装全10册)【图灵出品!一套囊括S
    书:pan.baidu.com/s/1tIHXj9HmIYojAHqje09DTA?pwd=jqso提取码:jqso数据处理基础:介绍数据处理的基本概念、流程和应用场景,帮助读者建立对数据处理的整体认识。SQL语言与应用:详细讲解SQL的语法和用法,包括数据查询、数据操作和数据定义等,以及在实际应用中的最佳实践。Python数据挖......
  • 【Kafka专栏 05】一条消息的完整生命周期:Kafka如何保证消息的顺序消费
    作者名称:夏之以寒作者简介:专注于Java和大数据领域,致力于探索技术的边界,分享前沿的实践和洞见文章专栏:夏之以寒-kafka专栏专栏介绍:本专栏旨在以浅显易懂的方式介绍Kafka的基本概念、核心组件和使用场景,一步步构建起消息队列和流处理的知识体系,无论是对分布式系统感兴趣,还......
  • Spring (58)什么是Spring Kafka
    SpringKafka是一个基于Spring框架的项目,它提供了对ApacheKafka的集成支持。Kafka是一个分布式流媒体平台,专门用于构建实时数据管道和流应用程序。SpringKafka提供了一种简单的抽象来发送和接收消息,使得与Kafka交云进行通讯变得容易。核心概念SpringKafka主......
  • linux环境 kafka3.4.0 刚搭建好(用kraft替代zookeeper) 怎么建个topic测试一下消费有没
        在使用Kafka3.4.0并且用Kraft(KafkaRaft)替代Zookeeper的情况下,步骤会有一些变化。这是因为Kraft模式下Kafka自身管理元数据,而不再依赖Zookeeper。以下是使用Kraft模式的Kafka3.4.0创建topic并进行生产和消费测试的具体步骤:###1.**启动Kaf......
  • sql server日期格式 sqlserver的日期格式
    常用转换格式yyyy-mm-ddThh:mm:ssSELECTCONVERT(VARCHAR(20),GETDATE(),20)→2021-06-2716:58:00yyyy-mm-dd(SELECTCONVERT(VARCHAR(20),GETDATE(),23)→2021-06-27Thh:mm:ssSELECTCONVERT(VARCHAR(20),GETDATE(),24)→17:00:20yyyymmddSELECTCONVERT(VARCHAR(20),GETD......
  • Kafka多维度调优
    优化金字塔应用程序层面框架层面(Broker层面)JVM层面操作系统层面应用程序层面:应当优化业务代码合理使用kafka,合理规划主题,合理规划分区,合理设计数据结构;框架层面:在不改动源码的情况下,从kafka参数配置入手,结合业务体量和运行数据进行调优JVM层面:在出现明显缓慢和可能的内存......
  • Deploy Kafka for Centos 7
    应用介绍Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写,用于处理消费者在网站中的所有动作流数据。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来......
  • spark读取Kafka数据写入postgreSQL
    Java代码importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.function.VoidFunction;importorg.apache.spark.streaming.api.java.JavaInputDStream;importorg.apache.spark.streaming.api.java.JavaStreamingContext;importorg.apache.spark.stream......