首页 > 其他分享 >flink窗口

flink窗口

时间:2024-02-05 16:34:43浏览次数:45  
标签:01 窗口 17 -- flink 2024 window

目录

一、时间属性

  • Flink SQL支持以下两种时间属性。实时计算可以基于这两种时间属性对数据进行窗口聚合。
    Event Time:您提供的事件时间(通常是数据的最原始的创建时间),Event Time一定是您提供在Schema里的数据。
    Processing Time:对事件进行处理的本地系统时间。

二、窗口

1、累计窗口 CUMULATE(time_attr, interval)

定义一个滚动窗口。滚动窗口把行分配到有固定持续时间(interval)的不重叠的连续窗口。比如,5分钟的滚动窗口以5分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

  • 1)处理时间的累计窗口
SET 'execution.checkpointing.interval' = '3s';
set 'sql-client.execution.result-mode' = 'tableau';
set pipeline.operator-chaining = true;

CREATE TABLE read_oracle_test4 (
	NAME STRING, 
	AGE bigint, 
	UPDATETIME TIMESTAMP(3),  --事件时间
    proctime as PROCTIME() ,  --处理时间
	watermark for UPDATETIME as UPDATETIME - interval '5' minutes --水位线
) with (....);


CREATE  TABLE print_table_data(
people string,    --人群种类
num bigint,      --数量
window_start TIMESTAMP(3),  --窗口开始时间
window_end TIMESTAMP(3)    --窗口结束时间
)
WITH (
 'connector' = 'print'
);



--累积窗口 cumulate
--统计到目前为止的人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
    when AGE>50 then '老年人'
    else '中年人' end people, count(1) as num,window_start, window_end
from
    table(
        cumulate(table read_oracle_test4, descriptor(proctime), interval '1' minutes, interval '1' day)
        )
group by case when AGE<20 then '青年人'
    when AGE>50 then '老年人'
    else '中年人' end, window_start, window_end
;

上面用处理时间去滚动统计,窗口大小是1天,窗口步长是1分钟,每分钟统计到目前为止的人群种类的数量

青年人,50,2024-01-01 00:00,2024-01-01 17:20
青年人,50,2024-01-01 00:00,2024-01-01 17:21
--此时进来了一条数据:张三,12,2024-01-01 17:21:23
青年人,51,2024-01-01 00:00,2024-01-01 17:22
青年人,51,2024-01-01 00:00,2024-01-01 17:23
  • 2)事件时间的累计窗口
    使用事件时间作为窗口入参时,必须设置水位线,此处的水位线设置为延迟5分钟
SET 'execution.checkpointing.interval' = '3s';
set 'sql-client.execution.result-mode' = 'tableau';
set pipeline.operator-chaining = true;

CREATE TABLE read_oracle_test4 (
	NAME STRING, 
	AGE bigint, 
	UPDATETIME TIMESTAMP(3),  --事件时间
    proctime as PROCTIME() ,  --处理时间
	watermark for UPDATETIME as UPDATETIME - interval '5' minutes --水位线
) with (....);


CREATE  TABLE print_table_data(
people string,    --人群种类
num bigint,      --数量
window_start TIMESTAMP(3),  --窗口开始时间
window_end TIMESTAMP(3),    --窗口结束时间
current_time TIMESTAMP      --当前系统时间
)
WITH (
 'connector' = 'print'
);

insert into print_table_data
select case when AGE<20 then '青年人'
    when AGE>50 then '老年人'
    else '中年人' end people, count(1) as num,window_start, window_end,current_timestamp current_time 
from
    table(
        cumulate(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes, interval '1' day)
        )
group by case when AGE<20 then '青年人'
    when AGE>50 then '老年人'
    else '中年人' end, window_start, window_end
;

上面用事件时间去滚动统计,窗口大小是1天,窗口步长是1分钟,每分钟统计到目前为止的人群种类的数量
和处理时间的区别是,当前系统时间是17:27时,才开始计算17:21~17:22之间的数据,也是就说,flink把最新窗口的结束时间延迟了
所以当前统计出来的数据,实际上是5分钟前的人群数量

青年人,50,2024-01-01 00:00,2024-01-01 17:19,2024-01-01 17:24
--此时进来了一条延迟数据:张三,12,2024-01-01 17:21:23
青年人,50,2024-01-01 00:00,2024-01-01 17:20,2024-01-01 17:25
青年人,50,2024-01-01 00:00,2024-01-01 17:21,2024-01-01 17:26
--直到这个窗口结束时才开始计算
青年人,51,2024-01-01 00:00,2024-01-01 17:22,2024-01-01 17:27
--此时进来了一条延迟数据:张三,12,2024-01-01 17:26:23
青年人,51,2024-01-01 00:00,2024-01-01 17:23,2024-01-01 17:28

