首页 > 其他分享 >2023-12-22-flink-cdc使用

2023-12-22-flink-cdc使用

时间:2024-03-22 17:33:51浏览次数:23  
标签:12 22 cdc jsonObject flink import apache org com

应用场景、上手体验

应用场景

Flink CDC (Change Data Capture) 是一种用于捕获和处理数据源中的变化的流处理技术。它可以实时地将数据源中的增量更新捕获到流处理作业中,使得作业可以实时响应数据变化。以下是 Flink CDC 的一些常见应用场景:

  • 数据仓库和实时分析:Flink CDC 可以用于从事务型数据库或消息队列中捕获变化,并将这些变化实时地传输到数据仓库或实时分析系统中。这可以帮助实时分析、报表生成、指标计算等业务在数据更新时立即得到更新的结果,带来实时性和洞察力。

  • 实时ETL和数据同步:Flink CDC 可以实时地捕获源数据变化,并将其转换成目标数据模型,然后将这些转换后的数据输送到其他系统或存储位置,实现实时ETL(Extract, Transform, Load)或数据同步的功能。这种能力可以在不中断服务的情况下对数据进行实时转换、整合和迁移。

  • 反应式应用程序:Flink CDC 可以用于构建反应式应用程序,即根据数据源中的实时变化来实时响应和处理数据。这对于实时监测、告警系统、实时推荐等具有快速响应时间要求的应用非常有用。

  • 数据集成和流数据处理:Flink CDC 可以捕获不同数据源中的变化,并将其转化为流数据进行实时处理。这为数据集成、变换和处理提供了一个强大的工具,使得企业可以更高效地利用和组织分散在不同数据源中的数据。

  • 增量更新索引和搜索引擎:Flink CDC 可以捕获关系数据库中的变化,并将这些变化应用于搜索引擎或索引系统,以保持索引和数据的同步更新。这对于实时搜索、内容检索和全文搜索等场景非常有用。

总之,Flink CDC 提供了将数据源中的变更实时捕获到流处理作业中的能力,使得企业可以实时地处理和响应数据变化。这为实时分析、实时ETL、反应式应用程序等多个场景提供了强大的支持。

上手体验

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.airtu</groupId>
  <artifactId>airtu-cdc</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
   <flink-version>1.17.0</flink-version>
  </properties>

  <dependencies>

    <!--        flink connector 基础包-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <!--        CDC mysql 源-->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-sql-connector-mysql-cdc</artifactId>
      <version>2.4.2</version>
    </dependency>

    <!--        Flink Steam流处理-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <!--        flink java客户端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <!--        开启webui支持,默认是8081,默认没有开启-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <!--        Flink Table API和SQL API使得在Flink中进行数据处理变得更加简单和高效
    通过使用Table API和SQL API,可以像使用传统的关系型数据库一样,通过编写SQL语句或者使用类似于
    Java的API进行数据处理和分析-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.11</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>2.0.6</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.75</version>
    </dependency>

    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-core</artifactId>
      <version>5.8.23</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <classpathPrefix>lib/</classpathPrefix>
              <mainClass>com.airtu.saas.FlinkCDC</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.0</version>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <version>3.6.0</version>
        <executions>
          <execution>
            <id>copy-dependencies</id>
            <phase>package</phase>
            <goals>
              <goal>copy-dependencies</goal>
            </goals>
            <configuration>
              <outputDirectory>${project.build.directory}/lib</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

定义原始数据解析器

该类是将原始的JSON数据,挑选出有用的部分组装成信息的JSON

package com.airtu.saas;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;

public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

    private String host;
    private int port;
    
    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void deserialize(SourceRecord record, Collector<String> out){

        //打印原始的数据
        System.out.println(record);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("host",host);
        jsonObject.put("port",port);
        jsonObject.put("file", record.sourceOffset().get("file"));
        jsonObject.put("pos", record.sourceOffset().get("pos"));
        jsonObject.put("ts_sec", record.sourceOffset().get("ts_sec"));

        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.put("db", name[1]);
        jsonObject.put("table", name[2]);

        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.put("operator_type", operatorType);

        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            jsonObject.put("after", parseRecord(after));
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            jsonObject.put("before", parseRecord(source));
        }
        jsonObject.put("parse_time", System.currentTimeMillis() / 1000);

        out.collect(jsonObject.toString());
    }

    private JSONObject parseRecord(Struct after) {
        JSONObject jsonObject = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field.name());
            jsonObject.put(field.name(),o);
        }

        return jsonObject;
    }

    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

测试代码

package com.airtu.saas;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;


public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //启动webui,绑定本地web-ui端口号
        Configuration configuration=new Configuration();
        configuration.setInteger(RestOptions.PORT,8081);

        //获取配置文件
        Properties properties = new Properties();
        String currentDir = System.getProperty("user.dir");
        String configFilePath = currentDir + File.separator + "application.properties";
        File file = new File(configFilePath);
        if (!file.exists()){
            ClassLoader classLoader = FlinkCDC.class.getClassLoader();
            URL resourceUrl = classLoader.getResource("application.properties");
            configFilePath = resourceUrl.getPath();
        }

        System.out.println("配置文件:" + configFilePath);
        try (FileInputStream configFile = new FileInputStream(configFilePath)) {
            properties.load(configFile);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }
        Object syncTime = properties.get("syncTime");
        DateTime dateTime = DateUtil.parse((String) syncTime, "yyyy-MM-dd HH:mm:ss");

        //获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(1);//设置并行度为1


        //通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("192.168.0.215")
                .port(33306)
                .username("root")
                .password("123456")
                .databaseList("test2")
                .tableList("test2.tb_user")
                .deserializer(new CommonStringDebeziumDeserializationSchema("192.168.0.215",33306)) //设置序列化器
                .startupOptions(StartupOptions.timestamp(dateTime.getTime())) //可以选择从头开始、最新的、指定时间戳位置
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        dataStreamSource.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {

                //打印出重新组装的JSON数据
                System.out.println("json->: "+value);
                super.invoke(value, context);
            }
        });

        //启动任务
        env.execute("FlinkCDC");

    }

}


