首页 > 其他分享 >一文聊透 Flink 的 Temporal Join

一文聊透 Flink 的 Temporal Join

时间:2024-03-23 14:29:20浏览次数:26  
标签:时态 Join Temporal Flink 聊透 版本 Table

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

坦率地讲,Flink 在 Temporal Join 以及与其密切相关的 Temporal Table (时态表)等概念上是有一些混乱的,可能是历史原因造成的,所以给理解这部分的知识带来了不少麻烦。本文希望把所有与 Temporal Join 相关的概念进行一次统一梳理,用更规范的知识结构把 Flink 的 Temporal Join 介绍清楚。为了表述准确和简洁,我们先约定一些概念,为了区分 join 的两张表,我们根据习惯这样来命名:

  • 主动关联的表:事实表 / 左表 / left input / probe side
  • 被关联的表:维表 / 右表 / right input / build side

本着实用原则,我们先把本文总结出的 Temporal Join 知识用以下表格呈现出来,后文着力要做的就是这这张表解释清楚:

场景 & 需求简单描述从 时态表 的角度解释从 Temporal Join 的角度解释基于 Temporal Table DDL 实现基于 Temporal Table Function 实现
事实表想关联自己的记录所代表的事件在发生时(事件时间)维表中对应的当时的数据当时对当时Join 时态表 (版本表)上对应时期的某个版本基于 事件时间 的 Temporal Join支持支持
事实表想关联维表当前最新的数据当时对最新Join 时态表 (版本表)上的最新版本基于 处理时间 的 Temporal Join不支持(曾经支持,后因语义问题和Bug在新版本中移除)支持(同样有语义问题,出于兼容考虑保留,但并未在 1.17.1 上测试成功)

1. 两个基础概念


要搞清楚 Tmporal Join 就必须得先弄清楚:时态表 (Temporal Table) 和 时态表函数(Temporal Table Function)这两个基础概念。总得来说,你可以认为:时态表 (Temporal Table) 和 时态表函数(Temporal Table Function)用各自的方式实现了对一张表的版本回溯(与 Hudi 中的 Time Travel 性质相同),但它们之间是没有什么直接关系的,是两种并行机制,时态表函数是一个 UDTF,它能模拟时态表的功能,使得通过这个 UDTF 访问一张表时就像是在访问一张时态表。以下是对两个概念的详细介绍。

1.1. 时态表(Temporal Table)

《Flink:动态表 / 时态表 / 版本表 / 普通表 概念区别澄清》一文中,我们特别澄清过 时态表(Temporal Table)的概念,这里再简述一遍:

时态表:是一张数据会随时间变化的表(任何表都会随时间变化,这里特指在流上),在 Flink 流上的表其实都是时态表,它们都会伴随输入数据发生改变,所以广义的时态表概念并没有特别之处,广义的时态表就是 Flink 流上的动态表,反之依然,即:广义时态表 <=> 动态表。

时态表真正作为一个独立概念被提出来是因为它的两个细分种类:版本表 和 普通表,特别是 版本表,实际上,我们发现很多文章(也包括官方文档),使用 “时态表”(Temporal Table)这个称谓时,其实指得就是 “版本表”(Versioned Table),这一点一定要根据上下文加以区分,可能是因为 “普通表” 这个概念确实没有特别之处,人们几乎不会单独引用和讨论它,而时态表这个称谓中的“时态”二字要比“版本”二字更形象,所以人们慢慢就开始使用 “时态表” 来指代 “版本表”。

我们用如下的结构将时态表的相关概念整理一下:


时态表(Temporal Table)== 动态表(Dynamic Table),但概念侧重点和上下文不同

​ ├── 版本表:如果一张时态表能保存记录的历史版本,并可以回溯(读取历史上的某个版本),这就是一张版本表

​ └── 普通表:如果一张时态表只保存当前最新记录,无历史版本,这就是一张普通表


1.1.1 版本表 (Versioned Table)


版本表:是一张能记录数据变更历史并能访问(读取)过去某个版本的表。版本表的概念其实并不新奇, Apache Hudi 中的表其实也是典型的版本表,支持 Time Travel。不管在什么系统里,实现一张版本表都必须得满足以下两个基本条件:

  • 记录必须要有一个主键(用于标识记录的唯一性)
  • 记录必须要有一个标记记录本身的时间戳(用于标识同一条记录在时间线上的唯一性,即版本)

因为这两个条件是在逻辑上实现版本回溯(Time Travel)所必须的。所以,对于 Flink 来说,定义一张 版本表 就需要同时指定 PRIMARY KEYWATERMARK (用于标记事件时间字段)两个字段,同样的,对于 Hudi 来说,要定义一张 Hudi 表就得同时指定 hoodie.datasource.write.recordkey.fieldprecombine.field 两个字段,这不是巧合,而是必然。

