首页 > 其他分享 >【FLINK学习笔记】 FLINK WINDOW(窗口)详解

【FLINK学习笔记】 FLINK WINDOW(窗口)详解

时间:2024-02-19 20:13:16浏览次数:30  
标签:窗口 String FLINK Window stationLog WINDOW 详解 Time 数据

【FLINK学习笔记】 FLINK WINDOW(窗口)详解

一、Window 分类

Global Window 和 和 Keyed Window
在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Windows 也会有所不同。

  • Keyed Window:上游数据集如果是 KeyedStream 类型,则调用 DataStream API 的 window()方法,数据会根据 Key 在不同的 Task 实例中并行分别计算,最后得出针对每个 Key 统计的结果。
  • Global Window:如果是 Non-Keyed 类型,则调用 WindowsAll()方法,所有的数据都会在窗口算子中由到一个 Task 中计算,并得到全局统计结果。
//读取文件数据
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)})
//Global Window
data.windowAll(自定义的WindowAssigner)
//Keyed Window
data.keyBy(_.sid)
.window(自定义的WindowAssigner)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Time Window 和 和 Count Window

基于业务数据的方面考虑,Flink 又支持两种类型的窗口,一种是基于时间的窗口叫Time Window。还有一种基于输入数据数量的窗口叫 Count Window

Time Window (时间窗口)

根据不同的业务场景,Time Window 也可以分为三种类型,分别是滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

滚动窗口(Tumbling Window)

滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠。这种类型的窗口的最大特点是比较简单。只需要指定一个窗口长度(window size)。

//每隔5秒统计每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1) //聚合
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

滑动窗口(Sliding Window)

滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础之上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。

//每隔3秒计算最近5秒内,每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3))
//.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.sum(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

会话窗口(Session Window)

会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是 Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows 不需要有固定 windows size 和 slide time,只需要定义 session gap,来规定不活跃数据的时间上限即可。

//3秒内如果没有数据进入,则计算每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(3)))
.sum(1)
  • 1
  • 2
  • 3
  • 4
  • 5

二、Window 的 的 API

在以后的实际案例中 Keyed Window 使用最多,所以我们需要掌握 Keyed Window 的算子,在每个窗口算子中包含了 Windows Assigner、Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、Output Tag(输出标签)以及 Windows Funciton等组成部分,其中 Windows Assigner 和 Windows Funciton 所有窗口算子必须指定的属性,其余的属性都是根据实际情况选择指定。

stream.keyBy(...) // 是Keyed类型数据集
.window(...) //指定窗口分配器类型
[.trigger(...)] //指定触发器类型(可选)
[.evictor(...)] //指定evictor或者不指定(可选)
[.allowedLateness(...)] //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)] //指定Output Lag(可选)
.reduce/aggregate/fold/apply() //指定窗口计算函数
[.getSideOutput(...)] //根据Tag输出数据(可选)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • Windows Assigner:指定窗口的类型,定义如何将数据流分配到一个或多个窗口;
  • Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;
  • Evictor:用于数据剔除;
  • allowedLateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算;
  • Output Tag:标记输出标签,然后在通过 getSideOutput 将窗口中的数据根据标签输出;
  • Windows Funciton:定义窗口上数据处理的逻辑,例如对数据进行 sum 操作。

三、窗口聚合函数

如果定义了 Window Assigner 之后,下一步就可以定义窗口内数据的计算逻辑,这也就是 Window Function 的定义。Flink 中提供了四种类型的 Window Function,分别为ReduceFunction、AggregateFunction 以及 ProcessWindowFunction,(sum 和 max)等。前三种类型的 Window Fucntion 按照计算原理的不同可以分为两类:

  • 一类是增量聚合函数:对应有 ReduceFunction、AggregateFunction;
  • 另一类是全量窗口函数,对应有 ProcessWindowFunction(还有 WindowFunction)。增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。

1) ReduceFunction
ReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,然后输出类型相同的一个结果元素。

//每隔5秒统计每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
  • 1
  • 2
  • 3
  • 4
  • 5

2) AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。

//每隔3秒计算最近5秒内,每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
  .keyBy(_._1)
  .timeWindow(Time.seconds(5),Time.seconds(3))
  .aggregate(new AggregateFunction[(String,Int),(String,Long),(String,Long)] {
    override def createAccumulator() = ("",0)
    override def add(in: (String, Int), acc: (String, Long)) = {
      (in._1,acc._2+in._2)
    }
    override def getResult(acc: (String, Long)) = acc
    override def merge(acc: (String, Long), acc1: (String, Long)) = {
      (acc._1,acc1._2+acc._2)
    }
  })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3) ProcessWindowFunction
