首页 > 数据库 >27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)

27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)

时间:2023-09-27 12:32:55浏览次数:38  
标签:supplier2 聚合 supplier1 Over 09 Aggregation window 2023 id



文章目录

  • Flink 系列文章
  • 一、Group Aggregation分组聚合
  • 1、count示例
  • 2、group by的聚合示例
  • 3、distinct 聚合
  • 4、GROUPING SETS
  • 1)、ROLLUP
  • 2)、CUBE
  • 5、Having
  • 二、Over Aggregation
  • 1、语法
  • 1)、ORDER BY
  • 2)、PARTITION BY
  • 3)、Range Definitions
  • 4)、RANGE intervals
  • 5)、ROW intervals
  • 2、示例
  • 三、Window Join
  • 1、INNER/LEFT/RIGHT/FULL OUTER
  • 2、SEMI(IN/EXISTS)
  • 3、ANTI(NOT IN/EXISTS)
  • 4、Limitation限制
  • 1)、Join 子句的限制
  • 2)、windowing TVFs 输入限制
  • 3)、Limitation on Window Join which follows after Windowing TVFs directly



本文介绍了Flink 的分组聚合、over聚合和window join操作、当前版本的限制以及具体的运行示例。
本文依赖flink和kafka集群能正常使用。
本文分为3个部分,即介绍了Flink 分组聚合、over聚合以及window join,并且每个内容均以验证通过示例进行说明。
本文运行环境是Flink 1.17版本。

一、Group Aggregation分组聚合

像大多数数据系统一样,Apache Flink 支持聚合函数:内置和用户定义。用户定义的函数必须在使用前在目录中注册。

聚合函数从多个输入行计算单个结果。例如,有一些聚合可以计算一组行的COUNT, SUM, AVG (average), MAX (maximum) 和 MIN (minimum) 。

下文用到的数据源为orders3,其数据结构以及数据为

Flink SQL> desc orders3;
+----------+-----------------------------+-------+-----+---------------+-----------+
|     name |                        type |  null | key |        extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
|       id |                      STRING |  TRUE |     |               |           |
|     u_id |                      STRING |  TRUE |     |               |           |
|     item |                      STRING |  TRUE |     |               |           |
|    price |              DECIMAL(32, 2) |  TRUE |     |               |           |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
+----------+-----------------------------+-------+-----+---------------+-----------+

Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op |                             id |                           u_id |                           item |                              price |                proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I |                              1 |                              5 |                           alan |                              18.00 | 2023-09-21 08:12:58.654 |
| +I |                              2 |                              3 |                       alanchan |                              23.80 | 2023-09-21 08:12:58.654 |
| +I |                              3 |                              1 |                           chan |                              32.66 | 2023-09-21 08:12:58.654 |
| +I |                              4 |                              3 |                         iphone |                              22.88 | 2023-09-21 08:12:58.654 |
| +I |                              5 |                              6 |                            usb |                              26.80 | 2023-09-21 08:12:58.654 |
| +I |                              6 |                              3 |                             hd |                             100.80 | 2023-09-21 08:12:58.655 |
| +I |                              7 |                              1 |                             tv |                            4002.88 | 2023-09-21 08:12:58.655 |

1、count示例

SELECT COUNT(*) FROM Orders

Flink SQL> select count(*) from orders3;
+----+----------------------+
| op |               EXPR$0 |
+----+----------------------+
| +I |                    1 |
| -U |                    1 |
| +U |                    2 |
| -U |                    2 |
| +U |                    3 |
| -U |                    3 |
| +U |                    4 |
| -U |                    4 |
| +U |                    5 |
| -U |                    5 |
| +U |                    6 |
| -U |                    6 |
| +U |                    7 |

2、group by的聚合示例

对于流式查询,重要的是要了解 Flink 运行永不终止的连续查询。相反,他们根据其输入表上的更新更新其结果表。对于上面的查询,Flink 会在每次将新行插入 Orders3 表时输出更新的计数。

Apache Flink 支持用于聚合数据的标准 GROUP BY 子句。

SELECT u_id,COUNT(*)
FROM orders3
GROUP BY u_id;

