首页 > 其他分享 >ETL之apache hop数据增量同步功能

ETL之apache hop数据增量同步功能

时间:2023-08-21 13:22:19浏览次数:44  
标签:同步 CDC 数据库 示例 db MyTable hop apache ETL

ETL增量数据抽取CDC

概念:Change Data Capture,变化的数据捕获,也称:【增量数据抽取】(名词解释)

CDC是一种实现数据的增量抽取解决方案,是实现【ETL整体解决方案】中的一项子方案/子问题。(对CDC的定位)

如何捕获变化的数据是增量抽取的关键,对捕获方法一般有2点要求:

  • 准确性:能够将业务系统中的变化数据准确地捕获到;
  • 性能:尽量减少对业务系统造成太大的压力,影响现有业务。

2 CDC 常见解决方案
按CDC方案的任一操作是否对数据源系统产生影响(性能、功能等),分为:【侵入式CDC】、【非侵入式CDC】
按CDC方案所抽取的数据与数据源系统的变化数据是否在规定时间内同步,分为:【同步CDC】、【异步CDC】

一、侵入式

1、基于触发器
创建中间表,编写触发器或者在后端服务插入增删改的操作记录

2、基于时间戳
区分插入操作和更新操作:只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则无法区分。
删除记录的操作:不能捕获到删除操作,除非是逻辑删除,即记录没有真的删除,只是做了逻辑上的标志。
多次更新检测:如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更新操作都丢失了。
实时能力:时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据加载。

二、非侵入式

3、基于快照
1基于快照的CDC可检测到插入、更新和删除的数据 (相比基于时间戳的CDC的优点)
2需要大量存储空间来保存快照
4、基于日志
源数据库会把每个插入、更新、删除操作记录到日志里。
通过分析已经发生的事件提交(commit)的日志记录来得到增量数据信息,有一定的时间延迟。
【特点】复杂、异步、非侵入式

参考文档:

https://zhuanlan.zhihu.com/p/362471672

https://www.cnblogs.com/johnnyzen/p/12781942.html

基于以上的几种增量同步方式优缺点,采用第一种基于触发器方式
本文中的示例是源数据库Sql Server 数据库的数据同步到目标数据库MySql数据库中,被同步的源数据库为Test,Schema 为innovator,表名为MyTable

Workflow

​ 流程示意图

一、在数据库层面上的系列操作

表的创建SQL如下

