flink cdc 使用
目前 cdc 产品 非常多 ,目前我使用canal ,flink cdc (集成 debezium) 二者 对比相对来说 flink cdc 更加强大,功能很多 但是 有很多 坑,迭代速度很快,借助flink 分布式计算框架,分布式处理 数据。
1. canal
装个服务端,客户端自己写,当然也提供了一些适配器,我之前是定制 客户端写的组件。
2. flink cdc
官方文档
https://ververica.github.io/flink-cdc-connectors/master/
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tableapi/
cdc 开发主要就是写 sql (flink sql),借助 flink 各种连接器 ,快速同步数据 sink 到各个地方,确实使用 方便 借助 checkpoint 可以 保证 事务操作 的精确 一次 操作(这个叼)。
flinksql 上手很容易 但是有个大坑,就是 如果 job 多了 很耗 数据库连接 和多次 重复读 bin日志 。
社区文章,阿里的内部解决 也是 合源 合 整库同步。(阿里云 flink cdc 合源,整库同步这种技术未开源,因为他有专门 数据同步服务)
https://flink-learning.org.cn/article/detail/da710dd3cdfb9b430af405725ad27784
腾讯 云 通过 和 source 解决
目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。
https://cloud.tencent.com/document/product/849/76650
3. 不买 云服务 合源 解决方案
flink 1.45 ,flink cdc 2.1
思路 参考
http://www.dlink.top/docs/extend/practice_guide/cdc_kafka_multi_source_merge
flink table kafka
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/debezium/
json 解析 参考 debezium 官网
4 .代码 实现
- datastream
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
*
* @Date 2022/8/26 16:04
*/
public class TestCdcKafka {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// enable checkpoint
String checkpointDir = "file:///D:/checkpoint";
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
//1.1 设置 CK&状态后端
//略
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("")
.port(3306)
.username("")
.password("")
.databaseList("XXSX").tableList()
//.databaseList()
//.tableList() //这个注释,就是多库同步
.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
//.deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
//2.通过 FlinkCDC 构建 SourceFunction 并读取数据
DataStreamSource<String> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "MySQL Source");
//3.打印数据并将数据写入 Kafka
streamSource.print();
String sinkTopic = "testcdc";
streamSource.addSink(getKafkaProducer("XXX:9092",sinkTopic));
//4.启动任务
env.execute("FlinkCDC");
}
//kafka 生产者
public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
- 序列化
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* @Author
* @Date 2022/8/26 16:14
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
JSONObject source = new JSONObject();
source.put("db",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
- flink sql
- |
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
`id` BIGINT,
`agent_id` BIGINT,
`usable_amount` decimal(20,4)
) WITH (
'connector' = 'kafka',
'topic' = 'testcdc',
'properties.bootstrap.servers' = 'XXXXX:9092',
'properties.group.id' = 'testGroup11',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
)
- |
CREATE TABLE t_user_copy(
id BIGINT,
agent_id BIGINT,
usable_amount decimal(20,4),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://XXXXX:9200',
'index' = 'user_cdc1'
)
- select * FROM KafkaTable where origin_database='XXXX' and origin_table = 'XXXX'
彩蛋
实时数仓
flink cdc + kafka + doris 成本不高,相比 Hadoop 生态 那一套下来
doris 官网,像 使用 mysql 一样
https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/binlog-load-manual
数仓文章 关于 doris
https://blog.csdn.net/weixin_46141936/article/details/121846412
https://blog.csdn.net/weixin_43320999/article/details/111599512
https://blog.csdn.net/dajiangtai007/article/details/123501210
新东方
https://blog.csdn.net/m0_54252387/article/details/125739846
https://cloud.tencent.com/developer/article/1925453?from=article.detail.1807913
数仓
https://cloud.tencent.com/developer/article/1938194
https://segmentfault.com/a/1190000040686141
https://www.daimajiaoliu.com/daima/7b7448559360801
https://blog.csdn.net/qq_37067752/article/details/107474369
https://tech.meituan.com/2020/04/09/doris-in-meituan-waimai.html
https://developer.aliyun.com/article/985042
标签:cdc,org,flink,https,使用,import,com From: https://www.cnblogs.com/lyc88/p/16629996.html