此外,从构成 版本表 的数据结构上看,能同时满足上述两个特征的数据往往都是 changelog 型的数据,典型的代表是数据库的 CDC 数据(例如 debezium-json 格式的数据)。当然,这不是说只有 changelog 型的数据才能构建版本表,构建版本表的充分必要条件还是上述两点。

以下是通过 Flink SQL 定义一张版本的示例,如 SQL 中的注释,关键点就是:有 PRIMARY KEY,有 WATERMARK ,这两个配置和它们对应的字段决定了这是一张版本表。同样的,呼应前文的解释,在大多数文章中,所谓的 “时态表”(Temporal Table)其实通常指的都是下面这样的 “版本表”(Versioned Table),本文后文会沿用官方文档使用的说法 “Temporal Table DDL” 指得也是下面这种表的定义方式,它其实是张“版本表”

-- 定义一张版本表
-- 只有同时定义了主键和事件时间字段的表才是一张版本表
-- 通过 CDC 技术从数据库采集的 changelog 数据是构成版本表的数据“典型”数据
-- 但并不是说:版本表的数据一定是 changelog 型的数据,只要满足有主键和事件时间字段数据,就可以定义为版本表
CREATE TABLE product_changelog (
  product_id STRING,
  product_name STRING,
  product_price DECIMAL(10, 4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product_id) NOT ENFORCED,      -- 版本表特征(1) 定义主键
  WATERMARK FOR update_time AS update_time   -- 版本表特征(2) 定义事件时间字段(通过 watermark 定义事件时间)              
) WITH (
  'connector' = 'kafka',
  'topic' = 'products',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'debezium-json'
);

1.1.2 普通表 (英文文档已无此概念)


普通表 确实没有任何可强调的,不像版本表中的 PRIMARY KEY WATERMARK 必要配置,它没有任何“额外”特征。我们还注意到,在 英文版的 Flink 文档 中直接移除了普通表的介绍。

不过,我们还是要强调一点:“普通表” 其实并不普通,这要看跟谁比,普通表毕竟是流上的一张动态表,具备动态表的所有特征,只是不能像版本表那样进行 Time Travel。

最后,关于普通表常会产生的一个误解是:认为 “基于处理时间的 Temporal Join” 关联的是一张普通表,从 “基于处理时间的 Temporal Join” 的关联效果上看确实像是在关联一张普通表,因为它关联到的总是表中的当前版本,但这种理解是错误的,“基于处理时间的 Temporal Join” 关联的必定是一张版本表,之一点从 Temporal Table Function 的定义方法上就能判定:Table.createTemporalTableFunction(“时间属性列”, “主键列”),实际上,所有的 Temporal Join,不管是 DDL 还是 Function 方式实现,关联的表(维表)都必须是版本表,只不过,“基于处理时间的 Temporal Join” 总是关联最新的版本,不会关联过去的某个版本,所以才会让人误解它关联的是一张普通表。

1.2. 时态表函数(Temporal Table Function)


其实,我们应该先介绍 时态表函数(Temporal Table Function),因为,它是早于 时态表(Temporal Table)之前就有的机制,是早期 Flink 中实现“类版本表”功能和基于事件时间或处理时间进行 Temporal Join 的维一手段:

Flink 很早就支持一种名为 时态表函数 JOIN 的关联操作,它允许用户对一个自定义表函数 UDTF 执行关联操作。换句话说,UDTF 可以返回一张虚拟表,它可以是从外部系统实时查到的,也可以是动态生成的,非常灵活。在没有上述各种复杂 JOIN 的上古年代,这是为数不多表关联方法。引用自:《Flink SQL 双表 JOIN 介绍与原理简析》

“时态表函数”(Temporal Table Function)和 “时态表”(Temporal Table)并没有太大的关系,两者是完全不同的实现机制,Temporal Table Function 是以 UDTF 的形式去模拟“时态表”的行为,所以,我们把它与 Temporal Table DDL 放在一起讨论是因为它们是实现 Temporal Join 的两种方式!

2. Temporal Join - 流式 Join 的特有问题


现在,我们来介绍 Temporal Join,很奇怪的一点是:Flink 的官方文档并没有正式解释过 “Temporal Join” 这个概念,很过文章直接把 Temporal Join 给翻译成了 Temporal Table Join,这多少是有些问题的,准确地说,Temporal Table Join 只是实现 Temporal Join 的一种方式(另一种是方式是 Temporal Table Function),并没有解释这个概念本身。

Temporal Join 其实可以直译为 “时态 Join”,它是在流上 Join 数据(两动态表之间) 所特有的,在批处理上 Join 数据并不存在 “时态” 概念,而在流上的表是一直在变化的,流式 Join 作为一个连续查询(Continuous Query)也是持续在进行中的,这就会面临一个问题:当被关联的表发生了变更,join 输出的结果应该如何应对呢?是取变更前还是变更后的数据呢?流处理引擎应该能根据用户需要选择是关联当时的数据还是现在最新的数据,这就是 Temporal Join(时态 Join)所要解决的问题