Flink SQL> SELECT u_id,COUNT(*)
> FROM orders3
> GROUP BY u_id;
+----+--------------------------------+----------------------+
| op |                           u_id |               EXPR$1 |
+----+--------------------------------+----------------------+
| +I |                              5 |                    1 |
| +I |                              3 |                    1 |
| +I |                              1 |                    1 |
| -U |                              3 |                    1 |
| +U |                              3 |                    2 |
| +I |                              6 |                    1 |
| -U |                              3 |                    2 |
| +U |                              3 |                    3 |
| -U |                              1 |                    1 |
| +U |                              1 |                    2 |

对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小取决于组的数量以及聚合函数的数量和类型。例如,MIN/MAX在状态大小上很重,而COUNT很便宜。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例

Apache Flink 为群聚合提供了一套性能调优方式,详见45、Flink之性能调优介绍及示例。该篇文章中将详细介绍性能调优的几个方向及示例。

3、distinct 聚合

distinct 聚合是删除重复值在聚合时。下面的示例计算 orders3表中非重复u_id数,而不是行总数。

Flink SQL> SELECT COUNT(DISTINCT u_id) FROM orders3;
+----+----------------------+
| op |               EXPR$0 |
+----+----------------------+
| +I |                    1 |
| -U |                    1 |
| +U |                    2 |
| -U |                    2 |
| +U |                    3 |
| -U |                    3 |
| +U |                    4 |

Flink SQL> select * from orders3;
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op |                             id |                           u_id |                           item |                              price |                proctime |
+----+--------------------------------+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I |                              1 |                              5 |                           alan |                              18.00 | 2023-09-21 08:12:58.654 |
| +I |                              2 |                              3 |                       alanchan |                              23.80 | 2023-09-21 08:12:58.654 |
| +I |                              3 |                              1 |                           chan |                              32.66 | 2023-09-21 08:12:58.654 |
| +I |                              4 |                              3 |                         iphone |                              22.88 | 2023-09-21 08:12:58.654 |
| +I |                              5 |                              6 |                            usb |                              26.80 | 2023-09-21 08:12:58.654 |
| +I |                              6 |                              3 |                             hd |                             100.80 | 2023-09-21 08:12:58.655 |
| +I |                              7 |                              1 |                             tv |                            4002.88 | 2023-09-21 08:12:58.655 |

对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小主要取决于不同行的数量和维护组的时间,按窗口划分的短期组不是问题(short lived group by windows are not a problem)。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例

4、GROUPING SETS

分组集允许比标准 GROUP BY 描述的操作更复杂的分组操作。行按每个指定的分组集单独分组,并为每个组计算聚合,就像简单的 GROUP BY 子句一样。

关于grouping set的更多内容可以参考文章:27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)

  • 示例
Flink SQL> SELECT supplier_id, rating, COUNT(*) AS total
> FROM (VALUES
>     ('supplier1', 'product1', 4),
>     ('supplier1', 'product2', 3),
>     ('supplier2', 'product3', 3),
>     ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());
+----+--------------------------------+-------------+----------------------+
| op |                    supplier_id |      rating |                total |
+----+--------------------------------+-------------+----------------------+
| +I |                      supplier1 |           4 |                    1 |
| +I |                      supplier1 |      <NULL> |                    1 |
| +I |                         <NULL> |      <NULL> |                    1 |
| +I |                      supplier1 |           3 |                    1 |
| -U |                      supplier1 |      <NULL> |                    1 |
| +U |                      supplier1 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    1 |
| +U |                         <NULL> |      <NULL> |                    2 |
| +I |                      supplier2 |           3 |                    1 |
| +I |                      supplier2 |      <NULL> |                    1 |
| -U |                         <NULL> |      <NULL> |                    2 |
| +U |                         <NULL> |      <NULL> |                    3 |
| +I |                      supplier2 |           4 |                    1 |
| -U |                      supplier2 |      <NULL> |                    1 |
| +U |                      supplier2 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    3 |
| +U |                         <NULL> |      <NULL> |                    4 |
+----+--------------------------------+-------------+----------------------+

GROUPING SETS 的每个子列表可以指定零个或多个列或表达式,并且解释方式与直接在 GROUP BY 子句中使用的方式相同。空分组集意味着所有行都聚合到单个组,即使不存在输入行,也会输出该组。

对分组列或表达式的引用将替换为结果行中的 null 值,用于未显示这些列的分组集。

对于流式处理查询,计算查询结果所需的状态可能会无限增长。状态大小取决于组集的数量和聚合函数的类型。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。关于TTL的配置可以参考文章:43、Flink之Hive 读写及详细验证示例

1)、ROLLUP

