首页 > 其他分享 >FlinkCDC的自定义反序列化

FlinkCDC的自定义反序列化

时间:2023-01-15 15:22:24浏览次数:62  
标签:自定义 cdc org FlinkCDC mysql apache import 序列化 before

FlinkCDC的自定义反序列化

FlinkCDC的简单使用方法

package com.pzb;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @author 海绵先生
 * @Description TODO DataStream流和Flink SQL两种方式应用FlinkCDC
 * @date 2023/1/12-20:03
 */
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.1通过FlinkCDC构建SourceFunction
        /*
        * 格式:MySqlSource.<String>builder().[中间各参数].build()
        * */
        /*DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()//<String>为最终返回类型,官方提供的返回类型为String
                .hostname("hadoop111")
                .port(3306)
                .username("root")
                .password("****")//填写自己的密码
                .databaseList("cdc_test")// 监控的数据库,若只选了数据库参数,则监控所有表
                .tableList("cdc_test.user_info")//监控那张表,格式:数据库名.表名。因为databaseList参数是可以监控多个数据库的
                .deserializer(new CustomerDeserializationSchema())//官方反序列化类:StringDebeziumDeserializationSchema
                .startupOptions(StartupOptions.initial())//startupOptions参数有5种方式:initial、earliest、latest、specificOffset、timestamp
                .build();

        // 获取数据
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //打印数据
        dataStreamSource.print();*/

        //2.2通过FlinkSQL构建SourceFunction
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE user_info ( " +
                " id STRING PRIMARY KEY, " +
                " name STRING, " +
                " sex STRING " +
                " ) WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = 'hadoop111', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '****', " +
                " 'database-name' = 'cdc_test', " +
                " 'table-name' = 'user_info' " +
                ")");
        //3.查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);//注意导包:org.apache.flink.types.Row
        retractStream.print();

        env.execute("FlinkCDC");
    }
}

DataStreamAPI下的自定义反序列化

DataStreamAPIFlinkSQL 都可以进行FlinkCDC操作,但是用DataStreamAPI进行CDC监听数据库,通过官方的反序列化操作得到的不只有目标数据,为了能够更直观看到数据的变化,因此需要我们自己自定义反序列化操作。

简单来看下用官方反序列化类得到的数据样式:

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1673531342, file=mysql-bin.000001, pos=803, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1003}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1003,name=wangwu,sex=famale},after=Struct{id=1003,name=wangwu,sex=male},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1673531342000,db=cdc_test,table=user_info,server_id=1,file=mysql-bin.000001,pos=943,row=0},op=u,ts_ms=1673531338605}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以看到数据非常的杂,因此自定义反序列化操作还是很有必要的

  • 实战源码:
package com.pzb;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


/**
 * @author 海绵先生
 * @Description TODO 关于DataStream方式的CDC,自定义反序列化
 * @date 2023/1/13-21:26
 */
