首页 > 数据库 >5分钟搞定 关系型数据库 到 Flink 数据同步

5分钟搞定 关系型数据库 到 Flink 数据同步

时间:2022-08-30 15:45:34浏览次数:85  
标签:搞定 同步 CloudCanal 数据库 Flink Kafka MySQL 数据

简述

实时数据处理领域中,使用 Flink 方式,除了从日志服务订阅埋点数据外,总离不开从关系型数据库订阅并处理相关业务数据,这时就需要监测并捕获数据库增量数据,将变更按发生的顺序写入到消息中间件以供计算(或消费)。
本文主要介绍如何通过 CloudCanal 快速构建一条高效稳定运行的 MySQL -> Kafka -> Flink 数据同步链路。

技术点

兼容多种常见消息结构

CloudCanal 目前支持 Debezium Envelope (新增)CanalAliyun DTS Avro 等多种流行消息结构,对数据下游消费比较友好。
本次对 Debezium Envelope 消息格式的支持,我们采用了一种轻量的方式做到完全兼容,充分利用 CloudCanal 增量组件,扩展数据序列化器 (EnvelopDeserialize),得到 Envelop 消息并发送到 Kafka 中。
其中 Envelop 的消息结构分为 PayloadSchema 两部分

  • Payload:存储具体数据
  • Schema:定义 Payload 的解析格式 (默认关闭)
{
  "payload":{
    "after":{
      "column_1":"3",
      ...
    },
    "before":null,
    "op":"c",
    "source":{
      "db":"kafka_test",
      "table":"new_table"
      "pos":110341861,
      "ts_ms":1659614884026,
      ...
    },
    "ts_ms":1659614884026
  },
  "schema":{
    "fields":[
      {
        "field":"after",
        "fields":[
          {
            "field":"column_1",
            "isPK":true,
            "jdbType":4,
            "type":"int(11)"
          },
          ...
        ],
        "type":"struct"
      },
      ...
    ],
    "type":"struct"
  }
}

高度可视化的CDC

CDC 工具如 FlinkCDCMaxwellDebezium ... 各有特色,CloudCanal 相对这些产品,最大的特点是高度可视化,自动化,下表针对目标端为Kafka 的 CDC 简要做了一些对比。

CloudCanal FlinkCDC Maxwell
产品化 完备 基础
同步对象配置 可视化 代码 配置文件
封装格式 多种常用格式 自定义 JSON
高可用
数据初始化(snapshot) 实例级 实例级 单表
源端支持 ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL... ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL... MySQL

CloudCanal 在平衡性能的基础上,提供多种关系型数据源的同步,以及反向同步;提供便捷的可视化操作、轻巧的数据源添加、轻便的参数配置;
提供多种常见的消息格式,仅仅通过鼠标点击,就可以使用其他 CDC 的消息格式的传输,让数据处理变的异常的快捷、方便。
其中经过我们在相同环境的测试下, CloudCanal 在高写入的 MySQL 场景中,处理数据的效率表现的很出色,后续我们会继续对 CloudCanal 进行优化,提升整体的性能。
综上,相比与类似的 CDC 产品来说,CloudCanal 简单轻巧并集成一体化的操作占据了很大的优势。

Flink 流式计算中不仅要订阅日志服务器的日志埋点信息,同样需要业务数据库中的信息,通过 CDC 工具订阅数据,能减少查询对业务数据库产生的压力还能以流的形式传输,方便与日志服务器中的数据进行关联处理。
实际开发中,可以将业务数据库中的信息提取过滤之后动态的放入 Hbase 中作为维度数据,方便相关联的宽表进行关联查询;
也可以对数据进行开窗、分组、聚合,同样也可以下沉到其他的 Kafka 消费者组中,实现数据的分层。
image.png

操作示例

前置条件

  • 本例使用 Envelop 消息格式,关系型数据库 MySQL 为示例,展示 MySQL 对接 Flink 的 Demo
  • 登陆 CloudCanal SaaS版,使用参见快速上手文档
  • 准备好 1 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的 MySQL 5.6,阿里云 Kafka 2.2)
  • 准备好 Flink 消费端程序,配置好相关信息:flink-demo 下载
  • 登录 CloudCanal 平台,添加 Kafka,MySQL

截屏2022-08-17 17.12.13.png

  • Kafka 自定义一个主题 topic_1,并创建一条 MySQL -> Kafka 链路作为增量数据来源

任务创建

  • 首先配置 **FlinkDemo 程序的 **阿里云 Kafka 相关信息

截屏2022-08-17 17.09.12.png

  • 运行 FlinkDemo 程序,等待消费 MySQL 同步 Kafka 的数据(程序不要关闭)