ROLLUP 是用于指定常见类型的分组集的速记表示法(shorthand notation)。它表示给定的表达式列表和列表的所有前缀,包括空列表。

下面两个查询等效。

-- GROUPING SETS
SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())

-- ROLLUP 
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating);

---验证
Flink SQL> SELECT supplier_id, rating, COUNT(*) AS total
> FROM (VALUES
>     ('supplier1', 'product1', 4),
>     ('supplier1', 'product2', 3),
>     ('supplier2', 'product3', 3),
>     ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());
+----+--------------------------------+-------------+----------------------+
| op |                    supplier_id |      rating |                total |
+----+--------------------------------+-------------+----------------------+
| +I |                      supplier1 |           4 |                    1 |
| +I |                      supplier1 |      <NULL> |                    1 |
| +I |                         <NULL> |      <NULL> |                    1 |
| +I |                      supplier1 |           3 |                    1 |
| -U |                      supplier1 |      <NULL> |                    1 |
| +U |                      supplier1 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    1 |
| +U |                         <NULL> |      <NULL> |                    2 |
| +I |                      supplier2 |           3 |                    1 |
| +I |                      supplier2 |      <NULL> |                    1 |
| -U |                         <NULL> |      <NULL> |                    2 |
| +U |                         <NULL> |      <NULL> |                    3 |
| +I |                      supplier2 |           4 |                    1 |
| -U |                      supplier2 |      <NULL> |                    1 |
| +U |                      supplier2 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    3 |
| +U |                         <NULL> |      <NULL> |                    4 |
+----+--------------------------------+-------------+----------------------+

Flink SQL> SELECT supplier_id, rating, COUNT(*)
> FROM (VALUES
>     ('supplier1', 'product1', 4),
>     ('supplier1', 'product2', 3),
>     ('supplier2', 'product3', 3),
>     ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY ROLLUP (supplier_id, rating);
+----+--------------------------------+-------------+----------------------+
| op |                    supplier_id |      rating |               EXPR$2 |
+----+--------------------------------+-------------+----------------------+
| +I |                      supplier1 |           4 |                    1 |
| +I |                      supplier1 |      <NULL> |                    1 |
| +I |                         <NULL> |      <NULL> |                    1 |
| +I |                      supplier1 |           3 |                    1 |
| -U |                      supplier1 |      <NULL> |                    1 |
| +U |                      supplier1 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    1 |
| +U |                         <NULL> |      <NULL> |                    2 |
| +I |                      supplier2 |           3 |                    1 |
| +I |                      supplier2 |      <NULL> |                    1 |
| -U |                         <NULL> |      <NULL> |                    2 |
| +U |                         <NULL> |      <NULL> |                    3 |
| +I |                      supplier2 |           4 |                    1 |
| -U |                      supplier2 |      <NULL> |                    1 |
| +U |                      supplier2 |      <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                    3 |
| +U |                         <NULL> |      <NULL> |                    4 |
+----+--------------------------------+-------------+----------------------+

2)、CUBE

CUBE 是用于指定常见类型的分组集的速记表示法。它表示给定的列表及其所有可能的子集 - 幂集。

以下两个查询是等效的。

-- cube
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id);

-- GROUPING SET
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
);

