首页 > 其他分享 >使用轻量级 CDC debezium-server-databend 构建实时数据同步

使用轻量级 CDC debezium-server-databend 构建实时数据同步

时间:2023-08-02 11:56:32浏览次数:52  
标签:source Databend default CDC server databend database 轻量级 debezium

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

Debezium Server Databend 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。

使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。

这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。

假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。

接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:

准备阶段

准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。

准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  • Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • 从项目根目录开始:

    • 构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
    • 构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
    • 进入解压后的文件夹: cd databendDist
    • 创建 application.properties 文件并修改: nano conf/application.properties,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。
    • 使用提供的脚本运行服务: bash run.sh
    • Debezium Server with Databend 将会启动

同时我们也提供了相应的 Docker image,可以在容器中一键启动:

version: '2.1'
services:
  debezium:
    image: ghcr.io/databendcloud/debezium-server-databend:pr-2
    ports:
      - "8080:8080"
      - "8083:8083"
    volumes:
      - $PWD/conf:/app/conf
      - $PWD/data:/app/data

NOTE: 在容器中启动注意所连接数据库的网络。

Debezium Server Databend Application Properties

本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

准备数据

MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建数据库 mydb 和表 products,并插入数据:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

在 Databend 中创建 Database

NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。

启动 Debezium Server Databend

bash run.sh

首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:

同步 Insert 数据

我们继续往 MySQL 中插入 5 条数据:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend 日志:

同时在 Databend 中可以查到 5 条数据已经同步过来了:

同步 Update 数据

配置文件中 debezium.sink.databend.upsert=true ,所以我们也可以处理 Update/Delete 的事件。

在 MySQL 中更新 id=10 的数据:

update products set name="from debezium" where id=10;

在 Databend 中可以查到 id 为 10 的数据已经被更新:

同步 Delete 数据

在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:

  1. 一个包含 "op": "d",其他的行数据以及字段;
  2. 一个tombstones记录,它具有与被删除行相同的键,但值为null。

这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted 字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。

这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true 即可。

关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档

在 MySQL 中删除 id=12 的数据:

delete from products where id=12;

在 Databend 中可以观察到 id=12 的值的 __deleted 字段已经被置为 true

环境清理

操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

结论

以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。

标签:source,Databend,default,CDC,server,databend,database,轻量级,debezium
From: https://www.cnblogs.com/databend/p/17600285.html

相关文章

  • SQL语句——“制杖”SQLServer
    时间函数:DATEPART([YEAR],date)//获取时间的年DATEPART([MONTH],date)//获取月CONVERT([TIME],date)//获取时间hh:mm:ssWHEN语句:CASEtableFieldWHENvalueTHENother_tableFieldELSEtableFieldEND//可接多个WH......
  • Windows server 2012 服务器允许多用户同时远程桌面的设置
    错误表现如下方法1.在运行里面(Windows+R)或者右击开始菜单,选择运行,输入“gpedit.msc”命令2.计算机组策略”依次打开计算机配置-->管理模板--->windows组件--->远程桌面服务--->远程桌面会话主机--->连接3.在连接里面找到“限制连接的数量”双击,显示如图,选中“已启用”and我设置......
  • Could not find server 'server name' in sys.servers. SQL Server 2014
    Couldnotfindserver'servername'insys.servers.SQLServer2014  Atfirstcheckoutthatyourlinkedserverisinthelistbythisqueryselectnamefromsys.serversIfitnotexiststhentrytoaddtothelinkedserverEXECsp_addl......
  • Windows 7 & Windows Server 2008 R2 简体中文版下载 (updated Jul 2023)
    Windows7&WindowsServer2008R2简体中文版下载(updatedJul2023)Windows7&WindowsServer2008R2(2023年7月更新)请访问原文链接:https://sysin.org/blog/windows-7/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgWindows7&WindowsServer2008R......
  • Windows Server 2008 R2 OVF, updated Jul 2023 (sysin) - VMware 虚拟机模板
    WindowsServer2008R2OVF,updatedJul2023(sysin)-VMware虚拟机模板WindowsServer2008R2简体中文版OVF,2023年7月更新请访问原文链接:https://sysin.org/blog/windows-server-2008-r2-ovf/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgWindowsSe......
  • Windows Server 2016 OVF, updated Jul 2023 (sysin) - VMware 虚拟机模板
    WindowsServer2016OVF,updatedJul2023(sysin)-VMware虚拟机模板2023年6月版本更新,现在自动运行sysprep,支持ESXiHostClient部署请访问原文链接:https://sysin.org/blog/windows-server-2016-ovf/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org现在......
  • Windows Server 2016 中文版、英文版下载 (updated Jul 2023)
    WindowsServer2016中文版、英文版下载(updatedJul2023)WindowsServer2016Version1607,2023年7月更新请访问原文链接:https://sysin.org/blog/windows-server-2016/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org本站将不定期发布官方原版风格月度更新IS......
  • Windows Server 2019 OVF, updated Jul 2023 (sysin) - VMware 虚拟机模板
    WindowsServer2019OVF,updatedJul2023(sysin)-VMware虚拟机模板2023年7月版本更新,现在自动运行sysprep,支持ESXiHostClient部署更新日期:FriJul28202317:12:00GMT+0800,阅读量:6244请访问原文链接:https://sysin.org/blog/windows-server-2019-ovf/,查看最......
  • Windows Server 2019 中文版、英文版下载 (updated Jul 2023)
    WindowsServer2019中文版、英文版下载(updatedJul2023)WindowsServer2019Version1809,2023年7月更新请访问原文链接:https://sysin.org/blog/windows-server-2019/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org本站将不定期发布官方原版风格月度更新IS......
  • Windows Server 2022 中文版、英文版下载 (updated Jul 2023)
    WindowsServer2022中文版、英文版下载(updatedJul2023)WindowsServer2022正式版,2023年7月更新请访问原文链接:https://sysin.org/blog/windows-server-2022/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org早期直观体验版本21H2,根据名称预计今年秋季......