首页 > 其他分享 >Flink CDC

Flink CDC

时间:2023-06-05 14:12:30浏览次数:52  
标签:Flink CDC flink env apache org

第1章 CDC简介

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC的种类

CDC主要分为基于查询和基于Binlog两种方式。

基于查询的 CDC:

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;

  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;

  • 不保障实时性,基于离线调度存在天然的延迟。

基于日志的 CDC:

  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;

  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;

  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

基于查询的CDC 基于Binlog的CDC
开源产品 Sqoop、DataX、Kafka JDBC Source Canal、Maxwell、Debezium
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

Flink CDC (Flink Change Data Capture) 是基于数据库的日志 CDC 技术,实现了全增量一体化读取的数据集成框架。搭配Flink计算框架,Flink CDC 可以高效实现海量数据的实时集成。

img

开源地址:https://github.com/ververica/flink-cdc-connectors

为什么推荐Flink CDC?

https://blog.csdn.net/iusedbelieve/article/details/130460341

Debezium是国外⽤户常⽤的CDC组件,单机对于分布式来说,在数据读取能力的拓展上,没有分布式的更具有优势,在大数据众多的分布式框架中(Hive、Hudi等)Flink CDC 的架构能够很好地接入这些框架。

DataX无法支持增量同步。如果一张Mysql表每天增量的数据是不同天的数据,并且没有办法确定它的产生时间,那么如何将数据同步到数仓是一个值得考虑的问题。DataX支持全表同步,也支持sql查询的方式导入导出,全量同步一定是不可取的,sql查询的方式没有可以确定增量数据的字段的话也不是一个好的增量数据同步方案。

Canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。Canal主要支持了MySQL的Binlog解析,将增量数据写入中间件中(例如kafka,Rocket MQ等),但是无法同步历史数据,因为无法获取到binlog的变更。

Sqoop主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递。Sqoop将导入或导出命令翻译成mapreduce程序来实现,这样的弊端就是Sqoop只能做批量导入,遵循事务的一致性,Mapreduce任务成功则同步成功,失败则全部同步失败。

Apache SeaTunnel是一个当前也非常受欢迎的数据集成同步组件。其可以支持全量和增量,支持流批一体。SeaTunnel的使用是非常简单的,零编写代码,只需要写一个配置文件脚本提交命令即可,同时也使用分布式的架构,可以依托于Flink,Spark以及自身的Zeta引擎的分布式完成一个任务在多个节点上运行。其内部也有类似Flink checkpoint的状态保存机制,用于故障恢复,sink阶段的两阶段提交机制也可以做到精准一次性Excatly-once。对于大部分的场景,SeaTunnel都能完美支持,但是SeaTunnel只能支持简单的数据转换逻辑,对于复杂的数据转换场景,还是需要Flink、Spark任务来完成。

Flink CDC 基本都弥补了以上框架的不足,将数据库的全量和增量数据一体化地同步到消息队列和数据仓库中;也可以用于实时数据集成,将数据库数据实时入湖入仓;无需像其他的CDC工具一样需要在服务器上进行部署,减少了维护成本,链路更少;完美套接Flink程序,CDC获取到的数据流直接对接Flink进行数据加工处理,一套代码即可完成对数据的抽取转换和写出,既可以使用flink的DataStream API完成编码,也可以使用较为上层的FlinkSQL API进行操作。

截止到Flink CDC 2.2 为止,支持的连接器:

img

支持的Flink版本:

img

第2章 FlinkCDC案例实操

2.1 开启MySQL Binlog并重启MySQL

img

2.2 DataStream方式的应用

2.2.1 导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
</dependency>

<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.source.reader.RecordEmitter
引入如下依赖可以解决这个问题(引入某些其它的 flink-table 相关依赖也可)
-->

<dependency>
<groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.2.2 编写代码

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * description:
 * Created by 铁盾 on 2022/4/6
 */
public class FlinkCDC_01_DS {
    public static void main(String[] args) throws Exception {
        // TODO 1. 准备流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 2. 开启检查点   Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
        // 需要从Checkpoint或者Savepoint启动程序
        // 2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.4 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.5 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1L), Time.minutes(1L)
        ));
        // 2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/flinkCDC"
        );
        // 2.7 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        // TODO 3. 创建 Flink-MySQL-CDC 的 Source
		// initial:Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
// earliest:Never to perform snapshot on the monitored database tables upon first startup, just read from the beginning of the binlog. This should be used with care, as it is only valid when the binlog is guaranteed to contain the entire history of the database.
// latest:Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
// specificOffset:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
// timestamp:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp.The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("gmall_config") // set captured database
                .tableList("gmall_config.t_user") // set captured table
                .username("root")
                .password("000000")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial())
                .build();

        // TODO 4.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS =
                env.fromSource(
                        mySqlSource,
                        WatermarkStrategy.noWatermarks(),
                        "MysqlSource");

        // TODO 5.打印输出
        mysqlDS.print();

        // TODO 6.执行任务
        env.execute();
    }
}

2.2.3 案例测试

1)打包并上传至Linux

img

2)启动HDFS集群

[atguigu@hadoop102 flink-local]$ start-dfs.sh

