目录
一、时间属性
- Flink SQL支持以下两种时间属性。实时计算可以基于这两种时间属性对数据进行窗口聚合。
Event Time:您提供的事件时间(通常是数据的最原始的创建时间),Event Time一定是您提供在Schema里的数据。
Processing Time:对事件进行处理的本地系统时间。
二、窗口
1、累计窗口 CUMULATE(time_attr, interval)
定义一个滚动窗口。滚动窗口把行分配到有固定持续时间(interval)的不重叠的连续窗口。比如,5分钟的滚动窗口以5分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
- 1)处理时间的累计窗口
SET 'execution.checkpointing.interval' = '3s';
set 'sql-client.execution.result-mode' = 'tableau';
set pipeline.operator-chaining = true;
CREATE TABLE read_oracle_test4 (
NAME STRING,
AGE bigint,
UPDATETIME TIMESTAMP(3), --事件时间
proctime as PROCTIME() , --处理时间
watermark for UPDATETIME as UPDATETIME - interval '5' minutes --水位线
) with (....);
CREATE TABLE print_table_data(
people string, --人群种类
num bigint, --数量
window_start TIMESTAMP(3), --窗口开始时间
window_end TIMESTAMP(3) --窗口结束时间
)
WITH (
'connector' = 'print'
);
--累积窗口 cumulate
--统计到目前为止的人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end people, count(1) as num,window_start, window_end
from
table(
cumulate(table read_oracle_test4, descriptor(proctime), interval '1' minutes, interval '1' day)
)
group by case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end, window_start, window_end
;
上面用处理时间去滚动统计,窗口大小是1天,窗口步长是1分钟,每分钟统计到目前为止的人群种类的数量
青年人,50,2024-01-01 00:00,2024-01-01 17:20
青年人,50,2024-01-01 00:00,2024-01-01 17:21
--此时进来了一条数据:张三,12,2024-01-01 17:21:23
青年人,51,2024-01-01 00:00,2024-01-01 17:22
青年人,51,2024-01-01 00:00,2024-01-01 17:23
- 2)事件时间的累计窗口
使用事件时间作为窗口入参时,必须设置水位线,此处的水位线设置为延迟5分钟
SET 'execution.checkpointing.interval' = '3s';
set 'sql-client.execution.result-mode' = 'tableau';
set pipeline.operator-chaining = true;
CREATE TABLE read_oracle_test4 (
NAME STRING,
AGE bigint,
UPDATETIME TIMESTAMP(3), --事件时间
proctime as PROCTIME() , --处理时间
watermark for UPDATETIME as UPDATETIME - interval '5' minutes --水位线
) with (....);
CREATE TABLE print_table_data(
people string, --人群种类
num bigint, --数量
window_start TIMESTAMP(3), --窗口开始时间
window_end TIMESTAMP(3), --窗口结束时间
current_time TIMESTAMP --当前系统时间
)
WITH (
'connector' = 'print'
);
insert into print_table_data
select case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end people, count(1) as num,window_start, window_end,current_timestamp current_time
from
table(
cumulate(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes, interval '1' day)
)
group by case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end, window_start, window_end
;
上面用事件时间去滚动统计,窗口大小是1天,窗口步长是1分钟,每分钟统计到目前为止的人群种类的数量
和处理时间的区别是,当前系统时间是17:27时,才开始计算17:21~17:22之间的数据,也是就说,flink把最新窗口的结束时间延迟了
所以当前统计出来的数据,实际上是5分钟前的人群数量
青年人,50,2024-01-01 00:00,2024-01-01 17:19,2024-01-01 17:24
--此时进来了一条延迟数据:张三,12,2024-01-01 17:21:23
青年人,50,2024-01-01 00:00,2024-01-01 17:20,2024-01-01 17:25
青年人,50,2024-01-01 00:00,2024-01-01 17:21,2024-01-01 17:26
--直到这个窗口结束时才开始计算
青年人,51,2024-01-01 00:00,2024-01-01 17:22,2024-01-01 17:27
--此时进来了一条延迟数据:张三,12,2024-01-01 17:26:23
青年人,51,2024-01-01 00:00,2024-01-01 17:23,2024-01-01 17:28
2、滚动窗口 TUMBLE(time_attr, interval)
--滚动窗口 TUMBLE
--统计每分钟内人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end people,
count(1) as num,
window_start,
window_end
from
table(
TUMBLE(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes)
)
group by case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end, window_start,
window_end
;
3、滑动窗口 HOP(time_attr, interval)
--滑动窗口 HOP
--每分钟统计近10分钟内人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end people,
count(1) as num,
window_start,
window_end
from
table(
HOP(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes,interval '10' minutes)
)
group by case when AGE<20 then '青年人'
when AGE>50 then '老年人'
else '中年人' end, window_start,
window_end
;
标签:01,窗口,17,--,flink,2024,window
From: https://www.cnblogs.com/nthforsth/p/18008309