一、介绍
在构建数据湖时,也许没有比数据格式存储更具有意义的决定。其结果将对其性能、可用性和兼容性产生直接影响。
通过简单地改变数据的存储格式,我们就可以解锁新的功能,提高整个系统的性能,这很有启发意义。
Apache Hudi
、Apache Iceberg
和 Delta Lake
是目前为数据湖设计的最佳格式。这三种格式都解决了数据湖最迫切的一些问题。
-
原子事务–保证对数据湖的更新或追加操作不会中途失败,产生脏数据。
-
一致的更新–防止在写入过程中读取失败或返回不完整的结果。同时处理潜在的并发写入冲突。
-
数据和元数据的可扩展性–当表增长到数千个分区和数十亿文件的大小时,避免对象存储API和相关元数据的瓶颈。
让我们仔细分析下每种格式在更新性能、并发性和与其他工具的兼容性方面的做法。最后,我们将就某种格式对你的数据湖建设提出最有意义的建议
二、平台兼容
2.1、Hudi
Hudi 最初是由 Uber 开源的,它被设计成支持通过列式数据格式进行增量更新。它支持从多个来源摄取数据,主要是Apache Spark 和 Apache Flink。它还提供了一个基于Spark的实用程序(DeltaStreamer
)来读取外部资源,如Apache Kafka。
支持从 Apache Hive、Apache Impala 和 PrestoDB 读取数据。还有一个专门的工具(HiveSyncTool)可以将Hudi表模式同步到Hive Metastore 中。PS: Hudi 社区也支持 DLA
2.2、iceberg
Iceberg 最初由 Netflix 发布,旨在解决在 S3 上存储大型 Hive-Partitioned
数据集时出现的性能、可扩展性和可管理性挑战。
iceberg 支持 Apache Spark 的读和写,包括Spark的结构化流。Trino (PrestoSQL)也支持读取,但对删除的支持有限。同时支持Apache Flink 的读和写。最后,Iceberg 为 Apache Hive 提供了读支持。
2.3、Delta Lake
Delta Lake是由Databricks(Apache Spark的创建者)作为一个开源项目进行维护,并不奇怪,它与Spark在读写方面都进行了深度集成。
使用Hive的SymlinkTextInputFormat为Presto、AWS Athena、AWS Redshift Spectrum和Snowflake提供读取支持。尽管这需要为每个Delta表分区导出一个symlink.txt文件,而且正如你所说,对于较大的表来说,维护成本变得很高。
三、更新性能与吞吐
对大型、不可变对象的行级更新的支持可以通过几种方式来实现,每种方式都有其独特的关于性能和吞吐量的权衡。
让我们看看每种数据格式对UPSERT操作采用的策略。我们还将谈到与读取性能相关的额外优化。
3.1、Hudi
Hudi 表在处理 UPSERTS 时提供的性能权衡是灵活的(明确的)。两种不同类型的 Hudi 表之间的权衡是不同的。
Copy on Write Table
- 更新完全写在列式拼接文件中,创建新的对象。这增加了写的成本,但将读放大降低到零,使其成为读负载重的理想选择。
Merge on read Table
- 更新立即写入基于行的日志文件中,并定期合并到列式parquet中。有趣的是,查询可以包含最新的日志文件数据(快照视图),也可以不包含(读优化视图),为用户在数据延迟和查询效率之间提供不同的选择性
Hudi 通过利用关键索引进一步优化压缩,有效地跟踪哪些文件包含陈旧的记录。
3.2、Iceberg
随着去年夏天 Spark 3.0 的发布,Iceberg 通过 MERGE INTO 查询支持 upserts。它们使用直接的 Copy On Write 的方式工作,其中包含需要更新记录的文件会立即被重写。
Iceberg 的优势在于包含大量分区的表的读取性能。通过维护 manifest 文件,将对象映射到分区,并保持列级统计,Iceberg避免了昂贵的对象存储目录列表或需要从 Hive 获取分区数据。
此外,Iceberg 的清单允许一个文件同时分配到多个分区。这使得 Iceberg 表能高效地进行分区修剪,并改善了高选择性查询的延迟。
3.3、Delta Lake
在 MERGE 操作过程中,Delta 使用元数据信息的数据跳转,将文件分类为需要插入、更新或删除的数据。然后,它执行这些操作,并将它们作为 "提交 "记录在一个名为 Delta Log 的 JSON 日志文件中。这些日志文件每10次提交就会重写一次,作为Parquet 的 "检查点 "文件,保存整个表的状态,以防止昂贵的日志文件遍历。
为了保持性能,Delta 表需要定期进行压缩处理,将许多小的 Parquet 文件合并成较少的大文件(最佳情况下约1GB,但至少128MB大小)。Databricks 的专有版本 Delta Engine 支持自动压实,该过程会自动触发,并支持其他幕后写优化。
Delta Engine 通过使用 Bloom Filters 提供关键索引、Z-Ordering 以在读取时更好地修剪文件、本地缓存等,进一步提升了其开源版本的性能。
四、并发保证
允许数据表的 in-place 更新意味着要处理并发性问题。
如果有人在更新表的同时读取表,会发生什么?而当多个 writer 同时进行冲突的修改时,又会怎样?
通常数据库通过多版本并发控制(MVCC)来解决这个问题,这种方法利用一个逻辑事务日志,所有的更改都会被追加。
另一种称为优化并发控制(OCC)的方法允许同时发生多个写入,只在最后提交前检查冲突。如果检测到冲突,其中一个事务会被重试,直到成功。
4.1、Hudi
True to form,Hudi 提供 MVCC 和 OCC 并发控制。
Hudi 的 MVCC 意味着所有的写入必须在其中心日志中完全有序。为了提供这种保证,Hudi将写并发量限制为 1,这意味着在一个给定的时间点上,只能有一个writer对一个表进行写。
为了防止这种限制,Hudi 现在提供了 OCC。这个功能需要 Apache Zookeeper 或者 Hive Metastore 来锁定单个文件并提供隔离。
4.2、Iceberg
Iceberg 通过在更新过程中对元数据文件进行原子交换操作来支持优化并发(OCC)。
其工作方式:
- 每次写入都会创建一个新的表 “快照”。
- 然后,写入者会尝试对一个持有当前快照ID的特殊值进行比较和交换(CAS)操作。
- 如果在提交过程中没有其他写入者替换快照,则操作成功。
- 如果在此期间有另一个写入者进行提交,另一个写入者将不得不重试,直到成功。
在分布式文件系统上,如HDFS,这可以在本地完成。对于 S3,需要一个额外的组件来存储指针(目前只支持Hive Metastore)。
4.3、Delta Lake
据 Delta 文档解释,它使用Optimistic Control
来处理并发性,因为大多数数据湖操作都会将数据追加到一个时间排序的分区,不会发生冲突。
在两个进程向Delta Log文件添加提交的情况下,Delta会 "默默地、无缝地 "检查文件变化是否重叠,如果可能的话,会让两个进程都成功。
不过这意味着,底层对象存储需要提供一种方式,当多个写入者开始覆盖对方的日志条目时,要么提供CAS操作,要么提供一种写入失败的方式。
与 Iceberg 类似,这个功能在 HDFS 上是可以开箱即用的,但S3不支持。因此,在 AWS 上的 Delta 不支持从多个Spark集群进行写入,且没有真正的事务性保证。
注意:专有的 Delta Engine 版本支持使用 Databricks 自己管理的外部同步服务器在 S3 上进行多集群写入。
五、如何选择合适的
如果你看到这里,我们已经了解了 Apache Hudi、Delta Lake 和 Apache Iceberg 之间的一些重要的相似之处和不同之处。
现在是时候决定哪种格式对你的用例最有意义了! 我的建议是取决那种情况最适用
5.1、Iceberg 的应用场景
你的主要痛点不是对现有记录的更改,而是在对象存储上管理巨大的表(超过10k个分区)的元数据负担。采用Iceberg将缓解与S3对象列表或Hive Metastore分区枚举相关的性能问题。
相反,对删除和变更的支持还是初步的,而且涉及到数据保留的操作开销。
5.2、Hudi 的应用场景
你使用各种查询引擎,并且需要灵活管理变更的数据集。请注意,支持工具和整体的开发者体验可能较差。尽管有可能,但为实际的大规模生产工作负载安装和调试Hudi也需要一定的操作开销。
如果你正在使用AWS管理服务,如Athena,Glue或EMR - Hudi已经预装和配置,并由AWS支持。
5.3、Delta Lake 的应用场景
你主要是一个Spark商店,并且期望相对较低的写入吞吐量。如果你也已经是Databricks的客户,Delta Engine为读写性能和并发量都带来了显著的改善,对他们的生态系统进行双重开发是很有意义的。
对于其他Apache Spark发行版来说,要明白Delta Lake虽然是开源的,但很可能会一直落后于Delta Engine,以起到产品差异化的作用。
5.4、Integration With lakeFS
如果你想知道 “我可以用 lakeFS 和这些数据格式一起使用吗?”…答案是肯定的。
lakeFS可以与Delta、Iceberg或Hudi中的任何一种共生,提供跨任何数量的表进行分支、合并、回滚等操作的能力。
由于这些格式都是在表级操作,所以对跨多个表的操作没有提供保证。通过 lakeFS,可以在一个孤立的分支上修改多个表,然后将这些修改原子地合并到一个主分支上,实现跨表的一致性
Further Reading
-
ARTICLE: Diving Into Delta Lake: Unpacking the Transaction Log
-
DOCS: Branching Recommendations: Cross Collection Consistency
-
VIDEO: A Thorough Comparison of Delta Lake, Iceberg and Hudi
-
ARTICLE: In-depth Comparison of Delta, Iceberg, and Hudi
-
BLOG: Efficient Upserts into Data Lakes with Databricks Delta
-
ARTICLE: Comparison of Big Data storage layers: Delta vs Apache Hudi vs Apache Iceberg
This article was contributed to by Paul Singman, Developer Advocate at lakeFS.