-- 验证
Flink SQL> SELECT supplier_id, rating, product_id, COUNT(*)
> FROM (VALUES
>     ('supplier1', 'product1', 4),
>     ('supplier1', 'product2', 3),
>     ('supplier2', 'product3', 3),
>     ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY CUBE (supplier_id, rating, product_id);
+----+--------------------------------+-------------+--------------------------------+----------------------+
| op |                    supplier_id |      rating |                     product_id |               EXPR$3 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
| +I |                      supplier1 |           4 |                       product1 |                    1 |
| +I |                      supplier1 |      <NULL> |                       product1 |                    1 |
| +I |                      supplier1 |           4 |                         <NULL> |                    1 |
| +I |                      supplier1 |      <NULL> |                         <NULL> |                    1 |
| +I |                         <NULL> |           4 |                       product1 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product1 |                    1 |
| +I |                         <NULL> |           4 |                         <NULL> |                    1 |
| +I |                         <NULL> |      <NULL> |                         <NULL> |                    1 |
| +I |                      supplier1 |           3 |                       product2 |                    1 |
| +I |                      supplier1 |      <NULL> |                       product2 |                    1 |
| +I |                      supplier1 |           3 |                         <NULL> |                    1 |
| -U |                      supplier1 |      <NULL> |                         <NULL> |                    1 |
| +U |                      supplier1 |      <NULL> |                         <NULL> |                    2 |
| +I |                         <NULL> |           3 |                       product2 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product2 |                    1 |
| +I |                         <NULL> |           3 |                         <NULL> |                    1 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    1 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    2 |
| +I |                      supplier2 |           3 |                       product3 |                    1 |
| +I |                      supplier2 |      <NULL> |                       product3 |                    1 |
| +I |                      supplier2 |           3 |                         <NULL> |                    1 |
| +I |                      supplier2 |      <NULL> |                         <NULL> |                    1 |
| +I |                         <NULL> |           3 |                       product3 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product3 |                    1 |
| -U |                         <NULL> |           3 |                         <NULL> |                    1 |
| +U |                         <NULL> |           3 |                         <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    2 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    3 |
| +I |                      supplier2 |           4 |                       product4 |                    1 |
| +I |                      supplier2 |      <NULL> |                       product4 |                    1 |
| +I |                      supplier2 |           4 |                         <NULL> |                    1 |
| -U |                      supplier2 |      <NULL> |                         <NULL> |                    1 |
| +U |                      supplier2 |      <NULL> |                         <NULL> |                    2 |
| +I |                         <NULL> |           4 |                       product4 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product4 |                    1 |
| -U |                         <NULL> |           4 |                         <NULL> |                    1 |
| +U |                         <NULL> |           4 |                         <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    3 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    4 |
+----+--------------------------------+-------------+--------------------------------+----------------------+

Flink SQL> SELECT supplier_id, rating, product_id, COUNT(*)
> FROM (VALUES
>     ('supplier1', 'product1', 4),
>     ('supplier1', 'product2', 3),
>     ('supplier2', 'product3', 3),
>     ('supplier2', 'product4', 4))
> AS Products(supplier_id, product_id, rating)
> GROUP BY GROUPING SETS (
>     ( supplier_id, product_id, rating ),
>     ( supplier_id, product_id         ),
>     ( supplier_id,             rating ),
>     ( supplier_id                     ),
>     (              product_id, rating ),
>     (              product_id         ),
>     (                          rating ),
>     (                                 )
> );
+----+--------------------------------+-------------+--------------------------------+----------------------+
| op |                    supplier_id |      rating |                     product_id |               EXPR$3 |
+----+--------------------------------+-------------+--------------------------------+----------------------+
| +I |                      supplier1 |           4 |                       product1 |                    1 |
| +I |                      supplier1 |      <NULL> |                       product1 |                    1 |
| +I |                      supplier1 |           4 |                         <NULL> |                    1 |
| +I |                      supplier1 |      <NULL> |                         <NULL> |                    1 |
| +I |                         <NULL> |           4 |                       product1 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product1 |                    1 |
| +I |                         <NULL> |           4 |                         <NULL> |                    1 |
| +I |                         <NULL> |      <NULL> |                         <NULL> |                    1 |
| +I |                      supplier1 |           3 |                       product2 |                    1 |
| +I |                      supplier1 |      <NULL> |                       product2 |                    1 |
| +I |                      supplier1 |           3 |                         <NULL> |                    1 |
| -U |                      supplier1 |      <NULL> |                         <NULL> |                    1 |
| +U |                      supplier1 |      <NULL> |                         <NULL> |                    2 |
| +I |                         <NULL> |           3 |                       product2 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product2 |                    1 |
| +I |                         <NULL> |           3 |                         <NULL> |                    1 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    1 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    2 |
| +I |                      supplier2 |           3 |                       product3 |                    1 |
| +I |                      supplier2 |      <NULL> |                       product3 |                    1 |
| +I |                      supplier2 |           3 |                         <NULL> |                    1 |
| +I |                      supplier2 |      <NULL> |                         <NULL> |                    1 |
| +I |                         <NULL> |           3 |                       product3 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product3 |                    1 |
| -U |                         <NULL> |           3 |                         <NULL> |                    1 |
| +U |                         <NULL> |           3 |                         <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    2 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    3 |
| +I |                      supplier2 |           4 |                       product4 |                    1 |
| +I |                      supplier2 |      <NULL> |                       product4 |                    1 |
| +I |                      supplier2 |           4 |                         <NULL> |                    1 |
| -U |                      supplier2 |      <NULL> |                         <NULL> |                    1 |
| +U |                      supplier2 |      <NULL> |                         <NULL> |                    2 |
| +I |                         <NULL> |           4 |                       product4 |                    1 |
| +I |                         <NULL> |      <NULL> |                       product4 |                    1 |
| -U |                         <NULL> |           4 |                         <NULL> |                    1 |
| +U |                         <NULL> |           4 |                         <NULL> |                    2 |
| -U |                         <NULL> |      <NULL> |                         <NULL> |                    3 |
| +U |                         <NULL> |      <NULL> |                         <NULL> |                    4 |
+----+--------------------------------+-------------+--------------------------------+----------------------+

5、Having

HAVING消除不满足条件的组行。HAVING 与 WHERE 不同:WHERE 在 GROUP BY 之前筛选单个行,而 HAVING 筛选由 GROUP BY 创建的组行。条件中引用的每个列都必须明确引用分组列,除非它出现在聚合函数中。

SELECT SUM(price)
FROM orders3
GROUP BY u_id
HAVING SUM(price) > 100;

-------------比较有having和没有having的区别
--- 没有having
Flink SQL> SELECT SUM(price)
> FROM orders3
> GROUP BY u_id;
+----+------------------------------------------+
| op |                                   EXPR$0 |
+----+------------------------------------------+
| +I |                                    18.00 |
| +I |                                    23.80 |
| +I |                                    32.66 |
| -U |                                    23.80 |
| +U |                                    46.68 |
| +I |                                    26.80 |
| -U |                                    46.68 |
| +U |                                   147.48 |
| -U |                                    32.66 |
| +U |                                  4035.54 |

--- 有having
Flink SQL> SELECT SUM(price)
> FROM orders3
> GROUP BY u_id
> HAVING SUM(price) > 100;
+----+------------------------------------------+
| op |                                   EXPR$0 |
+----+------------------------------------------+
| +U |                                   147.48 |
| +U |                                  4035.54 |

HAVING 的存在会将查询转换为分组查询,即使没有 GROUP BY 子句也是如此。这与查询包含聚合函数但没有 GROUP BY 子句时发生的情况相同。该查询将所有选定的行视为形成一个组,并且 SELECT 列表和 HAVING 子句只能引用聚合函数中的表列。如果 HAVING 条件为真,则此类查询将发出单行,如果条件不为真,则发出零行。

二、Over Aggregation

OVER 聚合计算有序行范围内每个输入行的聚合值。与 GROUP BY 聚合相比,OVER 聚合不会将每个组的结果行数减少到一行。相反,OVER 聚合为每个输入行生成一个聚合值。

1、语法

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

可以在 SELECT 子句中定义多个 OVER 窗口聚合。但是,对于流式处理查询,由于当前限制,所有聚合的 OVER 窗口必须相同。

1)、ORDER BY

