首页 > 数据库 >[数据集成/数据同步] 基于数据库增量日志的数据同步方案 : Flink CDC/Debezium/DataX/Canal/Oracle Goldengate/Kettle/Sqoop

[数据集成/数据同步] 基于数据库增量日志的数据同步方案 : Flink CDC/Debezium/DataX/Canal/Oracle Goldengate/Kettle/Sqoop

时间:2024-10-16 20:11:25浏览次数:6  
标签:customer 同步 CDC 数据库 debezium 数据 id Debezium

1 概述

简述:CDC/增量数据同步

  • CDC 的全称是 Change Data Capture(增量数据捕获)
  • 在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC
  • 我们目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
  • CDC 的技术实现方案
  • 基于查询的 CDC:
  • 优点 : 实现简单,是通过批处理实现的
  • 缺点 : 需要依赖离线调度,不能保证数据强一致性和实时性;
  • 基于日志的 CDC:
  • 缺点 : 实现比较复杂
  • 优点 : 但是可以实时消费日志,流式处理,可保证数据一致性和实时性;

CDC 的优势

  • 如今,大多数公司仍然使用批处理在系统之间同步数据。使用批处理:
  • 数据未立即同步
  • 更多分配的资源用于同步数据库
  • 数据复制仅在指定的批次期间发生
  • 然而,变更数据捕获具有一些优势:
  • 不断跟踪源数据库的变化
  • 即时更新目标数据库
  • 使用流处理来保证即时更改

有了CDC, 不同的数据库就会持续同步 ,批量任务已经成为过去。此外,由于 CDC 仅传输增量更改————因此,降低了传输数据的成本。

方案对比

目前市面上的CDC技术比较多,我们选取了几种主要的开源CDC方案做了对比,总体如下图:

从CDC机制、增量同步、断电续传、全量同步、全量+增量、架构、数据计算、生态这八个方面做了对比。可以看出其中的佼佼者主要是Flink CDC和Oracle OGG以及Debezium;

由于基于查询的CDC方案缺陷明显,这里不作讨论,下面我们对基于日志的CDC方案的优劣来做详细的介绍。

  • Flink CDCFlink CDC是最近几年的新贵,Flink CDC 底层封装了 Debezium,功能比较全面,目前已经迭代到了2.4版本,社区活跃度在几个方案中是最高的;

  • 优点
  • 全、增量一体的分布式数据集成框架;
  • 同步时无需加锁;
  • 吞吐量大,适合海量数据实时同步;
  • 操作简单,SQL即可完成;
  • 具有强大的 transformation 能力,通过 Flink SQL 即可完成ETL 中的数据转换;
  • 有丰富的 Connector,除关系型数据库外,HBase、ClickHouse、TiDB等也支持,而且支持自定义 connector;
  • 缺点:依赖Flink集群,数据量较大时对服务器要求较高;

Oracle OGG

  • Oracle OGG:Oracle OGG 历史比较悠久,最初是设计用来从Oracle迁移数据到其它数据库,或者从其它平台迁移数据到Oracle,随着发展,目前已支持 Mysql、Hadoop、Hive、Kafka 等数据源;

  • 优点:
  • 支持增量和全量同步
  • 支持分布式
  • 高性能
  • 支持数据过滤和转化,是目前主流的实时同步方案之一;
  • 缺点:支持的数据库比较少,像一些MongoDB、TiDB等不支持;

Debezium

  • Debezium
  • Debezium最初设计成一个Kafka ConnectSource Plugin
  • 目前开发者虽致力于将其与Kafka Connect解耦,但当前的代码实现还未变动。

下图引自Debeizum官方文档,可以看到一个Debezium在一个完整CDC系统中的位置。

  • 优点:
  • 支持全量+增量同步;
  • 缺点:
  • 全量同步时会加锁,而且加锁时间不确定,会严重影响业务;
  • 最重要的是跟Kafka等消息中间件强耦合,下游数据要经过Kafka;

Canal

  • Canal:主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

  • 优点:用于单一的MySQL环境做数据同步还不错;

  • 缺点:

  • 缺点较为明显,只支持MySQL的CDC,只支持增量同步,全量需要用DataX或者Sqoop,全量和增量同步割裂;
  • 不支持分布式;

Debezium 平台