CREATE TABLE [innovator].[MyTable] (
  [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY ,
  [Name] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [CreatedTime] datetime  NOT NULL,
)  
1、建立SQL Server中间数据库temp_db,需要同步的所有表放在dbo下
2、创建需要同步的中间表,比如 ,中间表的表名addOrEdit_MyTable,字段和源表一样
-- 只复制基础表结构不复制索引触发器
SELECT * INTO temp_db.dbo.addOrEdit_MyTable FROM Test.innovator.MyTable WHERE 1 = 0;
3、创建需要同步删除的中间表Table_Delete
CREATE TABLE [dbo].[Table_Delete] (
  [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY,
  [TableName] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [DeletedTime] datetime  NOT NULL,
)  
4、在源数据库Test里的MyTable表里创建两个触发器

(1) 插入修改触发器

CREATE TRIGGER [innovator].[insertUpdateTrigger_MyTable]
ON [innovator].[MyTable]
WITH EXECUTE AS CALLER
FOR INSERT, UPDATE
AS
BEGIN
  -- Type the SQL Here.
  --检查插入或更新的数据在A表中是否存在,有则更新,无则添加
	if EXISTS(select 1 from temp_db.dbo.addOrEdit_MyTable as A,inserted as B where A.id=B.id)
		 UPDATE  A set 
			 A.Name=B.Name,A.CreatedTime=B.CreatedTime
		 from temp_db.dbo.addOrEdit_MyTable A join inserted B on A.id= B.id
	else
		 insert into temp_db.dbo.addOrEdit_MyTable select * from inserted
END

(2) 删除触发器

CREATE TRIGGER [innovator].[deleteTrigger_MyTable]
ON [innovator].[MyTable]
WITH EXECUTE AS CALLER
FOR DELETE
AS
BEGIN
  -- Type the SQL Here.
	insert into temp_db.dbo.Table_Delete select Id ,'MyTable' as TableName, GETDATE() as DeletedTime from deleted;
END

(3) 触发器图片参考

5、创建Mysql目标数据库 Test_Mysql,字符集选择为utf8mb4
 -- CREATE DATABASE Test_Mysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_chinese_ci;
 CREATE DATABASE IF NOT EXISTS Test_Mysql DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;
6、在Test_Mysql数据库里创建表MyTable
DROP TABLE IF EXISTS `MyTable`;
CREATE TABLE `MyTable`  (
  `Id` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `Name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `CreatedTime` datetime(0) NOT NULL,
  PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
7、可能存在触发器中不能访问temp_db数据库,解决如下:
-- 1、查询所有数据库信息
SELECT name, database_id, is_trustworthy_on FROM sys.databases
-- 2、修改 SQL Server 的实例是否信任该数据库以及其中的内容(注意:必须是 sysadmin 固定服务器角色的成员才能设置此选项。)
ALTER  DATABASE  temp_db set TRUSTWORTHY ON 
-- 3、查询系统所有用户
SELECT name FROM Sysusers
-- 4、给用户innovator_regular授权访问temp_db数据库
ALTER AUTHORIZATION ON DATABASE::temp_db TO innovator_regular

二、在apache hop上编写工作流和管道

这里偷点懒,不想再写一遍这些管道和工作流,拿之前写的截个图

1、编写源表数据第一次全量同步到目标表的管道,命名为Init_MyTable(比如下面示意图中的Init_HPART_)

(1) Table input就是源数据库Sql Server里的表,Insert/update 里就是被插入修改同步操作的目标MySql表

(2) Table input 示例图

把上图的示例sql改为下面的

SELECT
 Id,
 Name,
 CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
 FROM innovator.MyTable

(3) Insert/update 示例图

2、编写源表数据第一次全量同步到目标表的初始化工作流,命名为Init_Wrokflow(比如下面示意图中的InitDataTable_Wrokflow) (只执行一次)

3、编写增量同步添加修改操作的管道,命名为AddOrEdit_MyTable(比如下面示意图中的PR_AddOrEdit)

(1) Table input 示例图

把上图的示例sql改为下面的

SELECT
 Id,
 Name,
 CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
 FROM dbo.addOrEdit_MyTable

(2) Insert/update 示例图

(3) Delete 示例图

4、编写增量同步删除操作的管道,命名为Delete_MyTable(比如下面示意图中的PR_Delete)

(1) Table input 示例图

这里Sql改为

SELECT Id,TableName,DeletedTime FROM dbo.Table_Delete where TableName='MyTable'

(2) Delete 示例图

(3) Delete2 示例图

5、编写增量同步的工作流,命名为MyTable_Workflow(比如下面示意图中的PR_workflow)(定时执行)

其中 PR_AddOrEdit.hpl就是步骤3中的同步添加修改操作的管道,PR_Delete.hpl就是步骤四中的增量同步删除操作的管道

(1) Start定时执行示意图

(2) 工作流并行执行各个管道任务示意图

以上是基于触发器模式增量数据同步的hop web设计、建模,下一步需要在生产环境中执行设计的工作流/管道文件

Hop Web ,Hop Gui fat client 是帮助数据工程师通过可视化方式设计数据清洗流程的。

Hop run是本地命令行,来执行设计好的数据清洗流程的。

Hop server是管理和执行本地或远程的数据清洗流程的

也可以使用Apache Airflow等非自带工具,来执行。

标签:同步,CDC,数据库,示例,db,MyTable,hop,apache,ETL
From: https://www.cnblogs.com/hudean/p/17645488.html

相关文章

  • Photoshop2023(Beta) PS AI版本安装爱国使用教程
    所需准备creative-cloudAdobe-GenP开始什么是creative-cloud你可以把它当成苹果的AppStore或者安卓的PlayStore,这是Adobe自家的应该程序商店,商城,资源管理中心,可以下载Adobe的所有软件,也能购买相关服务。下载creative-cloud官网地址:https://creativecloud.adobe.com/app......
  • 成为Apache项目贡献者
    1、有4个主要的地方了解Apache项目:Apacheself-Jira:issues.apache.org/jira/secure/MyJiraHome.jspa邮件列表,该形式被广泛利用,尽量每天都抽出时间看一下在讨论什么: lists.apache.org项目官网,如shiro.apache.com,一般官网项目的资料更新比较快Apache官网:https://www.apache.o......
  • Kioptrix: Level 1 (#1) 古老的Apache Samba VULN
    0×01Vulnhub靶机渗透总结之Kioptrix:Level1(#1)......
  • Photoshop 2023完美激活版本
    Photoshop2023内置激活版是一款多合一的创意工具,从社交媒体贴子到修饰相片,设计横幅到精美网站,日常影像编辑到重新创造,轻松塑造艺术灵感,激发创意,自由探索,轻松设计,您可以自由灵活的修饰编辑您的照片,您可以获得一个精美、高质量的图片结果,创建属于您的新的艺术风格,更多智能工具让您......
  • PhotoShop 2023下载-功能强大的图片编辑软件 系列软件
    AdobePhotoshop,简称“PS”,是一个由Adobe公司开发和发行的图像处理软件。它可以编辑和合成多个图层中的位图,支持图层遮罩、图像合成。除了位图之外,它还具有编辑或渲染文本、矢量图形、3D图形和视频,并且PS还支持外部插件来拓展其功能。其中,PhotoshopCC2019于2018年10月15日发布。......
  • Adobe Photoshop官方软件Photoshop 2022正式版下载 系列软件
    Photoshop2022v23.0.2.101是由Adobe公司最新推出的高效、专业、实用的图像处理软件,同时该软件主要是以其强悍的编辑和调整、绘图等功能得到广泛的应用,其中还有各种图片的调整和图画绘制以及图像的修复、调色等一系列的工具都是数不胜数,使用范围也是非常的广,我们从照片修饰到海报......
  • 解决小程序报错 getLocation:fail the api need to be declared in the requiredPriva
    一、unipp项目打开uniapp项目的配置文件manifest.json,选择“源码视图”。/*小程序特有相关*/"mp-weixin":{"appid":"你的开发者id","setting":{"urlCheck":true,"es6":true,"postcss":......
  • apache开启php的伪静态模式,出现No input file specified
    Thinkphp教程中提供的APACHE伪静态模式出现Noinputfilespecified,打开.htaccess在RewriteRule后面的index.php教程后面添加一个“?”完整代码如下.htaccessRewriteEngineonRewriteCond$1!^(index.php|images|robots.txt)RewriteRule^(.*)$/index.php?/$1[QSA,PT,L......
  • springboot 打包 时报错:The specified user settings file does not exist: F:\opt\
     解决方法:  ......
  • Apache SeaTunnel社区迎来新Committer!
    采访&编辑|DebraChen个人简介姓名:马骋原公司:恒生电子GitHubID:rewerma个人擅长研究领域:java中间件、微服务、大数据等您为社区提交了什么贡献?具体方案可以描述一下吗?为SeatTunnel提交SQLTransformplugin的PR,通过SQL解析器生成物理执行计划,自建函数库执行数据转换......