OVER 窗口是在有序的行序列上定义的。由于表没有固有的顺序,因此 ORDER BY 子句是必需的。对于流式查询,Flink 目前(截至Flink 版本1.17)仅支持使用升序时间属性顺序定义的 OVER 窗口。不支持其他排序。

2)、PARTITION BY

可以在分区表上定义 OVER 窗口。在存在 PARTITION BY 子句的情况下,仅针对其分区的行计算每个输入行的聚合。

3)、Range Definitions

range definition指定聚合中包含多少行。该范围由定义下限和上限的 BETWEEN 子句定义。这些边界之间的所有行都包含在聚合中。Flink 仅支持 CURRENT ROW 作为上限。

有两个选项可以定义范围:行间隔和范围间隔。

4)、RANGE intervals

RANGE 间隔是在 ORDER BY 列的值上定义的,在 Flink 的情况下,它始终是一个时间属性。以下 RANGE 间隔定义时间属性最多比当前行少 30 分钟的所有行都包含在聚合中。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

5)、ROW intervals

行间隔(ROWS interval)是基于计数的间隔。它准确定义聚合中包含的行数。以下 ROWS 间隔定义聚合中包括当前行和当前行之前的 10 行(因此总共 11 行)。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW

WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它可以使查询更具可读性,还允许我们为多个聚合重用窗口定义。

  • 以下为使用window的示例
SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);

