首页 > 其他分享 >聊聊流式数据湖Paimon(三)

聊聊流式数据湖Paimon(三)

时间:2023-12-25 17:13:13浏览次数:42  
标签:记录 分区 bucket 流式 聊聊 Paimon order Append

概述

如果表没有定义主键,则默认情况下它是仅追加 表类型(Append Only Table)。 根据桶(Bucket)的定义,我们有两种不同的仅追加模式:"Append For Scalable Table"和"Append For Queue";两种模式支持不同的场景,提供不同的功能。
只能向表中插入一条完整的记录。 不支持删除或更新,并且不能定义主键。 此类表适合 不需要更新的用例(例如日志数据同步)。

Append 场景特指"无主键"的场景,比如日志数据的记录,不具有直接Upsert更新的能力。

Append For Scalable Table

其支持的功能如下:

  1. 支持批读批写 INSERT OVERWRITE
  2. 支持流读流写 自动合并小文件
  3. 支持湖存储特性 ACID、Time Travel
  4. order与z-order排序

Definition

通过在表属性中定义 'bucket' = '-1',可以为此表分配特殊模式(我们称之为"unaware-bucket 模式")。 在这种模式下,一切都不同了。 我们已经没有了桶的概念,也不保证流式读取的顺序。 我们将此表视为批量离线表(尽管我们仍然可以流式读写)。 所有记录都会进入一个目录(为了兼容性,我们将它们放在bucket-0中),并且我们不再维护顺序。 由于我们没有桶的概念,所以我们不会再按桶对输入记录进行混洗,这将加快插入速度。
使用此模式,可以将 Hive 表替换为 Lake 表。
image.png

Compaction

在unaware-bucket模式下,我们不在writer中进行压缩,而是使用Compact Coordinator扫描小文件并将压缩任务提交给Compact Worker。 这样,我们就可以轻松地对一个简单的数据目录进行并行压缩。 在流模式下,如果在flink中运行insert sql,拓扑将是这样的:
image.png
它会尽力压缩小文件,但是当一个分区中的单个小文件长时间保留并且没有新文件添加到该分区时,压缩协调器会将其从内存中删除以减少内存使用。 重新启动作业后,它将扫描小文件并将其再次添加到内存中。 控制紧凑行为的选项与 Append For Qeueue 完全相同。 如果将 write-only 设置为 true,Compact Coordinator 和 Compact Worker 将从拓扑中删除。
自动压缩仅在 Flink 引擎流模式下支持。还可以通过 paimon 中的 flink 操作在 flink 中启动压缩作业,并通过 set write-only 禁用所有其他压缩。

Sort Compact

每个分区中的数据乱序会导致选择缓慢,压缩可能会减慢插入速度。 将插入作业设置为只写是一个不错的选择,并且在每个分区数据完成后,触发分区排序压缩操作。

Streaming Source

Unaware-bucket模式 Append Only Table 支持流式读写,但不再保证顺序。 你不能把它看作一个队列,而是一个有bin的湖。每次提交都会生成一个新的binbin存储记录 来读取增量,但是一个 bin 中的记录会流向它们想要的任何地方,并且我们以任何可能的顺序获取它们。 在Append For Queue模式下,记录不存储在bin中,而是存储在record pipe中。 记录 存储,我们可以通过读取新的存储记录 来读取增量,但是一个 bin 中的记录会流向它们想要的任何地方,并且我们以任何可能的顺序获取它们。 在Append For Queue模式下,记录不存储在bin中,而是存储在record pipe中。

bin:储物箱

Streaming Multiple Partitions Write

由于Paimon-sink需要处理的写入任务数量为:数据写入的分区数量 * 每个分区的桶数量。 因此,我们需要尽量控制每个paimon-sink任务的写任务数量,使其分布在合理的范围内。 如果每个sink-task处理过多的写任务,不仅会导致小文件过多的问题,还可能导致内存不足的错误。
另外,写入失败会引入孤儿文件,这无疑增加了维护paimon的成本。 我们需要尽可能避免这个问题。
对于启用自动合并的 flink-jobs,我们建议尝试按照以下公式来调整 paimon-sink 的并行度(这不仅仅适用于append-only-tables,它实际上适用于大多数场景):

(N*B)/P < 100   (This value needs to be adjusted according to the actual situation)
N(the number of partitions to which the data is written)
B(bucket number)
P(parallelism of paimon-sink)
100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)

还可以将 write-buffer-spillable 设置为 true,writer 可以将记录溢出到磁盘。 这可以尽可能地减少小文件。要使用此选项,的 flink 集群需要有一定大小的本地磁盘。 这对于那些在 k8s 上使用 flink 的人来说尤其重要。
对于仅追加表,您可以为仅追加表设置 write-buffer-for-append 选项。 将此参数设置为true,writer将使用Segment Pool缓存记录以避免OOM。

Example

以下是创建Append-Only表并指定bucket key的示例。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT
) WITH (
  'bucket' = '-1'
);

Append For Queue

其支持的功能如下:

  1. 严格保证顺序,可以带消息队列
  2. 支持Watermark且对齐
  3. 自动合并小文件
  4. 支持Consumer-ID (类似Group-ID)

Definition

在这种模式下,可以将append-only table看成是一个由bucket分隔的队列。 同一个桶中的每条记录都是严格排序的,流式读取会严格按照写入的顺序将记录传输到下游。 使用此模式,不需要进行特殊配置,所有数据都会以队列的形式放入一个桶中。还可以定义bucketbucket-key以实现更大的并行性和分散数据。
image.png

