文章目录
1. 引言
通过本文,我们将带领读者《 Apache Paimon Docs - Table with PK》 展开讨论,本文将深入剖析 Paimon 的主键表相关知识,包括其数据结构、桶、表模式、合并引擎以及相关组件等方面,旨在帮助读者全面理解 Paimon 在数据管理中的核心概念与应用,为大数据处理与分析提供有力的技术支持。
通过本文你将了解到:
- Paimon 主键表相关的核心概念及其作用;
- 不同数据结构与组件如何协同工作以实现高效的数据处理;
- 如何根据实际需求选择合适的表模式与合并引擎。
2. 基本概念
2.1 数据结构
主键表:定义了主键的表可以进行插入、更新和删除操作。主键由一组列组成,这些列对每条记录具有唯一性。Paimon
通过对每个桶(bucket
)内的主键进行排序来确保数据有序,从而在应用主键过滤条件时实现高性能。
LSM
树(Log-Structured Merge Tree)
- 数据结构:Paimon 使用 LSM 树作为文件存储的数据结构。LSM 树通过将文件组织成若干排序运行(sorted runs)来管理数据。
具体的LSM介绍可以参考这篇文章:005.精读《B-Tree vs LSM-Tree》。
Sorted Runs
(排序运行)
-
组织方式:
- LSM 树将文件组织成若干排序运行,一个排序运行由一个或多个数据文件组成,每个数据文件仅属于一个排序运行。
- 数据文件中的记录按照主键排序。排序运行内,数据文件的主键范围不重叠。
-
查询过程:
- 不同的排序运行可能有重叠的主键范围,甚至可能包含相同的主键。
- 查询时需要合并所有排序运行,并根据用户指定的合并引擎和记录的时间戳来合并具有相同主键的所有记录。
-
写入过程:
- 新记录首先缓存在内存中,当内存缓冲区满时,所有记录将被排序并刷新到磁盘,创建一个新的排序运行。
Paimon
通过主键排序、分桶结构和LSM
树结构实现高效的数据存储和查询。在每个桶内,主键进行排序以确保数据有序;通过分桶管理数据,提升查询和写入的效率;同时,利用LSM
树有效组织和存储新数据,并通过有序运行管理确保查询性能。
2.2 桶
Bucket
(桶)
- 概念:无分区表或分区表中的分区被进一步细分为多个桶,以提供更高效的查询结构。
- 结构:每个桶目录包含一个 LSM 树及其变更日志文件。
- 分桶方式:桶的范围由记录中一个或多个列的哈希值决定。用户可以通过
bucket-key
选项指定分桶列。如果未指定,将使用主键(如定义)或完整记录作为分桶键。 - 处理并行度:桶是读写的最小存储单元,桶的数量限制了最大处理并行度。桶的数量不宜过大,否则会导致许多小文件并降低读性能。一般推荐每个桶的数据大小在 200MB 到 1GB 之间。
固定桶(Fixed Bucket
)
- 机制:配置一个大于 0 的桶数,使用公式
Math.abs(key_hashcode % numBuckets)
计算记录所归属的桶。 - 可扩展性:只能通过离线过程重新调整桶数。桶数过多会导致小文件过多,过少则会导致写入性能下降。
动态桶(Dynamic Bucket
)
- 默认模式:主键表的默认模式,或通过配置
bucket = '-1'
启用。 - 分配策略:
- 初始数据分配到旧桶,新数据分配到新桶,依据数据到达顺序。
- Paimon 通过索引来确定键-桶的映射关系,并自动扩展桶的数量。
- 配置选项:
dynamic-bucket.target-row-num
:控制单个桶的目标行数。dynamic-bucket.initial-buckets
:控制初始桶的数量。
- 限制:动态桶支持单写作业。不要启动多个作业写入同一分区,否则会导致数据重复问题。
动态桶的模式
-
正常动态桶模式(
Normal Dynamic Bucket Mode
)- 数据跨分区情况:
- 更新不跨分区(无分区或主键包含所有分区字段)时,使用 HASH 索引维护键-桶映射。
- 需要更多内存,约 1 亿条目占用 1GB 内存,但不再活动的分区不占用内存。
- 性能:一般无性能损失,但消耗更多内存。适用于更新频率低的表,可显著提升性能。
- 数据跨分区情况:
-
跨分区 Upsert 动态桶模式(
Cross Partitions Upsert Dynamic Bucket Mode
)- 场景:需要跨分区 Upsert 操作(主键不包含所有分区字段)。
- 机制:直接维护键-分区-桶的映射,使用本地磁盘,通过读取所有现有键初始化索引。
- 合并引擎行为:
- Deduplicate:删除旧分区的数据,插入新分区的数据。
- PartialUpdate & Aggregation:在旧分区插入新数据。
- FirstRow:忽略新数据,如果存在旧值。
- 性能:对于大量数据的表,性能会显著下降,且初始化时间较长。
- 优化:
- 配置 Index TTL(
cross-partition-upsert.index-ttl
)以减少索引和初始化时间,但可能导致数据重复。
- 配置 Index TTL(
分区字段的选择:
- 创建时间(推荐):通常是不可变的,可以自信地作为分区字段并添加到主键中。
- 事件时间:原表中的字段,适用于 CDC 数据(如从 MySQL CDC 同步的表),声明主键包含分区字段可以实现唯一效果。
- CDC 操作时间戳(op_ts):不能定义为分区字段,因为无法知道之前记录的时间戳,需要跨分区 Upsert,消耗更多资源。
通过这些高级机制,Paimon
确保了数据在高并发读写操作中的高效分布和访问,同时保证了系统的灵活性和可扩展性。
2.3 表模式
Paimon 的主键表采用 LSM 树结构,每个分区包含多个桶,每个桶是一个独立的 LSM 树,包含多个文件。根据写入过程中不同的处理方式,Paimon 提供了三种模式:
- MOR(Merge On Read):默认模式,仅进行小范围合并,读取时需要进行数据合并。
- COW(Copy On Write):启用全量合并,每次写入后立即完成合并,读取时不需再合并数据。
- MOW(Merge On Write):启用删除向量文件,写入时生成删除向量文件,读取时直接过滤不必要的行。
Merge On Read(MOR
)
- 特点:读取时合并所有文件,因为文件已排序,需要多路合并,包含主键比较。
- 限制:单一 LSM 树只能单线程读取,读取并行度有限。如果桶中数据量过大,读取性能会下降。
- 写性能:非常好。
- 读性能:不太好。
如果不使用删除向量模式,可以通过配置 compaction.optimization-interval
来优化 MOR 模式的读取性能。查询优化系统表中的结果可以避免合并同一主键的记录,从而提高读取性能。
Implying how often to perform an optimization compaction, this configuration is used to ensure the query timeliness of the read-optimized system table.
Copy On Write(COW
)
- 特点:每次写入都会进行全量合并,不再需要读取时合并,读取性能最高。
- 缺点:写入时需要完全合并,写入放大效应严重。
- 写性能:非常差。
- 读性能:非常好。
ALTER TABLE orders SET ('full-compaction.delta-commits' = '1');
Merge On Write(MOW
)
- 特点:利用 LSM 结构的主键查询能力,写入时生成删除向量文件,读取时直接过滤不必要的行,相当于合并。
- 优点:读取性能好,写入性能好。
- 可见性保证:L0 级别的文件在压缩后才可见,默认情况下压缩是同步进行的,如果开启异步模式,可能会有数据延迟。
ALTER TABLE orders SET ('deletion-vectors.enabled' = 'true');
表模式选择:
- MOR:适用于写入频繁、读取性能不敏感的场景。
- COW:适用于读取需求极高、写入频率较低的场景。
- MOW:适用于均衡读写性能的场景。
3. 合并引擎
3.1 概述
在数据流入 Paimon 时,如果收到两个或多个具有相同主键的记录,Paimon 会将它们合并成一条记录以保持主键的唯一性。用户可以通过指定合并引擎(merge-engine)属性来选择如何合并这些记录。
-
Flink SQL 配置:在 Flink SQL 的 TableConfig 中,将
table.exec.sink.upsert-materialize
设置为NONE
,因为启用upsert-materialize
可能会导致奇怪的行为。 -
乱序处理:当输入数据无序时,建议使用序列字段(Sequence Field)来纠正乱序。
-
默认引擎:去重合并引擎是 Paimon 的默认合并引擎。Paimon 只会保留最新的记录,并丢弃其他具有相同主键的记录。
-
DELETE记录处理:具体来说,如果最新记录是 DELETE 记录,那么所有具有相同主键的记录将被删除。用户可以配置
ignore-delete
参数来忽略 DELETE 记录。
SET table.exec.sink.upsert-materialize = 'NONE';
CREATE TABLE my_table (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate',
'ignore-delete' = 'true'
);
3.2 部分更新
通过设置 merge-engine
为 partial-update
,用户可以在多个更新中逐一更新记录的字段,直到最终记录完成。这个过程中,空值不会覆盖现有值。
- 用途:逐步构建完整记录,特别是在初始数据不完整或需要多次更新数据的场景。
- 机制:每次更新记录中只有非空字段会被更新,相同主键的记录会被逐一合并。
- 范例:一条记录可以通过多个部分更新合并成最终的完整记录。
流式查询要求,在流式数据处理中,部分更新合并引擎需要配合查找或全量压缩变更日志生成器使用(input
变更日志生成器也被支持,但仅返回输入记录)。
删除记录处理方案,默认情况下,部分更新不接受删除记录,可以通过配置以下选项进行处理:
- 忽略删除记录:配置
ignore-delete
。 - 删除整行记录:配置
partial-update.remove-record-on-delete
。 - 部分列撤销:配置
sequence-group
。
当进行多流更新时,序列字段可能不能完全解决乱序问题,因为一个流的最新数据可能会覆盖另一个流的序列字段。序列组(Sequence Group
),引入序列组机制以解决部分更新表的多流更新乱序问题。
Sequence Group 是为了解决在数据库部分更新(Partial Update)过程中,如何处理字段之间的关联问题。通过指定某些字段属于同一个序列组,可以确保这些字段在数据更新时具有关联性和一致性。
CREATE TABLE AGG
(
k INT,
a INT,
b INT,
g_1 INT,
c VARCHAR,
g_2 INT,
g_3 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.a.aggregate-function' = 'sum',
'fields.g_1,g_3.sequence-group' = 'a',
'fields.g_2.sequence-group' = 'c');
在 Paimon 的配置中,'fields.g_1,g_3.sequence-group' = 'a'
是用来定义字段的“序列组”属性。具体来说,这条配置表示字段 g_1
和 g_3
与字段 a
之间存在一种关联关系,该关系的行为受字段 a
的变化影响。
- fields.g_1,g_3: 指定字段
g_1
和g_3
。 - sequence-group: 表示这些字段属于同一个序列组。
a
: 指定这个序列组的主字段。
-
字段关联:
g_1
和g_3
字段的更新行为将依据字段a
的变化来进行管理。- 换句话说,当字段
a
更新时,Paimon 将确保g_1
和g_3
也会依据设定的逻辑来更新。
-
更新逻辑:
- 在插入新的数据时,如果主键
k
相同且字段a
更新了,那么g_1
和g_3
也会根据新的插入值进行相应的更新。 - 如果
a
没有变化或本次更新没有定义a
,那么g_1
和g_3
也会保留原值。
- 在插入新的数据时,如果主键
初始数据插入
INSERT INTO AGG
VALUES (1, 1, 1, 1, '1', 1, 1);
- 插入一条记录 (
k: 1
,a: 1
,b: 1
,g_1: 1
,c: '1'
,g_2: 1
,g_3: 1
)。
第一次数据更新
-- g_2 字段不更新
INSERT INTO AGG
VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT), 2);
查询结果:
SELECT *
FROM AGG;
-- 输出 1, 3, 2, 2, '1', 1, 2
分析:
a
从 1 累加到 3(使用 SUM 聚合函数)。g_1
和g_3
均更新为插入行中新的值 2,因为a
发生了变化。
第二次数据更新
-- g_1 和 g_3 字段不更新
INSERT INTO AGG
VALUES (1, 3, 3, 2, '3', 3, 1);
查询结果:
SELECT *
FROM AGG;
-- 输出 1, 6, 3, 2, '3', 3, 2
分析:
a
从 3 累加到 6(使用 SUM 聚合函数)。c
更新为 ‘3’。g_1
和g_3
保持原值(2 和 2),因为本次插入中,字段a
是累加更新而没有新的明确值插入,因此g_1
和g_3
保留原有值。
通过设置 'fields.g_1,g_3.sequence-group' = 'a'
,我们告诉 Paimon 在数据更新时,字段 g_1
和 g_3
应该与字段 a
的变化关联起来。这种机制有助于保持数据更新的一致性和关联性,特别是在处理复杂业务场景时,能够确保数据的正确性和逻辑完整性。
3.3 聚合引擎
Paimon 提供了聚合合并引擎,通过组合多个更新,实时计算出各字段的聚合结果。这个引擎尤其适合用户只关心聚合结果的场景。
- 配置建议:在 Flink SQL TableConfig 中,将
table.exec.sink.upsert-materialize
设置为NONE
。 - 工作机制:对于每个主键,基于指定的聚合函数逐一合并字段值。
举个例子:用户可以为每个非主键字段指定聚合函数。例如,下述表定义将 price
字段按最大值聚合,将 sales
字段按求和聚合:
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
给定两条输入记录 <1, 23.0, 15>
和 <1, 30.2, 20>
,最终结果将会是 <1, 30.2, 35>
。
聚合函数
以下是 Paimon 当前支持的聚合函数及其适用的数据类型:
- sum:对多个行的值进行求和。支持的数据类型包括 DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 和 DOUBLE。
- product:对多行值进行乘积计算。支持的数据类型与
sum
类似。 - count:计数符合特定条件的行。通过条件转化为布尔值并进一步转化为数值来实现。
- max:获取最大值。支持多种数据类型如 CHAR, VARCHAR, DECIMAL, 等。
- min:获取最小值,支持数据类型与
max
类似。 - last_value:保留最新导入的值。
- last_non_null_value:保留最新的非空值。
- listagg:将多个字符串值连接成一个字符串。
- bool_and 和 bool_or:分别检查布尔集合中的所有值是否为真,或至少一个值为真。
- first_value 和 first_non_null_value:分别获取数据集中的第一个值或第一个非空值。
- rbm32 和 rbm64:分别聚合多个 32 位和 64 位 RoaringBitmap。
- nested_update:将多行数据收集成一个数组(也称为“嵌套表”)。
- collect:将元素收集到一个数组中,支持去重。
- merge_map:合并输入的映射。
- hll_sketch 和 theta_sketch:使用 HyperLogLog 和 Theta sketch 进行近似去重计数。
使用场景和策略
-
分布式数据处理和分析:
- 使用场景:需要对大量数据进行实时聚合计算,如销售数据汇总、用户行为分析等。
- 策略:指定适当的聚合函数,如
sum
、max
、count
来实时计算所需的聚合结果。
-
复杂的聚合逻辑:
- 使用场景:需要根据多字段顺序进行复杂的聚合操作。
- 策略:定义序列组以及多个排序字段,并为这些字段配置合适的聚合函数,确保数据按照预期的顺序和逻辑进行处理。
-
高效去重和数据质量控制:
- 使用场景:需要进行去重和数据质量控制,如处理实时去重的用户点击、事件日志等。
- 策略:使用
hll_sketch
或theta_sketch
等近似计数方法,选择适合的数据存储和处理算法,达到最优的性能和准确性。
4. 相关组件
4.1 变更日志生成器
Paimon 允许用户通过指定变更日志生成器(changelog-producer
)配置表属性,来控制表文件所产生的变更模式。变更日志生成器可以显著影响压缩性能,不要轻易启用,除非确有必要。
不同类型的变更日志生成器如下:
-
None
- 默认行为:没有额外的变更日志生成器。Paimon 源只能看到跨快照合并的变化,这些变化不能形成完整的变更日志。
- 消费者需求:适合不需要老值的消费者,如数据库系统。Flink 提供内置的 “normalize” 操作符来持久化每个键的值,但这一操作开销很大。
-
Input
- 配置:通过指定
'changelog-producer' = 'input'
,Paimon 写入器依赖其输入作为完整变更日志的源。所有输入记录将保存到单独的变更日志文件中。 - 使用场景:当 Paimon 写入器的输入是完整的变更日志时使用,如来自数据库 CDC 或 Flink 状态计算生成的变更日志。
- 配置:通过指定
-
Lookup
- 配置:通过指定
'changelog-producer' = 'lookup'
,在提交数据写入之前,Paimon 将通过查找生成变更日志。 - 存储和性能调优:可在内存和本地磁盘上缓存数据,有相应的配置选项来调优性能,如
lookup.cache-file-retention
、lookup.cache-max-disk-size
和lookup.cache-max-memory-size
。 - 去重:支持通过
changelog-producer.row-deduplicate
避免生成相同记录的 -U, +U 变更日志。
- 配置:通过指定
-
Full Compaction
- 配置:通过指定
'changelog-producer' = 'full-compaction'
,Paimon 在全量压缩之间比较结果并生成变更日志。 - 延迟:变更日志的延迟受全量压缩频率影响。
- 优化:可通过设置表属性
full-compaction.delta-commits
来调节全量压缩的频率(默认为 1)。 - 适用场景:适用于高延迟场景(如 10 分钟)且资源消耗较低。支持
changelog-producer.row-deduplicate
以避免生成相同记录的 -U, +U 变更日志。
- 配置:通过指定
变更日志生成和使用场景:
-
None:适合无需变更日志的场景,适用于数据库系统和某些 Flink 应用。
-
Input:适合输入即为完整变更日志的场景,如 CDC 数据流。
-
Lookup:适合需要高效生成变更日志但资源消耗可控的场景。
-
Full Compaction:适合高延迟容忍度场景,通过全量压缩生成准确的变更日志。
-
性能优化配置:适当增加 Flink 的
execution.checkpointing.max-concurrent-checkpoints
配置,以提升性能。
4.2 序列字段和行类型字段
在创建表时,可以通过指定 sequence.field
定义用于确定更新顺序的字段,或通过 rowkind.field
确定记录变更日志的类型。
序列字段(Sequence Field
)
-
默认行为:主键表根据输入顺序确定合并顺序,最后输入的记录将最后合并。然而,在分布式计算中,可能会出现数据乱序的情况。
-
配置
sequence.field
:可以使用一个时间字段作为序列字段,以确保按时间顺序合并记录。例如:CREATE TABLE my_table ( pk BIGINT PRIMARY KEY NOT ENFORCED, v1 DOUBLE, v2 BIGINT, update_time TIMESTAMP ) WITH ( 'sequence.field' = 'update_time' );
-
工作机制:
- 记录按
sequence.field
值最大的记录最后合并。 - 如果
sequence.field
值相同,则按输入顺序确定最后合并的记录。 - 支持所有数据类型的字段并可定义多个字段进行比较(如
update_time,flag
)。
- 记录按
-
注意事项:用户定义的序列字段与
first_row
和first_value
等特性可能冲突,导致意外结果。
行类型字段(Row Kind Field
)
- 默认行为:主键表根据输入行确定行类型。
- 配置
rowkind.field
:可通过字段提取行类型。有效的行类型字符串有+I
(插入),-U
(更新前),+U
(更新后)或-D
(删除)。
配置示例 1:使用时间戳字段作为序列字段
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
amount DOUBLE,
status STRING,
update_time TIMESTAMP
) WITH (
'sequence.field' = 'update_time'
);
配置示例 2:使用多个字段作为序列字段
CREATE TABLE inventory (
item_id BIGINT PRIMARY KEY NOT ENFORCED,
stock INT,
flag INT,
last_updated TIMESTAMP
) WITH (
'sequence.field' = 'last_updated,flag'
);
配置示例 3:定义行类型字段
CREATE TABLE transactions (
transaction_id BIGINT PRIMARY KEY NOT ENFORCED,
amount DOUBLE,
transaction_type STRING,
row_kind VARCHAR
) WITH (
'rowkind.field' = 'row_kind'
);
4.3 压缩
在 LSM
树中,随着更多记录的写入,排序运行的数量会增加。查询 LSM
树需要合并所有排序运行,过多的排序运行会导致查询性能下降甚至内存不足。因此,必须定期将多个排序运行合并成一个大的排序运行,称为压缩(Compaction
)。
-
解决问题:
- 减少 Level 0 文件,避免查询性能下降。
- 通过
changelog-producer
生成变更日志。 - 为 MOW 模式生成删除向量。
- 快照、标签和分区的过期管理。
-
限制:
- 同一分区的压缩任务只有一个,否则会引发冲突。
- 写入性能几乎总是受到压缩的影响,因此调整是至关重要的。
异步压缩(Asynchronous Compaction)
压缩本质上是异步的,但如果希望它完全异步且不阻塞写入,可以配置最大写入吞吐量,并让压缩以较慢的速度进行。以下策略可用:
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
lookup-wait = false
这种配置在写入高峰期间生成更多文件,并在写入较少的期间逐步合并以优化读取性能。如果多个任务写入同一个表,需要分离压缩任务,可以使用专用的压缩任务。可以在压缩过程中配置记录级过期时间以自动过期记录。相关配置包括:
record-level.expire-time
:记录保留时间。record-level.time-field
:记录级过期的时间字段。record-level.time-field-type
:记录级过期时间字段的类型,如seconds-int
或millis-long
。
全量压缩(Full Compaction)
Paimon 使用 Universal-Compaction。当增量数据过多时,会自动执行全量压缩,以确保读取性能。可以通过配置定期执行全量压缩:
compaction.optimization-interval
:定期执行全量压缩的时间间隔。full-compaction.delta-commits
:在 delta 提交后不断触发全量压缩。
压缩选项(Compaction Options)
-
停止写入的排序运行数量:
- 配置
num-sorted-run.stop-trigger
来设置触发停止写入的排序运行数量阈值。
- 配置
-
触发压缩的排序运行数量:
- 配置
num-sorted-run.compaction-trigger
来设置触发压缩的最小排序运行数量。
- 配置
-
溢出的排序阈值:
- 配置
sort-spill-threshold
来防止过多读者占用过多内存导致内存不足(OOM)。
- 配置
使用场景和策略
-
高效数据压缩和查询性能优化:
- 策略:通过配置适当的压缩阈值和策略,确保在高效写入数据的同时,保持良好的查询性能。
-
异步和专用压缩任务:
- 策略:在写入高峰期间,使用异步压缩和专用压缩任务,减少对写入性能的影响,并在低峰期间优化读取性能。
-
记录过期管理:
- 策略:使用记录级过期时间配置,自动管理历史数据的过期,确保数据的有效性和存储空间的优化。
5. 总结
本文全面阐述了 Apache Paimon 的主键表相关知识。在数据结构方面,介绍了主键表、LSM 树、Sorted Runs 等概念及其特点;桶的概念、分桶方式及不同类型桶的特性;表模式中的 MOR、COW、MOW 模式及其适用场景。合并引擎部分涵盖了去重、部分更新、聚合引擎等类型及其工作机制与应用场景。相关组件中详细讲解了变更日志生成器的不同类型及适用场景、序列字段和行类型字段的作用、压缩的必要性及异步压缩和全量压缩的特点与配置。
标签:精读,记录,Docs,写入,合并,007,Paimon,主键,日志 From: https://blog.csdn.net/jankin6/article/details/144171997