文章目录
- Flink 系列文章
- 一、Window TVF Aggregation
- 1、Windowing TVFs窗口函数
- 1)、TUMBLE滚动窗口示例
- 2)、HOP滑动窗口示例
- 3)、CUMULATE累积窗口示例
- 2、GROUPING SETS分组集介绍及示例
- 1)、ROLLUP介绍及示例
- 2)、CUBE介绍及示例
- 3、Selecting Group Window Start and End Timestamps
- 4、Cascading Window Aggregation(级联窗口聚合)介绍及示例
- 二、Group Window Aggregation
- 1、Group Window Functions
- 2、Time Attributes
- 3、Selecting Group Window Start and End Timestamps
- 4、组窗口聚合示例
本文介绍了Flink 的窗口聚合(TVF函数、分组集的rollup和cube、级联窗口聚合)和不推荐使用的组窗口聚合及具体的示例验证过程。
本文依赖flink和kafka集群能正常使用。
本文的示例是在Flink 1.17版本中验证的。
一、Window TVF Aggregation
窗口聚合在包含“window_start”和“window_end”列的 GROUP BY 子句中定义,该子句应用了 Windowing TVF 的关系。就像使用常规 GROUP BY 子句的查询一样,具有按窗口聚合分组的查询将计算每个组一个结果行。
-----语法
SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...
----示例
Flink SQL> SELECT window_start, window_end, sum(price)
> FROM TABLE(
> TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL '5' MINUTES))
> GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op | window_start | window_end | EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 10:40:00.000 | 2023-09-19 10:45:00.000 | 1428.02 |
与连续表上的其他聚合不同,窗口聚合不发出中间结果,而只发出最终结果,即窗口末尾的总聚合。此外,窗口聚合在不再需要时会清除所有中间状态。
1、Windowing TVFs窗口函数
Flink 支持 TUMBLE、HOP 和 CUMULATE 类型的窗口聚合。在流式处理模式下,窗口表值函数的时间属性字段必须位于事件或处理时间属性上。有关更多窗口函数信息,请参阅Windowing TVF。在批处理模式下,窗口表值函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。
1)、TUMBLE滚动窗口示例
该示例请参考 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)中的
1)、示例1-使用滚动窗口查询、统计(表不含主键)
示例。
2)、HOP滑动窗口示例
该示例请参考 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)中的
1)、示例1-使用滚动窗口查询、统计(表不含主键)
示例。
3)、CUMULATE累积窗口示例
该示例请参考 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)中的
1)、示例1-使用累积窗口查询、统计
示例。
2、GROUPING SETS分组集介绍及示例
窗口聚合还支持分组集语法。分组集允许比标准 GROUP BY 描述的操作更复杂的分组操作。行按每个指定的分组集单独分组,并为每个组计算聚合,就像简单的 GROUP BY 子句一样。
具有 GROUPING SETS 的窗口聚合要求window_start列和window_end列都必须位于 GROUP BY 子句中,而不是在 GROUPING SETS 子句中。
SELECT window_start, window_end, u_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((u_id), ());
GROUPING SETS 的每个子列表可以指定零个或多个列或表达式,其解释方式与直接在 GROUP BY 子句中使用的方式相同。空分组集意味着所有行都聚合到单个组,即使不存在输入行,也会输出该组。
对分组列或表达式的引用将替换为结果行中的 null 值,用于未显示这些列的分组集。
- 具体示例如下
------1、建表
Flink SQL> CREATE TABLE orders3 (
> `id` STRING,
> u_id STRING,
> item STRING,
> price DECIMAL(32,2),
> proctime as PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'orders3_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
------2、查询表数据
Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | id | u_id | item | price | proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 5 | alan | 18.00 | 2023-09-20 10:30:59.135 |
| +I | 2 | 3 | alanchan | 23.80 | 2023-09-20 10:31:13.183 |
| +I | 3 | 1 | chan | 32.66 | 2023-09-20 10:31:19.950 |
| +I | 4 | 3 | iphone | 22.88 | 2023-09-20 10:31:26.222 |
| +I | 5 | 6 | usb | 26.80 | 2023-09-20 10:31:32.627 |
| +I | 6 | 3 | hd | 100.80 | 2023-09-20 10:31:39.398 |
| +I | 7 | 1 | tv | 4002.88 | 2023-09-20 10:31:45.901 |
------3、分组集示例,按照u_id创建组集和空分组集(示例运行需要一点时间才能出结果)
Flink SQL> SELECT window_start, window_end, u_id, SUM(price) as price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end, GROUPING SETS ((u_id), ());
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| op | window_start | window_end | u_id | price |
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 5 | 18.00 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 6 | 26.80 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 1 | 4035.54 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 3 | 147.48 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | <NULL> | 4227.82 |
1)、ROLLUP介绍及示例
ROLLUP 是用于指定常见类型的分组集的速记表示法(shorthand notation)。它表示给定的表达式列表和列表的所有前缀,包括空列表。
使用 ROLLUP 的窗口聚合要求window_start列和window_end列都必须位于 GROUP BY 子句中,而不是在 ROLLUP 子句中。
表数据参考本章节中关于orders3的数据
下面两个查询是等效的。
-----ROLLUP
SELECT window_start, window_end, u_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (u_id);
-----GROUPING SETS
SELECT window_start, window_end, u_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((u_id), ());
- ROLLUP和GROUPING SETS 验证结果
-----ROLLUP
Flink SQL> SELECT window_start, window_end, u_id, SUM(price) as price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end, ROLLUP (u_id);
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| op | window_start | window_end | u_id | price |
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| +I | 2023-09-20 10:50:00.000 | 2023-09-20 11:00:00.000 | 5 | 18.00 |
| +I | 2023-09-20 10:50:00.000 | 2023-09-20 11:00:00.000 | 6 | 26.80 |
| +I | 2023-09-20 10:50:00.000 | 2023-09-20 11:00:00.000 | 1 | 4035.54 |
| +I | 2023-09-20 10:50:00.000 | 2023-09-20 11:00:00.000 | 3 | 147.48 |
| +I | 2023-09-20 10:50:00.000 | 2023-09-20 11:00:00.000 | <NULL> | 4227.82 |
-----GROUPING SETS
Flink SQL> SELECT window_start, window_end, u_id, SUM(price) as price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end, GROUPING SETS ((u_id), ());
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| op | window_start | window_end | u_id | price |
+----+-------------------------+-------------------------+--------------------------------+------------------------------------------+
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 5 | 18.00 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 6 | 26.80 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 1 | 4035.54 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | 3 | 147.48 |
| +I | 2023-09-20 10:30:00.000 | 2023-09-20 10:40:00.000 | <NULL> | 4227.82 |
2)、CUBE介绍及示例
CUBE 是用于指定常见类型的分组集的速记表示法(shorthand notation )。它表示给定的列表及其所有可能的子集 - 幂集(power set,所谓幂集,就是原集合中所有的子集(包括全集和空集)构成的集族)。
使用 CUBE 进行窗口聚合时,window_start列和window_end列都必须位于 GROUP BY 子句中,而不是 CUBE 子句中。
表数据参考本章节中关于orders3的数据
以下两个查询是等效的。
----cube 写法
SELECT window_start, window_end, item, u_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (u_id, item);
---GROUPING SETS 写法
SELECT window_start, window_end, item, u_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS (
(u_id, item),
(u_id),
(item),
()
)
- CUBE 和 GROUPING SETS 验证
Flink SQL> SELECT window_start, window_end, item, u_id, SUM(price) as price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end, CUBE (u_id, item);
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+------------------------------------------+
| op | window_start | window_end | item | u_id | price |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+------------------------------------------+
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | alan | 5 | 18.00 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | tv | <NULL> | 4002.88 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | tv | 1 | 4002.88 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | hd | <NULL> | 100.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | hd | 3 | 100.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | usb | <NULL> | 26.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | <NULL> | 6 | 26.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | usb | 6 | 26.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | iphone | <NULL> | 22.88 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | iphone | 3 | 22.88 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | chan | <NULL> | 32.66 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | <NULL> | 1 | 4035.54 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | chan | 1 | 32.66 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | alanchan | <NULL> | 23.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | <NULL> | 3 | 147.48 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | alanchan | 3 | 23.80 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | <NULL> | <NULL> | 4227.82 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | alan | <NULL> | 18.00 |
| +I | 2023-09-20 11:00:00.000 | 2023-09-20 11:10:00.000 | <NULL> | 5 | 18.00 |
Flink SQL> SELECT window_start, window_end, item, u_id, SUM(price) as price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end, GROUPING SETS (
> (u_id, item),
> (u_id),
> (item),
> ()
> );
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+------------------------------------------+
| op | window_start | window_end | item | u_id | price |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+------------------------------------------+
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | alan | 5 | 18.00 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | tv | <NULL> | 4002.88 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | tv | 1 | 4002.88 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | hd | <NULL> | 100.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | hd | 3 | 100.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | usb | <NULL> | 26.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | <NULL> | 6 | 26.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | usb | 6 | 26.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | iphone | <NULL> | 22.88 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | iphone | 3 | 22.88 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | chan | <NULL> | 32.66 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | <NULL> | 1 | 4035.54 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | chan | 1 | 32.66 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | alanchan | <NULL> | 23.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | <NULL> | 3 | 147.48 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | alanchan | 3 | 23.80 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | <NULL> | <NULL> | 4227.82 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | alan | <NULL> | 18.00 |
| +I | 2023-09-20 11:10:00.000 | 2023-09-20 11:20:00.000 | <NULL> | 5 | 18.00 |
3、Selecting Group Window Start and End Timestamps
可以使用分组的window_start和window_end列选择组窗口的开始和结束时间戳。
4、Cascading Window Aggregation(级联窗口聚合)介绍及示例
window_start列和window_end列是常规时间戳列,而不是时间属性。因此,它们不能用作后续基于时间的操作中的时间属性。为了传播时间属性,您需要另外window_time列添加到 GROUP BY 子句中。window_time是Windowing TVF 生成的第三列,它是分配窗口的时间属性。将window_time添加到 GROUP BY 子句中window_time也可以选择组键(group key)。然后,以下查询可以将此列用于后续基于时间的操作,例如级联窗口聚合和窗口 TopN。
下面显示了一个级联窗口聚合,其中第一个窗口聚合传播第二个窗口聚合的时间属性。
测试表数据使用上述中orders3的表数据。
- 建立视图
-- 5分钟的滚动窗口
CREATE VIEW window1 AS
-- 注意:window start 和 window end 字段是windows TVF内置的且是可选的,如果它们出现在子句中,则需要使用别名,以防止名称与外部窗口 TVF 的window start 和 window end字段冲突
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
FROM TABLE(
TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '5' MINUTES))
GROUP BY u_id, window_start, window_end, window_time;
-----------创建视图示例
Flink SQL> CREATE VIEW window1 AS
> SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
> FROM TABLE(
> TUMBLE(TABLE orders3, DESCRIPTOR(proctime), INTERVAL '5' MINUTES))
> GROUP BY u_id, window_start, window_end, window_time;
- 查询视图
-- 第一个10分钟滚动窗口
SELECT window_start, window_end, SUM(partial_price) as total_price
FROM TABLE(
TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
---------查询视图示例
Flink SQL> SELECT window_start, window_end, SUM(partial_price) as total_price
> FROM TABLE(
> TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
> GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op | window_start | window_end | total_price |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-20 15:00:00.000 | 2023-09-20 15:10:00.000 | 4227.82 |
二、Group Window Aggregation
Group Window Aggregation已弃用。建议使用更强大、更有效的Window TVF 聚合。
与Group Window Aggregation相比,Window TVF 聚合具有许多优点,包括:
1、具有性能调整中提到的所有性能优化。
2、支持标准的Grouping set语法。
3、可以在窗口聚合结果后应用Window TopN。
4、等等
组窗口聚合在 SQL 查询的 GROUP BY 子句中定义。就像使用常规 GROUP BY 子句的查询一样,具有包含组窗口(group windows)函数的 GROUP BY 子句的查询会为每个组计算一个结果行。批处理表和流式处理表上的 SQL 支持以下组窗口(group windows)函数。
1、Group Window Functions
2、Time Attributes
在流式处理模式下,组窗口函数的time_attr参数必须引用指定行的处理时间或事件时间的有效时间属性。
在批处理模式下,组窗口函数的time_attr参数必须是 TIMESTAMP 类型的属性。
3、Selecting Group Window Start and End Timestamps
可以使用以下辅助功能选择组窗口的开始和结束时间戳以及时间属性:
必须使用与 GROUP BY 子句中的组窗口(group window)函数完全相同的参数调用辅助函数。
4、组窗口聚合示例
以下示例演示如何使用流式处理表上的组窗口指定 SQL 查询。
----1、表结构
CREATE TABLE orders3 (
`id` STRING,
u_id STRING,
item STRING,
price DECIMAL(32,2),
proctime as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'orders3_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
----2、数据
Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | id | u_id | item | price | proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 5 | alan | 18.00 | 2023-09-20 15:49:32.377 |
| +I | 2 | 3 | alanchan | 23.80 | 2023-09-20 15:49:32.377 |
| +I | 3 | 1 | chan | 32.66 | 2023-09-20 15:49:32.377 |
| +I | 4 | 3 | iphone | 22.88 | 2023-09-20 15:49:32.377 |
| +I | 5 | 6 | usb | 26.80 | 2023-09-20 15:49:32.377 |
| +I | 6 | 3 | hd | 100.80 | 2023-09-20 15:49:32.377 |
| +I | 7 | 1 | tv | 4002.88 | 2023-09-20 15:49:32.377 |
----3、group window aggregation 示例
Flink SQL> SELECT
> u_id,
> TUMBLE_START(proctime, INTERVAL '5' HOUR) AS wStart,
> SUM(price)
> FROM orders3
> GROUP BY
> TUMBLE(proctime, INTERVAL '5' HOUR),
> u_id;
+----+--------------------------------+-------------------------+------------------------------------------+
| op | u_id | wStart | EXPR$2 |
+----+--------------------------------+-------------------------+------------------------------------------+
| +I | 5 | 2023-09-20 15:00:00.000 | 18.00 |
| +I | 6 | 2023-09-20 15:00:00.000 | 26.80 |
| +I | 1 | 2023-09-20 15:00:00.000 | 4035.54 |
| +I | 3 | 2023-09-20 15:00:00.000 | 147.48 |
以上,介绍了Flink 的窗口聚合(TVF函数、分组集的rollup和cube、级联窗口聚合)和不推荐使用的组窗口聚合及具体的示例验证过程。