Compaction

默认情况下,sink节点会自动进行compaction来控制文件数量。 以下选项控制压缩策略:
image.png

Streaming Source

目前仅 Flink 引擎支持流式源行为。

Streaming Read Order

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果 scan.plan-sort-partition 设置为 true,则首先生成分区值较小的记录。
    • 否则,将先产生分区创建时间较早的记录。
  • 对于来自同一分区、同一桶的任意两条记录,将首先产生第一条写入的记录。
  • 对于来自同一分区但两个不同桶的任意两条记录,不同的桶由不同的任务处理,它们之间没有顺序保证。
Watermark Definition

定义读取 Paimon 表的watermark:

CREATE TABLE T (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
 TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

还可以启用 Flink Watermark 对齐,这将确保没有源/拆分/分片/分区将其 Watermark 增加得远远超出其他部分:
image.png

Bounded Stream

Streaming Source 也可以是有界的,指定 scan.bounded.watermark 来定义有界流模式的结束条件,流读取将结束,直到遇到更大的 watermark 快照。
快照中的watermark 是由writer生成的,例如,指定kafka源并声明watermark 的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的watermark,以便流式读取此Paimon表时可以使用有界watermark的功能。

CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

Example

以下是创建Append-Only表并指定bucket key的示例。

CREATE TABLE MyTable (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

参考

基于 Apache Paimon 的 Append 表处理
Apache Paimon 实时数据湖 Streaming Lakehouse 的存储底座

标签:记录,分区,bucket,流式,聊聊,Paimon,order,Append
From: https://www.cnblogs.com/zhiyong-ITNote/p/17926519.html

相关文章

  • 聊聊流式数据湖Paimon(二)
    当前的问题ApachePaimon最典型的场景是解决了CDC(ChangeDataCapture)数据的入湖;CDC数据来自数据库。一般来说,分析需求是不会直接查询数据库的。容易对业务造成影响,一般分析需求会查询全表,这可能导致数据库负载过高,影响业务分析性能不太好,业务数据库一般不是列存,查询部......
  • 聊聊流式数据湖Paimon(一)
    翻译自ApachePaimon官方文档概览概述ApachePaimon(incubating)是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。简单来说,Paimon的上游是各个CDC,即changlog数据流;而其自身支持实时sink与search(下沉与查询)changlog数据流......
  • 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
    本文整理自阿里云智能开源表存储负责人,FounderofPaimon,FlinkPMC成员李劲松在云栖大会开源大数据专场的分享。本篇内容主要分为三部分:数据分析架构演进介绍ApachePaimonFlink+Paimon流式湖仓一、数据分析架构演进目前,数据分析架构正在从Hive到Lakehouse的演变。传统数......
  • 聊聊Flink必知必会(七)
    WhatisState虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但某些操作会记住多个事件的信息(例如窗口算子)。这些操作称为有状态的(stateful)。有状态操作的一些示例:当应用程序搜索某些事件模式(eventpatterns)时,状态(state)将存储迄今为止遇到的事件序列。......
  • 聊聊Flink必知必会(六)
    Flink是一个分布式系统,需要有效地分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,如HadoopYARN和Kubernetes,但也可以设置为作为一个独立的集群运行,甚至作为一个库。Flink集群的剖析Flink运行时由两种类型的进程组成:一个JobManager和一个或多个taskma......
  • 【流式传输】使用Spring Boot实现ChatGpt流式传输
    引言在ChatGpt火了这么久,他的那种单字单字返回的格式可能让很多朋友感到好奇,在之前我用c#写了一个版本的,同时支持IAsyncEnumerable以及SSE,今天把之前写的Java版本的也发出来,和大家一起学习,有不对的地方,欢迎各位大佬指正。Code我这边用的是JDK21版本,可以看到下......
  • 聊聊如何实现热插拔AOP
    前言之前偶然看到一篇文章利用aop实现热拔插(类似于插件),里面的实现挺好玩。今天我们也来玩一把前置知识Advice:org.aopalliance.aop.Advice“通知”,表示Aspect在特定的Joinpoint采取的操作。包括“around”,“before”and“after等Advice,大体上分为了三类:Befor......
  • 聊聊神经网络的优化算法
    优化算法主要用于调整神经网络中的超参数,使得训练数据集上的损失函数尽可能小。其核心逻辑是通过计算损失函数对参数的梯度(导数)来确定参数更新方向。SGDStochasticGradientDescent(随机梯度下降法):随机梯度下降算法是一种改进的梯度下降方法,它在每次更新参数时,只随机选择一个......
  • 理解 Paimon changelog producer
    介绍目的Chaneglogproducer的主要目的是为了在Paimon表上产生流读的changelog,所以如果只是批读的表是可以不用设置Chaneglogproducer的.一般对于数据库如MySQL来说,当执行的语句涉及数据的修改例如插入、更新、删除时,MySQL会将这些数据变动记录在binlog中。相......
  • 聊聊GLM基座模型的理论知识
    概述大模型有两个流程:预训练和推理。预训练是在某种神经网络模型架构上,导入大规模语料数据,通过一系列的神经网络隐藏层的矩阵计算、微分计算等,输出权重,学习率,模型参数等超参数信息。推理是在预训练的成果上,应用超参数文件,基于预训练结果,根据用户的输入信息,推理预测其行为。G......