首页 > 其他分享 >一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

时间:2023-06-27 23:57:50浏览次数:46  
标签:flink cdc Flink CDC 开源 mysql apache org

@

目录

概述

定义

flink-cdc-connectors 官网 https://github.com/ververica/flink-cdc-connectors 源码release最新版本2.4.0

flink-cdc-connectors 文档地址 https://ververica.github.io/flink-cdc-connectors/master/

flink-cdc-connectors 源码地址 https://github.com/ververica/flink-cdc-connectors

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库摄取更改,其集成了Debezium作为捕获数据变化的引擎,因此它可以充分利用Debezium的能力。

Flink CDC是由Flink社区开发的flink-cdc-connectors 的source组件,基于数据库日志的 Change Data Caputre 技术,实现了从 MySQL、PostgreSQL 等数据库全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

什么是CDC?

这里也简单说明下,CDC为三个英文Change Data Capture(变更数据捕获)的缩写,核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其它服务进行订阅及消费。

CDC的分类

CDC主要分为基于查询的CDC和基于binlog的CDC,两者之间区别主要如下:

image-20230626155037373

特性

  • 支持读取数据库快照,即使发生故障,也只进行一次处理,继续读取事务日志。
  • 数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
  • 用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

应用场景

  • 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移,常用于数据库备份、容灾等。

支持数据源

CDC Connectors for Apache Flink支持从多种数据库到Flink摄取快照数据和实时更改,然后转换和下沉到各种下游系统

image-20230626154109523

支撑数据源包括如下:

image-20230626160745676

实战

这里以MySQL作为数据源为例,通过flink-connector-mysql-cdc实现数据变更获取,先准备MySQL环境,这里复用前面<<实时采集MySQL数据之轻量工具Maxwell实操>>的文章环境,数据库有两个my_maxwell_01,my_maxwell_02,每个数据库都有相同account和product表。pom文件引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itxs.flink</groupId>
    <artifactId>flink-cdc-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink.cdc.version>2.4.0</flink.cdc.version>
        <mysql.client.version>8.0.29</mysql.client.version>
        <fastjson.version>1.2.83</fastjson.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.client.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建DataStreamDemo.java,

package cn.itxs.cdc;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamDemo {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("mysqlserver")
                .port(3306)
                .databaseList("my_maxwell_01,my_maxwell_02")
                .tableList("my_maxwell_01.*,my_maxwell_02.product")
                .username("root")
                .password("12345678")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启checkpoint
        env.enableCheckpointing(3000);

        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // 设置平行度为4
                .setParallelism(4)
                .print().setParallelism(1); // 对sink打印使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

由于上面flink的依赖配置provided,因此在IDEA中启动的话需要勾选下面标红的选项

image-20230627165925042

启动程序,查看日志可以看到从mysql读取目前全量的数据,my_maxwell_02也只读取product表数据

image-20230627170148732

修改两个库的表后可以看到相应修改信息,其中也确认my_maxwell_02的account没有读取变更数据。

image-20230627170435984

{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}

打包后放到集群上,执行

bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar 

image-20230627185543185

可以看到的日志也成功输出表的全量的日志和刚才修改增量数据

image-20230627185508643

如果需要断点续传可以使用状态后端存储来实现

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc");
        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

FlinkSQL方式代码示例

创建SqlDemo.java文件

package cn.itxs.cdc;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE account (\n" +
                " id INT NOT NULL,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'mysqlserver',\n" +
                " 'port' = '3306',\n" +
                " 'username' = 'root',\n" +
                " 'password' = '12345678',\n" +
                " 'database-name' = 'my_maxwell_01',\n" +
                " 'table-name' = 'account'\n" +
                ");");

        Table table = tableEnv.sqlQuery("select * from account");
        DataStream<Row> rowDataStream = tableEnv.toChangelogStream(table);
        rowDataStream.print("account_binlog====");
        env.execute();
    }
}

启动程序,查看日志可以看到从mysql读取my_maxwell_01库account表的全量的数据,修改表数据也确认读取变更数据。

image-20230627182647771

  • 本人博客网站IT小神 www.itxiaoshen.com

