Introduction
建立数据湖是为了实现数据的民主化——让越来越多的人、工具和应用程序利用越来越多的数据。实现这一目标所需的一个关键能力是向用户隐藏底层数据结构和物理数据存储的复杂性。事实上,实现这一目标的标准是由Facebook于2009年发布的Hive表格式,该格式解决了其中的一些问题,但在数据、用户和应用程序规模方面存在不足。那么答案是什么呢?阿帕奇冰山。
Data lakes have been built with a desire to democratize data — to allow more and more people, tools, and applications to make use of more and more data. A key capability needed to achieve this is hiding the complexity of underlying data structures and physical data storage from users. The de facto standard to achieve this has been the Hive table format, released by Facebook in 2009 that addresses some of these problems, but falls short at data, user, and application scale. So what is the answer? Apache Iceberg.
在本文中,我们将介绍:In this article, we’ll go through:
- 表格式的定义,因为表格式的概念传统上被嵌入在“Hive”伞下。The definition of a table format, since the concept of a table format has traditionally been embedded under the “Hive” umbrella and implicit
- 详细介绍了长期事实上的标准,Hive表格式,包括它的优点和缺点。我们将看到这些问题是如何导致需要定义一种全新的表格格式的。Details of the long-time de facto standard, the Hive table format, including the pros and cons of it. We’ll see how these problems created the need for the definition of an entirely new table format
- Apache Iceberg表格式是如何根据这种需要创建的。我们还将深入研究Iceberg表的体系结构,包括从规范的角度,以及在执行Create、Read、Update和Delete(CRUD)操作时,Iceber表中发生的情况的一步一步的了解。How the Apache Iceberg table format was created as a result of this need. We will also delve into the architectural structure of an Iceberg table, including from the specification point of view and a step-by-step look under the covers of what happens in an Iceberg table as Create, Read, Update, and Delete (CRUD) operations are performed
- 最后,我们将展示此架构如何实现此设计的最终好处。Finally, we’ll show how this architecture enables the resulting benefits of this design
What’s a Table Format?
定义表格式的一个好方法是组织数据集的文件,将其作为单个“表”显示。A good way to define a table format is a way to organize a dataset’s files to present them as a single “table”. 从用户的角度来看,另一个比较简单的定义是一种回答“这个表中有什么数据?”问题的方法。Another somewhat simpler definition from a user’s perspective is a way to answer the question “what data is in this table?”. 该问题的单一答案允许多个人、组和工具同时与表中的数据交互,无论他们是在向表中写入数据还是从表中读取数据。A single answer to that question allows multiple people, groups, and tools to interact with data in the table at the same time, whether they’re writing to the table or reading from the table. 表格式的主要目标是向人员和工具提供表的抽象,并允许他们有效地与该表的底层数据交互。The primary goal of a table format is to provide the abstraction of a table to people and tools and allow them to efficiently interact with that table’s underlying data.
表格式并不是什么新鲜事——自从SystemR、Multics和Oracle首次实现EdgarCodd的关系模型以来,表格式就一直存在,尽管“表格式”不是当时使用的术语。这些系统为用户提供了将一组数据作为表引用的能力。数据库引擎拥有并管理以文件形式将数据集的字节放在磁盘上,并解决了出现的复杂问题,如事务的需要。Table formats are nothing new — they’ve been around since System R, Multics, and Oracle first implemented Edgar Codd’s relational model, although “table format” wasn’t the term used at the time. These systems provided users the ability to refer to a set of data as a table. The database engine owned and managed laying the dataset’s bytes out on disk in the form of files, and addressed the complications that arose, such as the need for transactions.
与底层数据的所有交互,如写入和读取数据,都由数据库的存储引擎处理。没有其他引擎可以直接与文件交互而不损坏系统。这在相当长的一段时间内效果良好。但在当今的大数据世界中,传统的RDBMS无法实现这一点,单一的封闭引擎管理对底层数据的所有访问是不可行的。
All interaction with the underlying data, like writing it and reading it, was handled by the database’s storage engine. No other engine could interact with the files directly without corrupting the system. This worked fine for a quite a while. But in today’s big data world where traditional RDBMSs don’t cut it, a single closed engine managing all access to the underlying data isn’t feasible.
With That Simple of a Concept, Why Do We Need a New One?
随着时间的推移,大数据社区已经了解到,在试图满足数据、用户和应用程序规模的业务需求时,在将数据集作为表呈现给用户和工具时,需要考虑很多因素。The big data community has learned over time that when trying to meet business requirements at data, user, and application scale, there are a lot of considerations when it comes to presenting datasets as tables for users and tools. 遇到的一些挑战是旧的——RDBMS已经遇到并解决了这些挑战,但由于RDBMS无法满足大数据世界的关键要求时必须使用不同的技术,这些挑战再次出现。但是,推动挑战的业务需求并没有改变。Some of the challenges encountered were old ones — ones that RDBMSs had already encountered and solved, but were arising again due to the different technologies that had to be used when RDBMSs couldn’t meet key requirements of the big data world. But, the business requirements driving the challenges hadn’t changed.
由于大数据世界中技术和规模的差异,遇到的一些挑战是新的。为了解释为什么我们真的需要一种新的表格格式,让我们来看看传统的事实上的标准表格格式是如何形成的,它面临的挑战,以及尝试了哪些解决方案来应对这些挑战。Some of the challenges encountered were new ones, due to the differences in technologies and scale in the big data world. To explain why we truly need a new table format, let’s take a look at how the traditionally de facto standard table format came to be, the challenges it has faced, and what solutions were attempted to address these challenges.
How Did We Get Here? A Brief History
2009年,Facebook意识到,虽然Hadoop满足了他们的许多需求,如规模和成本效率,但在向许多非技术专家的用户改善数据民主化方面,它也有一些不足之处需要解决:In 2009, Facebook realized that while Hadoop addressed many of their requirements such as scale and cost-efficiency, it also had shortcomings they needed to address when it came to improving democratization of their data to the many users who weren’t technical experts:
- 任何想要使用数据的用户都必须弄清楚如何将他们的问题融入MapReduce编程模型,然后编写Java代码来实现它。Any user who wanted to use the data had to figure out how to fit their question into the MapReduce programming model and then write Java code to implement it.
- 没有元数据定义数据集的信息,就像它的模式一样。There was no metadata defining information about the dataset, like its schema.
为了让更多用户掌握数据并解决这些缺点,他们构建了Hive。To get data in the hands of more of their users and address these shortcomings, they built Hive. 为了解决问题#1,他们意识到他们需要用人们熟悉的更通用的编程模型和语言——SQL——来提供访问。他们会构建Hive来获取用户的SQL查询,并将其转换为MapReduce作业,以便获得答案。To address problem #1, they realized they needed to provide access in a more general-purpose programming model and language that people were familiar with — SQL. They would build Hive to take a user’s SQL query and translate it for them into MapReduce jobs so that they could get their answers.
解决方案#1以及解决问题#2产生的一个需求是,需要定义数据集的模式是什么,以及如何在用户的SQL查询中将该数据集作为表引用。A requirement arising out of the solution to #1 as well as to address problem #2, was the need to define what a dataset’s schema was and how to refer to that dataset as a table in a user’s SQL query.
为了满足第二个需求,定义了Hive表格式(通过白皮书和Java实现中的3个要点),并从此成为事实上的标准。To address the second requirement, the Hive table format was defined (via just 3 bullet points in a white paper and the Java implementation) and has been the de facto standard ever since.
让我们仔细看看Hive表格式,它是数据湖之上的一个关系层,旨在将大规模的非技术专家的分析民主化。Let’s take a closer look at the Hive table format, a relational layer on top of data lakes designed to democratize analytics to non-technical-experts at scale.
The Hive Table Format
在配置单元表格式中,表定义为一个或多个目录的全部内容,即一个或更多目录的有效ls。对于非分区表,这是一个目录。对于在现实世界中更常见的分区表,该表由多个目录组成-每个分区一个目录。In the Hive table format, a table is defined as the entire contents of one or more directories — i.e., effectively an ls of one or more directories. For non-partitioned tables, this is a single directory. For partitioned tables, which are much more common in the real world, the table is composed of many directories — one directory per partition.
构成表的数据在目录级别进行跟踪,此跟踪在Hive元存储中完成。分区值通过目录路径定义,格式为/path/to/table/Partition_column=Partition_value
。下面是由列k1和k2划分的Hive表的示例架构图。The data making up the table is tracked at the directory level and this tracking is done in the Hive metastore. Partition values are defined via a directory path, in the form /path/to/table/partition_column=partition_value. Below is an example architecture diagram of a Hive table partitioned by columns k1 and k2.
Pros
鉴于其在过去10年左右的时间里作为事实标准的地位,Hive表格式显然提供了一系列有用的功能和好处:Given its status as the de facto standard for the past 10 years or so, the Hive table format has obviously provided a set of useful capabilities and benefits:
- 它最终基本上可以与所有处理引擎一起工作,因为它是城里唯一的表格格式——自从大数据被广泛采用以来,它一直是事实上的标准。It ended up working with basically every processing engine because it was the only table format in town — it’s been the de facto standard since broader adoption of big data.
- 多年来,它已经发展并提供了一些机制,允许Hive表提供比对每个查询(如分区和存储桶)进行全表扫描更有效的访问模式。Over the years, it has evolved and provided mechanisms that allowed Hive tables to deliver more efficient access patterns than doing full-table scans for every query, such as partitions and buckets.
- 它是文件格式不可知的,允许公司和社区开发更适合分析的文件格式(例如,Parquet、ORC),并且在使数据在Hive表中可用之前不需要转换(例如,Avro、CSV/TSV)。It was file-format agnostic which allowed companies and communities to develop file formats better suited for analytics (e.g., Parquet, ORC) and did not require transformation prior to making the data available in a Hive table (e.g., Avro, CSV/TSV).
- Hive元存储区存储以Hive表格式排列的表,为“此表中有什么数据?”对于需要与表交互的整个工具生态系统,无论是在读端还是写端。The Hive metastore, storing tables laid out in the Hive table format, provided a single, central answer to “what data is in this table?” for the whole ecosystem of tools that needed to interact with the table, both on the read side and the write side.
- 它提供了通过Hive元存储中的原子交换在整个分区级别原子地更改表中的数据的能力,从而实现了世界的一致视图。It provided the ability to atomically change data in the table at the whole-partition level, via an atomic swap in the Hive metastore, and therefore enabled a consistent view of the world.
Cons
然而,当配置单元表格式用于更大的数据、用户和应用程序规模时,许多问题开始变得越来越糟:However, many issues started getting worse and worse when the Hive table format was used at larger data, user, and application scale:
1 Changes to the data are inefficient 对数据的更改效率低下
因为分区存储在事务性存储(Hivemetastore,由关系数据库支持)中,所以可以以事务性的方式添加和删除分区。然而,由于文件的跟踪是在不提供事务功能的文件系统中完成的,因此不能以事务方式在文件级别添加和删除数据。Because partitions are stored in a transactional store (Hive metastore, which is backed by a relational database), you can add and delete partitions in a transactional manner. However, because tracking of the files is done in a filesystem that doesn’t provide transactional capabilities, you can’t add and delete data at the file level in a transactional way.
一般的解决方法是在分区级别解决这个问题,方法是将整个分区复制到后台的新位置,在复制分区时进行更新/删除,然后将分区在元存储中的位置更新为新位置。The general workaround is to address this at the partition level by replicating the whole partition to a new location behind the scenes, making the updates/deletes while you’re replicating the partition, then updating the partition’s location in the metastore to be the new location.
这种方法效率很低,特别是当您的分区很大时,您正在更改分区中相对较少的数据量,和/或您经常进行更改。This method is inefficient, especially when your partitions are large, you’re changing a relatively small amount of the data in a partition, and/or you’re making changes frequently.
2 There’s no way to safely change data in multiple partitions as part of one operation 无法在一次操作中安全地更改多个分区中的数据
因为更改数据的唯一事务一致操作是交换单个分区,所以不能以一致的方式同时更改多个分区中的数据。即使像将文件添加到两个分区这样简单的事情也不能以事务一致的方式完成。因此,用户对世界的看法不一致,最终在做出正确决策和信任数据方面出现问题。Because the only transactionally consistent operation you can do to change data is to swap a single partition, you can’t change data in multiple partitions at the same time in a consistent way. Even something as simple as adding a file to two partitions can’t be done in a transactionally consistent way. So, users see an inconsistent view of the world and end up with problems making the right decisions and issues trusting the data.
3 In practice, multiple jobs modifying the same dataset isn’t a safe operation 实际上,多个作业修改同一数据集不是安全的操作
在表格式中,没有一种被广泛采用的方法来处理一次更新数据的多个进程/人员。有一种方法,但它是如此的限制性,并导致了真正只有Hive坚持的问题。这会导致对谁可以写入以及何时写入的严格控制,组织必须自行定义和协调,或者多个进程同时更改数据,导致数据丢失,因为最后一次写入成功。There isn’t a well-adopted method in the table format to deal with more than one process/person updating the data at a time. There is one method, but it’s so restrictive and causes issues that really only Hive adheres to it. This leads to either strict controls on who can write and when, which an organization has to define and coordinate themselves, or multiple processes concurrently changing the data leading to data loss because the last write wins.
4 All of the directory listings needed for large tables take a long time 大型表所需的所有目录列表都需要很长时间
因为您没有所有分区目录中的文件列表,所以需要在运行时获取此列表。获取您需要的所有目录列表的响应通常需要很长时间。Because you don’t have a list of what files are in all of your partition directories, you need to go get this list at runtime. Getting the response for all the directory listings you need generally takes a long time.
Netflix的Iceberg创建者Ryan Blue谈到了一个示例用例,因为这些目录列表,仅计划查询就需要9分钟。Ryan Blue, the creator of Iceberg at Netflix, talks about an example use case where it would take over 9 minutes just to plan the query because of these directory listings.
5 Users have to know the physical layout of the table 用户必须知道表的物理布局
如果一个表是按事件发生的时间进行分区的,那么这通常是通过多级分区来完成的——首先是事件的年份,然后是事件的月份,然后是日期,有时是较低的粒度。但是,当用户看到事件时,在特定时间点之后获取事件的直观方法看起来像WHERE event_ts>=“2021-05-10 12:00:00”。在这种情况下,查询引擎进行全表扫描,这比进行可用分区修剪以限制数据所花费的时间要长得多。If a table is partitioned by when an event occurred, this is often done via multi-level partitioning — first the event’s year, then the event’s month, then the event’s day, and sometimes lower granularity. But when a user is presented with events, the intuitive way to get the events after a certain point in time looks like WHERE event_ts >= ‘2021-05-10 12:00:00’. In this situation, the query engine does a full table scan, which takes much much longer than if the available partition pruning was done to limit the data.
发生这种全表扫描是因为没有从用户知道的事件时间戳(2021-05-10 12:00:00)到物理分区方案(年=2021,月=05,日=10)的映射。
This full-table scan happens because there is no mapping from the event’s timestamp as the user knows it (2021-05-10 12:00:00) to the physical partitioning scheme (year=2021, then month=05, then day=10).
相反,所有用户都需要了解分区方案,并将其查询编写为WHERE event_ts>=“2021-05-10 12:00:00”and event_year>=“2021”and event_month>=“05”and(event_day>=“10”OR event_month>=“06”)(如果您查看2020年5月之后的事件,则此分区修剪查询会变得更加复杂)。
Instead, all users need to be aware of the partitioning scheme and write their query as WHERE event_ts >= ‘2021-05-10 12:00:00’ AND event_year >= ‘2021’ AND event_month >= ‘05’ AND (event_day >= ‘10’ OR event_month >= ‘06’) (this partition-pruning query gets even more complicated if you were to look at events after May of 2020 instead).
6 Hive table statistics are usually stale Hive table统计信息通常过时
因为表统计信息是在异步定期读取作业中收集的,所以统计信息通常是过时的。此外,由于收集这些统计信息需要昂贵的读取作业,需要大量的扫描和计算,因此这些作业很少运行(如果有的话)。Because table statistics are gathered in an asynchronous periodic read job, the statistics are often out of date. Furthermore, because gathering these statistics requires an expensive read job that requires a lot of scanning and computation, these jobs are run infrequently, if ever.
由于这两个方面,Hive中的表统计数据通常是过时的(如果有的话),导致优化器的计划选择不佳,这使得一些引擎甚至完全忽略Hive中所有的统计数据。Because of these two aspects, the table statistics in Hive are usually out of date, if they exist at all, resulting in poor plan choice by optimizers, which has made some engines even disregard any stats in Hive altogether.
7 The filesystem layout has poor performance on cloud object storage 文件系统布局在云对象存储上的性能较差
任何时候,如果您想读取一些数据,云对象存储(例如S3、GCS)体系结构都要求这些读取应具有尽可能多的不同前缀,以便由云对象存储中的不同节点处理。然而,由于在Hive table format中,分区中的所有数据都具有相同的前缀,并且通常读取分区中的全部数据(或分区中的至少所有Parquet/ORC页脚),因此这些数据都会命中同一个云对象存储节点,从而降低了读取操作的性能。
Any time you’re looking to read some data, cloud object storage (e.g., S3, GCS) architecture dictates those reads should have as many different prefixes as possible, so they get handled by different nodes in cloud object storage. However, since in the Hive table format, all data in a partition has the same prefix and you generally read all of the data in a partition (or at least all of the Parquet/ORC footers in a partition), these all hit the same cloud object storage node, reducing the performance of the read operation.
译者注:
具体来说,主要包括下面这些问题:没有acid保证,无法读写分离;只能支持partition粒度的谓词下推;确定需要扫描哪些文件时使用文件系统的list操作;partition字段必须显式出现在query里面。
- 没有acid保证由于Hive数仓只是文件系统上一系列文件的集合(单纯的采用目录方式进行管理),而数据读写只是对文件的直接操作,没有关系型数据库常有的事务概念和acid保证,所以会存在脏读等问题。
- partition粒度的谓词下推Hive的文件结构只能通过partition和bucket对需要扫描哪些文件进行过滤,无法精确到文件粒度。所以尽管parquet文件里保存了max和min值可以用于进一步的过滤(即谓词下推),但是Hive却无法使用。
- 文件系统的list操作Hive在确定了需要扫描的partition和bucket之后,对于bucket下有哪些文件需要使用文件系统的list操作,而这个操作是O(n)级别的,会随着文件数量的增加而变慢。特别是对于像s3这样的对象存储来说,一次list操作需要几百毫秒,每次只能取1000条记录,对性能的影响无法忽略。
- query需要显式地指定partition在 Hive 中,分区需要显示指定为表中的一个字段,并且要求在写入和读取时需要明确的指定写入和读取的分区。Iceberg将完全自行处理,并跳过不需要的分区和数据。在建表时用户可以指定分区,无需为快速查询添加额外的过滤,表布局可以随着数据或查询的变化而更新。
These Problems Get Amplified at Scale — Time for a New Format Altogether
尽管上述问题存在于较小的环境中,但它们在数据、用户和应用程序规模方面会变得更糟。与大数据历史上的许多其他成功项目一样,往往是科技公司首先解决规模问题,并构建工具来解决这些问题。然后,当其他组织遇到同样规模的问题时,他们会采用这些工具。大多数数据驱动的组织现在已经遇到或开始处理这些问题。几年前,Netflix正在解决这些问题,并采用了标准的解决方案,取得了好坏参半的成功。在处理这些问题很长时间后,他们意识到可能有一个比继续实施更多这些解决方案更好的方法。因此,他们退后一步,思考正在发生的问题,这些问题的原因,以及解决这些问题的最佳方法。他们意识到,在Hive表格格式上使用更多的创可贴并不是解决方案——需要一种新的表格格式。While the above-mentioned issues exist in smaller environments, they get significantly worse at data, user, and application scale. As with many other successful projects in big data’s history, it’s often tech companies that hit scale problems first and build tools to resolve them. Then, when other organizations experience these same scale problems, they adopt these tools. Most data-driven organizations are already experiencing or starting to deal with these problems now. A few years ago, Netflix was hitting these problems and employing the standard workarounds with mixed success. After dealing with these problems for a long time, they realized that there may be a better way than continuing to implement more of these workarounds. So, they took a step back and thought about the problems that were occurring, the causes of these problems, and the best way to solve them. They realized that more band-aids on the Hive table format was not the solution — a new table format was needed.
So, How Did Netflix Fix These Problems?
Netflix发现,Hive表格式的大部分问题都是由它的一个方面引起的,起初可能看起来很小,但最终却产生了重大后果——表中的数据是在文件夹级别进行跟踪的。Netflix figured out that most of the Hive table format’s problems arose from an aspect of it that may appear fairly minor at first, but ends up having major consequences — data in the table is tracked at the folder level. Netflix发现,解决Hive表格格式引起的主要问题的关键是在文件级别跟踪表格中的数据。Netflix figured out that the key to resolving the major issues arising from Hive’s table format was to instead track the data in the table at the file level. 他们将表定义为规范的文件列表,而不是指向一个目录或一组目录的表。Rather than a table pointing to a directory or a set of directories, they defined a table as a canonical list of files. 事实上,他们意识到,文件级跟踪不仅可以解决他们使用Hive表格格式遇到的问题,还可以为实现更广泛的分析目标奠定基础:In fact, what they realized was not only could file-level tracking resolve the issues they were hitting with Hive’s table format, it could also lay the foundation for achieving their broader set of analytic goals:
Provide an always correct and always consistent view of a table
在做出数据驱动的决策时,这些决策需要基于可靠的数据。如果在更新表的同时运行报告,而报告只看到部分(而不是全部)更改,则可能会产生错误的结果,导致错误或次优的决策,并破坏组织内对数据的更广泛信任。When making data-driven decisions, those decisions need to be based on trustworthy data. If a report is run while the table is being updated and the report only sees some, but not all, of the changes, that can produce incorrect results, result in wrong or suboptimal decisions, and undermine broader trust in the data within the organization.
Enable faster query planning and execution
如上面第4条所述,Netflix的一个用例查询只花了9分钟来计划查询,只花了1周的数据。至少,他们需要改进计划时间和整体查询执行时间,以提供更好的用户体验,并增加用户可以提出的问题数量,以便做出更多数据驱动的决策。As mentioned in #4 above, one of Netflix’s use case queries took 9 minutes just to plan the query and just for 1 week’s worth of data. At a minimum, they needed to improve the planning time, as well as overall query execution time to provide an improved user experience and to increase the number of questions a user could ask in order to make more decisions be data-driven.
Provide users with good response times without them having to know the physical layout of the data
如上面#5中所述,如果用户没有按照数据分区的方式查询数据,查询可能会花费更长的时间。为了解决这个问题,你可以教育每个用户,让他们做一些非直观的事情,但这样的情况几乎总是在软件中更好地解决,以获得用户体验和数据民主化。As mentioned in #5 above, if the user doesn’t query the data in exactly the way the data is partitioned, the query can take much much longer. To solve this problem, you can either educate every user and make them do something nonintuitive, but situations like this are almost always better solved in software for user experience and data democratization.
Enable better and safer table evolution
随着时间的推移,表会随着不断变化的业务需求、额外的规模和额外的数据源而变化。由于更改是不可避免的,因此表格式极大地简化了更改管理,因此应用层或数据工程不需要处理它。如果变更管理是有风险的,那么变更不会像需要的那样频繁发生,因此业务敏捷性和灵活性会降低。Tables change over time with evolving business requirements, additional scale, and additional sources of the data. Since change is inevitable, optimally change management is greatly simplified by the table format so the application layer or data engineering doesn’t need to deal with it. If change management is risky, changes don’t happen as often as they need to, and therefore business agility and flexibility is reduced.
Accomplish all of the goals above at data, user, and application scale
让我们仔细看看这个名为Iceberg的新表格格式,以及它如何解决Hive表格格式的问题,以及如何实现这些更广泛的业务和分析目标。Let’s take a closer look at this new table format, named Iceberg, and how it resolves the problems with Hive’s table format, as well as achieves these broader business and analytic goals.
The Iceberg Table Format在深入探讨格式本身之前,因为“蜂巢”是一个有点模糊和过载的术语,由于它的历史,它对不同的人来说意味着不同的东西,让我们清楚地定义冰山是什么,它不是什么:Before diving into the format itself, because “Hive” has been a somewhat nebulous and overloaded term that means different things to different people due to its history, let’s clearly define what Iceberg is and what it isn’t:
An Iceberg Table’s Architecture
现在,让我们来了解一下Iceberg解决Hive表格式问题的体系结构和规范,并通过查看Iceberg表的封面来实现上面讨论的目标。Now, let’s go through the architecture and specification that enables Iceberg to solve the Hive table format’s problems and achieves the goals discussed above by looking under the covers of an Iceberg table.
Iceberg Components
现在,让我们浏览一下上图中的每个组件。当我们遍历它们时,我们还将逐步完成SELECT查询通过组件读取Iceberg表中数据的过程。您将在下面标有此图标的框中看到这些内容:Now, let’s walk through each of the components in the diagram above. As we walk through them, we’ll also step through the process a SELECT query takes through the components to read the data in an Iceberg table. You’ll see these denoted in boxes below marked with this icon:
There are 3 layers in the architecture of an Iceberg table:The Iceberg catalog、The metadata layer, which contains metadata files, manifest lists, and manifest files、The data layer
Iceberg catalog
任何从表中读取数据的人(更不用说10秒、100秒或1000秒)都需要知道首先去哪里——他们可以去哪里找到读取/写入给定表数据的位置。对于任何想要读取该表的人来说,第一步是找到当前元数据指针的位置(请注意,术语“当前元数据指针”不是一个官方术语,而是一个描述性术语,因为此时没有官方术语,社区中也没有推送它)。Anyone reading from a table (let alone 10s, 100s, or 1,000s) needs to know where to go first — somewhere they can go to find out where to read/write data for a given table. The first step for anyone looking to read the table is to find the location of the current metadata pointer (note the term “current metadata pointer” is not an official term, but rather a descriptive term because there is no official term at this point and there hasn’t been push-back in the community on it).
您可以在Iceberg目录中找到当前元数据指针的当前位置。This central place where you go to find the current location of the current metadata pointer is the Iceberg catalog.
Iceberg目录的主要要求是它必须支持更新当前元数据指针的原子操作(例如HDFS、Hive Metastore、Nessie)。这使得Iceberg表上的事务可以是原子的,并提供正确性保证。The primary requirement for an Iceberg catalog is that it must support atomic operations for updating the current metadata pointer (e.g., HDFS, Hive Metastore, Nessie). This is what allows transactions on Iceberg tables to be atomic and provide correctness guarantees.
在目录中,每个表都有一个指向该表当前元数据文件的引用或指针。例如,在上图中,有2个元数据文件。目录中表的当前元数据指针的值是右侧元数据文件的位置。Within the catalog, there is a reference or pointer for each table to that table’s current metadata file. For example, in the diagram shown above, there are 2 metadata files. The value for the table’s current metadata pointer in the catalog is the location of the metadata file on the right.
这些数据看起来像什么取决于正在使用的冰山目录。几个例子:What this data looks like is dependent on what Iceberg catalog is being used. A few examples:
使用HDFS作为目录,有一个名为版本提示的文件。表的元数据文件夹中的文本,其内容是当前元数据文件的版本号。With HDFS as the catalog, there’s a file called version-hint.text in the table’s metadata folder whose contents is the version number of the current metadata file.
使用Hive元存储作为目录,元存储中的表条目具有表属性,该属性存储当前元数据文件的位置。With Hive metastore as the catalog, the table entry in the metastore has a table property which stores the location of the current metadata file.
使用Nessie作为目录,Nessie存储表的当前元数据文件的位置。With Nessie as the catalog, Nessie stores the location of the current metadata file for the table.
因此,当SELECT查询正在读取Iceberg表时,查询引擎首先转到Iceberg目录,然后检索要读取的表的当前元数据文件的位置条目,然后打开该文件。So, when a SELECT query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the entry of the location of the current metadata file for the table it’s looking to read, then opens that file.
Metadata file
顾名思义,元数据文件存储关于表的元数据。这包括有关表的模式、分区信息、快照以及当前快照的信息。As the name implies, metadata files store metadata about a table. This includes information about the table’s schema, partition information, snapshots, and which snapshot is the current one.
虽然以上是为了说明目的的简略示例,但以下是元数据文件的完整内容示例:While the above is an abridged sample for illustration purposes, here is an example of the full contents of a metadata file: v3.metadata.json
{
"format-version" : 1,
"table-uuid" : "4b96b6e8-9838-48df-a111-ec1ff6422816",
"location" : "/home/hadoop/warehouse/db2/part_table2",
"last-updated-ms" : 1611694436618,
"last-column-id" : 3,
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : true,
"type" : "int"
}, {
"id" : 2,
"name" : "ts",
"required" : false,
"type" : "timestamptz"
}, {
"id" : 3,
"name" : "message",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "hadoop"
},
"current-snapshot-id" : 1257424822184505371,
"snapshots" : [ {
"snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694406483,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "960",
"changed-partition-count" : "1",
"total-records" : "1",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-8271497753230544300-1-d8a778f9-ad19-4e9c-88ff-28f49ec939fa.avro"
},
{
"snapshot-id" : 1257424822184505371,
"parent-snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694436618,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "973",
"changed-partition-count" : "1",
"total-records" : "2",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro"
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1611694406483,
"snapshot-id" : 8271497753230544300
},
{
"timestamp-ms" : 1611694436618,
"snapshot-id" : 1257424822184505371
} ],
"metadata-log" : [ {
"timestamp-ms" : 1611694097253,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v1.metadata.json"
},
{
"timestamp-ms" : 1611694406483,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v2.metadata.json"
} ]
}
当SELECT查询正在读取Iceberg表并在从目录中的表条目获取其位置后打开其当前元数据文件时,查询引擎将读取current-snapshotid的值。然后,它使用该值在快照阵列中查找该快照的条目,然后检索该快照的清单列表条目的值,并打开该位置指向的清单列表。When a SELECT query is reading an Iceberg table and has its current metadata file open after getting its location from the table’s entry in the catalog, the query engine then reads the value of current-snapshot-id. It then uses this value to find that snapshot’s entry in the snapshots array, then retrieves the value of that snapshot’s manifest-list entry, and opens the manifest list that location points to.
Manifest list
另一个命名恰当的文件,清单列表是清单文件的列表。清单列表包含有关组成该快照的每个清单文件的信息,例如清单文件的位置、作为快照的一部分添加的快照、其所属分区的信息以及其跟踪的数据文件的分区列的下限和上限。以下是清单列表文件的完整内容示例:
snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro(转换为JSON)Another aptly named file, the manifest list is a list of manifest files. The manifest list has information about each manifest file that makes up that snapshot, such as the location of the manifest file, what snapshot it was added as part of, and information about the partitions it belongs to and the lower and upper bounds for partition columns for the data files it tracks.Here’s an example of the full contents of a manifest list file: snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro (converted to JSON)
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 1257424822184505300
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¹Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¹Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/d8a778f9-ad19-4e9c-88ff-28f49ec939fa-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 8271497753230544000
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¸Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¸Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}
当SELECT查询正在读取Iceberg表并在从元数据文件中获取快照的位置后为快照打开清单列表时,查询引擎将读取清单路径条目的值,并打开清单文件。它还可以在这个阶段进行一些优化,例如使用行计数或使用分区信息过滤数据。When a SELECT query is reading an Iceberg table and has the manifest list open for the snapshot after getting its location from the metadata file, the query engine then reads the value of the manifest-path entries, and opens the manifest files. It could also do some optimizations at this stage like using row counts or filtering of data using the partition information.
Manifest file
清单文件跟踪数据文件以及每个文件的其他详细信息和统计信息。如前所述,Iceberg解决Hive表格式问题的主要区别是在文件级别跟踪数据——清单文件是实现这一点的基础。Manifest files track data files as well as additional details and statistics about each file. As mentioned earlier, the primary difference that allows Iceberg to address the problems of the Hive table format is tracking data at the file level — manifest files are the boots on the ground that do that.
每个清单文件跟踪数据文件的子集,以实现并行性和大规模重用效率。它们包含许多有用的信息,用于在从这些数据文件中读取数据时提高效率和性能,例如分区成员身份、记录计数以及列的下限和上限的详细信息。这些统计信息是在写入操作期间为每个清单的数据文件子集编写的,因此比Hive中的统计信息更可能存在、更准确和最新。Each manifest file keeps track of a subset of the data files for parallelism and reuse efficiency at scale. They contain a lot of useful information that is used to improve efficiency and performance while reading the data from these data files, such as details about partition membership, record count, and lower and upper bounds of columns. These statistics are written for each manifest’s subset of data files during write operation, and are therefore more likely to exist, be accurate, and be up to date than statistics in Hive.
为了不把孩子和洗澡水一起扔出去,Iceberg是文件格式不可知的,因此清单文件还指定了数据文件的文件格式,例如Parquet、ORC或Avro。As to not throw the baby out with the bathwater, Iceberg is file-format agnostic, so the manifest files also specify the file format of the data file, such as Parquet, ORC, or Avro.
以下是清单文件的完整内容示例:eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro(转换为JSON)Here’s an example of the full contents of a manifest file:eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro (converted to JSON)
{
"status": 1,
"snapshot_id": {
"long": 1257424822184505300
},
"data_file": {
"file_path": "/home/hadoop/warehouse/db2/part_table2/data/ts_hour=2021-01-26-01/00000-6-7c6cf3c0-8090-4f15-a4cc-3a3a562eed7b-00001.parquet",
"file_format": "PARQUET",
"partition": {
"ts_hour": {
"int": 447673
}
},
"record_count": 1,
"file_size_in_bytes": 973,
"block_size_in_bytes": 67108864,
"column_sizes": {
"array": [ {
"key": 1,
"value": 47
},
{
"key": 2,
"value": 57
},
{
"key": 3,
"value": 60
} ]
},
"value_counts": {
"array": [ {
"key": 1,
"value": 1
},
{
"key": 2,
"value": 1
},
{
"key": 3,
"value": 1
} ]
},
"null_value_counts": {
"array": [ {
"key": 1,
"value": 0
},
{
"key": 2,
"value": 0
},
{
"key": 3,
"value": 0
} ]
},
"lower_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"upper_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"key_metadata": null,
"split_offsets": {
"array": [
4
]
}
}
}
当SELECT查询正在读取Iceberg表并在从清单列表中获取其位置后打开清单文件时,查询引擎将读取每个数据文件对象的文件路径条目的值,并打开数据文件。它还可以在这个阶段进行一些优化,例如使用行计数或使用分区或列统计信息过滤数据。When a SELECT query is reading an Iceberg table and has a manifest file open after getting its location from the manifest list, the query engine then reads the value of the file-path entries for each data-file object, and opens the data files. It could also do some optimizations at this stage like using row counts or filtering of data using the partition or column statistic information.
了解了冰山表的不同组件以及访问冰山表中数据的任何引擎或工具所采取的路径后,现在让我们更深入地了解一下在冰山表上执行CRUD操作时的情况。With this understanding of the different components of an Iceberg table and the path taken by any engine or tool accessing data in an Iceberg table, let’s now take a deeper look at what happens under the covers when CRUD operations are performed on an Iceberg table.
Iceberg和Hive不同的是,Iceberg不是通过list出目录来跟踪分区和定位文件的。从上面的元数据文件可以看出,Iceberg的清单文件中会记录每个数据文件所属的分区值信息,同时在清单列表中会记录每个清单文件的分区信息。除此以外在Iceberg的数据文件中也会存储分区列的值,以进行自动分区转换的实现。
总而言之,Iceberg采用的是直接存储分区值而不是作为字符串键,这样无需像 Hive 中那样解析键或 URL 编码值,同时利用元数据索引来过滤分区选择数据文件。
综上,每次进行数据的增删改都会创建一系列的Data file 或 Delete file数据文件, 同时会生成多个追踪和记录每个数据文件的manifest file清单文件,每个清单文件中可能会记录多个数据文件的统计信息;这些清单文件会被汇总记录到snapshot文件中的manifest list清单文件列表中,同时在快照文件中记录了每个清单文件的统计信息,方便跳过整个清单文件。而每次操作都会重新复制一份metadata.json 的元数据文件,文件汇总了所有快照文件的信息,同时在文件中追加写入最新生成的快照文件。
在Iceberg中自上而下实现了三层的数据过滤策略,分别是分区裁剪、文件过滤和RowGroup过滤。
- 分区剪裁:对于分区表来说,优化器可以自动从where条件中根据分区键直接提取出需要访问的分区,从而避免扫描所有的分区,降低了IO请求。Iceberg支持分区表和隐式分区技术,所以很自然地支持分区裁剪优化。如上一节所示,Iceberg实现分区剪枝并不依赖文件所在的目录,而是利用了Iceberg特有的清单文件实现了一套更为复杂的分区系统及分区剪枝算法,名为Hidden Partition。首先每个snapshot中都存储所有manifest清单文件的包含分区列信息,每个清单文件每个数据文件中存储分区列值信息。这些元数据信息可以帮助确定每个分区中包含哪些文件。这样实现的好处是:1. 无需调用文件系统的list操作,可以直接定位到属于分区的数据文件。2. partition的存储方式是透明的,用户在查询时无需指定分区,Iceberg可以自己实现分区的转换。3. 即使用户修改分区信息后,用户无需重写之前的数据。
- 文件过滤:Iceberg提供了文件级别的统计信息,例如Min/Max等。可以用where语句中的过滤条件去判断目标数据是否存在于文件中。Iceberg利用元数据中的统计信息,通过Predicate PushDown(谓词下推)实现数据的过滤。在讲Iceberg前我们先来说下Spark是如何实现谓词下推的:在SparkSQL优化中,会把查询的过滤条件,下推到靠近存储层,这样可以减少从存储层读取的数据量。其次在真正读取过滤数据时,Spark并不自己实现谓词下推,而是交给文件格式的reader来解决。例如对于parquet文件,Spark使用PartquetRecordReader或VectorizedParquetRecordReader类来读取parquet文件,分别对于非向量化读和向量化的读取。在构造reader类时需要提供filter的参数,即过滤的条件。过滤逻辑稍后由RowGroupFilter调用,根据文件中块的统计信息或存储列的元数据验证是否应该删除读取块(Spark在3.1 支持avro, json, csv的谓词下推)。相比于Spark, Iceberg会在snapshot层面基于元数据信息过滤掉不满足条件的data file。
- RowGroup过滤:对于Parquet这类列式存储文件格式,它也会有文件级别的统计信息,例如Min/Max/BloomFiter等等,利用这些信息可以快速跳过无关的RowGroup,减少文件内的数据扫描。Iceberg在data file层面过滤掉不满足条件的RowGroup。这一点和Spark实际是类似的,但是作为存储引擎的Iceberg,他使用了parquet更偏底层的ParquetFileReader接口,自己实现了过滤逻辑。Iceberg通过调用更底层的API, 可以直接跳过整个RowGroup, 更进一步的减少了IO量。
CREATE TABLE
First, let’s create a table in our environment.
CREATE TABLE table1 (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10, 2),
order_ts TIMESTAMP
)
USING iceberg
PARTITIONED BY ( HOUR(order_ts) );
After this statement is executed, the environment will look like this:
上面,我们在数据库db1中创建了一个名为table1的表。该表有4列,并按order_ts时间戳列的小时粒度进行分区(稍后将详细介绍)。
当执行上述查询时,在元数据层中创建具有快照s0的元数据文件(快照s0不指向任何清单列表,因为表中还没有数据)。然后更新db1.table1当前元数据指针的目录条目,以指向此新元数据文件的路径。Above, we created a table called table1 in database db1. The table has 4 columns and is partitioned at the hour granularity of the order_ts timestamp column (more on that later). When the query above is executed, a metadata file with a snapshot s0 is created in the metadata layer (snapshot s0 doesn’t point to any manifest lists because no data exists in the table yet). The catalog entry for db1.table1’s current metadata pointer is then updated to point to the path of this new metadata file.
INSERT
Now, let’s add some data to the table (albeit, literal values).
INSERT INTO table1 VALUES (
123,
456,
36.17,
'2021-01-26 08:10:23'
);
When we execute this INSERT statement, the following process happens: 当我们执行此INSERT语句时,会发生以下过程:
- 首先创建Parquet文件形式的数据–table1/data/order_ts_hour=2021-01-26-08/00000-5-cae2d.Parquet The data in the form of a Parquet file is first created – table1/data/order_ts_hour=2021-01-26-08/00000-5-cae2d.parquet
- 然后,创建指向此数据文件的清单文件(包括其他详细信息和统计信息)–table1/metadata/d8f9-ad19-4e.avro Then, a manifest file pointing to this data file is created (including the additional details and statistics) – table1/metadata/d8f9-ad19-4e.avro
- 然后,创建指向此清单文件的清单列表(包括其他详细信息和统计信息)–table1/metadata/snap-2938-1-4103.avro Then, a manifest list pointing to this manifest file is created (including the additional details and statistics) – table1/metadata/snap-2938-1-4103.avro
- 然后,基于先前当前的元数据文件创建一个新的元数据文件,其中包含一个新快照s1,并跟踪先前的快照s0,指向此清单列表(包括其他详细信息和统计信息)–table1/metadata/v.metadata.json Then, a new metadata file is created based on the previously current metadata file with a new snapshot s1 as well as keeping track of the previous snapshot s0, pointing to this manifest list (including the additional details and statistics) – table1/metadata/v2.metadata.json
- 然后,db1.table1的当前元数据指针的值在目录中自动更新,现在指向这个新的元数据文件。 Then, the value of the current metadata pointer for db1.table1 is atomically updated in the catalog to now point to this new metadata file.
在所有这些步骤中,任何读取表的人都将继续读取第一个元数据文件,直到原子步骤#5完成,这意味着任何使用数据的人都不会看到表状态和内容的不一致视图。 During all of these steps, anyone reading the table would continue to read the first metadata file until the atomic step #5 is complete, meaning that no one using the data would ever see an inconsistent view of the table’s state and contents.
MERGE INTO / UPSERT
Now, let’s step through a MERGE INTO / UPSERT operation.
假设我们已经将一些数据放入后台创建的暂存表中。在这个简单的示例中,每次订单发生更改时都会记录信息,我们希望保持此表显示每个订单的最新详细信息,因此如果订单ID已经在表中,我们会更新订单金额。如果我们还没有该订单的记录,我们希望插入该新订单的记录。Let’s assume we’ve landed some data into a staging table we created in the background. In this simple example, information is logged each time there’s a change to the order, and we want to keep this table showing the most recent details of each order, so we update the order amount if the order ID is already in the table. If we don’t have a record of that order yet, we want to insert a record for this new order.
在本例中,阶段表包括表中已有订单的更新(order_id=123)和表中尚未出现的新订单,发生在2021 1月27日10:21:46。In this example, the stage table includes an update for the order that’s already in the table (order_id=123) and a new order that isn’t in the table yet, which occurred on January 27, 2021 at 10:21:46.
MERGE INTO table1
USING ( SELECT * FROM table1_stage ) s
ON table1.order_id = s.order_id
WHEN MATCHED THEN
UPDATE table1.order_amount = s.order_amount
WHEN NOT MATCHED THEN
INSERT *
When we execute this MERGE INTO statement, the following process happens:
- 按照前面详述的读取路径,确定table1和table1_stage中具有相同order_id的所有记录。The read path as detailed earlier is followed to determine all records in table1 and table1_stage that have the same order_id.
- 包含来自表1的order_id=123的记录的文件被读取到查询引擎的内存(00000-5-cae2d.parquet)中,该内存副本中order_id=13的记录随后更新其order_amount字段,以反映表1_stage中匹配记录的新order_amount。然后将原始文件的修改副本写入新的Parquet文件–table1/data/order_ts_hour=2021-01-26-08/000001-aef71.Parquet。即使文件中有其他记录与order_id更新条件不匹配,仍会复制整个文件,并在复制时更新一个匹配的记录,然后将新文件写出来,这是一种称为“写时复制”的策略。Iceberg即将推出一种新的数据更改策略,称为“读取时合并”(merge-on-read),它在封面下的行为会有所不同,但仍为您提供相同的更新和删除功能。The file containing the record with order_id=123 from table1 is read into the query engine’s memory (00000-5-cae2d.parquet), order_id=123’s record in this memory copy then has its order_amount field updated to reflect the new order_amount of the matching record in table1_stage. This modified copy of the original file is then written to a new Parquet file – table1/data/order_ts_hour=2021-01-26-08/00000-1-aef71.parquet. Even if there were other records in the file that didn’t match the order_id update condition, the entire file would still be copied and the one matching record updated as it was copied, and the new file written out — a strategy known as copy-on-write. There is a new data change strategy coming soon in Iceberg known as merge-on-read which will behave differently under the covers, but still provides you the same update and delete functionality.
- table1_stage中与table1中的任何记录都不匹配的记录将以新的Parquet文件的形式写入,因为它与匹配记录table1/data/order_ts_hour=2021-01-27-10/00000-3-0fa3a.Parquet属于不同的分区 The record in table1_stage that didn’t match any records in table1 gets written in the form of a new Parquet file, because it belongs to a different partition than the matching record – table1/data/order_ts_hour=2021-01-27-10/00000-3-0fa3a.parquet
- 然后,将创建指向这两个数据文件的新清单文件(包括其他详细信息和统计信息)–table1/metadata/0d9a-98fa-77.avro。在本例中,快照s1中的唯一数据文件中的唯一记录已更改,因此没有重复使用清单文件或数据文件。通常情况下并非如此,清单文件和数据文件在快照之间重用。Then, a new manifest file pointing to these two data files is created (including the additional details and statistics) – table1/metadata/0d9a-98fa-77.avro. In this case, the only record in the only data file in snapshot s1 was changed, so there was no reuse of manifest files or data files. Normally this is not the case, and manifest files and data files are reused across snapshots.
- 然后,将创建指向此清单文件的新清单列表(包括其他详细信息和统计信息)–table1/metadata/snap-9fa1-3-16c3.avro。Then, a new manifest list pointing to this manifest file is created (including the additional details and statistics) – table1/metadata/snap-9fa1-3-16c3.avro
- 然后,基于先前当前的元数据文件创建一个新的元数据文件,其中包含一个新快照s2,并跟踪先前的快照s0和s1,指向此清单列表(包括其他详细信息和统计信息)–table1/metadata/v3.metadata.json。Then, a new metadata file is created based on the previously current metadata file with a new snapshot s2 as well as keeping track of the previous snapshots s0 and s1, pointing to this manifest list (including the additional details and statistics) – table1/metadata/v3.metadata.json
- 然后,db1.table1的当前元数据指针的值在目录中自动更新,现在指向这个新的元数据文件。Then, the value of the current metadata pointer for db1.table1 is atomically updated in the catalog to now point to this new metadata file.
虽然这个过程有多个步骤,但这一切都很快发生。一个例子是Adobe做了一些基准测试,发现他们每分钟可以完成15次提交。
在上图中,我们还显示了在执行此MERGE INTO之前,后台垃圾收集作业运行以清理未使用的元数据文件-请注意,创建表时快照s0的第一个元数据文件已不存在。因为每个新的元数据文件还包含以前文件中所需的重要信息,所以可以安全地清理这些文件。还可以通过垃圾收集清理未使用的清单列表、清单文件和数据文件。While there are multiple steps to this process, it all happens quickly. One example is where Adobe did some benchmarking and found they could achieve 15 commits per minute. In the diagram above, we also show that before this MERGE INTO was executed, a background garbage collection job ran to clean up unused metadata files — note that our first metadata file for snapshot s0 when we created the table is no longer there. Because each new metadata file also contains the important information needed from previous ones, these can be cleaned up safely. Unused manifest lists, manifest files, and data files can also be cleaned up via garbage collection.
SELECT
Let’s review the SELECT path again, but this time on the Iceberg table we’ve been working on.
SELECT * FROM db1.table1
When this SELECT statement is executed, the following process happens:
- 查询引擎转到Iceberg目录 The query engine goes to the Iceberg catalog
- 然后它检索db1.table1的当前元数据文件位置条目 It then retrieves the current metadata file location entry for db1.table1
- 然后它打开此元数据文件并检索当前快照的清单列表位置的条目s2 It then opens this metadata file and retrieves the entry for the manifest list location for the current snapshot, s2
- 然后它打开这个清单列表,检索唯一清单文件的位置 It then opens this manifest list, retrieving the location of the only manifest file
- 然后它打开这个清单文件,检索两个数据文件的位置 It then opens this manifest file, retrieving the location of the two data files
- 然后它读取这些数据文件,因为它是SELECT*,所以将数据返回给客户端 It then reads these data files, and since it’s a SELECT *, returns the data back to the client
Hidden Partitioning
Recall earlier in this post we discussed one of the problems of the Hive table format is that the users need to know the physical layout of the table in order to avoid very slow queries.回想一下在这篇文章的前面,我们讨论了Hive表格式的一个问题,即用户需要知道表的物理布局,以避免非常慢的查询。
假设一个用户想查看某一天的所有记录,例如2021 1月26日,所以他们发出以下查询:Let’s say a user wants to see all records for a single day, say January 26, 2021, so they issue this query:
SELECT *
FROM table1
WHERE order_ts = DATE '2021-01-26'
Recall that when we created the table, we partitioned it at the hour-level of the timestamp of when the order first occurred. In Hive, this query generally causes a full table scan.Let’s walk through how Iceberg addresses this problem and provides users with the ability to interact with the table in an intuitive way while still achieving good performance, avoiding a full table scan.回想一下,当我们创建表时,我们在订单首次发生时的时间戳的小时级别对其进行了分区。在配置单元中,此查询通常会导致全表扫描。让我们来了解一下Iceberg如何解决这个问题,并为用户提供以直观方式与表交互的能力,同时仍能获得良好的性能,避免了全表扫描。
执行此SELECT语句时,将发生以下过程:When this SELECT statement is executed, the following process happens:
- 查询引擎转到Iceberg目录。The query engine goes to the Iceberg catalog.
- 然后,它检索db1.table1的当前元数据文件位置条目。It then retrieves the current metadata file location entry for db1.table1.
- 然后,它打开这个元数据文件,检索当前快照s2的清单列表位置的条目。它还查找文件中的分区规范,并看到表是在order_ts字段的小时级别分区的。It then opens this metadata file, retrieves the entry for the manifest list location for the current snapshot s2. It also looks up the partition specification in the file and sees that the table is partitioned at the hour level of the order_ts field.
- 然后,它打开这个清单列表,检索唯一清单文件的位置。It then opens this manifest list, retrieving the location of the only manifest file.
- 然后,它打开这个清单文件,查看每个数据文件的条目,将数据文件所属的分区值与用户查询请求的分区值进行比较。此文件中的值对应于自Unix epoch以来的小时数,然后引擎使用该小时数确定只有一个数据文件中的事件发生在2021 1月26日(换句话说,在2021 1月26号00:00:00到2021 1月26日间23:59:59)。具体来说,唯一匹配的事件是我们插入的第一个事件,因为它发生在2021 1月26日08:10:23。另一个数据文件的订单时间戳是2021 1月27日10:21:46,也就是说,不是2021 1月26号,所以它与过滤器不匹配。It then opens this manifest file, looking at each data file’s entry to compare the partition value the data file belongs to with the one requested by the user’s query. The value in this file corresponds to the number of hours since the Unix epoch, which the engine then uses to determine that only the events in one of the data files occurred on January 26, 2021 (or in other words, between January 26, 2021 at 00:00:00 and January 26, 2021 at 23:59:59). Specifically, the only event that matched is the first event we inserted, since it happened on January 26, 2021 at 08:10:23. The other data file’s order timestamp was January 27, 2021 at 10:21:46, i.e., not on January 26, 2021, so it didn’t match the filter.
- 然后它只读取一个匹配的数据文件,因为它是SELECT*,所以它将数据返回给客户端。It then only reads the one matching data file, and since it’s a SELECT *, it returns the data back to the client.
Time Travel
Another key capability the Iceberg table format enables is something called “time travel.” 冰山表格式的另一个关键功能是“时间旅行”
为了跟踪一段时间内表的状态以实现法规遵从性、报告或再现性,数据工程通常需要编写和管理在特定时间点创建和管理表副本的作业。
相反,Iceberg提供了一种开箱即用的能力,可以在过去的不同时间点看到桌子的样子。To keep track of the state of a table over time for compliance, reporting, or reproducibility purposes, data engineering traditionally needs to write and manage jobs that create and manage copies of the table at certain points in time.Instead, Iceberg provides the ability out-of-the-box to see what a table looked like at different points in time in the past.
例如,假设今天一个用户需要查看截至2021 1月28日的表格内容,因为这是一篇静态文本文章,假设是在1月27日的订单插入表格之前,以及在1月26日的订单通过上面的UPSERT操作更新其订单金额之前。他们的查询如下:For instance, let’s say that today a user needed to see the contents of our table as of January 28, 2021, and since this is a static text article, let’s say was before the order from January 27 was inserted into the table and before the order from January 26 had its order amount updated via the UPSERT operation we did above. Their query would look like this:
SELECT *
FROM table1 AS OF '2021-01-28 00:00:00'
-- (timestamp is from before UPSERT operation)
执行此SELECT语句时,将发生以下过程:When this SELECT statement is executed, the following process happens:
- 查询引擎转到Iceberg目录 The query engine goes to the Iceberg catalog
- 然后,它检索db1.table1的当前元数据文件位置条目 It then retrieves the current metadata file location entry for db1.table1
- 然后,它打开此元数据文件并查看快照数组中的条目(其中包含创建快照的毫秒Unix纪元时间,因此成为最新的快照),确定截至请求的时间点(2021 1月28日午夜)哪个快照处于活动状态,并检索该快照的清单列表位置的条目,即s1 It then opens this metadata file and looks at the entries in the snapshots array (which contains the millisecond Unix epoch time the snapshot was created, and therefore became the most current snapshot), determines which snapshot was active as of the requested point in time (January 28, 2021 at midnight), and retrieves the entry for the manifest list location for that snapshot, which is s1
- 然后,它打开这个清单列表,检索唯一清单文件的位置 It then opens this manifest list, retrieving the location of the only manifest file
- 然后,它打开这个清单文件,检索两个数据文件的位置 It then opens this manifest file, retrieving the location of the two data files
- 然后,它读取这些数据文件,因为它是SELECT*,所以将数据返回给客户端 It then reads these data files, and since it’s a SELECT *, returns the data back to the client
请注意,在上图中的文件结构中,尽管旧的清单列表、清单文件和数据文件未在表的当前状态中使用,但它们仍然存在于数据湖中,可以使用。Notice in the file structure in the diagram above, that although the old manifest list, manifest file, and data files are not used in the current state of the table, they still exist in the data lake and are available for use.
当然,尽管保留旧的元数据和数据文件在这些用例中提供了价值,但在某一点上,您将拥有不再被访问的元数据和文件,或者允许人们访问它们的价值超过了保留它们的成本。因此,有一个异步后台进程可以清理旧文件,称为垃圾收集。垃圾收集策略可以根据业务需求进行配置,它是您希望为旧文件使用多少存储空间与您希望提供的时间间隔和粒度之间的权衡。Of course, while keeping around old metadata and data files provides value in these use cases, at a certain point you’ll have metadata and data files that are either no longer accessed or the value of allowing people to access them outweighs the cost of keeping them. So, there is an asynchronous background process that cleans up old files called garbage collection. Garbage collection policies can be configured according to the business requirements, and is a trade-off between how much storage you want to use for old files versus how far back in time and at what granularity you want to provide.
Compaction
Iceberg设计的另一个关键功能是压缩,这有助于平衡写端和读端的权衡。Another key capability available as part of Iceberg’s design is compaction, which helps balance the write-side and read-side trade-offs.
在Iceberg,压缩是一个异步的后台过程,它将一组小文件压缩成更少的大文件。因为它是异步的,而且是在后台的,所以它不会对用户产生负面影响。事实上,这基本上是一种特定类型的普通Iceberg写作业,它具有与输入和输出相同的记录,但在写作业提交事务后,文件大小和属性的分析能力大大提高。In Iceberg, compaction is an asynchronous background process that compacts a set of small files into fewer larger files. Since it’s asynchronous and in the background, it has no negative impact on your users. In fact, it’s basically a specific kind of a normal Iceberg write job that has the same records as input and output, but the file sizes and attributes are far improved for analytics after the write job commits its transaction.
无论何时,在处理数据时,都会对所要实现的目标进行权衡,通常,写端和读端的激励都会产生相反的效果。Anytime you’re working with data, there are trade-offs for what you’re looking to achieve, and in general the incentives on the write-side and read-side pull in opposite directions.
在写端,您通常希望低延迟-使数据尽快可用,这意味着您希望在获得记录后立即写入,甚至可能不将其转换为列格式。但是,如果您要对每个记录执行此操作,那么每个文件将有一个记录(小文件问题的最极端形式)。On the write-side, you generally want low latency — making the data available as soon as possible, meaning you want to write as soon as you get the record, potentially without even converting it into a columnar format. But, if you were to do this for every record, you would end up with one record per file (the most extreme form of the small files problem).
在读取方面,您通常需要高吞吐量-在一个文件中有许多记录,并且是列格式的,因此数据相关的可变成本(读取数据)超过了固定成本(记录保存、打开每个文件等的开销)。您通常也需要最新的数据,但您需要支付读取操作的成本。On the read-side, you generally want high throughput — having many many records in a single file and in a columnar format, so your data-correlated variable costs (reading the data) outweigh your fixed costs (overhead of record-keeping, opening each file, etc.). You also generally want up-to-date data, but you pay the cost of that on read operations.
压缩有助于平衡写端和读端的权衡-您可以在获得数据后立即将其写入接近的位置,这在极端情况下是每个文件一条行格式的记录,读者可以立即看到并使用,而后台压缩过程会定期地将所有这些小文件合并为更小、更大的列格式文件。Compaction helps balance the write-side and read-side trade-offs — you can write the data close to as soon as you get it, which at the extreme would be 1 record in row format per file which readers can see and use right away, while a background compaction process periodically takes all those small files and combines them into fewer, larger, columnar format files.
通过压缩,您的读者可以继续以他们想要的高吞吐量形式获得99%的数据,但仍然可以以低延迟低吞吐量形式看到最近的1%的数据。对于这个用例,还需要注意的是,压缩作业的输入文件格式和输出文件格式可以是不同的文件类型。一个很好的例子是从流式写入中写入Avro,这些流式写入被压缩成更大的Parquet文件进行分析。With compaction, your readers continually have 99% of their data in the high-throughput form they want, but still see the most recent 1% of data in the low-latency low-throughput form. It’s also important to note for this use case that the input file format and output file format of compaction jobs can be different file types. A good example of this would be writing Avro from streaming writes, which are compacted into larger Parquet files for analytics.
另一个重要的注意事项是,由于Iceberg不是一个引擎或工具,因此调度/触发和实际压缩工作由与Iceberg集成的其他工具和引擎完成。Another important note, since Iceberg is not an engine or tool, scheduling/triggering and the actual compaction work is done by other tools and engines that integrate with Iceberg.
Design Benefits of the FormatNow, let’s apply what we’ve gone through so far to the higher-level value the architecture and design provides.
Snapshot isolation for transactions
Iceberg表上的读写互不干扰。Iceberg通过乐观并发控制提供并发写入的能力。所有的写入都是原子的。Reads and writes on Iceberg tables don’t interfere with each other.Iceberg provides the ability for concurrent writes via Optimistic Concurrency Control. All writes are atomic.
Faster planning and execution
这两个好处都源于这样一个事实,即您在写路径上写下了有关所写内容的详细信息,而不是在读路径上获得这些信息。Both of these benefits stem from the fact that you’re writing details about what you’ve written on the write-path, versus getting that information on the read-path.
因为文件列表是在对表进行更改时写入的,所以不需要在运行时执行昂贵的文件系统列表操作,这意味着在运行时要做的工作和等待的时间要少得多。Because the list of files are written when changes to the table are made, there’s no need to do expensive filesystem list operations at runtime, meaning there is far less work and waiting to do at runtime.
由于有关文件中数据的统计信息是在写端写入的,因此统计信息不会丢失、错误或过时,这意味着基于成本的优化器可以更好地决定哪个查询计划提供最快的响应时间。Because statistics about the data in the files is written on the write-side, statistics aren’t missing, wrong, or out of date, meaning cost-based optimizers can make better decisions in deciding which query plan provides the fastest response time.
因为文件中数据的统计信息是在文件级别跟踪的,所以统计信息不是粗粒度的,这意味着引擎可以进行更多的数据修剪,处理更少的数据,因此响应时间更快。Because statistics about the data in the files is tracked at the file level, the statistics aren’t as coarse-grained, meaning engines can do more data pruning, process less data, and therefore have faster response times.
在本文前面链接的Ryan Blue的演示中,他分享了Netflix的一个示例用例的结果:对于Hive表上的查询,仅计划查询就花费了9.6分钟;对于Iceberg表上的相同查询,计划和执行查询只需42秒。In Ryan Blue’s presentation linked earlier in this article, he shares the results of an example use case at Netflix:For a query on a Hive table, it took 9.6 minutes just to plan the query; For the same query on an Iceberg table, it only took 42 seconds to plan and execute the query
Abstract the physical, expose a logical view
在本文的前面,我们看到,对于Hive表,用户通常需要知道表的潜在的不直观的物理布局,以便获得更好的性能。Earlier in this article, we saw that with Hive tables, users often need to know the potentially unintuitive physical layout of the table in order to achieve even decent performance.
Iceberg提供了不断向用户公开逻辑视图的能力,从而将逻辑交互点与数据的物理布局分离开来。我们看到了这对于隐藏分区和压缩之类的功能是多么有用。Iceberg provides the ability to continually expose a logical view to your users, decoupling the logical interaction point from the physical layout of the data. We saw how this is incredibly useful with capabilities like hidden partitioning and compaction.
Iceberg通过模式演化、分区演化和排序顺序演化功能,提供了随时间透明地演化表的能力。有关这些的更多详细信息,请访问冰山文档网站。Iceberg provides the ability to transparently evolve your table over time, via schema evolution, partition evolution, and sort order evolution capabilities. More details on these can be found on the Iceberg docs site.
对于数据工程来说,在幕后尝试不同的、可能更好的表布局要容易得多。一旦提交,更改将生效,用户无需更改其应用程序代码或查询。如果实验结果使情况变得更糟,则可以回滚事务,用户返回到以前的体验。让实验更安全可以进行更多的实验,从而让你找到更好的方法。It is much easier for data engineering to experiment with different, potentially better, table layouts behind the scenes. Once committed, the changes will take effect without users having to change their application code or queries. If an experiment turns out to make things worse, the transaction can be rolled back and users are returned to the previous experience. Making experimentation safer allows more experiments to be performed and therefore allows you to find out better ways of doing things.
All engines see changes immediately
因为构成表内容的文件是在写端定义的,并且一旦文件列表发生更改,所有新的读取器都指向这个新列表(通过从目录开始的读取流),一旦写入者对表进行更改,使用这个表的所有新查询都会立即看到新数据。Because the files making up a table’s contents are defined on the write-side, and as soon as the file list changes all new readers are pointed to this new list (via the read flow starting at the catalog), as soon as a writer makes a change to the table, all new queries using this table immediately see the new data.
Event listeners
Iceberg有一个框架,允许在Iceberg表上发生事件时通知其他服务。目前,此功能处于早期阶段,只有扫描表时才能发出事件。然而,这个框架提供了未来功能的能力,例如保持缓存、物化视图和索引与原始数据同步。Iceberg has a framework that allows other services to be notified when an event occurs on an Iceberg table. Currently, this feature is in the early stages, and only an event when a table is scanned can be emitted. This framework, however, provides the ability for future capabilities, such as keeping caches, materialized views, and indexes in sync with the raw data.
Efficiently make smaller updates
因为数据是在文件级别跟踪的,所以可以更有效地对数据集进行较小的更新。Because data is tracked at the file level, smaller updates can be made to the dataset much more efficiently.
参考文档:
https://www.dremio.com/resources/guide/apache-iceberg-an-architectural-look-under-the-covers/
https://www.zhihu.com/question/427612618
https://mp.weixin.qq.com/s?__biz=MzI1MjQ2OTQ3Ng==&mid=2247578118&idx=1&sn=53b54e5f60ed7039e2acd39123879754&chksm=e9e0bb8dde97329b612c6aa8332734d6a234b7888bfb884fe6c5c30bf47ebc7b32638f2e983b&scene=27