首页 > 数据库 >SQLServer CDC Connector

SQLServer CDC Connector

时间:2024-06-19 22:31:43浏览次数:29  
标签:name database CDC SQLServer Connector snapshot table

SQLServer CDC Connector #
The SQLServer CDC connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to setup the SQLServer CDC connector to run SQL queries against SQLServer databases.

Dependencies #
In order to setup the SQLServer CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency #

org.apache.flink
flink-connector-sqlserver-cdc
3.1.1

SQL Client JAR #
Download link is available only for stable releases.

Download flink-sql-connector-sqlserver-cdc-3.1.0.jar and put it under <FLINK_HOME>/lib/.

Note: Refer to flink-sql-connector-sqlserver-cdc, more released versions will be available in the Maven central warehouse.

Setup SQLServer Database #
A SQL Server administrator must enable change data capture on the source tables that you want to capture. The database must already be enabled for CDC. To enable CDC on a table, a SQL Server administrator runs the stored procedure sys.sp_cdc_enable_table for the table.

Prerequisites:

CDC is enabled on the SQL Server database.
The SQL Server Agent is running.
You are a member of the db_owner fixed database role for the database.
Procedure:

Connect to the SQL Server database by database management studio.
Run the following SQL statement to enable CDC on the table.
USE MyDB
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo', -- Specifies the schema of the source table.
@source_name = N'MyTable', -- Specifies the name of the table that you want to capture.
@role_name = N'MyRole', -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the sysadmin or db_owner role also have access to the specified change tables. Set the value of @role_name to NULL, to allow only members in the sysadmin or db_owner to have full access to captured information.
@filegroup_name = N'MyDB_CT',-- Specifies the filegroup where SQL Server places the change table for the captured table. The named filegroup must already exist. It is best not to locate change tables in the same filegroup that you use for source tables.
@supports_net_changes = 0
GO
Verifying that the user has access to the CDC table
--The following example runs the stored procedure sys.sp_cdc_help_change_data_capture on the database MyDB:
USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
The query returns configuration information for each table in the database that is enabled for CDC and that contains change data that the caller is authorized to access. If the result is empty, verify that the user has privileges to access both the capture instance and the CDC tables.

How to create a SQLServer CDC table #
The SqlServer CDC table can be defined as following:

-- register a SqlServer table 'orders' in Flink SQL
CREATE TABLE orders (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'table-name' = 'dob.orders'
);

-- read snapshot and binlogs from orders table
SELECT * FROM orders;
Connector Options #
Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'sqlserver-cdc'.
hostname required (none) String IP address or hostname of the SQLServer database.
username required (none) String Username to use when connecting to the SQLServer database.
password required (none) String Password to use when connecting to the SQLServer database.
database-name required (none) String Database name of the SQLServer database to monitor.
table-name required (none) String Table name of the SQLServer database to monitor, e.g.: "db1.table1"
port optional 1433 Integer Integer port number of the SQLServer database.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai".
scan.incremental.snapshot.enabled optional true Boolean Whether enable parallelism snapshot.
chunk-meta.group.size optional 1000 Integer The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.
chunk-key.even-distribution.factor.lower-bound optional 0.05d Double The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.
chunk-key.even-distribution.factor.upper-bound optional 1000.0d Double The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SQLServer. For example: 'debezium.snapshot.mode' = 'initial_only'. See more about the Debezium's SQLServer Connector properties
scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
scan.incremental.snapshot.chunk.key-column optional (none) String The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.
Available Metadata #
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
schema_name STRING NOT NULL Name of the schema that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.
Limitation #
Can’t perform checkpoint during scanning snapshot of tables #
During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order to not perform checkpoints, SqlServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
table_name STRING METADATA FROM 'table_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'table-name' = 'dbo.products'
);
Features #
Exactly-Once Processing #
The SQLServer CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen. Please read How the connector works.

Startup Reading Position #
The config option scan.startup.mode specifies the startup mode for SQLServer CDC consumer. The valid enumerations are:

initial (default): Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables.
initial-only: Takes a snapshot of structure and data like initial but instead does not transition into streaming changes once the snapshot has completed.
latest-offset: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.
Note: the mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration. So please do not use them together. If you specific both scan.startup.mode and debezium.snapshot.mode options in the table DDL, it may make scan.startup.mode doesn’t work.

Single Thread Reading #
The SQLServer CDC source can’t work in parallel reading, because there is only one task can receive change events.

DataStream Source #
The SQLServer CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.sqlserver.SqlServerSource;

public class SqlServerSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction sourceFunction = SqlServerSource.builder()
.hostname("localhost")
.port(1433)
.database("sqlserver") // monitor sqlserver database
.tableList("dbo.products") // monitor products table
.username("sa")
.password("Password!")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
  .addSource(sourceFunction)
  .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute();

}
}
The SQLServer CDC incremental connector (after 2.4.0) can be used as the following shows:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;

