首页 > 其他分享 >Flink CDC 写 StarRocks

Flink CDC 写 StarRocks

时间:2024-03-08 10:46:56浏览次数:31  
标签:flink StarRocks get Flink parameterTool CDC source withProperty sink

Flink 版本:1.17.1
CDC 版本:2.3.0
StarRocks 版本:2.5.8

前言

最近需要实时同步几个 Mysql 表到 StarRocks,薅出之前写的 Demo 代码,简单改造了一下,加了个配置文件,可以通过修改配置文件指定 source、sink 表,这样就不用讲表名什么的写死到代码里面。

再利用 flink session 模式,把一堆任务放到一个 session(表的数据增量并不大),利用不太多的资源,同步一堆表。

DDL

StarRocks 的 flink connect,需要先在 StarRocks 上建表,直接基于 Mysql 表,把字段和类型拿出来,加上表类型、分桶和properties 就行


CREATE TABLE `ads_circle_guest_task_result` (
  `sub_task_id` varchar(128) COMMENT '子任务id',
  `task_id` bigint(20)  COMMENT '任务id',

-- 字段忽略

  `created_at` datetime  COMMENT '子任务创建时间',
  `created_by` varchar(64)  COMMENT '创建人',
  `created_time` datetime  COMMENT '子任务创建时间',
  `updated_time` datetime  COMMENT '子任务修改时间',
  `finish_time` datetime  COMMENT '任务完成时间',
  `updated_at` datetime   COMMENT '子任务修改时间',
  `updated_by` varchar(64)  COMMENT '更新人',
  `dr` tinyint(1)  COMMENT '逻辑删除字段'
)
ENGINE=OLAP
PRIMARY KEY(sub_task_id)
DISTRIBUTED BY HASH(sub_task_id) BUCKETS 8
PROPERTIES(
    "replication_num" = "1"
);

配置文件

用配置文件生成任务,所以任务名做成参数,在配置文件中传入,直接使用表名作为 任务名

将 Flink Cdc 需要的参数使用 source.xxx 的方式放在配置文件中

将 Flink 的 StarRocks connector 需要的参数使用 sink.xxx 的方式放在配置文件中

job_name=ads_circle_guest_task_result
## source  mysql
source.host:localhost
source.port:3306
source.user:root
source.pass:123456
source.database:test
source.table_list:test.ads_circle_guest_task_result
source.time_zone:Asia/Shanghai
# init  latest
source.startup_option:LATEST
source.startup_option_time:2024-03-07 00:00:00
## sink starrocks
sink.jdbc-url=jdbc:mysql://localhost:9030
sink.load-url=localhost:18030
sink.username=root
sink.password=123456
sink.database-name=test
sink.table-name=ads_circle_guest_task_result
sink.batch=64000
sink.interval=5000

Source

Flink cdc Source,参数都通过参数的方式传进来

// cdc source
val source = MySqlSource.builder[String]()
  .hostname(parameterTool.get("source.host"))
  .port(parameterTool.get("source.port").toInt)
  .username(parameterTool.get("source.user"))
  .password(parameterTool.get("source.pass"))
  .databaseList(parameterTool.get("source.database"))
  .tableList(parameterTool.get("source.table_list"))
  .serverTimeZone(parameterTool.get("source.time_zone"))
  // 包含 schema change
  .includeSchemaChanges(false)
  .debeziumProperties(prop)
  //      .startupOptions(StartupOptions.latest())
  .startupOptions(startupOption)
  .deserializer(new DdlDebeziumDeserializationSchema(parameterTool.get("source.host"), parameterTool.get("source.port").toInt))
  .build()


Sink

Source 输出的数据是自定义 反序列化器输出的嵌套 Json,使用 operator_type 标识数据的修改类型:c 创建,u 修改,d 删除,r 读,输出的数据是 json 类型,c u r 类型都使用 after ,delete 使用 before


