FlinkCDC的自定义反序列化
FlinkCDC的简单使用方法
package com.pzb;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author 海绵先生
* @Description TODO DataStream流和Flink SQL两种方式应用FlinkCDC
* @date 2023/1/12-20:03
*/
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.1通过FlinkCDC构建SourceFunction
/*
* 格式:MySqlSource.<String>builder().[中间各参数].build()
* */
/*DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()//<String>为最终返回类型,官方提供的返回类型为String
.hostname("hadoop111")
.port(3306)
.username("root")
.password("****")//填写自己的密码
.databaseList("cdc_test")// 监控的数据库,若只选了数据库参数,则监控所有表
.tableList("cdc_test.user_info")//监控那张表,格式:数据库名.表名。因为databaseList参数是可以监控多个数据库的
.deserializer(new CustomerDeserializationSchema())//官方反序列化类:StringDebeziumDeserializationSchema
.startupOptions(StartupOptions.initial())//startupOptions参数有5种方式:initial、earliest、latest、specificOffset、timestamp
.build();
// 获取数据
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
//打印数据
dataStreamSource.print();*/
//2.2通过FlinkSQL构建SourceFunction
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE user_info ( " +
" id STRING PRIMARY KEY, " +
" name STRING, " +
" sex STRING " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'hostname' = 'hadoop111', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '****', " +
" 'database-name' = 'cdc_test', " +
" 'table-name' = 'user_info' " +
")");
//3.查询数据并转换为流输出
Table table = tableEnv.sqlQuery("select * from user_info");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);//注意导包:org.apache.flink.types.Row
retractStream.print();
env.execute("FlinkCDC");
}
}
DataStreamAPI下的自定义反序列化
DataStreamAPI
和FlinkSQL
都可以进行FlinkCDC操作,但是用DataStreamAPI进行CDC监听数据库,通过官方的反序列化操作得到的不只有目标数据,为了能够更直观看到数据的变化,因此需要我们自己自定义反序列化操作。
简单来看下用官方反序列化类得到的数据样式:
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1673531342, file=mysql-bin.000001, pos=803, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1003}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1003,name=wangwu,sex=famale},after=Struct{id=1003,name=wangwu,sex=male},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1673531342000,db=cdc_test,table=user_info,server_id=1,file=mysql-bin.000001,pos=943,row=0},op=u,ts_ms=1673531338605}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
可以看到数据非常的杂,因此自定义反序列化操作还是很有必要的
- 实战源码:
package com.pzb;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* @author 海绵先生
* @Description TODO 关于DataStream方式的CDC,自定义反序列化
* @date 2023/1/13-21:26
*/
/*官方默认String类型的数据样式
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1673531342, file=mysql-bin.000001, pos=803, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1003}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1003,name=wangwu,sex=famale},after=Struct{id=1003,name=wangwu,sex=male},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1673531342000,db=cdc_test,table=user_info,server_id=1,file=mysql-bin.000001,pos=943,row=0},op=u,ts_ms=1673531338605}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
* */
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {
/*TODO 明确自己想要的数据格式
* (
* "db":"",
* "tableName":"",
* "before":{"id":"1001","name":""...},
* "after":{"id":"1001","name":""...},
* "op":""
* )
* */
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//创建JSON 对象用于封装结果数据
JSONObject result = new JSONObject();
//获取库名&表名
String topic = sourceRecord.topic();//根据上面样式通过sourceRecord.键值,获取对应值
//获取结果:topic='mysql_binlog_source.cdc_test.user_info'
String[] fields = topic.split("\\.");//安装`.`进行分割(.需要转义)
//添加对应的库名和表名键值对
result.put("db",fields[1]);
result.put("tableName",fields[2]);
//获取before数据
Struct value = (Struct) sourceRecord.value();//需要进行强转下,注意导的是:org.apache.kafka.connect.data.Struct 这个包
Struct before = value.getStruct("before");//通过指定before,获取before字段的数据
JSONObject beforeJson = new JSONObject();
if (before != null){// before字段是有可能为空的(比如读取[op=r]、插入操作[op=c]...),所以要进行判断
Schema schema = before.schema();//获取before 的schema信息
List<Field> fieldList = schema.fields();//获取before里的全部字段
for (Field field : fieldList){
// 通过field.name()获取对应的字段名,before.get(field)根据字段名,获取对应的值
beforeJson.put(field.name(), before.get(field));
}
}
result.put("before",beforeJson);//把before信息添加进去
//同理获取after
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null){
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (Field field : fieldList){
afterJson.put(field.name(), after.get(field));
}
}
result.put("after",afterJson);
//获取操作类型(operation不能直接通过sourceRecord获取)
Envelope.Operation operation = Envelope.operationFor(sourceRecord);//注意导包:io.debezium.data.Envelope
//将operation信息添加进去
result.put("op",operation);
//输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
// 返回类型
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
标签:自定义,cdc,org,FlinkCDC,mysql,apache,import,序列化,before From: https://www.cnblogs.com/Mr-Sponge/p/17053547.html在进行反序列化前,要弄明白自己想要的到底是什么样的数据