文章目录
- Flink 系列文章
- 一、Group Aggregation分组聚合
- 1、count示例
- 2、group by的聚合示例
- 3、distinct 聚合
- 4、GROUPING SETS
- 1)、ROLLUP
- 2)、CUBE
- 5、Having
- 二、Over Aggregation
- 1、语法
- 1)、ORDER BY
- 2)、PARTITION BY
- 3)、Range Definitions
- 4)、RANGE intervals
- 5)、ROW intervals
- 2、示例
- 三、Window Join
- 1、INNER/LEFT/RIGHT/FULL OUTER
- 2、SEMI(IN/EXISTS)
- 3、ANTI(NOT IN/EXISTS)
- 4、Limitation限制
- 1)、Join 子句的限制
- 2)、windowing TVFs 输入限制
- 3)、Limitation on Window Join which follows after Windowing TVFs directly
本文介绍了Flink 的分组聚合、over聚合和window join操作、当前版本的限制以及具体的运行示例。
本文依赖flink和kafka集群能正常使用。
本文分为3个部分,即介绍了Flink 分组聚合、over聚合以及window join,并且每个内容均以验证通过示例进行说明。
本文运行环境是Flink 1.17版本。
一、Group Aggregation分组聚合
像大多数数据系统一样,Apache Flink 支持聚合函数:内置和用户定义。用户定义的函数必须在使用前在目录中注册。
聚合函数从多个输入行计算单个结果。例如,有一些聚合可以计算一组行的COUNT, SUM, AVG (average), MAX (maximum) 和 MIN (minimum) 。
下文用到的数据源为orders3,其数据结构以及数据为
Flink SQL> desc orders3;
+----------+-----------------------------+-------+-----+---------------+-----------+
| name | type | null | key | extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
| id | STRING | TRUE | | | |
| u_id | STRING | TRUE | | | |
| item | STRING | TRUE | | | |
| price | DECIMAL(32, 2) | TRUE | | | |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
+----------+-----------------------------+-------+-----+---------------+-----------+
Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | id | u_id | item | price | proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 5 | alan | 18.00 | 2023-09-21 08:12:58.654 |
| +I | 2 | 3 | alanchan | 23.80 | 2023-09-21 08:12:58.654 |
| +I | 3 | 1 | chan | 32.66 | 2023-09-21 08:12:58.654 |
| +I | 4 | 3 | iphone | 22.88 | 2023-09-21 08:12:58.654 |
| +I | 5 | 6 | usb | 26.80 | 2023-09-21 08:12:58.654 |
| +I | 6 | 3 | hd | 100.80 | 2023-09-21 08:12:58.655 |
| +I | 7 | 1 | tv | 4002.88 | 2023-09-21 08:12:58.655 |
1、count示例
SELECT COUNT(*) FROM Orders
Flink SQL> select count(*) from orders3;
+----+----------------------+
| op | EXPR$0 |
+----+----------------------+
| +I | 1 |
| -U | 1 |
| +U | 2 |
| -U | 2 |
| +U | 3 |
| -U | 3 |
| +U | 4 |
| -U | 4 |
| +U | 5 |
| -U | 5 |
| +U | 6 |
| -U | 6 |
| +U | 7 |
2、group by的聚合示例
对于流式查询,重要的是要了解 Flink 运行永不终止的连续查询。相反,他们根据其输入表上的更新更新其结果表。对于上面的查询,Flink 会在每次将新行插入 Orders3 表时输出更新的计数。
Apache Flink 支持用于聚合数据的标准 GROUP BY 子句。
SELECT u_id,COUNT(*)
FROM orders3
GROUP BY u_id;
Flink SQL> SELECT u_id,COUNT(*)
> FROM orders3
> GROUP BY u_id;
+----+--------------------------------+----------------------+
| op | u_id | EXPR$1 |
+----+--------------------------------+----------------------+
| +I | 5 | 1 |
| +I | 3 | 1 |
| +I | 1 | 1 |
| -U | 3 | 1 |
| +U | 3 | 2 |
| +I | 6 | 1 |
| -U | 3 | 2 |
| +U | 3 | 3 |
| -U | 1 | 1 |
| +U | 1 | 2 |
对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小取决于组的数量以及聚合函数的数量和类型。例如,MIN/MAX在状态大小上很重,而COUNT很便宜。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例
Apache Flink 为群聚合提供了一套性能调优方式,详见45、Flink之性能调优介绍及示例。该篇文章中将详细介绍性能调优的几个方向及示例。
3、distinct 聚合
distinct 聚合是删除重复值在聚合时。下面的示例计算 orders3表中非重复u_id数,而不是行总数。
Flink SQL> SELECT COUNT(DISTINCT u_id) FROM orders3;
+----+----------------------+
| op | EXPR$0 |
+----+----------------------+
| +I | 1 |
| -U | 1 |
| +U | 2 |
| -U | 2 |
| +U | 3 |
| -U | 3 |
| +U | 4 |
Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | id | u_id | item | price | proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 5 | alan | 18.00 | 2023-09-21 08:12:58.654 |
| +I | 2 | 3 | alanchan | 23.80 | 2023-09-21 08:12:58.654 |
| +I | 3 | 1 | chan | 32.66 | 2023-09-21 08:12:58.654 |
| +I | 4 | 3 | iphone | 22.88 | 2023-09-21 08:12:58.654 |
| +I | 5 | 6 | usb | 26.80 | 2023-09-21 08:12:58.654 |
| +I | 6 | 3 | hd | 100.80 | 2023-09-21 08:12:58.655 |
| +I | 7 | 1 | tv | 4002.88 | 2023-09-21 08:12:58.655 |
对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小主要取决于不同行的数量和维护组的时间,按窗口划分的短期组不是问题(short lived group by windows are not a problem)。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例
4、GROUPING SETS
分组集允许比标准 GROUP BY 描述的操作更复杂的分组操作。行按每个指定的分组集单独分组,并为每个组计算聚合,就像简单的 GROUP BY 子句一样。
关于grouping set的更多内容可以参考文章:27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
- 示例
Flink SQL> SELECT supplier_id, rating, COUNT(*) AS total
> FROM (VALUES
> ('supplier1', 'product1', 4),
> ('supplier1', 'product2', 3),
> ('supplier2', 'product3', 3),
> ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());
+----+--------------------------------+-------------+----------------------+
| op | supplier_id | rating | total |
+----+--------------------------------+-------------+----------------------+
| +I | supplier1 | 4 | 1 |
| +I | supplier1 | <NULL> | 1 |
| +I | <NULL> | <NULL> | 1 |
| +I | supplier1 | 3 | 1 |
| -U | supplier1 | <NULL> | 1 |
| +U | supplier1 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 1 |
| +U | <NULL> | <NULL> | 2 |
| +I | supplier2 | 3 | 1 |
| +I | supplier2 | <NULL> | 1 |
| -U | <NULL> | <NULL> | 2 |
| +U | <NULL> | <NULL> | 3 |
| +I | supplier2 | 4 | 1 |
| -U | supplier2 | <NULL> | 1 |
| +U | supplier2 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 3 |
| +U | <NULL> | <NULL> | 4 |
+----+--------------------------------+-------------+----------------------+
GROUPING SETS 的每个子列表可以指定零个或多个列或表达式,并且解释方式与直接在 GROUP BY 子句中使用的方式相同。空分组集意味着所有行都聚合到单个组,即使不存在输入行,也会输出该组。
对分组列或表达式的引用将替换为结果行中的 null 值,用于未显示这些列的分组集。
对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小取决于组集的数量和聚合函数的类型。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例
1)、ROLLUP
ROLLUP 是用于指定常见类型的分组集的速记表示法(shorthand notation)。它表示给定的表达式列表和列表的所有前缀,包括空列表。
下面两个查询等效。
-- GROUPING SETS
SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
-- ROLLUP
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating);
---验证
Flink SQL> SELECT supplier_id, rating, COUNT(*) AS total
> FROM (VALUES
> ('supplier1', 'product1', 4),
> ('supplier1', 'product2', 3),
> ('supplier2', 'product3', 3),
> ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());
+----+--------------------------------+-------------+----------------------+
| op | supplier_id | rating | total |
+----+--------------------------------+-------------+----------------------+
| +I | supplier1 | 4 | 1 |
| +I | supplier1 | <NULL> | 1 |
| +I | <NULL> | <NULL> | 1 |
| +I | supplier1 | 3 | 1 |
| -U | supplier1 | <NULL> | 1 |
| +U | supplier1 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 1 |
| +U | <NULL> | <NULL> | 2 |
| +I | supplier2 | 3 | 1 |
| +I | supplier2 | <NULL> | 1 |
| -U | <NULL> | <NULL> | 2 |
| +U | <NULL> | <NULL> | 3 |
| +I | supplier2 | 4 | 1 |
| -U | supplier2 | <NULL> | 1 |
| +U | supplier2 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 3 |
| +U | <NULL> | <NULL> | 4 |
+----+--------------------------------+-------------+----------------------+
Flink SQL> SELECT supplier_id, rating, COUNT(*)
> FROM (VALUES
> ('supplier1', 'product1', 4),
> ('supplier1', 'product2', 3),
> ('supplier2', 'product3', 3),
> ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY ROLLUP (supplier_id, rating);
+----+--------------------------------+-------------+----------------------+
| op | supplier_id | rating | EXPR$2 |
+----+--------------------------------+-------------+----------------------+
| +I | supplier1 | 4 | 1 |
| +I | supplier1 | <NULL> | 1 |
| +I | <NULL> | <NULL> | 1 |
| +I | supplier1 | 3 | 1 |
| -U | supplier1 | <NULL> | 1 |
| +U | supplier1 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 1 |
| +U | <NULL> | <NULL> | 2 |
| +I | supplier2 | 3 | 1 |
| +I | supplier2 | <NULL> | 1 |
| -U | <NULL> | <NULL> | 2 |
| +U | <NULL> | <NULL> | 3 |
| +I | supplier2 | 4 | 1 |
| -U | supplier2 | <NULL> | 1 |
| +U | supplier2 | <NULL> | 2 |
| -U | <NULL> | <NULL> | 3 |
| +U | <NULL> | <NULL> | 4 |
+----+--------------------------------+-------------+----------------------+
2)、CUBE
CUBE 是用于指定常见类型的分组集的速记表示法。它表示给定的列表及其所有可能的子集 - 幂集。
以下两个查询是等效的。
-- cube
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id);
-- GROUPING SET
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
);
-- 验证
Flink SQL> SELECT supplier_id, rating, product_id, COUNT(*)
> FROM (VALUES
> ('supplier1', 'product1', 4),
> ('supplier1', 'product2', 3),
> ('supplier2', 'product3', 3),
> ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY CUBE (supplier_id, rating, product_id);
+----+--------------------------------+-------------+--------------------------------+----------------------+
| op | supplier_id | rating | product_id | EXPR$3 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
| +I | supplier1 | 4 | product1 | 1 |
| +I | supplier1 | <NULL> | product1 | 1 |
| +I | supplier1 | 4 | <NULL> | 1 |
| +I | supplier1 | <NULL> | <NULL> | 1 |
| +I | <NULL> | 4 | product1 | 1 |
| +I | <NULL> | <NULL> | product1 | 1 |
| +I | <NULL> | 4 | <NULL> | 1 |
| +I | <NULL> | <NULL> | <NULL> | 1 |
| +I | supplier1 | 3 | product2 | 1 |
| +I | supplier1 | <NULL> | product2 | 1 |
| +I | supplier1 | 3 | <NULL> | 1 |
| -U | supplier1 | <NULL> | <NULL> | 1 |
| +U | supplier1 | <NULL> | <NULL> | 2 |
| +I | <NULL> | 3 | product2 | 1 |
| +I | <NULL> | <NULL> | product2 | 1 |
| +I | <NULL> | 3 | <NULL> | 1 |
| -U | <NULL> | <NULL> | <NULL> | 1 |
| +U | <NULL> | <NULL> | <NULL> | 2 |
| +I | supplier2 | 3 | product3 | 1 |
| +I | supplier2 | <NULL> | product3 | 1 |
| +I | supplier2 | 3 | <NULL> | 1 |
| +I | supplier2 | <NULL> | <NULL> | 1 |
| +I | <NULL> | 3 | product3 | 1 |
| +I | <NULL> | <NULL> | product3 | 1 |
| -U | <NULL> | 3 | <NULL> | 1 |
| +U | <NULL> | 3 | <NULL> | 2 |
| -U | <NULL> | <NULL> | <NULL> | 2 |
| +U | <NULL> | <NULL> | <NULL> | 3 |
| +I | supplier2 | 4 | product4 | 1 |
| +I | supplier2 | <NULL> | product4 | 1 |
| +I | supplier2 | 4 | <NULL> | 1 |
| -U | supplier2 | <NULL> | <NULL> | 1 |
| +U | supplier2 | <NULL> | <NULL> | 2 |
| +I | <NULL> | 4 | product4 | 1 |
| +I | <NULL> | <NULL> | product4 | 1 |
| -U | <NULL> | 4 | <NULL> | 1 |
| +U | <NULL> | 4 | <NULL> | 2 |
| -U | <NULL> | <NULL> | <NULL> | 3 |
| +U | <NULL> | <NULL> | <NULL> | 4 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
Flink SQL> SELECT supplier_id, rating, product_id, COUNT(*)
> FROM (VALUES
> ('supplier1', 'product1', 4),
> ('supplier1', 'product2', 3),
> ('supplier2', 'product3', 3),
> ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS (
> ( supplier_id, product_id, rating ),
> ( supplier_id, product_id ),
> ( supplier_id, rating ),
> ( supplier_id ),
> ( product_id, rating ),
> ( product_id ),
> ( rating ),
> ( )
> );
+----+--------------------------------+-------------+--------------------------------+----------------------+
| op | supplier_id | rating | product_id | EXPR$3 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
| +I | supplier1 | 4 | product1 | 1 |
| +I | supplier1 | <NULL> | product1 | 1 |
| +I | supplier1 | 4 | <NULL> | 1 |
| +I | supplier1 | <NULL> | <NULL> | 1 |
| +I | <NULL> | 4 | product1 | 1 |
| +I | <NULL> | <NULL> | product1 | 1 |
| +I | <NULL> | 4 | <NULL> | 1 |
| +I | <NULL> | <NULL> | <NULL> | 1 |
| +I | supplier1 | 3 | product2 | 1 |
| +I | supplier1 | <NULL> | product2 | 1 |
| +I | supplier1 | 3 | <NULL> | 1 |
| -U | supplier1 | <NULL> | <NULL> | 1 |
| +U | supplier1 | <NULL> | <NULL> | 2 |
| +I | <NULL> | 3 | product2 | 1 |
| +I | <NULL> | <NULL> | product2 | 1 |
| +I | <NULL> | 3 | <NULL> | 1 |
| -U | <NULL> | <NULL> | <NULL> | 1 |
| +U | <NULL> | <NULL> | <NULL> | 2 |
| +I | supplier2 | 3 | product3 | 1 |
| +I | supplier2 | <NULL> | product3 | 1 |
| +I | supplier2 | 3 | <NULL> | 1 |
| +I | supplier2 | <NULL> | <NULL> | 1 |
| +I | <NULL> | 3 | product3 | 1 |
| +I | <NULL> | <NULL> | product3 | 1 |
| -U | <NULL> | 3 | <NULL> | 1 |
| +U | <NULL> | 3 | <NULL> | 2 |
| -U | <NULL> | <NULL> | <NULL> | 2 |
| +U | <NULL> | <NULL> | <NULL> | 3 |
| +I | supplier2 | 4 | product4 | 1 |
| +I | supplier2 | <NULL> | product4 | 1 |
| +I | supplier2 | 4 | <NULL> | 1 |
| -U | supplier2 | <NULL> | <NULL> | 1 |
| +U | supplier2 | <NULL> | <NULL> | 2 |
| +I | <NULL> | 4 | product4 | 1 |
| +I | <NULL> | <NULL> | product4 | 1 |
| -U | <NULL> | 4 | <NULL> | 1 |
| +U | <NULL> | 4 | <NULL> | 2 |
| -U | <NULL> | <NULL> | <NULL> | 3 |
| +U | <NULL> | <NULL> | <NULL> | 4 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
5、Having
HAVING消除不满足条件的组行。HAVING 与 WHERE 不同:WHERE 在 GROUP BY 之前筛选单个行,而 HAVING 筛选由 GROUP BY 创建的组行。条件中引用的每个列都必须明确引用分组列,除非它出现在聚合函数中。
SELECT SUM(price)
FROM orders3
GROUP BY u_id
HAVING SUM(price) > 100;
-------------比较有having和没有having的区别
--- 没有having
Flink SQL> SELECT SUM(price)
> FROM orders3
> GROUP BY u_id;
+----+------------------------------------------+
| op | EXPR$0 |
+----+------------------------------------------+
| +I | 18.00 |
| +I | 23.80 |
| +I | 32.66 |
| -U | 23.80 |
| +U | 46.68 |
| +I | 26.80 |
| -U | 46.68 |
| +U | 147.48 |
| -U | 32.66 |
| +U | 4035.54 |
--- 有having
Flink SQL> SELECT SUM(price)
> FROM orders3
> GROUP BY u_id
> HAVING SUM(price) > 100;
+----+------------------------------------------+
| op | EXPR$0 |
+----+------------------------------------------+
| +U | 147.48 |
| +U | 4035.54 |
HAVING 的存在会将查询转换为分组查询,即使没有 GROUP BY 子句也是如此。这与查询包含聚合函数但没有 GROUP BY 子句时发生的情况相同。该查询将所有选定的行视为形成一个组,并且 SELECT 列表和 HAVING 子句只能引用聚合函数中的表列。如果 HAVING 条件为真,则此类查询将发出单行,如果条件不为真,则发出零行。
二、Over Aggregation
OVER 聚合计算有序行范围内每个输入行的聚合值。与 GROUP BY 聚合相比,OVER 聚合不会将每个组的结果行数减少到一行。相反,OVER 聚合为每个输入行生成一个聚合值。
1、语法
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
可以在 SELECT 子句中定义多个 OVER 窗口聚合。但是,对于流式处理查询,由于当前限制,所有聚合的 OVER 窗口必须相同。
1)、ORDER BY
OVER 窗口是在有序的行序列上定义的。由于表没有固有的顺序,因此 ORDER BY 子句是必需的。对于流式查询,Flink 目前(截至Flink 版本1.17)仅支持使用升序时间属性顺序定义的 OVER 窗口。不支持其他排序。
2)、PARTITION BY
可以在分区表上定义 OVER 窗口。在存在 PARTITION BY 子句的情况下,仅针对其分区的行计算每个输入行的聚合。
3)、Range Definitions
range definition指定聚合中包含多少行。该范围由定义下限和上限的 BETWEEN 子句定义。这些边界之间的所有行都包含在聚合中。Flink 仅支持 CURRENT ROW 作为上限。
有两个选项可以定义范围:行间隔和范围间隔。
4)、RANGE intervals
RANGE 间隔是在 ORDER BY 列的值上定义的,在 Flink 的情况下,它始终是一个时间属性。以下 RANGE 间隔定义时间属性最多比当前行少 30 分钟的所有行都包含在聚合中。
RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
5)、ROW intervals
行间隔(ROWS interval)是基于计数的间隔。它准确定义聚合中包含的行数。以下 ROWS 间隔定义聚合中包括当前行和当前行之前的 10 行(因此总共 11 行)。
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW
WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它可以使查询更具可读性,还允许我们为多个聚合重用窗口定义。
- 以下为使用window的示例
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);
Flink SQL> SELECT id, proctime, price,
> SUM(price) OVER w AS sum_amount,
> AVG(price) OVER w AS avg_amount
> FROM orders3
> WINDOW w AS (
> PARTITION BY u_id
> ORDER BY proctime
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+------------------------------------------+
| op | id | proctime | price | sum_amount | avg_amount |
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+------------------------------------------+
| +I | 1 | 2023-09-21 09:34:16.920 | 18.00 | 18.00 | 18.000000 |
| +I | 2 | 2023-09-21 09:34:16.920 | 23.80 | 23.80 | 23.800000 |
| +I | 3 | 2023-09-21 09:34:16.921 | 32.66 | 4035.54 | 2017.770000 |
| +I | 7 | 2023-09-21 09:34:16.921 | 4002.88 | 4035.54 | 2017.770000 |
| +I | 5 | 2023-09-21 09:34:16.921 | 26.80 | 26.80 | 26.800000 |
| +I | 4 | 2023-09-21 09:34:16.921 | 22.88 | 147.48 | 49.160000 |
| +I | 6 | 2023-09-21 09:34:16.921 | 100.80 | 147.48 | 49.160000 |
2、示例
以下查询计算每个订单的当前订单前一小时内收到的同一用户的所有订单的金额总和。
其实用的是proctime,仅仅是示例,可能实际的业务上来说不够准确,仅仅是为了模拟验证。
Flink SQL> SELECT id, proctime, price,
> SUM(price) OVER (
> PARTITION BY u_id
> ORDER BY proctime
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
> ) AS one_hour_prod_amount_sum
> FROM orders3;
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+
| op | id | proctime | price | one_hour_prod_amount_sum |
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+
| +I | 1 | 2023-09-21 09:12:11.005 | 18.00 | 18.00 |
| +I | 3 | 2023-09-21 09:12:11.006 | 32.66 | 4035.54 |
| +I | 7 | 2023-09-21 09:12:11.006 | 4002.88 | 4035.54 |
| +I | 5 | 2023-09-21 09:12:11.006 | 26.80 | 26.80 |
| +I | 2 | 2023-09-21 09:12:11.006 | 23.80 | 147.48 |
| +I | 4 | 2023-09-21 09:12:11.006 | 22.88 | 147.48 |
| +I | 6 | 2023-09-21 09:12:11.006 | 100.80 | 147.48 |
三、Window Join
窗口联接(window join)将时间维度添加到联接条件本身中。在此过程中,窗口联接将联接共享公共键且位于同一窗口中的两个流的元素。窗口联接的语义与数据流窗口联接相同。
对于流式处理查询,与连续表上的其他联接不同,窗口联接不会发出中间结果,而只会在窗口末尾发出最终结果。此外,窗口联接在不再需要时清除所有中间状态。
通常,窗口联接与Windowing TVF 一起使用。此外,窗口联接可以遵循基于Windowing TVF的其他操作,例如窗口聚合,窗口TopN和窗口连接。
目前(截至版本Flink 1.17),窗口联接要求连接条件包含输入表的窗口开始相等和输入表的窗口结束相等。
窗口联接支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。
1、INNER/LEFT/RIGHT/FULL OUTER
下面显示了 INNER/LEFT/RIGHT/FULL OUTER Window Join 语句的语法。
SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
INNER/LEFT/RIGHT/FULL OUTER WINDOW JOIN 的语法彼此非常相似,我们在这里只举一个 FULL OUTER JOIN 的例子。执行窗口联接时,具有公共键和公共滚动窗口的所有元素将联接在一起。我们只给出一个适用于滚动窗口 TVF 的窗口联接示例。通过将联接的时间区域范围界定为固定的五分钟间隔,我们将数据集切成两个不同的时间窗口:[9:35, 9:40) 和 [9:40, 9:45)。L3 和 R3 行无法联接在一起,因为它们属于不同的窗口。
------1、表结构
CREATE TABLE leftTable (
`id` STRING,
num STRING,
row_time TIMESTAMP(3),
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'leftTable_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
CREATE TABLE rightTable (
`id` STRING,
num STRING,
row_time TIMESTAMP(3),
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'rightTable_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
------2、表数据
Flink SQL> select * from leftTable;
+----+--------------------------------+--------------------------------+-------------------------+
| op | id | num | row_time |
+----+--------------------------------+--------------------------------+-------------------------+
| +I | L1 | 1 | 2023-09-22 09:37:18.000 |
| +I | L2 | 2 | 2023-09-22 09:38:18.000 |
| +I | L3 | 3 | 2023-09-22 09:42:18.000 |
Flink SQL> select * from rightTable;
+----+--------------------------------+--------------------------------+-------------------------+
| op | id | num | row_time |
+----+--------------------------------+--------------------------------+-------------------------+
| +I | R1 | 2 | 2023-09-22 09:39:18.000 |
| +I | R2 | 3 | 2023-09-22 09:37:18.000 |
| +I | R3 | 4 | 2023-09-22 09:43:18.000 |
------3、FULL JOIN 验证
Flink SQL> SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end
> FROM ( SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L
> FULL JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R
> ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+
| op | L_Num | L_Id | R_Num | R_Id | window_start | window_end |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1 | L1 | <NULL> | <NULL> | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I | 2 | L2 | 2 | R1 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I | <NULL> | <NULL> | 3 | R2 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I | 3 | L3 | <NULL> | <NULL> | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 |
| +I | <NULL> | <NULL> | 4 | R3 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 |
2、SEMI(IN/EXISTS)
Semi Window Joins returns a row from one left record if there is at least one matching row on the right side within the common window.
如果公共窗口中右侧至少有一个匹配的行,则半窗口联接(Semi Window Joins)从左侧记录返回一行。
leftTable和rightTable表数据基于上一个示例的数据
- 示例
---- IN
Flink SQL> SELECT *
> FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) L WHERE L.num IN (
> SELECT num FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | id | num | row_time | window_start | window_end | window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | L2 | 2 | 2023-09-22 09:38:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
--- EXISTS
Flink SQL> SELECT *
> FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) L WHERE EXISTS (
> SELECT * FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | id | num | row_time | window_start | window_end | window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | L2 | 2 | 2023-09-22 09:38:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
3、ANTI(NOT IN/EXISTS)
反窗口联接(Anti Window Joins)是内部窗口联接的正面:它们包含每个公共窗口中所有未联接的行。
leftTable和rightTable表数据基于上一个示例的数据
- 示例
---- not in
Flink SQL> SELECT *
> FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) L WHERE L.num NOT IN (
> SELECT num FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | id | num | row_time | window_start | window_end | window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | L1 | 1 | 2023-09-22 09:37:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
| +I | L3 | 3 | 2023-09-22 09:42:18.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 | 2023-09-22 09:44:59.999 |
---- not EXISTS
Flink SQL> SELECT *
> FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) L WHERE NOT EXISTS (
> SELECT * FROM (
> SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
> ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | id | num | row_time | window_start | window_end | window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | L1 | 1 | 2023-09-22 09:37:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
| +I | L3 | 3 | 2023-09-22 09:42:18.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 | 2023-09-22 09:44:59.999 |
4、Limitation限制
1)、Join 子句的限制
目前(截至FLink 版本1.17),窗口连接要求连接条件包含输入表的window starts相等和输入表的window ends相等。将来,我们还可以简化 join on 子句,如果窗口 TVF 是 TUMBLE 或 HOP,则只包含window start相等。
2)、windowing TVFs 输入限制
目前(截至FLink 版本1.17),窗口 TVF 必须与左右输入相同。这可以在未来扩展,例如,滚动窗口加入具有相同窗口大小的滑动窗口。
3)、Limitation on Window Join which follows after Windowing TVFs directly
目前(截至FLink 版本1.17),如果窗口加入在窗口化 TVF 之后,则窗口化 TVF 必须与Tumble Windows, Hop Windows or Cumulate Windows一起使用,而不是Session windows。