我们知道,窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在 DataStream API 中提供了对不同类型的窗口进行定义和处理的接口,而在 Table API 和 SQL 中,类似的功能也都可以实现。
1.窗口
1.1分组窗口(Group Window,1.12版本之前)
在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例: TUMBLE(ts, INTERVAL '1' HOUR) 这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:val result = tableEnv.sqlQuery( "SELECT " + "user, " + "TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " + "COUNT(url) AS cnt " + "FROM EventTable " + "GROUP BY " + // 使用窗口和用户名进行分组 "user, " + "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口 )这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数 COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为 endT 提取出来。 分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。
1.2窗口表值函数(Window TVFs,1.13之后)
从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数 目前 Flink 提供了以下几个窗口 TVF:- 滚动窗口(Tumbling Windows);
- 滑动窗口(Hop Windows,跳跃窗口);
- 累积窗口(Cumulate Windows);
- 会话窗口(Session Windows,目前尚未完全支持)。
1.2.1滚动窗口(TUMBLE)
滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。 在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下: TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR) 这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。1.2.2滑动窗口(HOP)
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大(size)和滑动步长(slide)两个参数。HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS)); 这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。1.2.3累计窗口(COMULATE)
滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。 例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。 累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。如图所示,开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下: CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS)) 这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。2.聚合查询(Aggregation)
在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合(Aggregation)查询。Flink 中的 SQL 是流处理与标准 SQL 结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式。2.1分组聚合
SQL 中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及 COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数: val eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable") 而更多的情况下,我们可以通过 GROUP BY 子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。比如,可以按照用户名进行分组,统计每个用户点击 url 的次数: SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user 这种聚合方式,就叫作“分组聚合”(group aggregation)。从概念上讲,SQL 中的分组聚合可以对应 DataStream API 中 keyBy()之后的聚合转换,它们都是按照某个 key 对数据进行了划分,各自维护状态来进行聚合统计的。在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出,需要调用 toChangelogStream()。 另外,在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表环境中配置状态的生存时间(TTL):val tableEnv = ... // 获取表环境的配置 val tableConfig = tableEnv.getConfig(); // 配置状态保持时间 tableConfig.setIdleStateRetention(Duration.ofMinutes(60)) 或者也可以直接设置配置项 table.exec.state.ttl: val tableEnv = ... val configuration = tableEnv.getConfig().getConfiguration() configuration.setString("table.exec.state.ttl", "60 min")这两种方式是等效的。需要注意,配置 TTL 有可能会导致统计结果不准确,这其实是以牺牲正确性为代价换取了资源的释放。 此外,在 Flink SQL 的分组聚合中同样可以使用 DISTINCT 进行去重的聚合处理;可以使用 HAVING 对聚合结果进行条件筛选;还可以使用 GROUPING SETS(分组集)设置多个分组情况分别统计。这些语法跟标准 SQL 中的用法一致,这里就不再详细展开了。 可以看到,分组聚合既是 SQL 原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义
2.2窗口聚合
在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的“窗口”。上边已经介绍了窗口的声明方式,这相当于 DataStream API 中的窗口分配器(window assigner),只是明确了窗口的形式以及数据如何分配;而窗口具体的计算处理操作,在DataStream API 中还需要窗口函数(window function)来进行定义。 在 Flink 的 Table API 和 SQL 中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用 SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。在 Flink 1.12 版本之前,是直接把窗口自身作为分组 key 放在GROUP BY 之后的,所以也叫“分组窗口聚合”;而 1.13 版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一张表,所以窗口会出现在 FROM后面,GROUP BY 后面的则是窗口新增的字段 window_start 和 window_end。 比如,我们将分组窗口的聚合,用窗口 TVF 重新实现一下:val result = tableEnv.sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + "COUNT(url) AS cnt " + "FROM TABLE( " + "TUMBLE( TABLE EventTable, " + "DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " + "GROUP BY user, window_start, window_end " )这里我们以 ts 作为时间属性字段、基于 EventTable 定义了 1 小时的滚动窗口,希望统计出每小时每个用户点击 url 的次数。用来分组的字段是用户名 user,以及表示窗口的window_start 和 window_end;而 TUMBLE()是表值函数,所以得到的是一个表(Table),我们的聚合查询就是在这个 Table 中进行的。这就是 11.3.3 小节中窗口聚合的实现方式。 Flink SQL 目前提供了滚动窗口 TUMBLE()、滑动窗口 HOP()和累积窗口(CUMULATE)三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:
package com.zhen.flink.table import java.time.Duration import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.$ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment /** * @Author FengZhen * @Date 10/11/22 1:57 PM * @Description TODO */ object TimeAndWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 创建表环境 val tableEnv = StreamTableEnvironment.create(env) // 1.在创建表的DDL中指定时间属性字段 tableEnv.executeSql("CREATE TABLE eventTableTest (" + " uid STRING," + " url STRING," + " ts BIGINT," + " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000))," + // " proc_time AS PROCTIME(), " + //处理时间 " WATERMARK FOR et AS et - INTERVAL '3' SECOND " + ") WITH (" + " 'connector' = 'filesystem'," + " 'path' = '/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/input/clicks.txt', " + " 'format' = 'csv' " + ")") // 2.在将流转换成表的时候指定时间属性字段 // 读取数据源,并分配时间戳、生成水位线 val eventStream = env .fromElements( Event("Alice", "./home", 1000L), Event("Bob", "./cart", 1000L), Event("Alice", "./prod?id=1", 25 * 60 * 1000L), Event("Alice", "./prod?id=4", 55 * 60 * 1000L), Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) ) //如果数据为标准的升序数据,直接assignAscendingTimestamps //.assignAscendingTimestamps(_.timestamp) //如果数据为乱序数据,则assignTimestampsAndWatermarks .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner[Event] { override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp }) ) // 将数据流转换成表,并指定时间属性 // val eventTable = tableEnv.fromDataStream( // eventStream, // $("url"), // $("user").as("uid"), // $("timestamp").as("ts"), // $("et").rowtime() //新增一个字段,表示为当前的时间属性字段 // ) //可以直接将原有字段指定位rowtime val eventTable = tableEnv.fromDataStream( eventStream, $("url"), $("user").as("uid"), $("timestamp").rowtime().as("ts"), // $("proc_time").proctime() // 处理时间 ) /** * 1. * ( * `uid` STRING, * `url` STRING, * `ts` BIGINT, * `et` TIMESTAMP(3) *ROWTIME* AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000)), * `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(), * WATERMARK FOR `et`: TIMESTAMP(3) AS `et` - INTERVAL '3' SECOND * ) */ tableEnv.from("eventTableTest").printSchema() /** * 2. * ( * `url` STRING, * `uid` STRING, * `ts` TIMESTAMP(3) *ROWTIME*, * `proc_time` TIMESTAMP_LTZ(3) *PROCTIME* * ) */ eventTable.printSchema() // 3.测试累计窗口 tableEnv.createTemporaryView("eventTable", eventTable) val resultTable = tableEnv.sqlQuery( """ |SELECT | uid, window_end as endT, count(url) as cnt |FROM TABLE( | CUMULATE( | TABLE eventTable, | DESCRIPTOR(ts), | INTERVAL '30' MINUTE, | INTERVAL '1' HOUR | ) |) |GROUP BY uid, window_start, window_end |""".stripMargin) // 转换成流打印输出 val resultDataStream = tableEnv.toDataStream(resultTable) resultDataStream.print("resultDataStream") env.execute() } }
2.3开窗聚合(Over)
在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。 与标准 SQL 中一致,Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时开窗聚合也叫作“OVER 聚合”(Over Aggregation)。基本语法如下:SELECT <聚合函数> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <时间属性字段> <开窗范围>), ... FROM ...这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:
- PARTITION BY(可选)
- ORDER BY
- 开窗范围
- 范围间隔
- 行间隔
SELECT user, ts, COUNT(url) OVER ( PARTITION BY user ORDER BY ts RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) AS cnt FROM EventTable这里我们以 ts 作为时间属性字段,对 EventTable 中的每行数据都选取它之前 1 小时的所有数据进行聚合,统计每个用户访问 url 的总次数,并重命名为 cnt。最终将表中每行的 user,ts 以及扩展出 cnt 提取出来。 可以看到,整个开窗聚合的结果,是对每一行数据都有一个对应的聚合值,因此就像将表中扩展出了一个新的列一样。由于聚合范围上界只能到当前行,新到的数据一般不会影响之前数据的聚合结果,所以结果表只需要不断插入(INSERT)就可以了。执行上面 SQL 得到的结果表,可以用 toDataStream()直接转换成流打印输出。 开窗聚合与窗口聚合(窗口 TVF 聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在 Table API 中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在 SQL 中,也可以用 WINDOW 子句来在 SELECT 外部单独定义一个 OVER 窗口:
SELECT user, ts, COUNT(url) OVER w AS cnt, MAX(CHAR_LENGTH(url)) OVER w AS max_url FROM EventTable WINDOW w AS ( PARTITION BY user ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)上面的 SQL 中定义了一个选取之前 2 行数据的 OVER 窗口,并重命名为 w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。比如这里除统计 url 的个数外,还统计了 url 的最大长度:首先用 CHAR_LENGTH()函数计算出 url 的长度,再调用聚合函数 MAX()进行聚合统计。这样,我们就可以方便重复引用定义好的 OVER 窗口了,大大增强了代码的可读性。
2.4 TopN
2.4.1 普通TopN
在 Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现 Top N 的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来;然后再用 WHERE 子句筛选行号小于等于 N 的那些行返回。 基本语法如下:SELECT ... FROM ( SELECT ..., ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...) WHERE row_num <= N [AND <其它条件>]这里的 OVER 窗口定义与之前的介绍基本一致,目的就是利用 ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为 row_num,并在外层的查询中以row_num <= N 作为条件进行筛选,就可以得到根据排序字段统计的 Top N 结果了。 需要对关键字额外做一些说明:
- WHERE
- PARTITION BY
- ORDER BY
SELECT user, url, ts, row_num FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY user ORDER BY CHAR_LENGTH(url) desc ) AS row_num FROM EventTable) WHERE row_num <= 2这里我们以用户来分组,以访问 url 的字符长度作为排序的字段,降序排列后用聚合统计出每一行的行号,这样就相当于在 EventTable 基础上扩展出了一列 row_num。而后筛选出行号小于等于 2 的所有数据,就得到了每个用户访问的长度最长的两个 url。 需要特别说明的是,这里的 Top N 聚合是一个更新查询。新数据到来后,可能会改变之前数据的排名,所以会有更新(UPDATE)操作。这是 ROW_NUMBER()聚合函数的特性决定的。因此,如果执行上面的 SQL 得到结果表,需要调用 toChangelogStream()才能转换成流打印输出。
2.4.2窗口TopN
除了直接对数据进行 Top N 的选取,我们也可以针对窗口来做 Top N。 例如电商行业,实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来,按窗口进行分组,并按点击量大小降序排序,选取前 N 个作为结果返回。 我们已经知道,Top N 聚合本质上是一个表聚合函数,这和窗口表值函数(TVF)有天然的联系。尽管如此,想要基于窗口 TVF 实现一个通用的 Top N 聚合函数还是比较麻烦的,目前Flink SQL尚不支持。不过我们同样可以借鉴之前的思路,使用OVER窗口统计行号来实现。 具体来说,可以先做一个窗口聚合,将窗口信息 window_start、window_end 连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。 接下来就可以像一般的 Top N 那样定义 OVER 窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前 N 行就可以得到结果。所以窗口 Top N 的实现就是窗口聚合与 OVER 聚合的结合使用。 下面是一个具体案例的代码实现。由于用户访问事件 Event 中没有商品相关信息,因此我们统计的是每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。package com.zhen.flink.table import java.time.Duration import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.$ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment /** * @Author FengZhen * @Date 10/17/22 3:15 PM * @Description 窗口TopN */ object TopNWindowExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 创建表环境 val tableEnv = StreamTableEnvironment.create(env) // 读取数据源,并分配时间戳、生成水位线 val eventStream = env .fromElements( Event("Alice", "./home", 1000L), Event("Bob", "./cart", 1000L), Event("Alice", "./prod?id=1", 25 * 60 * 1000L), Event("Alice", "./prod?id=4", 55 * 60 * 1000L), Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) ) //如果数据为标准的升序数据,直接assignAscendingTimestamps //.assignAscendingTimestamps(_.timestamp) //如果数据为乱序数据,则assignTimestampsAndWatermarks .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner[Event] { override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp }) ) // 将数据流转换成表,并指定时间属性 val eventTable = tableEnv.fromDataStream( eventStream, $("url"), $("user").as("uid"), $("timestamp").as("ts"), $("et").rowtime() //新增一个字段,表示为当前的时间属性字段 ) tableEnv.createTemporaryView("eventTable", eventTable) // 窗口TOP N. 选取每小时内活跃度最大的前两个用户 // 1.进行窗口聚合统计,统计每个用户的访问量 val urlCountWindowTable = tableEnv.sqlQuery( """ |SELECT uid, COUNT(url) AS cnt, window_start, window_end |FROM TABLE( | TUMBLE(TABLE eventTable, DESCRIPTOR(et), INTERVAL '1' HOUR) |) |GROUP BY uid, window_start, window_end | |""".stripMargin) tableEnv.createTemporaryView("urlCountWindowTable", urlCountWindowTable) // 2.提取count值最大的前两个用户 val top2ResultTable = tableEnv.sqlQuery( """ |SELECT | window_start, window_end, uid, cnt, row_num |FROM( | SELECT | *, | ROW_NUMBER() OVER( | PARTITION BY window_start, window_end | ORDER BY cnt DESC | ) AS row_num | FROM urlCountWindowTable |) WHERE row_num <= 2 |""".stripMargin) val top2ResultDataStream = tableEnv.toChangelogStream(top2ResultTable) top2ResultDataStream.print() env.execute() } }
+I[1970-01-01T00:00, 1970-01-01T01:00, Alice, 3, 1] +I[1970-01-01T00:00, 1970-01-01T01:00, Bob, 1, 2] +I[1970-01-01T01:00, 1970-01-01T02:00, Cary, 2, 1] +I[1970-01-01T01:00, 1970-01-01T02:00, Bob, 1, 2]
标签:聚合,Flink,ts,window,分组,SQL,Table,窗口 From: https://www.cnblogs.com/EnzoDin/p/16799400.html