首页 > 数据库 >SeaTunnel引擎下的SQL Server CDC解决方案:构建高效数据管道

SeaTunnel引擎下的SQL Server CDC解决方案:构建高效数据管道

时间:2023-11-29 10:01:07浏览次数:44  
标签:选项 SeaTunnel CDC column SqlServer Server mode 分片

file

在快速发展的数据驱动时代,实时数据处理已经成为企业决策和运营的关键因素。特别是在处理来自各种数据源的信息时,如何确保数据的及时、准确和高效同步变得尤为重要。本文着重介绍了如何利用 SqlServer CDC 源连接器在 SeaTunnel 框架下实现 SQL Server 到其他数据系统的实时数据同步,这对于希望提升数据处理能力和实时数据分析的企业来说,具有重要的实践意义。

SQL Server CDC

SqlServer CDC 源连接器

支持 SQL Server 版本

  • 服务器:2019(或更高版本,仅供参考)

支持引擎

SeaTunnel Zeta<br/> Flink <br/>

主要特性

描述

SqlServer CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 SqlServer CDC 连接器以在 SqlServer 数据库上运行 SQL 查询。

支持的数据源信息

数据源 支持的版本 驱动 URL Maven
SqlServer <li>服务器:2019(或更高版本,仅供参考)</li> com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc:sqlserver://localhost:1433;databaseName=column_type_test 下载

安装 Jdbc 驱动

请下载并将 SqlServer 驱动放在 ${SEATUNNEL_HOME}/lib/ 目录下。例如:cp mssql-jdbc-xxx.jar ${SEATUNNEL_HOME}/lib/

数据类型映射

SQL Server 数据类型 SeaTunnel 数据类型
CHAR<br/>VARCHAR<br/>NCHAR<br/>NVARCHAR<br/>STRUCT<br/>CLOB<br/>LONGVARCHAR<br/>LONGNVARCHAR<br/> STRING
BLOB BYTES
INTEGER INT
SMALLINT<br/>TINYINT<br/> SMALLINT
BIGINT BIGINT
FLOAT<br/>REAL<br/> FLOAT
DOUBLE DOUBLE
NUMERIC<br/>DECIMAL(column.length(), column.scale().orElse(0))<br/> DECIMAL(column.length(), column.scale().orElse(0))
TIMESTAMP TIMESTAMP
DATE DATE
TIME TIME
BOOLEAN <br/>BIT<br/> BOOLEAN

源选项

名称 类型 必需 默认值 描述
username 字符串 - 连接数据库服务器时使用的用户名。
password 字符串 - 连接数据库服务器时使用的密码。
database-names 列表 - 需要监控的数据库名。
table-names 列表 - 表名为模式名和表名的组合(databaseName.schemaName.tableName)。
base-url 字符串 - 必须包含数据库的URL,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。
startup.mode 枚举 INITIAL SqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 "specific"。
startup.timestamp 长整型 - 从指定的纪元时间戳(以毫秒为单位)开始。<br/> 注意,当使用 "startup.mode" 选项为 'timestamp' 时,此选项是必需的。
startup.specific-offset.file 字符串 - 从指定的 binlog 文件名开始。<br/>注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
startup.specific-offset.pos 长整型 - 从指定的 binlog 文件位置开始。<br/>注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
stop.mode 枚举 NEVER SqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。
stop.timestamp 长整型 - 从指定的纪元时间戳(以毫秒为单位)停止。<br/>注意,当 "stop.mode" 选项使用 'timestamp' 时,此选项是必需的。
stop.specific-offset.file 字符串 - 从指定的 binlog 文件名停止。<br/>注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
stop.specific-offset.pos 长整型 - 从指定的 binlog 文件位置停止。<br/>注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
incremental.parallelism 整型 1 增量阶段中并行读取器的数量。
snapshot.split.size 整型 8096 表快照的分割大小(行数),快照期间的表会被分割成多个分片进行读取。
snapshot.fetch.size 整型 1024 读取表快照时每次轮询的最大提取量。
server-time-zone 字符串 UTC 数据库服务器中的会话时区。
connect.timeout 时长 30s 连接器尝试连接到数据库服务器后等待超时的最大时间。
connect.max-retries 整型 3 连接器尝试建立数据库服务器连接的最大重试次数。
connection.pool.size 整型 20 连接池大小。
chunk-key.even-distribution.factor.upper-bound 双精度浮点型 100 分块键分布因子的上界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于此上界值(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将被优化为均匀分布。否则,如果分布因子更大,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,将使用基于抽样的分片策略。默认值为 100.0。
chunk-key.even-distribution.factor.lower-bound 双精度浮点型 0.05 分块键分布因子的下界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于此下界值(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将被优化为均匀分布。否则,如果分布因子更小,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,将使用基于抽样的分片策略。默认值为 0.05。
sample-sharding.threshold 整型 1000 触发抽样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-boundchunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 分块大小)超过此阈值时,将使用抽样分片策略。这可以帮助更有效地处理大型数据集。默认值为1000分片。
inverse-sampling.rate 整型 1000 抽样分片策略中使用的抽样率的倒数。例如,如果这个值设置为1000,意味着抽样过程中应用了1/1000的抽样率。这个选项提供了在控制抽样粒度的灵活性,从而影响最终的分片数量。特别是在处理非常大的数据集时,更低的抽样率是首选。默认值为1000。
exactly_once 布尔型 true 启用精确一次语义。
debezium.* 配置 - 将Debezium的属性传递给用于从SqlServer服务器捕获数据变化的Debezium嵌入式引擎。<br/>查看Debezium的SqlServer连接器属性获取更多信息
format 枚举 DEFAULT SqlServer CDC 的可选输出格式,有效枚举为 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。
common-options - 源插件的通用参数,请参考源通用选项获取详细信息。

任务示例

初始读取简单示例

这是一个流模式CDC初始化读取的示例,成功读取表数据后将进行增量读取。以下SQL DDL仅供参考。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="initial"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
  

增量读取简单示例

这是一个增量阅读示例,用于阅读变更数据并打印。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    # 设置精确一次读取
    exactly_once=true 
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="latest"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
}