截屏2022-08-17 17.08.50.png

  • **任务管理 **-> **任务创建 **
  • 测试链接并选择 目标 数据库,**并选择 DebeziumEnvelope 消息格式,和 topic_1 主题 **(在阿里云里提前创建)

截屏2022-08-17 17.08.18.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认

截屏2022-08-17 17.07.46.png

  • 选择需要迁移同步的表 **table1 **和对应的 Kafka 主题 topic_1

截屏2022-08-17 17.07.19.png

持续点击下一步,并创建出数据同步任务。

  • 向 **MySQL 生成数据,MySQL **-> Kafka(topic_1) -> Flink
  • FlinkDemo 接收到 Kafka(topic_1) 数据,下沉到 topic_2 主题,打印并输出;这里 Flink 程序可以做更多的流式计算的操作,FlinkDemo 只是演示了最基本的数据传输案例

截屏2022-08-17 17.10.05.png

常见问题

还支持哪些源端数据源呢?

目前开放 MySQL、Oracle,SQLServer,Postgres,MongoDB 到 Kafka,如果各位有需求,可以在社区反馈给我们。

支持 DDL 消息同步吗?

目前 关系型数据到 kafka 是支持 DDL 消息的同步的,可以将 关系型数据库 DDL 的变化同步到 Kafka 当中。

总结

本文简单介绍了如何使用 CloudCanal  进行 MySQL -> Kafka -> Flink 数据迁移同步。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

加入CloudCanal粉丝群掌握一手消息和获取更多福利,请添加我们小助手微信:suhuayue001

CloudCanal-免费好用的企业级数据同步工具,欢迎品鉴。
了解更多产品可以查看官方网站http://www.clougence.com
CloudCanal社区https://www.askcug.com/

标签:搞定,同步,CloudCanal,数据库,Flink,Kafka,MySQL,数据
From: https://www.cnblogs.com/clougence/p/16639531.html

相关文章

  • 5分钟搞定MySQL/PostgreSQL/Oracle到StarRocks数据迁移同步-CloudCanal实战
    ##简述CloudCanal2.1.0.x版本开始支持StarRocks作为对端的数据迁移同步能力本文通过MySQL->StarRocks的数据迁移同步案例简要介绍这个源端的能力。链路特点:-结......
  • 华为云GaussDB深耕数据库根技术,助力能源行业数字化转型
     近日,以“推进能源数字化共建低碳智能社会”为主题的第十二届能源企业信息化大会在北京举办。华为云数据库首席架构师冯柯在会上分享了华为云GaussDB的创新技术和能源......
  • 吉林大学数据库期末复习
    chaos结构化查询语言中级SQL实体关系模型关系数据库设计应用程序设计和开发事务并发控制恢复系统复习知识点......
  • Oracle数据库导出导入
    1.Oracle数据库导出expdp$username/$passwd@$ORACLE_SIDdirectory=DATA_PUMP_DIRschemas=$schemasdumpfile=$dumpfile.dmplogfile=$dumpfile.log#username数据库......
  • 9. SQL--use:选择数据库
    1.前言如果您的系统中有多个数据库,那么在开始操作之前,您需要先选择一个数据库。sqluse语句用来选择一个已经存在的数据库。2.语法use语句的基本语法如下:usedat......
  • 不同数据库之间导入数据
    如题,因为需要把SqlServer上关于地区的表导入到mysql中.所以在研究了一会儿之后成功通过导出为Excel的形式完成了数据的复制工具:navicat步骤:1:在SqlServer中选择需要......
  • MySQL低配数据库被大量数据导入时KO
    在一个低配MySQL数据库(笔记本电脑虚机环境,虚机配置2CPU/3G内存),在3000万级别的大量数据LOADDATA方式导入时,坚持一小时后,终于被KO了,甚至没写下任何有用的日志,只是在操作界面......
  • SqlServer定时备份数据库
    https://jingyan.baidu.com/article/75ab0bcb0f1a6497864db2ed.html1、打开SqlServer数据库,点击【管理】,右键【维护】,选择维护计划向导  2、输入计划名称,点击【更改......
  • Discuz!X3.2/3.3/3.4程序搬家/数据库修改教程 (2019-06-11 17:07:29)
    路径:/wwwroot/config/config_global.php这个根据你网站安装的路径而定。 打开config_global.php文件修改:$_config['db']['1']['dbpw']='原来密码'; 原来密码......
  • SQL Server 附加数据库时报1813错误的解决方案
    无法打开新数据库'ASR'。CREATEDATABASE中止。文件激活失败。物理文件名称'E:\SqlServer\MSSQL\Data\ASR_log.LDF'可能不正确。无法重新生成日志,原因是数据库关闭时存在......