val sink = StarRocksSink.sink(
  // the sink options
  StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", parameterTool.get("sink.jdbc-url"))
    .withProperty("load-url", parameterTool.get("sink.load-url"))
    .withProperty("username", parameterTool.get("sink.username"))
    .withProperty("password", parameterTool.get("sink.password"))
    .withProperty("database-name", parameterTool.get("sink.database-name"))
    .withProperty("table-name", parameterTool.get("sink.table-name"))
    // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
    // .withProperty("sink.properties.partial_update", "true")
    // .withProperty("sink.properties.columns", "k1,k2,k3")
    .withProperty("sink.properties.format", "json")
    .withProperty("sink.properties.strip_outer_array", "true")
    //        .withProperty("sink.properties.row_delimiter", ROW_SEP)
    //        .withProperty("sink.properties.column_separator", COL_SEP)
    // 设置并行度,多并行度情况下需要考虑如何保证数据有序性
    .withProperty("sink.parallelism", "1")
    .withProperty("sink.version", "v1")
    .withProperty("sink.buffer-flush.max-rows", parameterTool.get("sink.batch"))
    .withProperty("sink.buffer-flush.interval-ms", parameterTool.get("sink.interval"))
    .build())

map

使用 map 算子,调整一下输入的数据格式,在 json 中添加 "__op" 参数,标明数据操作类型,主要是将 删除的数据标记为 1

val map = env.fromSource(source, WatermarkStrategy.noWatermarks[String](), "cdc")
  .map(new RichMapFunction[String, String] {
    var jsonParser: JsonParser = _


    override def open(parameters: Configuration): Unit = {
      jsonParser = new JsonParser()
    }

    override def map(in: String): String = {

      val json = jsonParser.parse(in).getAsJsonObject

      val sqlOperator = json.get("operator_type").getAsString

      var data: JsonObject = null
      var result = ""


      if ("r".equals(sqlOperator) || "u".equals(sqlOperator) || "c".equals(sqlOperator)) {
        // read / create / u

        data = json.get("after").getAsJsonObject
        // add
        data.addProperty("__op", "0")

      } else if ("d".equals(sqlOperator)) {
        //
        data = json.get("before").getAsJsonObject
        data.addProperty("__op", "1")
      }

      if (data != null && !data.isJsonNull) {
        result = data.toString
      }

      result

    }

  })
  .name("map")
  .uid("map")

部署

session

使用 Flink session 启动一个 session,配置 slot 数据为 5

./bin/yarn-session.sh -d -nm cdc -s 4 -jm 2g -tm 4g  # -s 参数要写在前面,不然不生效

任务

使用 flink run 启动任务,使用 -yid 参数指定任务的 application id,最后一个参数指定任务的配置文件

bin/flink run -yd  -yid application_1682501108615_0788 -c com.venn.connector.starrocks.CdcSingleMysqlTableToStarRocks /data/jar/flink-rookie-1.0.jar ../jar/prd/cdc_asset_available_month.properties
bin/flink run -yd  -yid application_1682501108615_0788 -c com.venn.connector.starrocks.CdcSingleMysqlTableToStarRocks /data/jar/flink-rookie-1.0.jar ../jar/prd/cdc_reservation.properties
bin/flink run -yd  -yid application_1682501108615_0788 -c com.venn.connector.starrocks.CdcSingleMysqlTableToStarRocks /data/jar/flink-rookie-1.0.jar ../jar/prd/cdc_ads_circle_guest_task_result.properties
bin/flink run -yd  -yid application_1682501108615_0788 -c com.venn.connector.starrocks.CdcSingleMysqlTableToStarRocks /data/jar/flink-rookie-1.0.jar ../jar/prd/cdc_oc_project_extract_record.properties

flink web overview页面:

flink web 任务详情:

taskmanager gc(任务跑了 16 小时):

同步的表数据量都是千万,但是增量都不太大,预测 1 个 yarn 的 container (4 G内存),可以跑 10 个以上的任务

完整代码参考: github flink-rookie

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
flink 菜鸟公众号

