首页 > 数据库 >基于 Flink SQL 和 Paimon 构建流式湖仓新方案

基于 Flink SQL 和 Paimon 构建流式湖仓新方案

时间:2023-12-24 18:37:26浏览次数:45  
标签:Flink Hive 湖仓 流式 SQL Paimon 数据 流读

本文整理自阿里云智能开源表存储负责人,Founder of Paimon,Flink PMC 成员李劲松在云栖大会开源大数据专场的分享。本篇内容主要分为三部分:

  1. 数据分析架构演进
  2. 介绍 Apache Paimon
  3. Flink + Paimon 流式湖仓

一、数据分析架构演进

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_大数据

目前,数据分析架构正在从 Hive 到 Lakehouse 的演变。传统数仓包括 Hive、Hadoop 正在往湖、Lakehouse 架构上演进,Lakehouse 架构包括 Presto、Spark、OSS,湖格式 (Delta、Hudi、Iceberg) 等等架构,这是现在比较大的趋势。Lakehouse 架构包含了诸多新能力。

首先 OSS 比起传统的 HDFS 有了更加弹性、更加计算存储分离的能力。而且 OSS 还有热冷存储分离能力,数据可以归档到冷存,你会发现它的冷存储非常便宜,给了你存储的灵活性。

再往上会发现这些湖格式有着一些好处。具体是哪些好处呢?

  1. 第一点操作方便,湖格式有 ACID、Time Travel、Schema Evolution,这些可以让你有更好的管控能力。
  2. 第二个可能查询更快,比如说 plan 阶段会耗时更短,Hive 在超大数据量、超多文件的时候会有一些查询的问题。所以湖格式在这方面也会解决得更好。

上面的两个好处不一定能打动公司的决策人,其实也不是每家公司都在升级或者都已经升级,其中一个大的原因是大家虽然说 Hive 老了,但它还是能再战一战的,因为前面这两个好处不一定对于每家公司都是刚需。大量的公司都还是继续用 Hive,也许底下的存储换成 OSS (或者 OSS-HDFS) ,但还是老的 Hive 那套。

举例来说,现在已经有了运行稳定的火车,现在可以把它升级一下,增加餐车,装潢一遍,切分成更多节更灵活,但是需要升级为新的一套架构,你愿意冒着风险升级吗?但是如果能升级成高铁动车呢?

所以我要介绍左边第三个好处。Lakehouse 可以做到时效性更好。

  1. 时效性更好不一定是所有业务都需要更好的时效性,都要从天到达分钟级,而是你可以选择其中某些数据进行实时化升级,还可以选择某些时间进行实时化,主流数据仍然是离线状态。
  2. 时效性更好可能会给你的一些业务带来真正的改变,甚至说对于你的架构能带来大幅的简化,让整个数仓更稳定。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_后端_02

时效性在计算领域的领头羊是 Apache Flink。刚才说提升时效性是 Lakehouse 下一步的发展重点,现在要做的就是把 Streaming 计算标准技术也就是 Apache Flink 带到 Lakehouse 架构当中。

所以前几年我们也做有很多相关的探索,包括在 Iceberg 和 Hudi 上的投入,都成功地把 Flink 和 Iceberg 的对接、和 Hudi 的对接打磨出来。但是可能打磨得效果也没有那么好,如果大家用过 Flink + Iceberg 或者 Flink + Hudi 可能也有一些吐槽。关键问题在于,Iceberg 和 Hudi 都是面向 Spark、面向离线而生的数据湖技术,与实时和 Flink 有着不太好的匹配。

所以我们研发了新型数据湖格式 Apache Paimon,它是一个流式数据湖格式。我们分析一下数据湖四剑客有什么样的历史和初衷。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_大数据_03

Apache Iceberg 和 Delta Lake,他们其实是对传统Hive格式的一种升级。本质上还是面向 Append 数据的处理,在离线数仓 T+1 的分析上比起 Hive 更有优势和更方便的使用,更多还是面向传统的离线处理。

Apache Hudi 其实是在 Hive 的基础上提供增量更新的能力,这是它的初衷。它的基础架构还是面向全增量合并的方式,Flink 的集成不如 Spark,一些功能只在 Spark 有,Flink 没有。

Apache Paimon 是从 Flink 社区中孵化出来的,面向流设计的数据湖,目的就是支持大规模更新和真正的流读。

