首页 > 其他分享 >Flink-CDC

Flink-CDC

时间:2022-10-19 19:45:31浏览次数:62  
标签:Flink cdc flink CDC import apache org 2.11

flink-cdc

flink-cdc 概述
    flink-cdc
    文档地址:https://ververica.github.io/flink-cdc-connectors/master/content/about.html#
依赖
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.17.Final</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.4.13</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>1.4.13</version>
        <type>pom</type>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>0.20.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.4.13</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>1.4.13</version>
    </dependency>
    <dependency>
        <groupId>org.antlr</groupId>
        <artifactId>antlr4-runtime</artifactId>
        <version>4.7</version>
    </dependency>
    <dependency>
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
        <version>1.4</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.2</version>
    </dependency>

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>1.13.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet_2.11</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.13.6</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.4.9</version>
    </dependency>
API方式
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema
    import com.ververica.cdc.connectors.mysql.source.MySqlSource
    import com.ververica.cdc.connectors.mysql.table.StartupOptions
    import org.apache.flink.api.common.eventtime.WatermarkStrategy
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val mySqlSource:MySqlSource[String] = MySqlSource.builder[String]()
      .hostname("114.116.44.117")
      .port(3306)
      .databaseList("test")  //监控库存数据库下的所有表  可以监控多库多表
      .tableList("test.student")
      .username("root")
      .password("123456")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord
      .startupOptions(StartupOptions.latest())   //从flink cdc开始执行起,获取新增数据
      .build()
    env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks[String], "aaa").print()
    env.execute("flink cdc")
SQL方式
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
    import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
    import org.apache.flink.table.api.{EnvironmentSettings, Types}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.types.Row

    val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment//set参数 流处理
    val streamsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamtableenv = StreamTableEnvironment.create(sourceTestEnv,streamsettings)

    //不需要自定义反序列化器
    streamtableenv.executeSql(""" CREATE TABLE user_info (
         id INT NOT NULL,
         name STRING,
         score  DECIMAL(10,3),
         PRIMARY KEY(id) NOT ENFORCED
        ) WITH (
         'connector' = 'mysql-cdc',
         'hostname' = '114.116.44.117',
         'scan.startup.mode'='latest-offset',
         'port' = '3306',
         'username' = 'root',
         'password' = '123456',
         'database-name' = 'test',
         'table-name' = 'student'
        )
        """.stripMargin)
    val table = streamtableenv.sqlQuery("select * from user_info")

    streamtableenv.toRetractStream[Row](table).print()
    sourceTestEnv.execute("flink cdc")
辅助命令
    cd /opt/mysql/mysql/bin
    ./mysql -u root -p'123456'
    use test;

    delete from student;
    insert into student values(1,'zhangsan',1);
    insert into student values(2,'lisi',2);
    insert into student values(3,'wangwu',3);
    insert into student values(4,'sunliu',4);
    insert into student values(5,'liuqi',5);
2.X变化
    CDC 1.X 痛点
    1.Debezium 保证数据一致性,是通过加锁,DBA不会给权限
    2.只支持单并发
    3.全量读取不支持checkpoint

    CDC 2.X
    将数据按照主键进行切分:Chunk  (实现了并行)   
    Chunk读取、Chunk汇报、Chunk分配、增量读取

标签:Flink,cdc,flink,CDC,import,apache,org,2.11
From: https://www.cnblogs.com/wuxiaolong4/p/16807494.html

相关文章

  • Flink体系架构
    Flink的重要角色Flink是非常经典的Master/Slave结构实现,JobManager是Master,TaskManager是Slave。JobManager处理器(Master)协调分布式执行,它们用来调度task,协调检查点......
  • Flink-UDF
    Flink的TableAPI和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:标量函数(ScalarFunctions):将输入的标量值转换成一个新的标量值;表函......
  • flink sql kafka数据接入clickhouse
    --参数--并行度设置set'parallelism.default'='2';--resetexecution.savepoint.path;--resetexecution.checkpoint.path;--设置队列set'yarn.application.q......
  • flink sql kafka数据接入mysql
    --定义source表droptableIFEXISTSsource_applet_kafka;CREATETABLEIFNOTEXISTSsource_applet_kafka(provinceCodeString,companyNameString,appIdStri......
  • Flink WordCount入门
    下面通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)单词统计(批处理)引入依赖<!--flink核心包--><dependency><groupId>org.apa......
  • Flink-基于Table的窗口、聚合操作
    我们知道,窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接......
  • Flink集群部署-standalone部署模式
    简单研究下Flink的任务部署。我们在IDEA开发工具中用代码跑Flink的时候,实际是会虚拟出一个小型的Flink集群,当执行execute的时候是将上面的代码作为一个job提交到Fl......
  • Flink on k8s 讲解与实战操作
    一、概述Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错......
  • 153-《大数据架构师》Flink Task 部署、初始化和启动详解_ev
                                                     ......
  • Flink内存模型
    一、内存布局1、直观图2、树状图 二、内存解释1、Flink使用的内存(1)JVM堆上内存说明:堆上内存管理序列化之后的数据,如果需要处理的数据超出了内存限制,则会将部......