什么是 Debezium ?

  • 官网
  • 官方的口号与定位

从数据库流式传输更改。
Debezium 是一个用于变更数据捕获的开源分布式平台。
启动它,将它指向你的数据库,你的应用程序就可以开始响应其他应用程序提交给你的数据库的所有插入、更新和删除操作。
Debezium 耐用且快速,因此您的应用程序可以快速响应,即使出现问题也不会错过事件。

  • Debezium 是一个构建在Apache Kafka之上的 CDC 开源平台。

它的主要用途是在事务日志中,记录提交给每个源数据库表的所有行级更改
侦听这些事件的每个应用程序都可以根据增量数据更改执行所需的操作。

  • Debezium 提供了一个连接器库支持多种数据库

例如 MySQL、MongoDB、PostgreSQL 等。

  • 这些连接器可以监视记录数据库更改、并将其发布到 Kafka 等流服务。

  • 此外, 即使我们的应用程序出现故障,Debezium 也会进行监控 。

重新启动后,它将开始消耗上次停止的事件,因此不会丢失任何内容。

Debezium架构

  • 部署 Debezium 取决于我们拥有的基础设施,但更常见的是,我们经常使用 Apache Kafka Connect

  • Kafka Connect 是一个框架,与 Kafka 代理一起作为单独的服务运行。我们用它在 Apache Kafka 和其他系统之间传输数据。

  • 我们还可以定义连接器来将数据传入和传出 Kafka。

下图显示了基于 Debezium 的变更数据捕获管道的不同部分:

  • 首先,在左侧,我们有一个 MySQL 源数据库,我们希望将其数据复制并在目标数据库(如 PostgreSQL 或任何分析数据库)中使用。
  • 其次, Kafka Connect 连接器解析并解释事务日志并将其写入 Kafka 主题。
  • 接下来,Kafka 充当消息代理,将变更集可靠地传输到目标系统。
  • 然后,在右侧,我们有 Kafka 连接器轮询 Kafka 并将更改推送到目标数据库。
  • **Debezium 在其架构中使用 Kafka **,但它还提供其他部署方法来满足我们的基础设施需求。

我们可以将其用作 Debezium 服务器的独立服务器,也可以将其作为库嵌入到我们的应用程序代码中。

我们将在以下部分中看到这些方法。

Debezium 服务器

  • Debezium 提供了一个独立的服务器 来捕获源数据库的更改。它配置为使用 Debezium 源连接器之一。

此外,这些连接器将更改事件发送到各种消息基础设施,例如 Amazon KinesisGoogle Cloud Pub/Sub

嵌入式 Debezium

  • Kafka Connect 在用于部署 Debezium 时提供容错能力和可扩展性。

然而,有时我们的应用程序不需要这种级别的可靠性,并且我们希望最大限度地降低基础设施的成本。

值得庆幸的是, 我们可以通过将 Debezium 引擎嵌入到我们的应用程序中来做到这一点。
完成此操作后,我们必须配置连接器。

案例:基于 Debezium 的 Spring Boot CDC 应用程序

需求及架构

  • 为了使我们的应用程序保持简单,我们将创建一个用于客户管理的 Spring Boot 应用程序。

customer表模型有 ID 、 全名 和 电子邮件 字段。

对于数据访问层,使用 Spring Data JPA

最重要的是,应用程序将运行 Debezium 的嵌入式版本。

想象一下这个应用程序的架构:

首先,Debezium 引擎将跟踪源 MySQL 数据库(来自另一个系统或应用程序)上的 customer 表的事务日志。

其次,每当我们对 customer 表执行插入/更新/删除等数据库操作时,Debezium 连接器都会调用一个服务方法。

最后,根据这些事件,该方法会将 customer 表的数据同步到目标 MySQL 数据库(我们应用程序的主数据库)。

Maven 依赖项

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>1.4.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.4.2.Final</version>
</dependency>
  • 同样,我们为应用程序将使用的每个 Debezium 连接器添加依赖项。

  • 在我们的例子中,我们将使用 MySQL 连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.4.2.Final</version>
</dependency>

安装数据库

  • 我们可以手动安装和配置我们的数据库。但是,为了加快速度,我们将使用 docker-compose 文件:
version: "3.9"
services:
  # Install Source MySQL DB and setup the Customer database
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3305:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # Install Target MySQL DB and setup the Customer database
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  • 该文件将在不同端口上运行两个数据库实例。

  • 我们可以使用命令 docker-compose up -d 运行此文件。

建表 : customer

  • 现在,让我们通过运行 SQL 脚本来创建 customer 表:
CREATE TABLE customer
(
    id integer NOT NULL,
    fullname character varying(255),
    email character varying(255),
    CONSTRAINT customer_pkey PRIMARY KEY (id)
);
  • 在本节中,我们将配置 Debezium MySQL 连接器、并了解如何运行嵌入式 Debezium 引擎

配置 Debezium 连接器

  • 为了配置 Debezium MySQL 连接器,我们将创建一个 Debezium 配置 bean:
@Bean
public io.debezium.config.Configuration customerConnector() {
    return io.debezium.config.Configuration.create()
        .with("name", "customer-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", customerDbHost)
        .with("database.port", customerDbPort)
        .with("database.user", customerDbUsername)
        .with("database.password", customerDbPassword)
        .with("database.dbname", customerDbName)
        .with("database.include.list", customerDbName)
        .with("include.schema.changes", "false")
        .with("database.server.id", "10181")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "/tmp/dbhistory.dat")
        .build();
}

让我们更详细地检查此配置。

该 bean 中的 create 方法 使用构建器来创建 Properties 对象 。

无论首选连接器如何,此构建器都会设置引擎所需的多个属性。为了跟踪源 MySQL 数据库,我们使用 MySqlConnector 类。

当此连接器运行时,它开始跟踪源中的更改并记录“偏移量”以确定 它从事务日志中处理了多少数据 。

有多种方法可以保存这些偏移量,但在本例中,我们将使用类 FileOffsetBackingStore 在本地文件系统上存储偏移量。

连接器的最后几个参数是 MySQL 数据库属性。

现在我们已经有了配置,我们可以创建我们的引擎了。

配置、运行 Debezium 引擎

  • DebeziumEngine 充当我们的 MySQL 连接器的包装器。让我们使用连接器配置创建引擎:
private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
      .using(customerConnectorConfiguration.asProperties())
      .notifying(this::handleEvent)
      .build();

    this.customerService = customerService;
}

更重要的是,引擎将为每个数据更改调用一个方法 - 在我们的示例中为 handleChangeEvent

在此方法中,首先, 我们将根据 调用 create() 时指定的格式解析每个事件。

然后,我们找到我们进行的操作并调用 CustomerService 在目标数据库上执行创建/更新/删除功能:

private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
    Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

    if (sourceRecordChangeValue != null) {
        Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

        if(operation != Operation.READ) {
            String record = operation == Operation.DELETE ? BEFORE : AFTER;
            Struct struct = (Struct) sourceRecordChangeValue.get(record);
            Map<String, Object> payload = struct.schema().fields().stream()
              .map(Field::name)
              .filter(fieldName -> struct.get(fieldName) != null)
              .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
              .collect(toMap(Pair::getKey, Pair::getValue));

            this.customerService.replicateData(payload, operation);
        }
    }
}

现在我们已经配置了 DebeziumEngine 对象,让我们使用服务执行器异步启动它:

private final Executor executor = Executors.newSingleThreadExecutor();

@PostConstruct
private void start() {
    this.executor.execute(debeziumEngine);
}

@PreDestroy
private void stop() throws IOException {
    if (this.debeziumEngine != null) {
        this.debeziumEngine.close();
    }
}

运行

要查看我们的代码的实际效果,让我们对源数据库的 customer 表进行一些数据更改。

Step1 插入记录

  • 要将新记录添加到 customer 表中,我们将进入 MySQL shell 并运行:
INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', '[email protected]')
  • 运行此查询后,我们将看到应用程序的相应输出:
23:57:57.897 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: CREATE
  • 最后,我们检查一条新记录是否已插入到我们的目标数据库中:
id  fullname   email
1  John Doe   [email protected]

Step2 更新记录

  • 现在,让我们尝试更新最后插入的客户并检查会发生什么:
UPDATE customerdb.customer t SET t.email = '[email protected]' WHERE t.id = 1
  • 之后,我们将得到与插入相同的输出,除了操作类型更改为“UPDATE”,当然,Hibernate 使用的查询是“更新”查询:
00:08:57.893 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: UPDATE
  • 我们可以验证目标数据库中约翰的电子邮件已更改:
id  fullname   email
1  John Doe   [email protected]

Step3 删除记录

现在,我们可以通过执行以下命令删除 客户 表中的条目:

DELETE FROM customerdb.customer WHERE id = 1

同样,这里我们操作发生变化,再次查询:

00:12:16.892 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: DELETE

我们可以验证目标数据库上的数据已被删除:

select * from customerdb.customer where id= 1
0 rows retrieved

Y 推荐文献

X 参考文献

标签:customer,同步,CDC,数据库,debezium,数据,id,Debezium
From: https://www.cnblogs.com/johnnyzen/p/18470609

相关文章

  • 怎么修改网站后台数据?网站后台修改框架图标?
    要修改网站后台的数据或更改框架图标,通常需要访问和操作网站的后端管理系统或直接编辑相关的数据库和前端文件。以下是具体步骤:修改网站后台数据登录后台管理界面:使用管理员账号登录到网站的后台管理系统。定位数据:导航到需要修改的数据模块,如用户管理、文章管理等。......
  • ②C语言数据类型与变量
    数据类型1数据类型概要整形类型描述整数,字符类型描述字符,浮点型类型描述小数。特殊说明:布尔类型需要拥有的头文件(<stdbool.h)布尔类型的变量的取值是:true/false#definebool_Bool#definefalse0#definetrue1实例_Boolflag=true;if(flag)prin......
  • Java毕设项目案例实战II基于Spring Boot的科研项目验收管理系统(开发文档+数据库+源码)
    目录一、前言二、技术介绍三、系统实现四、论文参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末一、前言随着科研活动的日益增多,科研项目验收作为......
  • Java毕设项目案例实战II基于Spring Boot的校园资产管理系统(开发文档+数据库+源码)
    目录一、前言二、技术介绍三、系统实现四、论文参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末一、前言在信息化技术日益普及的今天,高效、智能的......
  • Java毕设项目案例实战II基于Spring Boot的在线互动学习网站(开发文档+数据库+源码)
    目录一、前言二、技术介绍三、系统实现四、论文参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末一、前言随着互联网技术的飞速发展,教育领域也迎来......
  • 数据结构八大排序的java实现
    冒泡排序package排序;importjava.util.Arrays;publicclassBubbleSort{   publicstaticvoidmain(String[]args){      int[]arr={5,7,4,2,0,3,1,6};      sort(arr);      System.out.println(Arrays.toString(arr));   }......
  • 【DBA Part01】国产Linux上安装Oracle进行数据迁移
    内容如下:1.1.生产环境RHEL/OELLinux8+Oracle11gR2安装配置1.2.国产麒麟操作系统+Oracle11gR2安装配置1.3.国产麒麟操作系统+Oracle11gR2RAC集群安装配置1.4.Oracle11gR2迁移到国产麒麟操作系统(单机/RAC)本阶段课程项目需求说明:某单位原有两套核心业务fgeduwx:itpux-com,运......
  • 数据库系统原理——第一章 数据库概述
    @目录1.数据1.1数据的概念1.2数据与信息的关系1.3数据使用2.数据管理3.数据库与数据库管理系统3.1数据库3.2数据库管理系统4.数据库系统4.1数据库系统组成4.2数据库系统的特点4.3数据库系统体系结构4.3.1内部体系结构4.3.2外部体系结构本文首先从数据讲起,然后介绍数据管理、数据......
  • MIMIC-IV v3.1正式发布,修复多个数据问题
    2024年10月11日,广受医学研究者欢迎的MIMIC-IV数据库发布了v3.1版本。本次更新主要针对社区提出的几个问题进行了修复,进一步提升了数据的准确性和一致性。首先,MIMIC-IVv3.1修复了d_labitems表和labevents表中的itemid值不一致问题。在v2.2与v3.0版本之间,部分实验室测量项的ite......
  • 从0到1搭建DeltaLake大数据平台
    1.下载VMWare,安装CentOS9虚拟机2.配置用户,创建目录1.以管理员身份登录,创建Spark用户给Spark使用sudoaddusersparkuser2.修改新用户密码(123456)sudopasswdsparkuser3.给新用户SparkuserSudo权限切换到Root:su-给sparkuser权限:sp......