3)启动Flink集群

[atguigu@hadoop102 flink-local]$ bin/start-cluster.sh

4)启动程序

[atguigu@hadoop102 flink-local]$ bin/flink run -m hadoop102:8081 -c com.atguigu.cdc.FlinkCDC_01_DS ./gmall-flink-cdc.jar

5)观察taskManager日志,会从头读取表数据

6)给当前的Flink程序创建Savepoint

[atguigu@hadoop102 flink-local]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flinkCDC/save

在WebUI中cancelJob

在MySQL的gmall_config.t_user表中添加、修改或者删除数据**

从Savepoint重启程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.atguigu.cdc.FlinkCDC_01_DS ./gmall-flink-cdc.jar

观察taskManager日志,会从检查点读取表数据

2.3 FlinkSQL 方式的应用

2.3.1 添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.12</artifactId>
  <version>1.13.0</version>
</dependency>

2.3.2 代码实现

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * description:
 * Created by 铁盾 on 2022/4/6
 */
public class FlinkCDC_02_SQL {
    public static void main(String[] args) throws Exception {
        // TODO 1. 准备环境
        // 1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1.2 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 创建动态表
        tableEnv.executeSql("CREATE TABLE user_info (\n" +
                "id INT,\n" +
                "name STRING,\n" +
                "age INT,\n" +
                "primary key(id) not enforced\n" +
                ") WITH (" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'hadoop102'," +
                "'port' = '3306'," +
                "'username' = 'root'," +
                "'password' = '000000'," +
                "'database-name' = 'gmall_config'," +
                "'table-name' = 't_user'" +
                ")");

        tableEnv.executeSql("select * from user_info").print();

        // TODO 3. 执行任务
        env.execute();
    }
}

标签:Flink,CDC,flink,env,apache,org
From: https://www.cnblogs.com/wkfvawl/p/17457634.html

相关文章

  • Flink Table Store 独立孵化启动 ,Apache Paimon 诞生
    2023年3月12日,FlinkTableStore项目顺利通过投票,正式进入Apache软件基金会(ASF)的孵化器,改名为ApachePaimon(incubating)。随着ApacheFlink技术社区的不断成熟和发展,越来越多企业开始利用Flink进行流式数据处理,从而提升数据时效性价值,获取业务实时化效果。与此......
  • Flink实战(七) - Time & Windows编程
    掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flink中的watermark。Flink在流处理工程中支持不同的时间概念。1处理时间(Processingtime)执行相应算子操作的机器的系统时间。当流程序在处理时间运行时,所有基于时间的算子操作(如时间窗口)将使用运行相......
  • Flink中的Window和Time详解
    Window(窗口)Flink认为批处理是流处理的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而Window就是从流处理到批处理的一个桥梁。通常来讲,Window是一种可以把无界数据切割为有界数据块的手段例如,对流中的所有元素进行计数是不可能的,因为通......
  • Flink核心API之DataSet
    DataSetAPIDataSetAPI主要可以分为3块来分析:DataSource、Transformation、Sink。DataSource是程序的数据源输入。Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。DataSink是程序的输出,它可以把Transformation处理之后的数......
  • Flink核心API之Table API和SQL
    TableAPI&SQL注意:TableAPI和SQL现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的[TableAPI,SQL]和[流,批]的组合都是支持的。TableAPI和SQL的由来:Flink针对标准的流处理和批处理提供了两种关系型API,TableAPI和SQL。TableAPI允许用户以一种很直......
  • Flink核心API之DataStream
    Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。低级API(StatefulStreamProcessing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在一些复杂事件处理逻辑上。核心API(DataStream/DataSetAP......
  • Flink详解
    什么是FlinkApacheFlink是一个开源的分布式,高性能,高可用,准确的流处理框架。分布式:表示flink程序可以运行在很多台机器上,高性能:表示Flink处理性能比较高高可用:表示flink支持程序的自动重启机制。准确的:表示flink可以保证处理数据的准确性。Flink支持流处理和批处理,虽然我们......
  • Flink安装部署
    Flink集群安装部署Flink支持多种安装部署方式StandaloneONYARNMesos、Kubernetes、AWS…这些安装方式我们主要讲一下standalone和onyarn。如果是一个独立环境的话,可能会用到standalone集群模式。在生产环境下一般还是用onyarn这种模式比较多,因为这样可以综合利用集群......
  • 实时数据治理—当Atlas遇见Flink
    Atlas是Hadoop的数据治理和元数据框架。Atlas是一组可扩展和可扩展的核心基础治理服务,使企业能够有效,高效地满足Hadoop中的合规性要求,并允许与整个企业数据生态系统集成。ApacheAtlas为组织提供了开放的元数据管理和治理功能,以建立其数据资产的目录,对这些资产进行分类和治理,并为数......
  • FLink写入Clickhouse优化
    一、背景ck因为有合并文件操作,适合批量写入。如单条插入则速度太慢二、Flink写入ck优化改为分批插入,代码如下DataStream<Row>stream=...stream.addSink(JdbcSink.sink("INSERTINTOmytable(col1,col2)VALUES(?,?)",(ps,row)->{ps.setString(1,row.ge......