首页 > 其他分享 >flink cdc 使用

flink cdc 使用

时间:2022-08-27 11:13:27浏览次数:81  
标签:cdc org flink https 使用 import com

目前 cdc 产品 非常多 ,目前我使用canal ,flink cdc (集成 debezium) 二者 对比相对来说 flink cdc 更加强大,功能很多 但是 有很多 坑,迭代速度很快,借助flink 分布式计算框架,分布式处理 数据。

1. canal

装个服务端,客户端自己写,当然也提供了一些适配器,我之前是定制 客户端写的组件。

https://github.com/alibaba/canal

官方文档

https://ververica.github.io/flink-cdc-connectors/master/

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tableapi/

cdc 开发主要就是写 sql (flink sql),借助 flink 各种连接器 ,快速同步数据 sink 到各个地方,确实使用 方便 借助 checkpoint 可以 保证 事务操作 的精确 一次 操作(这个叼)。

flinksql 上手很容易 但是有个大坑,就是 如果 job 多了 很耗 数据库连接 和多次 重复读 bin日志 。

社区文章,阿里的内部解决 也是 合源 合 整库同步。(阿里云 flink cdc 合源,整库同步这种技术未开源,因为他有专门 数据同步服务)

https://flink-learning.org.cn/article/detail/da710dd3cdfb9b430af405725ad27784

腾讯 云 通过 和 source 解决

目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。

https://cloud.tencent.com/document/product/849/76650

3. 不买 云服务 合源 解决方案

flink 1.45 ,flink cdc 2.1

思路 参考

http://www.dlink.top/docs/extend/practice_guide/cdc_kafka_multi_source_merge

flink table kafka

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/debezium/

json 解析 参考 debezium 官网

https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium

4 .代码 实现

  • datastream
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

/**
 * 
 * @Date 2022/8/26 16:04
 */
public class TestCdcKafka {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        String checkpointDir = "file:///D:/checkpoint";

        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
        //1.1 设置 CK&状态后端
        //略

        MySqlSource<String> mySqlSource =  MySqlSource.<String>builder()
                .hostname("")
                .port(3306)
                .username("")
                .password("")
                .databaseList("XXSX").tableList()
                //.databaseList()
                //.tableList() //这个注释,就是多库同步
         
                .deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
                //.deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
                .startupOptions(StartupOptions.latest())
                .build();

