首页 > 其他分享 >SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决

SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决

时间:2023-04-19 14:56:07浏览次数:32  
标签:-- flink SchemaRegestry kafka 链路 schema

0、前提知识储备

Conflurent公司的SchemaRegestry组件的基本了解和使用

一、背景:

0.组件版本

flink:1.14

1.链路调整情况

原先链路:oracle-->OGG-->kafka-->flink-->数据库\湖\仓

实现链路:oracle-->OGG-->kafka(搭配conflurent公司的SchemaRegestry组件使用)-->flink-->数据库\湖\仓

2.链路调整缘由:

原链路中的kafka存储的数据格式是avro,每次源端oracle表做schema变更时,下游的相关程序都需要做停程序处理,费时费力,对运维不友好。

预期链路加入SchemaRegestry组件,它天然对avro格式数据支持,并且可以实现动态调整源端schema但不会要求程序手动停止。可以有效解耦链路的上下游,更加灵活,减少运维操作。

3.约束条件

目前项目组负责OGG和kafka的日常维护,flink程序及后续链路由其他项目组(包括我们自己的项目组)独立开发。因为SchemaRegestry组件是由我们负责引入的,在kafka之后链路上的项目组都需要做相应变更,所以需要我们项目组出一个样例代码(此代码逻辑已经实现并全链路跑通)。为使对下游代码改造的影响最小,要求个项目组在flink消费者程序中引入我们编写的反序列化代码,而不是实现自定义的SourceFunction。

二、目前困境

1.通过上面的背景可知,下游在改造时仍需要引入我们自定义的反序列化类,虽然这个类也是对flink原生类的一种具体实现。但是这种方式仍然不太友好,但是当前原生API不支持,找了官网和社区也没有发现有效的解决办法,大家都是在原生基础上按需进行封装。

2. 1.14版本的flink官网中存在AvroDeserializationSchema.forGeneric(...)这种方式,但是需要传入静态schema。目前schema是不确定的,需要根据消费信息中存储的id值去拿到对应的schema版本,这样也存在一个问题:初始化未进行消费时是无法拿到schema的,并且你也无法去解析消费信息对象去动态拿到id值进而拿到对应的schema。这样就成了一个死结。

三、目前的解决办法

1.对原生的反序列化类KafkaDeserializationSchema<GenericRecord>进行封装,实现只传入schemaregestryURL不需要传入schema就可以进行反序列化的操作,将schema变更和消费程序进行解耦。

四、未来优化方向

1.在引入SchemaRegestry组件后,优化flink消费kafka中的avro格式数据的方法,解耦schema的变更和程序运行之间的联系,确保flink消费程序可以实现:在初始化链路中无对应topic的数据时不会报错;当链路中存在积压数据的情况下,依然可以进行schema的变更,程序按照kafka的offset值顺序消费(先消费旧数据再消费新数据),并且在新旧数据的连接处可以自动实现schema的转换。

2.按照上述第一条的描述,可能需要程序在消费每条数据时拿到消息自带的id号并和缓存的schema进行比对,比对成功则反序列化数据;对比不成功则按照新id值重新获取schema文件并进行缓存,再重复上面的步骤。这样可能消费速率会比较慢,影响整个链路运行性能,具体影响多少需要进行仔细测试才能知道。

标签:--,flink,SchemaRegestry,kafka,链路,schema
From: https://www.cnblogs.com/jia-tong/p/17330239.html

相关文章

  • 构建私有仓库 flink image 镜像
    准备flink-*.tar.gz可以自己编译,或者下载编译cd${flink-root}mvninstall-DskipTests#编译成功后,压缩成tar.gzcd./flink-dist/target/tar-cvfflink-1.18-SNAPSHOT-bin.tar.gz./flink-1.18-SNAPSHOT-bin下载准备gosudocker-entrypoint.sh需要使用到gosu......
  • performance_schema 笔记(二)——配置详解
    提前预警:这一篇巨长。。。做好心理准备。。。删除了书里重复说明和过于复杂的一些解释,完整版请参考原书《MySQL性能优化金字塔法则》 零、基本概念instruments:生产者,用于采集MySQL中各种各样的操作产生的事件信息,可以称为监控采集配置项consumers:消费者,用于存储来自instruments......
  • performance_schema 笔记(一)—— 简介与快速入门
    系列文章参考自《MySQL性能优化金字塔法则》,删除了书里重复说明和过于复杂的一些解释,完整版请参考原书。第一篇将简单介绍performance_schema是什么、有什么用、用法快速入门,它由哪些表组成以及这些表的用途。 一、performance_schema简介performanceschema是运行在较低级别的......
  • flink
    指定TM内存模型的方式整个TM内存模型可以通过三种方式来指定   通过指定taskmanager.memory.task.heap.size和taskmanager.memory.managed.size来确定   通过指定taskmanager.memory.flink.size也就是TotalFlinkMemory大小   通过指定*taskmanager.memory.p......
  • Flink零基础学习笔记(一):基础概念
    一、ApacheFlink的定义、架构和原理ApacheFlink是一个分布式大数据处理引擎,可以对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据以内存速度进行快速计算。接下来我们介绍一下这些关键词的意义。处理无界和有界数据任何数据都......
  • 记一次Flink遇到性能瓶颈
    前言这周的主要时间花在Flink上面,做了一个简单的从文本文件中读取数据,然后存入数据库的例子,能够正常的实现功能,但是遇到个问题,我有四台机器,自己搭建了一个standalone的集群,不论我把并行度设置多少,跑起来的耗时都非常接近,实在是百思不得其解。机器多似乎并不能帮助它。把过程记录......
  • xml schema
    1、介绍xmlschema可描述xml文档的结构,是基于xml的dtd替代者。XMLSchema可针对未来的需求进行扩展XMLSchema更完善,功能更强大XMLSchema基于XML编写XMLSchema支持数据类型XMLSchema支持命名空间参考:https://www.runoob.com/schema/schema-tutorial.html2、sche......
  • 分布式计算技术(下):Impala、Apache Flink、星环Slipstream
    实时计算的发展历史只有十几年,它与基于数据库的计算模型有本质区别,实时计算是固定的计算任务加上流动的数据,而数据库大多是固定的数据和流动的计算任务,因此实时计算平台对数据抽象、延时性、容错性、数据语义等的要求与数据库明显不同,面向实时计算的数据架构也就发展起来。本篇我......
  • chatpgt-flinkcdc从mysql到kafka再到mysql
    flinkcdcmysql到kafkaimportorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apach......
  • Web开发|XML Schema (XSD) 学习
    采用XSD建立教师、学生之间的关系。假设有两个实体“学生”和“教师”,一个学生可以有多个老师,一个老师也可以有多个学生,二者为多对多的关系。在XSD中描述多对多的关系可以使用XML元素的引用和复杂类型的组合,使用中间表(或连接表)来表示多对多关系。首先,我们定义“学生”和“教师”......