前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数据元素的结果计算,例如对整个窗口数据排序取 TopN,这样的需要就必须使用ProcessWindowFunction。

//每隔5秒统计每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
  .keyBy(_._1)
  .timeWindow(Time.seconds(5))
  .process(new
      ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String,
      Int)], out: Collector[(String, Int)]): Unit = {
      println("-------")
      out.collect((key,elements.size))
    }
  })
  .print()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
原文链接:https://blog.csdn.net/qq_37023928/article/details/113486971

标签:窗口,String,FLINK,Window,stationLog,WINDOW,详解,Time,数据
From: https://www.cnblogs.com/sunny3158/p/18021852

相关文章

  • 【Flink入门修炼】1-4 Flink 核心概念与架构
    前面几篇文章带大家了解了Flink是什么、能做什么,本篇将带大家了解Flink究竟是如何完成这些的,Flink本身架构是什么样的,让大家先对Flink有整体认知,便于后期理解。一、Flink组件栈Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink......
  • Flink 使用之 TypeInformation 由于泛型类型在运行时会被JVM擦除,所以要指定类型
    Flink使用之TypeInformation由于泛型类型在运行时会被JVM擦除,所以要指定类型Flink使用介绍相关文档目录Flink使用介绍相关文档目录背景本篇从近期遇到的StreamJavaAPI问题,引出TypeInformation的使用。Exceptioninthread"main"org.apache.flink.api.common.functi......
  • flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态
    flink的分流器-sideoutputFlink有两种常见的State类型,分别是:KeyedState(键控状态)和OperatorState(算子状态)为了说明侧输出(sideouptut)的作用,浪尖举个例子,比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度......
  • Qt error: LNK2001: 无法解析的外部符号 "public: virtual struct QMetaObject const
    这个问题总是在编译的不经意间出现,而且一出一大片,很烦。作为新手出了问题可定要在网上找答案,但是总是发现别人的解决方法解决不了自己的问题,唉~在这个问题上大家大家提出的大多数是.h.cpp文件不对应、.h中声明的文件在.cpp文件中没有实现、函数声明在了.cpp文件中等等一......
  • 将nginx、frp、hfs等exe添加为Windows服务
    三个exe当中,hfs是不需要参数的,而nginx、frpc都需要指定配置文件。最终的解决方案,还是将WinSW.EXE放到目标exe相同的目录下,这样可以省去很多的麻烦,至少目前看来,可以省去指定配置文件的具体路径这个操作。  安装为服务也很简单:直接就是install命令;相应的,卸载服务就是uninsta......
  • 最新Burp Suite插件详解
    Burp Suite中的插件BurpSuite中存在多个插件,通过这些插件可以更方便地进行安全测试。插件可以在“BAppStore”(“Extender”→“BAppStore”)中安装,如图3-46所示。   图3-46   下面列举一些常见的BurpSuite插件。 1.Active Scan++ActiveScan++在BurpSuite......
  • Java版Flink(十二)底层函数 API(process function)
    一、概述之前的转化算子是无法访问事件的时间戳信息和水位线watermark,但是,在某些情况下,显得很重要。Flink提供了DataStreamAPI的Low-Level转化算子。比如说可以访问事件时间戳、watermark、以及注册定时器,还可以输出一些特定的事件,比如超时事件等。ProcessFunction用......
  • Flink详解系列之六--窗口机制
    Flink详解系列之六--窗口机制窗口是flink处理无限流的核心,窗口将流拆分为有限大小的“桶”,我们可以在这些桶上进行计算。1、KeyedvsNon-KeyedWindows根据上游数据是否为KeyedStream类型(是否将数据按照某个指定的Key进行分区),将窗口划分为KeyedWindow和Non-KeyedWindow......
  • Modscan32 软件最全使用详解
    软件使用手动连接点击菜单栏”连接设置(Connection)“->”连接(Connect)“,弹出连接配置窗口。在”使用的连接”那里选择:RemotemodbusTCPServer RemoteTELNETServerDirectConnectiontoCOM1DirectConnectiontoCOM2…DirectConnectiontoCOM32备注:”Direct......
  • NSSM - 将exe,dll,jar封装成windows服务的神器
    NSSM(theNon-SuckingServiceManager)是Windows环境下一款免安装的服务管理软件,它可以将应用封装成服务,即将普通exe程序或者dll或者jar包应用,封装成服务使之像windows服务可以设置自动启动等。并且可以监控程序运行状态,程序异常中断后自动启动,实现守护进程的功能。Installingf......