        //2.通过 FlinkCDC 构建 SourceFunction 并读取数据

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "MySQL Source");

        //3.打印数据并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "testcdc";
        streamSource.addSink(getKafkaProducer("XXX:9092",sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");



    }

    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }
}
  • 序列化
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
/**
 * @Author 
 * @Date 2022/8/26 16:14
 */
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.创建 JSON 对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名放入 source
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        JSONObject source = new JSONObject();
        source.put("db",database);
        source.put("table",tableName);

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("insert".equals(type)) {
            type = "c";
        }
        if ("update".equals(type)) {
            type = "u";
        }
        if ("delete".equals(type)) {
            type = "d";
        }
        if ("create".equals(type)) {
            type = "c";
        }

        //6.将字段写入 JSON 对象
        result.put("source", source);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("op", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
  • flink sql
- |
  CREATE TABLE KafkaTable (
   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  `id` BIGINT,
  `agent_id` BIGINT,
  `usable_amount` decimal(20,4)
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'testcdc',
  'properties.bootstrap.servers' = 'XXXXX:9092',
  'properties.group.id' = 'testGroup11',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
  )
- |
  CREATE TABLE t_user_copy(
  id BIGINT,
  agent_id BIGINT,
  usable_amount decimal(20,4),
  PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://XXXXX:9200',
  'index' = 'user_cdc1'
  )
- select * FROM KafkaTable where origin_database='XXXX' and origin_table = 'XXXX'
彩蛋

实时数仓

flink cdc + kafka + doris 成本不高,相比 Hadoop 生态 那一套下来

doris 官网,像 使用 mysql 一样

https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/binlog-load-manual

数仓文章 关于 doris

https://blog.csdn.net/weixin_46141936/article/details/121846412

https://blog.csdn.net/weixin_43320999/article/details/111599512

https://blog.csdn.net/dajiangtai007/article/details/123501210

新东方

https://blog.csdn.net/m0_54252387/article/details/125739846

https://cloud.tencent.com/developer/article/1925453?from=article.detail.1807913

数仓

https://cloud.tencent.com/developer/article/1938194

https://segmentfault.com/a/1190000040686141

https://www.daimajiaoliu.com/daima/7b7448559360801

https://blog.csdn.net/qq_37067752/article/details/107474369

https://tech.meituan.com/2020/04/09/doris-in-meituan-waimai.html

https://developer.aliyun.com/article/985042

https://yangshibiao.blog.csdn.net/article/details/118687344?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_exp2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_e

标签:cdc,org,flink,https,使用,import,com
From: https://www.cnblogs.com/lyc88/p/16629996.html

相关文章

  • add_argument()方法基本参数使用
    selenium做web自动化时我们想要通过get打开一个页面之前就设置好一些基本参数,需要通过add_argument()方法来设置,下面以一个简单的不展示窗口为例。option=webdriver.C......
  • 【AGC】如何使用认证服务与云数据库处理用户信息
    ​使用场景华为AGC认证服务可以为应用快速构建安全可靠的用户认证系统,可以实现多种方式关联认证登录。而如何处理这些多种登录方式的用户信息,例如在应用中发布一个活动,哪......
  • node 使用 pm2-logrotate 分割pm2日志 && 停止 pm2-logroatate
    使用pm2-logrotate解决pm2日志体积过大,进行分割 什么是pm2-logrotate?pm2-logrotate是一个pm2的插件,可以对pm2日志进行管理,所以它的运行需要依靠pm2 安装:注:该命令......
  • 78、使用Jenkins Docker 部署SpringBoot项目
    1、centOS安装Docker1、更新软件源:yumupdate2、卸载旧版本:yumremovedockerdocker-commondocker-selinuxdocker-engine3、安装软件包:yuminstall-yyum-utils......
  • JS 中var声明的缺点 以及解决方法 let 和 const(声明常量)的使用
    1、允许重复变量的声明,导致数据被覆盖vara=199;此处省略一千行代码;vara=200;1002行代码并不会报错,且初始的a变量被覆盖,造成了我们的阅读障碍。2、变量提升......
  • 10个快速入门Query函数使用的Pandas的查询示例
    转载:https://mp.weixin.qq.com/s/TJStQDtUfOOXtb__cpivDgpandas.的query函数为我们提供了一种编写查询过滤条件更简单的方法,特别是在的查询条件很多的时候,在本文中整理了1......
  • 3个Pandas高频使用函数
    转载:https://mp.weixin.qq.com/s/6FECFHIkyItYnmQ37Vn_SQ 大家好,我是Peter~本文主要是给大家介绍3个Pandas日常高频使用函数:apply+agg+transform。模拟数据模拟......
  • webxone使用
    webxone使用主窗口 uses wxoExec;procedureTfMain.FormCreate(Sender:TObject);beginAlign:=alNone;Position:=poDesigned;WindowState:=wsNormal;......
  • QQuickImageProvider的使用
    一、概述QQuickImageProvider用于在QML应用程序中提供高级图像的加载功能。(在c++中提供图像路径,编辑等底层数据交互逻辑,在qml端调用显示)它允许QML中的图像被:使用QPixmap......
  • Seatunnel超高性能分布式数据集成平台使用体会
    @目录概述定义使用场景特点工作流程连接器转换为何选择SeaTunnel安装下载配置文件部署模式入门示例启动脚本配置文件使用参数示例Kafka进Kafka出的ETL示例FlinkRun传递参......