流和湖的结合难点其实在更新。如果大家对 Flink 比较熟悉,Flink SQL 成功的原因之一是它真正对 Changelog 做出了原生的处理,这个 Changelog 本身就是一种更新。

Iceberg、Hudi、Delta 是因为他们都是面向批处理、Spark 的增量 + 全量的方式。一旦需要涉及到合并就是增量数据和全量数据的一次超大合并。相当有全量 10 TB,增量哪怕 1 GB 也可能会涉及到所有文件的合并,这 10 个 TB 的数据要全部重写一次,然后合并才算完成,合并的代价非常大。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Flink_04

右边是面向更新的技术,LSM 全名是 Log Structured Merge-Tree,这种格式在实时领域已经被大量的各种数据库应用起来,包括 RocksDB、Clickhouse、Doris、StarRocks 等等。

LSM 带来的变化是每次合并都可能是局部的。每次合并只用按照一定的策略来 merge 数据即可,这种格式能真正在成本、新鲜度和查询延时的三角 trade-off 中可以做到更强,而且在三角当中可以根据不同的参数做不一样的 trade-off 的选择。

二、介绍 Apache Paimon

我们刚刚介绍了演进的过程,需要 Flink + 湖存储来做 Flink Lakehouse,也介绍了难点。第二部分就介绍一下 Apache Paimon。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_大数据_05

Apache Paimon 是什么样的东西?你可以简单认为基础的架构就是湖存储+ LSM 的结合,对于湖存储来说基本的能力是写和读。Apache Paimon 在这个基础上和 Flink 做了更深度的集成,各种 CDC 数据可以通过 Flink CDC 做到 Schema Evolution 和整库同步地把数据同步到 Paimon 中。

也可以通过 Flink、Spark、Hive、宽表合并的方式或者通过批写覆盖的方式写到 Paimon 中,这是基本的 Lakehouse 能力。也可以在后面批读,通过 Flink、Spark、StrarRocks、Trino 做一些分析,也可以这里通过 Flink 来流读 Paimon 里面的数据,流读生成的 Changelog,流读方面的特性,后面我也会介绍。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_数据_06

这是 Paimon 的架构图,这主要是 Paimon 流式一体实时数据湖大致的发展历程。最开始在 2022 年初发现了开源社区技术上的一块缺失,所以在 Flink 社区提出了 Flink Table Store。直到 2023 年 1 月发布了第一个稳定的版本0.3,3 月份进入 Apache 孵化器。今年 9 月份发布了 Paimon 0.5 版本,这是 Paimon 全面成熟的版本,包括 CDC 入湖和 Append 数据处理。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_数据_07

我们也在阿里云上测试 Apache Paimon 和 Hudi 的性能,测试湖存储的 MergeOnRead 的更新性能,可以看到左边是大致是 5 亿条数据入湖,按照类似的配置、相同的索引来入湖,我们来评估 5 亿条入湖需要多少时间。经过测试发现 Paimon 入湖的过程中,吞吐或者耗时能达到 Hudi 的 4 倍,但是查询相同的数据,发现 Paimon 的查询性能是 Hudi 的 10 倍甚至 20 倍,Hudi 还会碰到因内存变小而无法读取的情况。

为什么呢?我们分析到,Hudi MOR 是纯 Append,虽然后台有 Compaction,但是完全不等 Compaction。所以在测试中 Hudi 的 Compaction 只做了一点点,读取的时候性能特别差。

基于这点,我们也做了右边的 benchmark,就是 1 亿条数据的 CopyOnWrite,来测试合并性能,测试 CopyOnWrite 情况下的 Compaction 性能。测试的结果是发现不管是 2 分钟、1 分钟还是 30 秒,Paimon 性能都是大幅领先的,是 12 倍的性能差距。在 30 秒的时候,Hudi 跑不出来,Paimon 还是能比较正常地跑出来。(Checkpoint 到 10s 后,Paimon 也跑不出来了)

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_后端_08

所以回过头来,我希望通过这三句话的关键词来描述 Paimon 能做到什么。

第一,低延时、低成本的流式数据湖。如果你有用过 Hudi,我们希望你替换到 Paimon 之后以 1/3 的资源来运行它。

第二,使用简单、入湖简单、开发效率高。可以轻松地把数据库的数据以 CDC 的方式同步到数据湖 Paimon 中。

第三,与 Flink 集成强大,数据流起来。

三、Flink + Paimon 流式湖仓