2、滚动窗口 TUMBLE(time_attr, interval)

--滚动窗口 TUMBLE
--统计每分钟内人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
       when AGE>50 then '老年人'
     else '中年人' end people, 
     count(1) as num,
     window_start, 
     window_end
 from
     table(
         TUMBLE(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes)
         )
 group by case when AGE<20 then '青年人'
     when AGE>50 then '老年人'
    else '中年人' end, window_start, 
     window_end
 ;

3、滑动窗口 HOP(time_attr, interval)


--滑动窗口 HOP
--每分钟统计近10分钟内人群种类的数量
insert into print_table_data
select case when AGE<20 then '青年人'
      when AGE>50 then '老年人'
    else '中年人' end people, 
    count(1) as num,
    window_start, 
    window_end
from
    table(
        HOP(table read_oracle_test4, descriptor(UPDATETIME), interval '1' minutes,interval '10' minutes)
        )
group by case when AGE<20 then '青年人'
    when AGE>50 then '老年人'
    else '中年人' end, window_start, 
    window_end
;

标签:01,窗口,17,--,flink,2024,window
From: https://www.cnblogs.com/nthforsth/p/18008309

相关文章

  • 问题:在TCP的拥塞控制中,什么时候会使拥塞窗口重置为1?
    问题:在TCP的拥塞控制中,什么时候会使拥塞窗口重置为1?A:发生拥塞时;B:拥塞窗口超过慢开始门限时;C:分组超时时;D:慢开始门限重置为拥塞窗口的一半时参考答案如图所示......
  • 深入解析 Flink CDC 增量快照读取机制
    深入解析FlinkCDC增量快照读取机制一、Flink-CDC1.x痛点FlinkCDC1.x使用Debezium引擎集成来实现数据采集,支持全量加增量模式,确保数据的一致性。然而,这种集成存在一些痛点需要注意:一致性通过加锁保证:在保证数据一致性时,Debezium需要对读取的库或表加锁。全局锁可能导致数......
  • 汇编-窗口基本过程
     .386.modelflat,stdcalloptioncasemap:none.stack4096ExitProcessPROTO,dwExitCode:DWORDincludewindows.incincludegdi32.incincludelibgdi32.libincludeuser32.incincludelibuser32.libincludekernel32.incincludelibkernel32.lib.data?......
  • flink随笔
    内存设置Flink总内存,建议在独立部署模式下使用taskmanager.memory.flink.size jobmanager.memory.flink.size进程总内存,建议在容器化部署模式(Kubernetes、Yarn或Mesos)下使用taskmanager.memory.process.size jobmanager.memory.process.size不建议同时设置进程总内存......
  • 图像显示窗口名支持中文 使用cv2.imshow()显示图像默认是不支持中文名称的窗口的
    图像显示窗口名支持中文   使用cv2.imshow()显示图像默认是不支持中文名称的窗口的,如果你的窗口名参数中包含中文将会显示为乱码。这是由于在OpenCV-Python包中,imshow函数的窗口标题是gbk编码,而Python3默认UTF-8编码。因而窗口标题包含中文时,会显示乱码。    解决这个问......
  • pygame学习(一)——pygame库的导包、初始化、窗口的设置、打印文字
    导语 pygame是一个跨平台Python库(pygamenews),专门用来开发游戏。pygame主要为开发、设计2D电子游戏而生,提供图像模块(image)、声音模块(mixer)、输入/输出(鼠标、键盘、显示屏)模块等。使用pygame,理论上可以开发设计市面上所有的2D类型游戏。优点:pygame免费、开源,支持多种操作系统,具有......
  • NCC跳转UClient窗口变空白
    NCC跳转UClient窗口变空白解决方法:1、找到配置文件:目录:nchome\ierp\sf\文件:sysconfig.xml2、修改后的文件内容:<?xmlversion='1.0'encoding='UTF-8'?><nc.login.vo.SystemConfig> <MaxLoginFailure>5</MaxLoginFailure> <LoginUIType>stan......
  • [Flink] Flink源码分析 : BoundedOutOfOrdernessTimestampExtractor
    0序言0.1缘起importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.......
  • [Flink] Flink Job之Web UI
    0序言在本地电脑开发、调试,或集群环境下运行FlinkJob时,需要利用WebUI观测作业内部的运行情况。WEBUI,对我们观测Flink作业的总体运行情况(系统负载)、快速定位和解决问题,至关重要。全文基于如下版本演示:scala.version=2.11/2.12;flink.version=1.13.11FlinkJob......
  • Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
    1概况本文展示如何使用FlinkCDC+Iceberg+Doris构建实时湖仓一体的联邦查询分析,Doris1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。2系统架构我们整理架构图如下,   1.首先我们从Mysq......