标签:flink,cdc,Flink,CDC,开源,mysql,apache,org
From: https://www.cnblogs.com/itxiaoshen/p/17510226.html

相关文章

  • Databend 开源周报 第 99 期
    Databend是一款现代云数仓。专为弹性和高效设计,为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务:https://app.databend.cn。What'sOnInDatabend探索Databend本周新进展,遇到更贴近你心意的Databend。FlinkCDCApacheFlinkCDC(ChangeDataCapture)是指Ap......
  • Volvo EDI 项目 MySQL 方案开源介绍
    近期为了帮助广大用户更好地使用EDI系统,我们根据以往的项目实施经验,将成熟的EDI项目进行开源。用户安装好知行之桥EDI系统之后,只需要下载我们整理好的示例代码,并放置在知行之桥指定的工作区中,即可开始使用。今天的文章主要为大家介绍VOLVOEDI项目,了解如何获取开源的项目......
  • 跨平台开源远程连接工具rustdesk
    rustdeskhttps://github.com/rustdesk/rustdeskhttps://gitee.com/mirrors/rustdesk......
  • Taurus .Net Core 微服务开源框架:Admin 插件【2】 - 系统环境信息管理
    前言:继上篇:Taurus.NetCore微服务开源框架:Admin插件【1】-微服务节点管理本篇继续介绍下一个内容:1、系统环境信息节点- OS-Environment界面 基本信息如上图,重点的几个参数:1、App-StartTime:应用程序启用时间,可以在更新程序或配置文件后,查看该时间,看应用程序是否更......
  • 11k+ Star 一款更适合中国用户的开源 BI 工具
    在当今数字化时代,数据分析和可视化成为企业决策和发展的重要支撑,很多BI工具昂贵的许可费用,让许多中小型企业用户和个人用户望而却步,开源BI工具的出现,让其成为很多用户进行数据分析展示的首选。目前市面上主流的开源BI产品,例如Metabase和Superset,都是由国外的开发者开发......
  • Taurus .Net Core 微服务开源框架:Admin 插件【1】 - 微服务节点管理
    前言:最近发现NetCore的文章有点少,特来补几篇。上一篇:Taurus.mvc.NetCore微服务开源框架发布V3.1.7:让分布式应用更高效。自上篇之后,期间更新了4个小版本,更新如下:-----V3.1.7.1----------------1、优化:请求头输出【标识主机IP号、进程号】(2023-06-07)2、优化:Gateway负载......
  • 活动打卡报名小程序开源版开发
    活动打卡报名小程序开源版开发活动打卡报名小程序开源版的功能可以包括以下几个方面:活动列表展示:展示所有的活动信息,包括活动名称、时间、地点、报名人数等。活动详情展示:点击活动列表中的某个活动,可以查看该活动的详细信息,包括活动介绍、报名要求、费用等。活动报名:用户可以在活动......
  • 汽车车载电源DCDC 硬件原理图
    国内汽车车载电源DCDC硬件原理图、软件源码和3带上位机调试工具等完整的配套资料。延申科普:这个领域涉及到汽车电子和电源管理技术。汽车车载电源DCDC是一种重要的电子设备,用于将汽车电池的直流电转换为适用于车辆各个部分的电压。它在现代汽车中起着至关重要的作用,为各种电子设......
  • 本地部署开源大模型的完整教程:LangChain + Streamlit+ Llama
    在过去的几个月里,大型语言模型(llm)获得了极大的关注,这些模型创造了令人兴奋的前景,特别是对于从事聊天机器人、个人助理和内容创作的开发人员。大型语言模型(llm)是指能够生成与人类语言非常相似的文本并以自然方式理解提示的机器学习模型。这些模型使用广泛的数据集进行训练,这......
  • 小鹏内部独家Android车载系统开发指南开源了
    众所周知,国内的安卓市场内卷极其严重,原生应用开发可谓是寸步难行,想必大家在最近的面试中能看出,要么面试完全没机会,要么薪资对半砍,一天比一天难......但细看整个Android生态却无比繁荣,手机、平板、电视、音视频等等,特别是在智能汽车+电动汽车的浪潮下,诞生出一大批高薪岗位。车载成为......