标签:flink,StarRocks,get,Flink,parameterTool,CDC,source,withProperty,sink
From: https://www.cnblogs.com/Springmoon-venn/p/18060439

相关文章

  • flink 中的水位线(Watermark)
    水位线Watermark实时统计使用了flinksql程序,使用flink-TVF表值函数滚动窗口按分钟进行数据聚合操作,消费的kafka数据需要在规定的时间窗口内进行推送数据并消费计算,为了解决处理乱序事件或延迟数据引入了Watermark,用来设置延迟计算时间等待迟到的数据,但不能无限期的等下去,必......
  • flink总结
    基本概念介绍flink的基本处理流程读取数据(source)->各种算子计算处理数据(rdd)-->输出数据(sink)有界流和无界流如果是从文件有限数据的地方读取数据就是有界流,如果是接到kafka或者socket这种地方就是无界流。有状态和无状态算子计算的过程中,是否要保存中间结算结果......
  • 使用 SPL 高效实现 Flink SLS Connector 下推
    作者:潘伟龙(豁朗)背景日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入SLS进行存储、分析;阿里云Flink是阿里云基于ApacheFlink构建的大数据分析平台......
  • 【Flink入门修炼】2-2 Flink State 状态
    什么是状态?状态有什么作用?如果你来设计,对于一个流式服务,如何根据不断输入的数据计算呢?又如何做故障恢复呢?一、为什么要管理状态流计算不像批计算,数据是持续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都保存下来,然后再和新数据合并计算。......
  • Flink AggregatingState 实例
    FlinkAggregatingState实例AggregatingState介绍AggregatingState需要和AggregateFunction配合使用add()方法添加一个元素,触发AggregateFunction计算get()获取State的值需求:计算每个设备10秒内的平均温度importorg.apache.flink.api.common.eventtime.SerializableTimesta......
  • flink 提交yarn 命令 flink run -m yarn-cluster
    flink提交yarn命令flinkrun-myarn-cluster文章目录Flink集群搭建和使用local本地测试flink集群搭建1、standallonecluster提交任务--将代码打包2.flinkonyarn只需要部署一个节点flink启动方式1、yarn-session2、直接提交任务到yarnFlink集群搭建和使用local本地......
  • 【Flink入门修炼】2-1 Flink 四大基石
    前一章我们对Flink进行了总体的介绍。对Flink是什么、能做什么、入门demo、架构等进行了讲解。本章我们将学习Flink重点概念、核心特性等。本篇对Flink四大基石进行概括介绍,是Flink中非常关键的四个内容。一、四大基石Flink四大基石分别是:Time(时间)、Window(窗口)、St......
  • 使用debezium实现cdc实时数据同步功能记录
    Debezium是一个用于变更数据捕获的开源分布式平台。能够保证应用程序就可以开始响应其他应用程序提交到您数据库的所有插入、更新和删除操作。Debezium持久、快速,因此即使出现问题,您的应用程序也能快速响应,绝不会错过任何事件。Debezium默认使用Kafka来投递数据,在事务日志中记......
  • Hudi-FlinkSQL导入数据报错:[ERROR] Could not execute SQL statement. Reason: java.l
    问题描述通过FlinkSQL创建Hudi表后,向表中插入数据报错:[ERROR]CouldnotexecuteSQLstatement.Reason:java.lang.ClassNotFoundException:org.apache.hadoop.fs.FSDataInputStream 解决办法向Hudi表中写入数据时,会调用Hadoop的Jar包,但是Flink的lib目录中没有该Jar包。......
  • 在K8S中,etcdctl如何使用?
    在Kubernetes(K8s)中,etcdctl是用于直接与etcd集群交互的命令行工具,主要用于管理和调试etcd存储中的键值对数据。etcd是K8s集群的核心组件之一,它作为一个高可用的分布式键值存储系统,用于保存集群的所有重要配置数据。以下是如何在Kubernetes环境中使用etcdctl的基本......