随着数据处理需求的不断增长和实时数据同步的重要性日益凸显,SqlServer CDC 源连接器在 SeaTunnel 生态系统中扮演着至关重要的角色。

通过本文的深入解析,我们希望您能够更好地理解并利用这一强大工具,从而实现数据流的高效、稳定和精准同步。

无论您是数据工程师、系统架构师还是业务分析师,掌握如何在 SeaTunnel 中部署和优化 SQL Server CDC 连接器,都将为您的数据处理能力带来显著提升。

本文由 白鲸开源 提供发布支持!

标签:选项,SeaTunnel,CDC,column,SqlServer,Server,mode,分片
From: https://blog.51cto.com/u_15459354/8610414

相关文章

  • GeoServer API设置WMS服务图层的缓存信息
    importrequestsimportjson#设置缓存时间data=json.dumps({"coverage":{"metadata":{"entry":[{"@key":"cacheAgeMax",&q......
  • element-plus 报错 ResizeObserver loop limit exceeded 解决
    解决方案代码如下:constdebounce=(fn,delay)=>{lettimer=null;returnfunction(){letcontext=this;letargs=arguments;clearTimeout(timer);timer=setTimeout(function(){fn.apply(context,args);},delay);}}......
  • Blazor Server:现代 Web 开发的新视角
    随着Web技术的快速发展,BlazorServer正在成为现代Web开发的一个热门话题。作为一种新兴的Web框架,BlazorServer利用了.NET的强大功能,为开发者提供了一种新的构建互动式Web应用的方式。本文将深入探讨BlazorServer的主要优势和适合用于哪些类型的应用,以及其未来发......
  • json-server的增删改查与基本使用
    1.查看是否安装node2.查看是否安装npm3.安装json-server4.查看json-server的版本5.json-server--watchdb.json6.配置db.json文件下载 apifoxjson-server的增删改查在apifox中创建一个快捷调试直接点击发送,当body中返回下面内容,表示添加成功注意:不需要添加id,他会自......
  • SQL Server Profiler基础使用
     一、简介一个图形界面工具,用于创建和管理跟踪并分析和重播跟踪结果 二、如何打开1、直接打开 2、MicrosoftSQLServerManagementStudio工具栏打开  三、配置跟踪如过滤出本机电脑执行的sql脚本 跟踪属性常规配置   找计算机名称  ......
  • @SpringbootTest报错 javax.websocket.server.ServerContainer not availableJ情况解
    在使用springboot单元测试出现:11:11:10.799[main]ERRORo.s.b.SpringApplication-[reportFailure,870]-Applicationrunfailedorg.springframework.beans.factory.BeanCreationException:Errorcreatingbeanwithname'serverEndpointExporter'definedincla......
  • SQLServer字符串查找(判断字符串是否含中文,数字或字母),并把是否含中文作为条件来执行
    转载自:SQLServer字符串查找(判断字符串是否含中文,数字或字母),并把是否含中文作为条件来执行一些操作-亟待!-博客园(cnblogs.com)从sqlserver中提取数据如何截取字符1、LOCATE(substr,str):返回子串substr在字符串str中第一次出现的位置,如果字符substr在字符串str中不......
  • SQL Server2022安装图文教程
      一:下载 本次安装测试系统环境:    1、官网下载链接https://www.microsoft.com/zh-cn/sql-server/sql-server-downloadsSQLServer下载|Microsoft    2、在下载目录中找到下面这个小的安装包SQL2022-SSEI-Dev.exe,运行开始下载SQLserver;   二:安装SqlServer2022......
  • Windows中使用http-server搭建一个本地服务
    我们在开发中,经常会需要搭建一个本地服务去浏览开发的静态html文件,如果当静态文件中存在一些http、https或者访问文件之类的请求时,直接双击打开html文件是会报错预览不成功的,这时候就需要将静态文件搭建到IIS或者Tomcat环境中去,但是这样需要单独去部署,显得很麻烦,此时我们就可以利......
  • geoserver指定JAVA_HOME路径
    /geoserver/bin/startup.sh修改这个启动文件的配置#GuardagainstmisconfiguredJAVA_HOMEif[-n"${JAVA_HOME:-}"]&&[!-x"${JAVA_HOME}/bin/java"];thenecho"TheJAVA_HOMEenvironmentvariableissetbutJAVA_HOME/bin/java&quo......