application.properties

syncTime=2023-12-22 14:00:00

执行结果

SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/Users/applemima1234/Desktop/a/lib/logback-classic-1.2.11.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
配置文件:/Users/applemima1234/Desktop/a/application.properties
十二月 22, 2023 5:24:47 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
警告: Binary log position adjusted from 0 to 4
十二月 22, 2023 5:24:47 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.0.215:33306 at /4 (sid:5642, cid:62998)
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1703228128, file=mysql-bin.000003, pos=8068, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test2.tb_user', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{name=oo,age=25},source=Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1703228128000,db=test2,table=tb_user,server_id=1,file=mysql-bin.000003,pos=8204,row=0},op=c,ts_ms=1703237088440}, valueSchema=Schema{mysql_binlog_source.test2.tb_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
json->: {"ts_sec":1703228128,"file":"mysql-bin.000003","port":33306,"pos":8068,"parse_time":1703237088,"host":"192.168.0.215","after":{"name":"oo","age":25},"db":"test2","table":"tb_user","operator_type":"c"}

标签:12,22,cdc,jsonObject,flink,import,apache,org,com
From: https://www.cnblogs.com/sylvesterzhang/p/18089908

相关文章

  • 2023-12-5-logstash和filebeat使用
    应用场景、组件介绍、logstash启动、filebeat启动应用场景分布式场景中,不同服务器的服务日志集中收集管理,方便排查问题组件介绍logstash日志收集器,将接受到的日志存储到ES中fielbeat日志解析器,将日志解析后通过网络发送给日志收集器logstash启动下载https://www.elastic......
  • 2021-10-22-go语言基础
    概述、变量、常量、运算符和函数、导包、指针、defer、数组、切片、map、type使用、面向对象、反射、chanel、协程、json操作、随机数、网络编程、读取文件、beego概述1特性:自动垃圾回收更丰富的内置类型函数多返回值错误处理匿名函数和闭包类型和接口并发......
  • 2020-6-22-MySQL高级
    数据库引擎对比、索引、SQL语句的性能分析、优化、其他数据库引擎对比MyISAMInnoDB外键不支持支持事务不支持支持行表锁表锁行锁缓存只缓存索引索引、数据都缓存表空间小大关注点性能事务索引1索引分类单值索引:一个索引只包含单个......
  • 2020-8-12-Spring-Security
    资源访问控制方式、认证流程、授权流程、快速开始、授权案例、自定义登陆登出页面、会话管理、基于方法的授权资源访问控制方式基于角色的访问控制Role-BasedAccessControl基于资源的访问控制(常用)Resource-BasedAccessControl认证流程UsernamePasswordAuthenticatio......
  • 浮木云学习日志(12)---表格设计(二)
    上次做了一个包括表格的页面设计,个人认为关于表格设计的内容介绍比较笼统,可能没办法直接对这个组件进行透彻的了解,本次我就来详细介绍一下如何通过高级表格设计玩转表格设计。感兴趣的小伙伴可以去他们官网https://www.fumucloud.com/一起体验一下这个高级表格组件。本次记录我们......
  • 20240322每日一题题解
    20240322每日一题题解Problem输入\(n\)个不大于\(10^5\)的正整数。要求全部储存在数组中,去除掉不是质数的数字,依次输出剩余的质数。第一行输入一个正整数\(n\),表示整数个数。第二行输入\(n\)个正整数\(a_i\),以空格隔开。输出一行,依次输出\(a_i\)中剩余的质数,以空格......
  • mysql使用mysqldump.exe导出为sql脚本,进行导入时出现ERROR 1227 (42000) at line 18:
    mysql使用mysqldump.exe导出为sql脚本,进行导入时出现ERROR1227(42000)atline18:Accessdenied;youneed(atleastoneof)theSUPERorSYSTEM_VARIABLES_ADMINprivilege(s)forthisoperation。Warning:ApartialdumpfromaserverthathasGTIDswillbydefaul......
  • 「ABC221D」 Online games
    题意给\(n\)组整数\(a_i\)和\(b_i\),表示有一个人在\([a_i,a_i+b_i)\)登录。求\(\forallk\in[1,n]\),有\(k\)个玩家登录的天数。分析很明显的差分,但是因为\(a_i,b_i\le10^9\),不能直接开差分数组。注意到\(n\le2\times10^5\),所以可以用pair数组代替差分数组,......
  • 「ABC124D」 Handstand
    题意给一个长度为\(n\)的01串\(s\),可以至多进行\(k\)次操作,每次操作可以把任意子串取反,求操作后最长的连续1串长度。分析\(n\)的范围“友好”地告诉我们最大\(O(n\logn)\)。最开始想的是把每一块分出来跑dp,然后发现写不出来\(O(n)\)的式子。(去世)想了一会后注......
  • Windows server 系统 AD域、加入域——无法加入 Windows Server 2012 R2 或 Windows 7
    目录前言症状原因解决方案更多信息前言       大家好,我是你可知这世上再难遇我 创作者,最近工作中做项目的时候,做Windows服务器数据迁移,前期得加入前服务器的AD域控中,经过一天从早到晚,最终在下班后找到解决方法,现在分享一下我当时最后的解决问题的方式!......