Flink SQL> SELECT id, proctime, price,
>   SUM(price) OVER w AS sum_amount,
>   AVG(price) OVER w AS avg_amount
> FROM orders3
> WINDOW w AS (
>   PARTITION BY u_id
>   ORDER BY proctime
>   RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+------------------------------------------+
| op |                             id |                proctime |                              price |                               sum_amount |                               avg_amount |
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+------------------------------------------+
| +I |                              1 | 2023-09-21 09:34:16.920 |                              18.00 |                                    18.00 |                                18.000000 |
| +I |                              2 | 2023-09-21 09:34:16.920 |                              23.80 |                                    23.80 |                                23.800000 |
| +I |                              3 | 2023-09-21 09:34:16.921 |                              32.66 |                                  4035.54 |                              2017.770000 |
| +I |                              7 | 2023-09-21 09:34:16.921 |                            4002.88 |                                  4035.54 |                              2017.770000 |
| +I |                              5 | 2023-09-21 09:34:16.921 |                              26.80 |                                    26.80 |                                26.800000 |
| +I |                              4 | 2023-09-21 09:34:16.921 |                              22.88 |                                   147.48 |                                49.160000 |
| +I |                              6 | 2023-09-21 09:34:16.921 |                             100.80 |                                   147.48 |                                49.160000 |

2、示例

以下查询计算每个订单的当前订单前一小时内收到的同一用户的所有订单的金额总和。

其实用的是proctime,仅仅是示例,可能实际的业务上来说不够准确,仅仅是为了模拟验证。

Flink SQL> SELECT id, proctime, price,
>   SUM(price) OVER (
>     PARTITION BY u_id
>     ORDER BY proctime
>     RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
>   ) AS one_hour_prod_amount_sum
> FROM orders3;
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+
| op |                             id |                proctime |                              price |                 one_hour_prod_amount_sum |
+----+--------------------------------+-------------------------+------------------------------------+------------------------------------------+
| +I |                              1 | 2023-09-21 09:12:11.005 |                              18.00 |                                    18.00 |
| +I |                              3 | 2023-09-21 09:12:11.006 |                              32.66 |                                  4035.54 |
| +I |                              7 | 2023-09-21 09:12:11.006 |                            4002.88 |                                  4035.54 |
| +I |                              5 | 2023-09-21 09:12:11.006 |                              26.80 |                                    26.80 |
| +I |                              2 | 2023-09-21 09:12:11.006 |                              23.80 |                                   147.48 |
| +I |                              4 | 2023-09-21 09:12:11.006 |                              22.88 |                                   147.48 |
| +I |                              6 | 2023-09-21 09:12:11.006 |                             100.80 |                                   147.48 |

三、Window Join

窗口联接(window join)将时间维度添加到联接条件本身中。在此过程中,窗口联接将联接共享公共键且位于同一窗口中的两个流的元素。窗口联接的语义与数据流窗口联接相同。

对于流式处理查询,与连续表上的其他联接不同,窗口联接不会发出中间结果,而只会在窗口末尾发出最终结果。此外,窗口联接在不再需要时清除所有中间状态。

通常,窗口联接与Windowing TVF 一起使用。此外,窗口联接可以遵循基于Windowing TVF的其他操作,例如窗口聚合,窗口TopN和窗口连接。

目前(截至版本Flink 1.17),窗口联接要求连接条件包含输入表的窗口开始相等和输入表的窗口结束相等。

窗口联接支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。

1、INNER/LEFT/RIGHT/FULL OUTER

下面显示了 INNER/LEFT/RIGHT/FULL OUTER Window Join 语句的语法。

SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

INNER/LEFT/RIGHT/FULL OUTER WINDOW JOIN 的语法彼此非常相似,我们在这里只举一个 FULL OUTER JOIN 的例子。执行窗口联接时,具有公共键和公共滚动窗口的所有元素将联接在一起。我们只给出一个适用于滚动窗口 TVF 的窗口联接示例。通过将联接的时间区域范围界定为固定的五分钟间隔,我们将数据集切成两个不同的时间窗口:[9:35, 9:40) 和 [9:40, 9:45)。L3 和 R3 行无法联接在一起,因为它们属于不同的窗口。

------1、表结构
CREATE TABLE leftTable (
    `id`    STRING,
     num   STRING,
     row_time  TIMESTAMP(3),
     WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'leftTable_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

CREATE TABLE rightTable (
    `id`    STRING,
     num   STRING,
     row_time  TIMESTAMP(3),
     WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'rightTable_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);
------2、表数据
Flink SQL> select * from leftTable;
+----+--------------------------------+--------------------------------+-------------------------+
| op |                             id |                            num |                row_time |
+----+--------------------------------+--------------------------------+-------------------------+
| +I |                             L1 |                              1 | 2023-09-22 09:37:18.000 |
| +I |                             L2 |                              2 | 2023-09-22 09:38:18.000 |
| +I |                             L3 |                              3 | 2023-09-22 09:42:18.000 |

Flink SQL> select * from rightTable;
+----+--------------------------------+--------------------------------+-------------------------+
| op |                             id |                            num |                row_time |
+----+--------------------------------+--------------------------------+-------------------------+
| +I |                             R1 |                              2 | 2023-09-22 09:39:18.000 |
| +I |                             R2 |                              3 | 2023-09-22 09:37:18.000 |
| +I |                             R3 |                              4 | 2023-09-22 09:43:18.000 |

------3、FULL JOIN 验证
Flink SQL> SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end
> FROM ( SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L
> FULL JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R 
> ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+
| op |                          L_Num |                           L_Id |                          R_Num |                           R_Id |            window_start |              window_end |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+
| +I |                              1 |                             L1 |                         <NULL> |                         <NULL> | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I |                              2 |                             L2 |                              2 |                             R1 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I |                         <NULL> |                         <NULL> |                              3 |                             R2 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 |
| +I |                              3 |                             L3 |                         <NULL> |                         <NULL> | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 |
| +I |                         <NULL> |                         <NULL> |                              4 |                             R3 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 |

2、SEMI(IN/EXISTS)

Semi Window Joins returns a row from one left record if there is at least one matching row on the right side within the common window.
如果公共窗口中右侧至少有一个匹配的行,则半窗口联接(Semi Window Joins)从左侧记录返回一行。

leftTable和rightTable表数据基于上一个示例的数据

  • 示例
---- IN
Flink SQL> SELECT *
>            FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>            ) L WHERE L.num IN (
>              SELECT num FROM (   
>                SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>              ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                            num |                row_time |            window_start |              window_end |             window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                             L2 |                              2 | 2023-09-22 09:38:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |

--- EXISTS
Flink SQL> SELECT *
>            FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>            ) L WHERE EXISTS (
>              SELECT * FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>              ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                            num |                row_time |            window_start |              window_end |             window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                             L2 |                              2 | 2023-09-22 09:38:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |

3、ANTI(NOT IN/EXISTS)

反窗口联接(Anti Window Joins)是内部窗口联接的正面:它们包含每个公共窗口中所有未联接的行。

leftTable和rightTable表数据基于上一个示例的数据

  • 示例
---- not in
Flink SQL> SELECT *
>            FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>            ) L WHERE L.num NOT IN (
>              SELECT num FROM (   
>                SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>              ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                            num |                row_time |            window_start |              window_end |             window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                             L1 |                              1 | 2023-09-22 09:37:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
| +I |                             L3 |                              3 | 2023-09-22 09:42:18.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 | 2023-09-22 09:44:59.999 |

---- not EXISTS 
Flink SQL> SELECT *
>            FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE leftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>            ) L WHERE NOT EXISTS (
>              SELECT * FROM (
>                SELECT * FROM TABLE(TUMBLE(TABLE rightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
>              ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                            num |                row_time |            window_start |              window_end |             window_time |
+----+--------------------------------+--------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                             L1 |                              1 | 2023-09-22 09:37:18.000 | 2023-09-22 09:35:00.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:39:59.999 |
| +I |                             L3 |                              3 | 2023-09-22 09:42:18.000 | 2023-09-22 09:40:00.000 | 2023-09-22 09:45:00.000 | 2023-09-22 09:44:59.999 |

4、Limitation限制

1)、Join 子句的限制

目前(截至FLink 版本1.17),窗口连接要求连接条件包含输入表的window starts相等和输入表的window ends相等。将来,我们还可以简化 join on 子句,如果窗口 TVF 是 TUMBLE 或 HOP,则只包含window start相等。

2)、windowing TVFs 输入限制

目前(截至FLink 版本1.17),窗口 TVF 必须与左右输入相同。这可以在未来扩展,例如,滚动窗口加入具有相同窗口大小的滑动窗口。

3)、Limitation on Window Join which follows after Windowing TVFs directly

目前(截至FLink 版本1.17),如果窗口加入在窗口化 TVF 之后,则窗口化 TVF 必须与Tumble Windows, Hop Windows or Cumulate Windows一起使用,而不是Session windows。


标签:supplier2,聚合,supplier1,Over,09,Aggregation,window,2023,id
From: https://blog.51cto.com/alanchan2win/7623133

相关文章

  • 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
    文章目录Flink系列文章一、WindowTVFAggregation1、WindowingTVFs窗口函数1)、TUMBLE滚动窗口示例2)、HOP滑动窗口示例3)、CUMULATE累积窗口示例2、GROUPINGSETS分组集介绍及示例1)、ROLLUP介绍及示例2)、CUBE介绍及示例3、SelectingGroupWindowStartandEndTimestamps4、Cas......
  • 利用Python 去重聚合Excel数据并对比两份数据的差异
    需求描述:现在有两份Excel数据数据结构一致需要根据订单号和店铺名称去重聚合之后,再把两份数据合并对比差异,需要对比出两份数据的差异importpandasaspdimportnumpyasnp#读取两个Excel文件left_df=pd.read_excel('C:\\Users\\Admin\\Desktop\\数据核对\\数据1.......
  • 震坤行API接口聚合解析,实现根据ID取商品详情
    震坤行是一个工业品服务平台,提供了API接口供开发者使用。要根据ID获取商品详情,您需要使用震坤行API接口并进行相应的请求。以下是使用震坤行API接口根据ID获取商品详情的示例代码(使用Python编写):pythonimportrequestsimportjson#震坤行API接口地址api_url="htt......
  • 346_黑苹果Clover修改神器其一——设置自动倒计时
    这是一篇原发布于2020-02-0314:38:00得益小站的文章,备份在此处。前言这几日轶哥折腾黑苹果,安装过程倒是挺顺利,只不过这次的四叶草引导并没有自动倒计时功能。简单百度下,发现修改的方法挺简单,一起和轶哥来学习下吧![scodetype="yellow"]操作前请自行准备恢复U盘(推荐微PE),备份efi......
  • CF1710D Recover the Tree
    题目链接一个比较显然的思路就是:我们按照右端点从小到大的顺序(右端点相同按左端点从大到小)去考虑每个好的区间。由于是连通性问题,不难想到用并查集去实时维护连通性。根据定义,一个好的区间必定对应了一个连通块;我们考虑的是好的区间,所以当前并查集中的每个连通块必定都是一个区......
  • 数据恢复神器EaseUSDataRecovery数据恢复软件技术终身版免费下载
    EaseUSDataRecovery数据恢复软件免费版是一款优秀的数据恢复工具,可以帮助我们找回因各种原因丢失的数据。如果您正在被数据丢失问题所烦恼,请立刻免费下载数据恢复软件,开始进行数据恢复吧。数据恢复软件终身版本数据恢复软件免费版可以帮助用户找回因删除、格式化、分区丢失、分区损......
  • 易我数据恢复EaseUS Data Recovery Wizard 16.0技术员终身版你的数据恢复神器
    易我数据恢复EaseUSDataRecoveryWizard16.0技术员终身版为全球提供数据恢复方案,用于误删数据数据,电脑误删文件恢复,格式化硬盘数据恢复、手机U盘数据恢复等。RAID磁盘阵列数据恢复,分区丢失及其它未知原因丢失的数据恢复、简单易用轻松搞定数据恢复。EaseUSDataRecoveryWizar......
  • Django 聚合查询中 加减操作
    查询订单的定金+支付金额-支付金额错误写法Order.objects.aggregate(sum=Sum(F('final_amt')+F('deposit_amt')-F('refund_amt'))))正确的信息Order.objects.aggregate(sum=Sum(('final_amt')+Sum('......
  • C模拟CPP的方法重写(override)和多态
    1.所谓override,就是子类中重新实现了父类中的某一方法(子类和父类的同一个方法的方法体不同)2.所谓多态,最显著的一个特点就是父类指针指向不同的子类对象时,运行同一个方法会有不同的行为3.C语言模拟继承时,父类对象必须是子类对象的第一个成员4.理解了C的父类结构体指针子类结......
  • 清理Docker的Overlay2存储驱动占用的磁盘空间
    要清理Docker的Overlay2存储驱动占用的磁盘空间,您可以执行以下步骤:1.停止并删除所有已停止的容器:dockercontainerprune2.删除未使用的镜像:dockerimageprune3.清理Docker的系统级缓存:dockersystemprune--all--volumes4.调整Docker配置以限制磁盘使用量(可选):编辑/et......