第一部分讲了数据架构演进,就是我们为什么要做 Paimon,第二部分介绍 Paimon 能干什么,有哪些集成、优势,性能上表现如何。接下来第三部分就是 Flink + Paimon 怎么构建流式湖仓。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Hive_09

首先我们看一个大致的图,其实流式湖仓本质还是一个湖仓,湖仓能干什么?最基本的就是批写、批读,能比起传统的 Hive 数仓有更好的优势。在这个基础上要提供一个强大的流式数据更新入湖以及流式数据增量数据的流读,达到全链路的实时化、流批一体化,难点就是流式更新和流读。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Flink_10

一个最典型的流式湖仓能解决的场景,Hive 上 CDC 数据,也就是从 MySQL、传统数据库的数据、CDC 数据能流到仓或者湖中的链路。这是一个比较陈旧,但是也是大量在企业中被应用的架构图。

你可能在第一次运行的时候或者按需通过全量同步的方式同步到 Hive 全量分区表中,成为一个分区。接下来每天要通过增量同步的方式同步到 Kafka 中,通过定时回流的方式把增量的 CDC 数据同步成 Hive 中的一个增量表。每天晚上同步完后,大概 0 点 10 分的时候就可以做一个增量表和全量表的合并,合并之后形成新的分区就是 MySQL 新一天的全量。

通过这样的技术可以看到它的产出时延是非常高的,至少需要 T+1,并且还要等增量数据和全量数据合并。而且全量增量是割裂的,存储也非常浪费。你可以看到Hive全量表每个分区就是一个全量的数据,你要存 100 天的数据就至少是 100 倍的存储。

第三也是链路非常长,非常复杂,涉及到各种各样好几个技术,在真实的业务场景中非常容易遇到的就是这个产出,哪个组件有问题,数据产出不了,导致后面一系列的离线作业跑不了。所以这里描述的就是三高,时延高、成本高、链路复杂度高。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_大数据_11

切到 Flink+Paimon 的流式 CDC 更新,我们希望把架构做得非常简单,不用 Hive 的分区表,只要定义 Paimon 的主键表,不分区。它的定义就非常像 MySQL 表的定义。

通过 Flink CDC、Flink 作业把 CDC 数据全增量一体到 Paimon 中就够了,就可以实时看到这张表的状态,并且实时地查到这张表。数据被实时的同步,但是离线数仓是需要每天的 View,Paimon 要提供 Tag 技术。今天打了一个 Tag 就记住了今天的状态,每次读到这个 Tag 都是相同的数据,这个状态是不可变的。所以通过 Tag 技术能等同取代 Hive 全量表分区的作用,Flink、Spark 可以通过 Time Travel 的语法访问到 Tag 的数据。

传统的 Hive 表那是分区表,Hive SQL 也没有 Time Travel 的语义,怎么办?在 Paimon 中也提供了 Tag 映射成 Hive 分区表的能力,还是可以在 Hive SQL 中通过分区查询,查询多天的数据。Hive SQL 是完全兼容一行不改的状态来查询到 Paimon 的组件表,所以经过这样的架构改造之后,你可以看到整个数据分钟级实时可见,各整个全增量一体化,存储是复用,比较简单稳定而且一键同步,这里不管是存储成本还是计算成本都可以大幅降低。

存储成本通过 Paimon 的文件复用机制,你会发现打十天的 Tag 其实存储成本只有一两天的全量成本,所以保留 100 天的分区,最后存储成本可以达到 50 倍的节省。

在计算成本上虽然需要维护 24 小时都在跑的流作业,但是你可以通过 Paimon 的异步 Compaction 的方式,尽可能地缩小同步的资源消耗,甚至 Paimon 也提供整库同步的类似功能给到你,可以通过一个作业同步上百张或者几百张表。所以整个链路能做到三低:时延低、成本低和链路复杂度低。

接下来介绍两个流读。大家可能觉得 Paimon 是为实时而生的,更好地流读,其实没有什么实感。包括 Hudi、Iceberg 也能流读,我在这里通过两个机制来说明 Paimon 在数据流读上做了大量的工作。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Flink_12

第一,Consumer 机制。如果没有这个能力,经常流读的时候碰到非常头疼的东西就是 FileNotFoundException,这个机制是什么样的呢?因为我们在数据产出过程当中,需要不断地产生 Snapshot。太多的 Snapshot 会导致大量的文件、导致数据存储非常地冗余,所以需要有 Snapshot 的清理机制。但是另外流读的作业可不知道这些,万一我正在流读的 Snapshot 被 Snapshot Expiration 给删了,那不就会出现 FileNotFoundException,怎么办?而且更为严重的是,流读作业可能会 Failover,万一它挂了 2 个小时,重新恢复后,它正在流读的 Snapshot 已经被删除了,再也恢复不了。