这里,我们还要补充很重要的一点: Flink 整个关于 Temporal Join 部分的讨论都是建立在 join 的右表 (right input/build side) 是一张 “版本表” 的前提下展开的,因为不管是 Temporal Table DDL 方式还是 Temporal Table Function 方式,它们都强制要求表必须要同时具备主键和一个时间戳字段,就是要求他们必须是版本表,如果一张表不是版本表,根本无法进行 Temporal Join

2.1 Temporal Join 的两种实现方式


了解了 “时态表”(Temporal Table)和 “时态表函数”(Temporal Table Function)两个基础概念之后,就可以介绍 Temporal Join 的两种实现方式了,因为它们分别就是利用了这两种机制实现的。

回到本节主题,基于上一节的介绍,从实现的方式上看,Flink 其实支持两套风格/方式的 Temporal Join:

  • Temporal Table DDL 配合 FOR SYSTEM_TIME AS OF 关键字( SQL 2011 标准 )
  • Temporal Table Function 配合 LATERAL TABLE 关键字(Flink 早期的实现)

2.2 Temporal Join 的两种时间标准


在前面介绍 Temporal Join 的概念时就提到了一个关键的问题:当被关联的表发生了变更,join 输出的结果应该取变更前还是变更后的数据呢?要想把这个问题解释清楚,必须要展开描述,并且还得结合示例才能解释明白,我们就以官方文档使用的”订单“ 和”汇率表“的 join 场景为例来解释:

需求是将订单的总价换算成统一货币美元进行计价,这就要求将流上的订单表和流上的汇率表进行 Join,汇率表是被关连表,同时会实时更新。此外还应注意到:定单有“下单时间”,汇率表有“变更时间”,两条记录进行 Join 时还有一个“处理时间”,虽然在实时处理场景下,流上的“下单时间” 和 “汇率变更时间” 与当下时间(就是“处理时间”) 不会有很大的差距,但它们毕竟是不同的时间点,那么,现在的问题就是:当我们在流上 join 订单和汇率数据时,在根据币种关联上汇率表后,我们应该选取哪个时间点(哪个版本)上的汇率呢?逻辑上,我们有两种选择:

  • 选择一:根据“下单时间”,找到 “当时” 的汇率进行 join,那这就是 “基于事件时间的 Temporal Join”
  • 选择二:总是简单地 join 汇率表中的”当前“的最新汇率进行 join,那这就是 “基于处理时间的 Temporal Join”

如果仅以”汇率换算“这个场景来说,最合理的选择无疑应该是 “基于事件时间的 Temporal Join”,但是,这种 Temporal Join 显然是有条件的,首先,被关联的表(此处的汇率表)必须是一张版本表,我们前面也介绍过,想到构建一张版本表,大概率你得使用 CDC 数据才行,这是一道门槛,另外,关联表上还得有合理的 ”事件时间“属性,这是第二道门槛,所以,你会发现,“基于事件时间的 Temporal Join” 虽好,但你的表(数据)并不一定能支持你去这样 join.

若条件不允许,使用 “基于处理时间的 Temporal Join” 其实是更普遍的选择,抛开”汇率换算“这个场景,很多的维表其实都是缓慢变化的(不会像汇率这样高频的变更),在这类场景下,流入的数据并不需要追溯维表上对应时刻的版本值,仅仅简单地 join 表中”当前“的数据足以满足需求,这就是为什么大量的维表 Join 使用的都是“基于处理时间的 Temporal Join” ,因为它的实用门槛确实很低:它既不要求被关联表是一张版本表(不用 changelog 数据来构建),也不要求关联表必须要有时间属性。

比 “基于处理时间的 Temporal Join” 门槛更低,也更容易实现的是 Lookup Join,注意,它们是不同的两种 Join 方式,虽然在 Join 的语法上是一样的,关于两者的差别,我们会在后续文章中专门讨论。

3. 组合起来:四种可能的 Temporal Join 方式


既然我们有两种 Temporal Join 的实现方式,又有两种 Join 的时间标准,那么,组合在一起,我们应该有四种 Temporal Join 方式,它们分别是:

  1. 以 Temporal Table DDL 方式实现基于事件时间的 Temporal Join => 最标准,也是最符合通常预期的 Temporal Join 方式,但对数据供给有要求

  2. 以 Temporal Table DDL 方式实现基于处理时间的 Temporal Join => 现已不再支持(曾经支持,后因语义和Bug在新版中移除)

  3. 以 Temporal Table Function 方式实现基于事件时间的 Temporal Join => 与方式 1 相同,但必须在代码中实现,不推荐

  4. 以 Temporal Table Function 方式实现基于处理时间的 Temporal Join => 与方式 2 相同,文档说支持,并未实测成功,刚方式可完全被 Lookup Join 替代,亦无再研究的必要