public class SqlServerIncrementalSourceExample {
public static void main(String[] args) throws Exception {
SqlServerIncrementalSource sqlServerSource =
new SqlServerSourceBuilder()
.hostname("localhost")
.port(1433)
.databaseList("inventory")
.tableList("dbo.products")
.username("sa")
.password("Password!")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    // set the source parallelism to 2
    env.fromSource(
                    sqlServerSource,
                    WatermarkStrategy.noWatermarks(),
                    "SqlServerIncrementalSource")
            .setParallelism(2)
            .print()
            .setParallelism(1);

    env.execute("Print SqlServer Snapshot + Change Stream");
}

}
Data Type Mapping #
SQLServer type Flink SQL type
char(n) CHAR(n)
varchar(n)
nvarchar(n)
nchar(n) VARCHAR(n)
text
ntext
xml STRING
decimal(p, s)
money
smallmoney DECIMAL(p, s)
numeric NUMERIC
float
real DOUBLE
bit BOOLEAN
int INT
tinyint SMALLINT
smallint SMALLINT
bigint BIGINT
date DATE
time(n) TIME(n)
datetime2
datetime
smalldatetime TIMESTAMP(n)
datetimeoffset TIMESTAMP_LTZ(3)

标签:name,database,CDC,SQLServer,Connector,snapshot,table
From: https://www.cnblogs.com/huft/p/18257659

相关文章

  • mysql-connector-java 驱动jar包下載
    地址MySQL::MySQLDownloads,点击网址,进入mysql官网首页,我们点击downloads(下载)然后将页面往下拉,点击如图所画的选择,进入点击Connector/J,表示的java语言连接的数据库驱动器,如果是Python则选择下面Python为后缀的Connector,同理其他语言则选择对应的即可。因为我用的是Java,......
  • debezium+kafka实现sqlserver数据同步(debezium-connector-sqlserver)
    SELECTCASEWHENdss.[status]=4THEN1ELSE0ENDASisRunningFROM[#db].sys.dm_server_servicesdssWHEREdss.[servicename]LIKEN'SQLServerAgent(%'1.情景展示在企业当中,往往会存在不同数据库之间的表的数据需要保持一致的情况(数据同步)。如何将A库a表的数据......
  • 【flink实战】flink-connector-mysql-cdc导致mysql连接器报类型转换错误
    文章目录一.报错现象二.方案二:重新编译打包flink-connector-cdc1.排查脚本2.重新编译打包flink-sql-connector-mysql-cdc-2.4.0.jar3.测试flink环境三.方案一:改造flink连接器一.报错现象flinksql任务是:mysql到hdfs的离线任务,flink在消费mysql时报如上错误......
  • sql server日期格式 sqlserver的日期格式
    常用转换格式yyyy-mm-ddThh:mm:ssSELECTCONVERT(VARCHAR(20),GETDATE(),20)→2021-06-2716:58:00yyyy-mm-dd(SELECTCONVERT(VARCHAR(20),GETDATE(),23)→2021-06-27Thh:mm:ssSELECTCONVERT(VARCHAR(20),GETDATE(),24)→17:00:20yyyymmddSELECTCONVERT(VARCHAR(20),GETD......
  • FlinkSQL 运行官网的 filesystem SQL 连接器例子出错:Cannot discover a connector usi
    我的例子程序是仿照官网例子写的:我的程序:packagecom.xxx.demo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTabl......
  • Could not find artifact com.mysql:mysql-connector-j:pom:8.0.36 in central (https
    遇到修改依赖项的MySQL版本结果说找不到依赖项解决方法确保MySQL版本正确降低依赖项的MySQL版本,修改后更新即可以我的MySQL版本举例,可以降低MySQL版本到依赖项支持的版本<dependency><groupId>com.mysql</groupId><artifactId>m......
  • sqlserver判断字段值是否存在某个字符
    原文链接:https://blog.csdn.net/qq_27033067/article/details/102950735写SQL语句我们会遇到需要判断字段值中是否包含某个字符串,虽然SQLSERVER中并没有像C#提供了Contains函数,但SQLSERVER中提供了CHAEINDX函数,它是找到字符(char)的位置(index),通过查找字符所在的位置,就可以判断是......
  • sqlserver数据库开启linkedserver
    USE[master]GO/******Object:LinkedServer[链接名称]ScriptDate:2024/6/69:09:53******/EXECmaster.dbo.sp_addlinkedserver@server=N'链接名称',@srvproduct=N'MSSQL',@provider=N'SQLNCLI',@datasrc=N'远程数据库'/*For......
  • sqlserver 通过压缩bak文件实现从服务器还原数据库《数据差异数个小时》
    十年河东,十年河西,莫欺少年穷学无止境,精益求精1、备份主服务器数据库并压缩publicvoidDbBack(){varbakname=@"ChargeDB_"+DateTime.Now.ToString("yyyyMMdd")+".bak";stringfilepath=@"D:\dbback\"+bakna......
  • mssql 开启cdc
    USETESTGO库开启cdcEXECsys.sp_cdc_enable_dbGOEXECsys.sp_cdc_help_change_data_captureEXECxp_servicecontrolN'querystate',N'SQLServerAGENT';SELECTname,is_tracked_by_cdcFROMsys.tablesWHEREname='user6';表开启EXECs......