Flink中的时间和窗口
Flink中的时间语义
-
处理时间(Processing Time)
处理时间就是指处理操作的机器的系统时间
-
事件时间(Event Time)
事件时间是指每个事件在对应的设备上发生的事件,也就是数据生成的时间。
水位线
水位线是基于事件时间提出的概念,了解水位线之前需先了解事件时间和窗口的关系。
在事件时间语义下,窗口处理数据是基于数据的时间戳,相当于自定义了一个逻辑时钟。这个时钟的时间不会自动流逝,它的时间进展,就是靠新到的数据的时间戳来推动的。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。它插入数据流中的位置,就是在某个数据到来之后,从这个数据中提取时间戳,作为当前水位线的时间戳。
水位线生成策略
-
用DataStream调用.assignTimestampsAndWatermarks()方法
-
.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是“水位线生成策略”。
-
WatermarkStrategy这个接口是一个生成水位线策略的抽象,可以灵活实现自己的需求,Flink也提供了内置的水位线生成器,通过调用WatermarkStrategy的静态辅助方法来创建,它们都是周期性生成水位线的,分别对应处理有序流和乱序流场景。
-
有序流:WatermarkStrategy.forMonotonousTimestamps()
stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMontonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>(){ @Override public long extractTimestamp(Event element, long recordTimestamp){ return element.timestamp; } }) )
-
乱序流:WatermarkStrategy.forBoundedOutOfOrderness()
stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>(){ @Override public long extractTimestamp(Event element, long recordTimestamp){ return element.timestamp; } }) )
-
水位线的传递
在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发送给下游任务的水位线可能并不相同,此时下游任务应以较小水位线作为当前时间。
水位线的默认计算公式:水位线 = 观察到的最大事件时间 - 最大延迟时间 - 1毫秒
假如设置延迟时间为2秒,那么0-10秒的窗口会在时间戳为12的数据到来之后,才真正关闭计算输出结果。
窗口
窗口的分类
-
按驱动类型分类:
-
时间窗口(Time Window)
时间窗口以时间点来定义窗口的开始和结束,截取出某一段时间的数据。到达结束时间后,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。时间窗口的范围是左闭右开的区间[start, end)。
-
计数窗口(Count Window)
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算输出结果并关闭窗口。Flink内部没有对应的类表示计数窗口,底层通过“全局窗口”(Global Window)来实现。
-
-
按窗口分配数据的规则分类
- 滚动窗口(Tumbling Windows)
- 滑动窗口(Sliding Windows)
- 会话窗口(Session Windows)
- 全局窗口(Global Windows)
窗口API使用
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行,相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>)
.window(<window assigner>) # .window()方法传入一个窗口分配器,指明窗口类型
.aggregate(<window function>) # 窗口函数包括增量聚合函数和全窗口函数
窗口分配器(Window Assigners)
-
滚动处理时间窗口
# TUmblingProcessingTimeWindows类,静态方法.of()中传入Time类型的参数size,表示窗口大小 stream.keyBy(...) .window(TUmblingProcessingTimeWindows.of(Time.seconds(5)))
-
滑动处理时间窗口
窗口分配器由类SlidingProcessingTimeWindows提供,同样调用.of()方法,传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口和滑动步长。
-
处理时间会话窗口
窗口分配器由类ProcessingTimeSessionWindows提供,调用静态方法.withGap()或.withDynamicGap()。传入Time类型参数size,表示会话的超时时间。
-
滚动事件时间窗口
窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理时间窗口一致。
-
滑动事件时间窗口(SlidingEventTimeWindows)
-
事件时间会话窗口(EventTimeSessionWindows)
窗口函数
增量聚合函数
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction
-
归约函数(ReduceFunction)
基于WindowedStream调用.reduce()方法,然后传入ReduceFunction的实现类作为参数。
ReduceFunction中需要重写一个reduce方法,它的两个参数代表输入的两个元素,中间聚合的状态、输出的结果和输入的数据类型需保持一致。
-
聚合函数(AggregateFunction)
基于WindowedStream调用.aggregate()方法,传入AggregateFunction的实现类作为参数。
AggregateFunction中取消了类型一致的限制,输入数据、中间状态、输出结果三者类型都可以不同。
AggregateFunction源码:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable{ # 创建一个累加器,为聚合创建一个初始状态,每个聚合任务只会调用一次。 ACC createAccumulator(); # 将输入的元素添加到累加器中,基于聚合状态,对新数据进行进一步聚合的过程。 # 方法传入两个参数:当前新到数据value,当前的累加器accumulator;返回一个新的累加器值。 # 每条数据到来都会调用此方法。 Acc add(IN value, ACC accumulator); # 从累加器中提取聚合的输出结果。 OUT getResult(ACC accumulator); # 合并两个累加器,将合并后的状态作为一个累加器返回。此方法只在需要合并窗口的场景下调用,如会话窗口。 ACC merge(ACC a, ACC b); }
全窗口函数
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候在取出数据进行计算。全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
-
窗口函数(WindowFunction)
基于WindowedStream调用.apply()方法,传入一个WindowFunctin的实现类。(实际应用少)
stream.keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction()); # WindowFunction源码 public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function{ # 当窗口到达结束时间需要触发计算时,调用apply方法。 # 从input集合中取出窗口收集的数据,结合key和window信息,通过收集器(Collector)输出结果。 void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out); }
-
处理窗口函数(ProcessWindowFunction)
基于WindowedStream调用.process()方法,传入一个ProcessWindowFunction的实现类。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象不仅能获取窗口信息,还可以访问当前时间和状态信息。包括处理时间(processing time)和事件时间水位线(event time watermark)
new ProcessWindowFunction<IN, OUT, KEY, TimeWindow>() { # 重写process()方法 @Override public void process(String key, Context context, Iterable<IN> iterable, Collector<OUT> collector) throws Exception { } }
-
增量聚合和全窗口函数结合使用
全窗口函数只是把数据收集缓存起来,并没有处理,到窗口要关闭、输出结果的时候,再遍历所有数据依次计算,得到最终结果。如果两者结合使用,采用增量聚合的方式,每个数据到来时做一次聚合,更新状态,到了要输出结果的时候,只要将当前状态直接拿出来即可。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,比全窗口聚合更加高效、输出更加实时。
用法:
在调用WindowStream的.reduce()和.aggregate()方法时,第一个参数传入一个ReduceFunction或AggregateFunction进行增量聚合;第二个参数传入WindowFunction或者ProcessWindowFunction全窗口函数。