所以 Paimon 在这里提出了 Consumer 机制。Consumer 机制就是在 Paimon 里用了这个机制之后,会在文件系统中记一个进度,当我再读这个 Snapshot,Expiration 就不会删这个 Snapshot,它能保证这个流读的安全,也能做到像类似 Kafka Group Id 流读进度的保存。重启一个作业无状态恢复还是这个进度。所以 Consumer 机制可以说是流读的基本机制。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Flink_13

第二,Changelog 生成。假设有这样一张 Paimon 的 PK 表,Key 是名字,Value 是 Count,上游在不断地流写,下游在不断地流读。流写可能会同一个组件写相同的数据,比如说先前写的 Jason 是 1,后面又写一个 Jason 是 2。你会发现流读的作业在做一个正确流处理的时候,比如说做一个 sum,sum 结果应该是 2 还是 3,如果没有这个 Changelog 的生成就不知道这是同一个主键,我要先把 Jason -> 1 给 retract 掉,再写 Jason -> 2。所以这里也对我们湖存储本身要表现得像一个数据库生成 Binlog 的方式,下游的流读计算才能更好、更准确。

Changelog 生成有哪些技术呢?在 Flink 实时流计算中,大家如果写过作业的话,也可能写过大量用 State 的方式来去重。但是这样的方式 State 的成本比较高,而且数据会存储多份,一致性也很难保障。或者你可以通过全量合并的方式,比如说 Delta、Hudi、Paimon 都提供了这样的方式,可以在全量合并的时候生成对应的 Changelog,这个可以,但是每次生成 Changelog 都需要全量合并,这个代价也会非常大。

第三,Paimon 这边独有的方式,它有 Changelog-Producer=lookup,因为它是 LSM。LSM 是有点查的能力,所以你可以配置这样一个点查的方式在写入的时候能通过批量高效率的点查生成对应的 Changelog 让下游的流处理能够正确地流处理。

上面两个部分就是 Paimon 的更新和流读。流式湖仓面向流批一体的 Flink 的流批一体。之前是流批一体的计算,现在有了存储以后是流批一体的计算 + 流批一体的存储。

但是,有同学在用阿里云 Serverless Flink 发现没有批的基本能力:调度和工作流?

流式湖仓不仅要解决流的能力,还需要解决批的离线处理能力,批是湖仓的基础,流只是在这个流式湖仓中真正的流可能只有 10%、20%,并不是整个湖仓的全部。所以 Flink 的流批一体离不开 Flink 的真正批处理。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_Flink_14

大家也可以看到流式湖仓的图里,可能需要 4 个步骤来处理数据。

  1. 第一步是一键入湖,通过 Flink CTAS/CDAS 一键入湖。
  2. 第二步里面 Pipeline 全链路实时化是流起来的,所以需要我对存储有流读流写的能力。
  3. 第三步就是这些数据全都是可以通过开放分析引擎来分析到数据。
  4. 第四步就是湖仓本质的东西批读批写,在产品上需要的东西基本上就是调度、工作流。

大家期待已久,阿里云 Serverless Flink 也正式迎来了产品上的调度和工作流的能力,能让你在 Serverless Flink 达到真正的完整批处理链路的能力。

接下来我就想通过一个准实时流式湖仓的案例,是电商的数据分析。通过 Flink 实时入湖入到 ODS 层 Paimon 表,通过流式流起来流到 DWD,再流到 DWM,再到 DWS,这样一整套完整的流式湖仓。

基于 Flink SQL 和 Paimon 构建流式湖仓新方案_数据_15

开源大数据专场回放观看地址:yunqi.aliyun.com/2023/subfor…

《基于 Flink SQL 和 Paimon 构建流式湖仓新方案》Demo 演示见开源大数据专场回放视频 01:52:42 - 01:59:00 时间段。

Serverless Flink 不只有流 ETL 的能力,现在也有一个比较完善的批处理方式,以前可能是流在一个开发平台,批在一个开发平台,非常地割裂,现在能做到的是整个开发平台都可以在 Serverless Flink 上,整个计算引擎可以是 Flink Unified 的,而且底下的存储都是 Unified 的一套 Paimon 存储,完成离线处理以及实时处理或者准实时处理的能力,能达到从开发到计算和存储的完整 Unified 方案。批处理的版本即将发布,大家有需要可以联系我们提前试用。

