在之前三期的实时湖仓系列文章中,我们从业务侧、产品侧、应用侧等几个方向,为大家介绍了实时湖仓方方面面的内容,包括实时湖仓对于企业数字化布局的重要性以及如何进行实时湖仓的落地实践等。
本文将从纯技术的角度,为大家解析实时湖仓的存储原理以及生态选型,为企业建设实时湖仓给出技术方面的参考意见。
实时湖仓能解决什么问题?
大部分人可能都会有这样一个疑问,企业为什么要引入实时湖仓?
如下图所示,引入实时湖仓可以降低运维难度,实现低成本统一存储、中间状态可查,以及提升开发效率。
实时湖仓能够在低成本存储的同时,极大降低数据指标的时延,从传统的 T+1 的时延,降低到到分钟级。
实时湖仓解决方案,利用湖存储的特性和 Flink 的流批计算能力,统一存储和计算,解决业务对数据时效性高要求的需求。
实时湖仓存储原理
下文将从大数据常用技术和大数据存储常用理论两个方面为大家解析实时湖仓的存储原理。
大数据常用技术分析
Hive 事务表
Hive 是基于 Hadoop 的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供 SQL 查询功能,能将 SQL 语句转变成 MapReduce 任务来执行。
ACID 事务表中会包含三类文件,分别是 base、delta 以及 delete。
Hbase——LSM-Tree模型
LSM-Tree,即日志结构合并树(Log-Structured Merge-Tree)是 Google BigTable 和 HBase 的基本存储算法。HBase 是基于 LSM-Tree 模型实现的,所有的数据写入操作都首先会顺序写到日志 HLog,再写入 Memstore。当 MemStore 中数据达到指定大小之后再将这些数据批量写入磁盘,生成一个新的 HFile 文件,这个动作叫 Memstore flush。
LSM 树架构有如下几个非常明显的优势:
· 这种写入方式将一次随机 IO 写入转换成一个顺序 IO 写入(HLog 顺序写入)加上一次内存写入(MemStore 写入),使得写入性能得到极大提升
· HFile 中 KeyValue 数据需要按照 Key 排序,排序之后可以在文件级别根据有序的 Key 建立索引树,极大提升数据读取效率,然而 HDFS 本身只允许顺序读写,不能更新,因此需要数据在落盘生成 HFile 之前就完成排序工作,MemStore 就是 KeyValue 数据排序的实际执行者
· MemStore 作为一个缓存级的存储组件,总是缓存着最近写入的数据,对于很多业务来说,最新写入的数据被读取的概率会更大,最典型的比如时序数据,80%的请求都会落到最近一天的数据上,实际上对于某些场景,新写入的数据存储在 MemStore 对读取性能的提升至关重要
· 在数据写入 HFile 之前,可以在内存中对 KeyValue 数据进行很多更高级的优化。比如,如果业务数据保留版本仅设置为1,在业务更新比较频繁的场景下,MemStore 中可能会存储某些数据的多个版本,这样,MemStore 在将数据写入 HFile 之前实际上可以丢弃老版本数据,仅保留最新版本数据
MemStore 使用数据结构 ConcurrentSkipListMap 来实际存储 KeyValue,优点是能够非常友好地支持大规模并发写入,同时跳跃表本身是有序存储的,这有利于数据有序落盘,以及有利于提升 MemStore 中的 KeyValue 查找性能。
KeyValue 写入 MemStore 并不会每次都随机在堆上创建一个内存对象,然后再放到 ConcurrentSkipListMap 中,这会带来非常严重的内存碎片,进而可能频繁触发 Full GC。HBase 使用 MemStore-Local Allocation Buffer(MSLAB) 机制预先申请一个大的(2M)的 Chunk 内存,写入的 KeyValue 会进行一次封装,顺序拷贝这个 Chunk 中。
这样,MemStore 中的数据从内存 flush 到硬盘的时候,JVM 内存留下来的就不再是小的无法使用的内存碎片,而是大的可用的内存片段。基于这样的设计思路,MemStore 的写入流程可以表述为以下3步:
· 检查当前可用的 Chunk 是否写满,如果写满,重新申请一个2M的 Chunk
· 将当前 KeyValue 在内存中重新构建,在可用 Chunk 的指定 offset 处申请内存,创建一个新的 KeyValue 对象
· 将新创建的 KeyValue 对象写入 ConcurrentSkipListMap 中
Hbase在实现中,是把整个内存在一定阈值后,flush 到 disk 中,形成一个 file。这个 file 的存储也就是一个小的B+树,因为 Hbase 一般是部署在 HDFS 上,HDFS 不支持对文件的 update 操作,所以 Hbase 整体内存 flush,而不是和磁盘中的小树 merge update。内存 flush 到磁盘上的小树,定期也会合并成一个大树,整体上 Hbase 就是用了 LSM-Tree 的思路。
因为小树先写到内存中,为了防止内存数据丢失,写内存的同时需要暂时持久化到磁盘,对应了 HBase 的 HLog(WAL)和 MemStore。
MemStore 上的树达到一定大小之后,需要 flush 到 HRegion 磁盘中(一般是 Hadoop DataNode),这样 MemStore 就变成了 DataNode 上的磁盘文件 StoreFile,定期 HRegionServer 对 DataNode 的数据做 merge 操作。彻底删除无效空间,多棵小树在这个时机合并成大树,来增强读性能。
Hudi
● Hudi 表格式
Hudi 表类型定义了如何在 DFS 上对数据进行索引和布局,以及如何在此类组织上实现上述操作和时间轴活动(即如何写入数据)。同样,查询类型定义了底层数据如何暴露给查询(即如何读取数据)。
· Table Types:
1)Copy on Write : 使用列式存储来存储数据(parquet), 通过在写入期间执行同步合并来简单地更新和重现文件
2)Merge on Read : 使用列式存储(parquet)+ 行式文件(avro)组合存储数据,更新记录到增量文件(avro)中,然后进行同步或异步压缩来生成新版本的列式文件
· 查询类型
1)Snapshot Queries: 在此视图上的查询可以查看给定提交或压缩操作时表的最新快照。对于读时合并表(MOR表) 该视图通过动态合并最新文件切片的基本文件(例如 parquet)和增量文件(例如 avro)来提供近实时数据集(几分钟的延迟);对于写时复制表(COW 表),它提供了现有 parquet 表的插入式替换,同时提供了插入/删除和其他写侧功能
2)Incremental Queries: 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据,该视图有效地提供了更改流,来支持增量数据管道
3)Read Optimized Queries: 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照,该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非 Hudi 列式数据集相比,具有相同的列式查询性能
● Hudi 索引
Hudi 通过索引机制,将给定的 hoodie key(record key + partition path)一致映射到 file id,从而提供高效的上载功能。一旦记录的第一个版本被写入文件, record key 和 file group/file id 之间的映射就永远不会改变。简而言之,映射的 file group 包含一组记录的所有版本。
对于 "写入即复制 "表来说,这样就不需要 join 整个数据集来确定要重写哪些文件,从而实现了快速的上载/删除操作。
对于 "读取合并 "表,这种设计允许 Hudi 对任何给定基础文件需要合并的记录数量进行约束。具体来说,一个给定的基础文件只需要针对属于该基础文件的记录更新进行合并。相比之下,没有索引组件的设计(如 Apache Hive ACID)最终可能需要针对所有传入的更新/删除记录合并所有基础文件。
目前,Hudi 支持以下索引类型,在 Spark 引擎上默认为 SIMPLE,在 Flink 和 Java 引擎上默认为 INMEMORY。
· BLOOM:采用由 record keys 构建的 Bloom 过滤器,也可选择使用 record keys 范围修剪候选文件
· GLOBAL_BLOOM:使用由 record keys 构建的 Bloom 过滤器,也可选择使用 record keys 范围剪切候选文件,表中的所有分区都会强制执行 key 唯一性
· SIMPLE(Spark 引擎默认): Spark 引擎的默认索引类型,根据从存储表中提取的 key 对输入记录执行精益 join,在分区内强制执行 key 的唯一性
· GLOBAL_SIMPLE: 根据从存储表中提取的 key 对输入记录执行精简 join,在表的所有分区中强制执行 key 的唯一性
· HBASE:管理外部 Apache HBase 表中的索引映射
· INMEMORY(Flink 和 Java 的默认值): 在 Spark 和 Java 引擎中使用内存 hashmap,在 Flink 中使用 Flink 内存 state 索引
· BUCKET:使用桶 hashing 定位包含记录的文件组,尤其适用于大规模数据量的表,使用 hoodie.index.bucket.engine 选择桶引擎类型,即如何生成桶
1)SIMPLE(默认):每个分区的文件组使用固定数量的桶,这些桶不能收缩或扩展,这适用于 COW 表和 MOR 表。由于桶的数量不能改变,而且桶和文件组之间设计了一对一的映射,因此这种索引可能不太适合高度倾斜的分区
2)CONSISTENT_HASHING:支持桶的动态数量和桶的大小调整,以适当调整每个桶的大小。这解决了潜在的数据倾斜问题,即数据量大的分区可以动态调整大小,以拥有多个大小合理的桶,而 SIMPLE 桶引擎类型中每个分区的桶数量是固定的,这只适用于 MOR 表
· RECORD_INDEX:在 Hudi 元数据表中保存 record keys 到位置映射的索引,记录索引是全局索引,可确保表中所有分区的密钥唯一性,支持分片,以实现极高的规模,可以扩展 HoodieIndex 来实现自定义索引
Iceberg
Iceberg 在 V1 的格式中定义了如何使用不可变类型的文件(Parquet、ORC、AVRO)来管理大型分析型的表,包括元数据文件、属性、数据类型、表的模式,分区信息,以及如何写入与读取。
而在 V2 的格式中,在 V1 的基础上增加了如何通过这些类型的表实现行级别的更新与删除功能。其最主要的改变是引入了 delete file 记录需要删除的行数据,这样可以在不重写原有(数据)文件的前提下,实现行数据的更新与删除。
Paimon
● Paimon 表格式
得益于 LSM 数据结构的追加写能力,Paimon 在大规模的更新数据输入的场景中提供了出色的性能。
Paimon 创新地结合了 湖存储 + LSM + 列式格式(ORC, Parquet),为湖存储带来大规模实时更新能力。Paimon 的 LSM 的文件组织结构如下:
· 高性能更新:LSM 的 Minor Compaction,保障写入的性能和稳定性
· 高性能合并:LSM 的有序合并效率非常高
· 高性能查询:LSM 的 基本有序性,保障查询可以基于主键做文件的 Skipping
LSM 是一个面向写友好的格式,它在写入的时候可以看到整个流程,但它不用理解具体的流程。大致的思路是,写入发生在 Flink Sink 中,当检查点到达时,它会对内存中的数据进行排序,并将记录刷新到 Level0 文件中。
得益于 LSM 这种原生异步的 Minor Compaction,它可以通过异步 Compaction 落到最下层,也可以在上层就发生一些 Minor 的 Compaction 和 Minor 的合并,这样压缩之后它可以保持 LSM 不会有太多的 level。保证了读取 merge read 的性能,且不会带来很大的写放大。
另外,Flink Sink 会自动清理过期的快照和文件,还可以配置分区的清理策略。所以整个 Paimon 提供了吞吐大的 Append 写,消耗低的局部 Compaction,全自动的清理以及有序的合并。所以它的写吞吐很大,merge read 不会太慢。
● Paimon Changelog Producers
Changelog-Producer 配置 Changelog 生产的模式:None,Input,Lookup, Full Compaction。
· None:最适合数据库系统等消费者
· Input:适合完整的 change log,database CDC
· Lookup:没有完整的 change log,又不想用 normalized operator,commit 前执行
· Full Compaction:比 Lookup 时延更长,通常是多个 commit
大数据存储中的常用理论
磁盘存储
在介绍大数据存储的常用理论之前,先讲讲磁盘存储。
· 磁道移动到对应的磁道,这是由马达控制的机械动作,一般为10ms左右(取决于磁头位置和目标磁道的距离),这叫做寻道时间
· 等待对应的扇区旋转到磁头位置(磁头是不动的),按现在主流磁盘转速7200转/分钟,旋转一周需要8.33ms,这叫等待时间
· 对应扇区在磁头旋转而过,数据就被读写完成了,一般一个磁道又63个扇区,一个扇区掠过磁头的时间为 8.33ms/63=0.13ms,我们叫它传输时间
· 存取时间 = 寻道时间(t1) + 等待时间(t2) + 传输时间(t3)
● 希捷硬盘
7200转、256MB缓存、SAS 12Gbps接口,最大持续传输速度524MB/s,随机4K QD16读取为304 IOPS、随机4K QD16写入最大448 IOPS,平均延迟4.16ms,平均运行功耗12W左右。
● 固态硬盘
在固态硬盘中,4K 是最小的读写单元固态硬盘。例如,如果我们需要写入 2K 的数据,我们实际上必须写入 4K;如果我们需要写入 13K 的数据,就必须写入 16K 的数据(这里不考虑写入放大)。
对于固态硬盘,顺序读的速度仍然能达到随机读的3倍左右。但是随机写还是顺序写,差别不大。
行式存储/列式存储
● 行式存储
· 数据是按行存储的
· 没有建立索引的查询将消耗很大的 IO
· 建立索引和视图需要花费一定的物理空间和时间资源
· 面对大量的查询,复杂的查询数据库必须被大量膨胀才能满足性能需求
● 列式存储
· 数据是按列存储的,每一列单独存放
· 只访问查询涉及的列大量降低系统 IO
· 数据类型一致,数据特征相似高效的压缩
行式存储和列式存储的对比如下:
Merge-On-Read & Copy On Write
● Merge-On-Read (MOR)
· 最适合频繁写入/更新的表
· 使用读取时合并时,不会重写文件,而是将更改写入新文件
· 当读取数据时,更改将应用或合并到原始数据文件中,以在处理过程中形成数据的新状态
● Copy-On-Write (COW)
· 写入时复制(COW),最适合频繁读取、不频繁写入/更新或大批量更新的表
· 使用 COW 时,当对某一行或多行进行删除或更新时,包含这些行的数据文件会被复制,但新版本包含更新的行,这会导致写入速度变慢,取决于有多少数据文件必须重写,可能会导致并发写入发生冲突,并可能超过重试次数而失败
· 如果要更新大量记录,COW 是理想的选择,但是,如果只更新几行,则仍需重写整个数据文件,这就使得小规模或频繁的更改代价高昂
· 在读取方面,COW 是理想的选择,因为读取不需要额外的数据处理,读取查询有很大的文件可以读取,吞吐量很高
Merge-On-Read 和 Copy On Write 的对比如下:
WAL&LSM
在 RDBMS 中我们需要B+树(或者广义地说,索引),一句话总结,减少寻道时间。在存储系统中广泛使用的 HDD 是磁性介质+机械旋转的,这就使得其顺序访问较快而随机访问较慢。使用B+树组织数据可以较好地利用 HDD 的这种特点,其本质是多路平衡查找树。
B+树最大的性能问题是会产生大量的随机 IO,随着新数据的插入,叶子节点会慢慢分裂,逻辑上连续的叶子节点在物理上往往不连续,甚至分离的很远,但做范围查询时,会产生大量读随机 IO。
为了克服B+树的弱点,引入了 LSM 树的概念,即 Log-Structured Merge-Trees。
Delta Store
基本思想是牺牲写入性能,换取更高的读性能。当获取 CDC 数据后,通过主键索引,可以定位到这条记录原来所在的位置、文件(或者 Block),然后在这个 Block 旁边放一个 Delta Store,用来保存对于这个 Block 的增量修改。这个 Delta Store 里保存的不是主键而是数据记录在 Block 里的行号(RowId)。
查询时读到原始 Block,然后根据 RowId 与 Delta 数据进行合并更新并返回最新数据给上层引擎。由于 Delta 数据是按行号组织的,与 Merge-on-Read 按照 Key 进行合并比,查询性能好很多。不过这种方式会破坏二级索引,因为对 Block 做修改后,他的索引相当于失效了,想要在更新后再维护索引复杂度会很高。
这种方式写入性能差(要查询索引,定位原数据),但读取的性能好很多。另外因为引入了主键索引和 Delta Store,复杂性也较高。
Delete-and-Insert
思路也是牺牲部分写性能,极大地优化读的性能。原理也是引入主键索引,通过主键索引定位原来这条记录所在位置,找到记录后只需要给这条记录打个删除标记(Delete Bitmap),表示这条记录是被删除的,然后所有其它 update 记录可以当成新增数据插入到新的 Block 里面。
这样的好处是读取时直接把所有的 Block 都可以并行加载,然后只需要根据 Delete Bitmap 标记过滤已经删除的记录。
StarRocks 新的支持实时更新的 Primary Key 表就是用到这个 Delete+Insert 的方式实现的,另外还有阿里云的 ADB 和 Hologres 也用到在这种方式。
数据湖的一致性
● Flink 二阶段提交
● 对比 MySQL 二阶段提交
因为最开始 MySQL 里并没有 InnoDB 引擎。MySQL 自带的引擎是 MyISAM,但是 MyISAM 没有 crash-safe 的能力,binlog 日志只能用于归档,而 InnoDB 是另一个公司以插件形式引入 MySQL 的。既然只依靠 binlog 是没有 crash-safe 能力的,所以 InnoDB 使用另外一套日志系统,也就是 redo log 来实现 crash-safe 能力。
假设执行一条 SQL 语句:update T set c=c+1 where ID=2;
· 时刻 A Crash
也就是写入 redo log 处于 prepare 阶段之后,写 binlog 之前,发生了崩溃(crash),由于此时 binlog 还没写,redo log 也还没提交,所以崩溃恢复的时候,这个事务会回滚。
· 时刻 B Crash
看一下崩溃恢复时的判断规则:
1)如果 redo log 里面的事务是 完整的,也就是已经有了 commit 标识,则直接提交
2)如果 redo log 里面的事务只有完整的 prepare,则判断对应的事务 binlog 是否存在并完整:
a. 如果是,则提交事务;
b. 否则,回滚事务。
这里,时刻 B 发生 crash 对应的就是 2(a) 的情况,崩溃恢复过程中事务会被提交。
实时湖仓生态选型
在介绍实时湖仓生态选型之前,先看看流计算 + 湖存储的历史和发展。
· Storm : 流计算 += 不准确的实时预处理
· Spark : 流计算 += Mini-Batch 预处理
· Flink + HBase/Redis/Mysql : 流计算 += 准确的实时预处理
· Flink + OLAP : 流计算 += 实时数仓,预处理和成本的权衡,高性能 OLAP 带来了一定的灵活度
· Flink + 数据湖 : 流计算 += 离线数仓部分实时化
· Flink + 流式数据湖 : 流计算 += CDC 流式增量计算,解决更多痛点
· 未来 : Streaming Lakehouse,通用的 Lakehouse 架构
计算
● Spark(Structured Streaming)、Flink
Spark 流计算国外用的比较多,Spark RDD/Dataset 相关 API;Flink 流计算国内生态社区比较完善,绝大部分需求可以直接使用 SQL 完成。
●Trino、Doris、StarRocks
· Trino
1)优点:纯 Java 项目;代码质量较好;Hive 优化细节处理比较完善
2)缺点:执行性能比向量化引擎低;联邦查询较慢(缺少自己的元数据管理)
· Doris/StarRocks
1)优点:向量化引擎执行效率高;国内社区活跃;功能迭代快
2)缺点:C++ 门槛较高,遇到问题难排查
存储
根据计算选择存储,有如下推荐:
● Flink + Paimon/Iceberg
Paimon 早期是 Flink 内部存储,叫作 Table Store,后来进入Apache 孵化,改名为 Paimon。
● Spark + Hudi
Hudi 是从 Spark 解耦出来的,很多功能是和 Spark 绑定的,Spark 支持的最好,有很多存储过程
● Doris/StarRocks 自带的存储
元数据
● Hive MetaStore
· 传统数仓,用户非常多
· 存储有局限性:不同计算引擎的逻辑类型和 Hive 不一致,例如 timestamp 类型属性只能以 key value 的形式存储到 Hive 表的 Properties 里面
· Hive metastore 瓶颈:高并发请求元数据时,经常 CPU 打满
● Hudi MetaServer
Hudi 里面开发了一个可以存储 Hudi 元数据的服务,服务主要基于 Thrift 协议去开发,通过 MyBatis 保存元数据到 MySQL 中。
● HDFS/OSS
元数据存储在 HDFS 或者在对象存储上面:.hoodie / .iceberg(metadata) / .paimon(metadata)
● 二次开发
基于 Spring 生态去开发 Web 服务,元数据存储在 MySQL 中。
湖仓治理
EasyLake
袋鼠云 EasyLake 湖表治理功能支持数据文件治理,支持快照文件治理,支持 Hudi MOR 增量文件合并,将小文件数量控制在一定的范围内,提升治理效率。
具体实践请点击该链接:《如何构建新一代实时湖仓?袋鼠云基于数据湖的探索升级之路》
Amoro
Amoro 为用户、平台和产品构建湖原生数据仓库和架构。
Dremio
基于 Apache Parquet、Apache Iceberg 和 Apache Arrow 等社区驱动标准的开放数据湖仓一体,使组织能够使用一流的处理引擎并消除供应商锁定。
本文根据《实时湖仓实践五讲第四期》直播内容总结而来,感兴趣的朋友们可点击链接观看直播回放视频及免费获取直播课件。