/*官方默认String类型的数据样式
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1673531342, file=mysql-bin.000001, pos=803, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1003}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1003,name=wangwu,sex=famale},after=Struct{id=1003,name=wangwu,sex=male},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1673531342000,db=cdc_test,table=user_info,server_id=1,file=mysql-bin.000001,pos=943,row=0},op=u,ts_ms=1673531338605}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
* */
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {

    /*TODO 明确自己想要的数据格式
    * (
    *   "db":"",
    *   "tableName":"",
    *   "before":{"id":"1001","name":""...},
    *   "after":{"id":"1001","name":""...},
    *   "op":""
    * )
    * */

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //创建JSON 对象用于封装结果数据
        JSONObject result = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();//根据上面样式通过sourceRecord.键值,获取对应值
        //获取结果:topic='mysql_binlog_source.cdc_test.user_info'
        String[] fields = topic.split("\\.");//安装`.`进行分割(.需要转义)

        //添加对应的库名和表名键值对
        result.put("db",fields[1]);
        result.put("tableName",fields[2]);

        //获取before数据
        Struct value = (Struct) sourceRecord.value();//需要进行强转下,注意导的是:org.apache.kafka.connect.data.Struct 这个包
        Struct before = value.getStruct("before");//通过指定before,获取before字段的数据
        JSONObject beforeJson = new JSONObject();
        if (before != null){// before字段是有可能为空的(比如读取[op=r]、插入操作[op=c]...),所以要进行判断
            Schema schema = before.schema();//获取before 的schema信息
            List<Field> fieldList = schema.fields();//获取before里的全部字段

            for (Field field : fieldList){
                // 通过field.name()获取对应的字段名,before.get(field)根据字段名,获取对应的值
                beforeJson.put(field.name(), before.get(field));
            }
        }
        result.put("before",beforeJson);//把before信息添加进去

        //同理获取after
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null){
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList){
                afterJson.put(field.name(), after.get(field));
            }
        }
        result.put("after",afterJson);

        //获取操作类型(operation不能直接通过sourceRecord获取)
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);//注意导包:io.debezium.data.Envelope
        //将operation信息添加进去
        result.put("op",operation);

        //输出数据
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        // 返回类型
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

在进行反序列化前,要弄明白自己想要的到底是什么样的数据

标签:自定义,cdc,org,FlinkCDC,mysql,apache,import,序列化,before
From: https://www.cnblogs.com/Mr-Sponge/p/17053547.html

相关文章

  • flex 4.6 自定义Datagrid添加checkbox\图片等功能
    flex4.6中的 spark 的Datagrid相比于之前的mx有很大不同实现自定义<s:itemRenderer>详见代码~<s:DataGridid="dg"left="14"right="10"top="35"bottom="9"d......
  • 微信小程序自定义字体
    微信小程序自定义字体  解决方法 //加载字体wx.loadFontFace({global:true,family:'cl',......
  • python教程6--自定义函数,数据类型转换,解方程
    本文主要讲解点如下:简单函数数据类型转换空函数自定义绝对值函数自定义函数检查参数类型函数返回多个值求解ax2+bx+c=0的根具体代码如下:'函数相关'__author__='mo......
  • 自定义用户登录验证
    1.自定义用户登录验证把自带的登录逻辑改写以及界面的改写1.1UserDetailServiceImpl@ServicepublicclassUserDetailServiceImplimplementsUserDetailsService{......
  • arcgis api for 自定义zoom
    1.需求自定义UI,实现对地图的zoom操作,在view缩放的时候,带动画效果2.分析问题UI视图一般情况,可能大部分初学者会使用以下代码对zoom进行操作,这个方法是可以放大缩小,但是......
  • C++ 序列化和反序列化
    序列化1、背景1、在TCP的连接上,它传输数据的基本形式就是二进制流,也就是一段一段的1和0。2、在一般编程语言或者网络框架提供的API中,传输数据的基本形式是字节,也就是Byte......
  • iisexpress 绑定自定义域名
    1、项目根目录找到    2、添加绑定域名  3、host映射  4、以管理员身份运行vs,以管理员身份运行vs,以管理员身份运行vs,重要的事情说三遍。。。不然域名无......
  • 序列化与反序列化
    O:4:"site":3:{s:3:"url";s:13:"www.baidu.com";s:4:"name";N;s:5:"title";s:4:"test";}//析构函数:结束运行结束后会调用此魔术方法//__tostring:当我们直接要输出......
  • JS_6_自定义对象
    JS中万事万物皆对象,灵活! 对象:可以调用不存在的属性方法(自动扩充),值为undefined。可以直接新增属性方法。创建自定义对象://创建一个自定义对象var对象名......
  • JS_5_自定义类
    JS万事万物皆对象,灵活!可以调用不存在的属性和方法。(即为undefined。)  创建一个类:格式:function类名(形参){this.属性名=形参;...this.属性名......