关于 Paimon

  1. 微信公众号:Apache Paimon ,了解行业实践与最新动态
  2. 官网:paimon.apache.org/ 查询文档和关注项目

标签:Flink,Hive,湖仓,流式,SQL,Paimon,数据,流读
From: https://blog.51cto.com/u_14286418/8956368

相关文章

  • mysql 表注释查询
    驼峰函数CREATEFUNCTION`underlineToCamel`(paramStringVARCHAR(200))RETURNSvarchar(200)CHARSETutf8DETERMINISTICbeginsetparamString=LOWER(paramString);setparamString=replace(paramString,'_a'......
  • Flink on Yarn安装部署
    引言ApacheFlink是一款用于大规模数据处理和分析的分布式流处理框架,它提供了高性能、容错性和灵活性,广泛应用于实时数据处理和批处理场景。Flink的核心特性包括事件驱动、状态管理、窗口操作等,使其成为处理实时和离线数据的理想选择。本文档将引导您在YARN(YetAnotherReso......
  • SQL分页
    DECLARE@sUserNoNVARCHAR(20),@sBillNoNVARCHAR(50),@sStartDateDATETIME,@sEndDateDATETIME,@iPageSizeINT=20,@iPageIndexINT=1;IFISNULL(@iPageIndex,0)<1SET@iPageIndex=1;iFISNULL(@iPageSize,0)<1SET@iPageSize=50;SET@sEndDate=@sEndD......
  • 无涯教程-PostgreSQL - Limit语句
    PostgreSQLLIMIT子句用于限制SELECT语句返回的数量。LimitClause-语法带LIMIT子句的SELECT语句的基本语法如下-SELECTcolumn1,column2,columnNFROMtable_nameLIMIT[noofrows]以下是LIMIT子句与OFFSET子句一起使用时的语法-SELECTcolumn1,column2,columnN......
  • 写原生SQL和使用ORM框架相比优缺点
    一、写原生SQL1.1优点理论上来说更可控,想怎么写就怎么写。相对来说开发起来可能更快速1.2缺点问题不好排查,可能会耗费更多时间切换数据库,sql需要重新调整(各个数据库sql写法存在区别)如果是数据结构中某个字段有修改,所有使用到当前数据库字段的地方都需要修改(特别是通过......
  • 无涯教程-PostgreSQL - 更新数据(Update)
    PostgreSQLUPDATE查询用于修改表中的现有记录,您可以将WHERE子句与UPDATE查询一起使用来更新选定的行,否则,将更新所有行。使用WHERE子句的UPDATE查询的基本语法如下-UPDATEtable_nameSETcolumn1=value1,column2=value2....,columnN=valueNWHERE[condition];您可以使用......
  • 无涯教程-PostgreSQL - AND&OR语句
    PostgreSQL的AND和OR运算符用于组合多个条件以缩小PostgreSQL语句中的选定数据。AND运算符AND运算符允许PostgreSQL语句的WHERE子句中存在多个条件,使用AND运算符时,当所有条件都为真时才通过。如,仅当condition1和condition2均为true时,[condition1]AND[condition2]才为tr......
  • MySQL 5.7.36安装
    文档课题:MySQL5.7.36安装系统:rhel7.964位安装包:mysql-5.7.36-el7-x86_64.tar.gz1、安装1.1、创建目录和用户[root@leo-mysql01~]#mkdir-p/mysql/data[root@leo-mysql01~]#mkdir-p/mysql/binlog[root@leo-mysql01~]#mkdir-p/opt/mysql[root@leo-mysql01~]#......
  • 无涯教程-PostgreSQL - 表达式
    表达式(Expressions)是一个或多个值,运算符以及计算为一个值的PostgresSQL函数的组合。PostgreSQLEXPRESSIONS类似于公式,它们以查询语言编写,您还可以用于查询数据库以获取特定的数据集。考虑一下SELECT语句的基本语法,如下所示:SELECTcolumn1,column2,columnNFROMtable_na......
  • mysql8.0 OCP 105
    105、Choosefour.YoumuststoreconnectionparametersforconnectingaLinux-basedMySQLclienttoaremoteWindows-basedMySQLserverlisteningonport3309.您必须存储连接参数,以便将基于linux的MySQL客户端连接到侦听端口3309的基于Windows的远程MySQL服务器。Wh......