但实际情况是:方式 2 目前已不再支持;方式 3 与方式 1 相同,但必须在代码中实现,不推荐;方式 4 在 Flink 1.17.1 上并未测通,原因待查。所以,在 Temporal Join 这一块,最实用的是 ”以 Temporal Table DDL 方式实现基于事件时间的 Temporal Join“,外部维表 Join 可以使用 Lookup Join。

4. 代码演示


接下来,我们针对上述 4 种 Temporal Join 方式,我们逐一给出示例代码:

标签:时态,Join,Temporal,Flink,聊透,版本,Table
From: https://blog.csdn.net/bluishglc/article/details/136707197

相关文章

  • 2023-12-22-flink-cdc使用
    应用场景、上手体验应用场景FlinkCDC(ChangeDataCapture)是一种用于捕获和处理数据源中的变化的流处理技术。它可以实时地将数据源中的增量更新捕获到流处理作业中,使得作业可以实时响应数据变化。以下是FlinkCDC的一些常见应用场景:数据仓库和实时分析:FlinkCDC可以......
  • Flink实战之Flink乱序场景汇总
    目录一数据乱序场景1数据源乱序2ETL造成乱序二Flink处理乱序数据方案1Watermark和EventTime模式2提前创建保序任务3使用事务性Sink保证下游数据时序三结语       在数据处理领域,无论离线批处理领域还是实时流处理领域,数据时序性对于最终数据的......
  • Flink 自定义 ClickHouse Table Connector 的简单实现
    本次实现基于Flink1.18版本,具体的对象之间的关系可以先参考官网的图:先拿官网上的Socket示例来说一下实现过程:首先编写SocketDynamicTableFactory实现DynamicTableSourceFactory接口。在SocketDynamicTableFactory中会返回SocketDynamicTableSource,同时返回实现......
  • 【Flink】Flink 使用 JobListener 监听 任务状态
    1.概述flink程序运行起来,需要获取flink的任务状态,我们有几种方式获取。使用yarn命令或者,查看任务状态使用flink指标监控查看flink任务状态使用JobListener监听任务状态下面我们来主要介绍如何使用JobListener监听任务状态。2.使用JobListener的使用很简单,我们只需......
  • 中电金信:技术实践|Flink维度表关联方案解析
    ​导语:Flink是一个对有界和无界数据流进行状态计算的分布式处理引擎和框架,主要用来处理流式数据。它既可以处理有界的批量数据集,也可以处理无界的实时流数据,为批处理和流处理提供了统一编程模型。 维度表可以看作是用户来分析数据的窗口,它区别于事实表业务真实发生的数据,通常......
  • Flink实时写Hudi报NumberFormatException异常
    Flink实时写Hudi报NumberFormatException异常问题描述在Flink项目中,针对Hudi表xxxx_table的bucket_write操作由于java.lang.NumberFormatException异常而从运行状态切换到失败状态。异常信息显示在解析字符串"ddd7a1ec"为整数时出现了问题。报错如下:bucket_write:......
  • 【快捷部署】002_Flink
    Flink一键安装(本地模式)install-flink.sh脚本内容#!/bin/bash####变量###执行脚本的当前目录mydir=$(cd"$(dirname"$0")";pwd)echo$mydir#flink安装目录flink=/flink#检查点目录cp=$flink/checkpoints/#保留点目录sp=$flink/savepoints/#tasknumber数量ta......
  • flink部署模式和运行模式
    flink部署模式部署模式:flink里面的计算程序运行的方式sessionsession模式一个flink集群可以跑多个计算任务,资源共享session模式下集群是提前启动的,然后向flink集群提交jobper-job(高版本已经不推荐了)per-job模式下,一个集群只跑一个计算任务,资源独立,集群的启动是跟随......
  • centos7安装flink
    local模式环境说明,flink需要jdk,并且flin.2k1.17,需要的是jdk11,jdk17不行,实测jdk1.8也行下载flink包wgethttps://dlcdn.apache.org解压#解压tar-zxvfflink-1.17.2-bin-scala_2.12.tgz#进入flink目录cdflink-1.17.2修改配置文件viconf/flink-conf.yaml#允......
  • Flink CDC简介-flinkcdc-jian-jie
    FlinkCDC官方文档什么是FlinkCDC¶FlinkCDCConnectors是ApacheFlink的一组源连接器,使用变更数据捕获(CDC)从不同数据库中获取变更。FlinkCDCConnectors集成Debezium作为捕获数据变化的引擎。所以它可以充分发挥Debezium的能力。详细了解Debezium是什么。支......