Flink 窗口计算
1. 背景
在当今大数据时代,实时数据处理的需求日益增长,Flink 的窗口计算在这一领域中发挥着至关重要的作用。
窗口计算使得我们能够将无界的数据流切分成有意义的片段,从而进行特定时间段内的数据聚合和分析。它适用于众多场景,比如实时监控系统中对一段时间内的关键指标进行统计,金融交易中对特定窗口内的交易数据进行分析等。
与其他流式计算相比,Flink 窗口计算展现出了显著的优点。它提供了高度灵活的窗口定义方式,能够满足各种复杂的业务需求。无论是基于时间的滚动窗口、滑动窗口,还是基于数据驱动的窗口,Flink 都能轻松应对。同时,Flink 在处理大规模数据时具有出色的性能和稳定性,能够高效地处理高速流入的数据,确保计算结果的及时性和准确性。
2. Watermark
在 Flink 窗口计算中,还有一个关键概念与之紧密相关,那就是 watermark。watermark 对于处理乱序数据以及确保窗口计算的准确性起到了至关重要的作用。它就像是一个进度指示器,帮助我们在面对数据流的不确定性时,依然能够精确地进行窗口相关的操作和分析。
Watermark 是用来标记 Event-Time 的前进过程,表示较早的事件已经全部到达。
对于 顺序事件流1 比较好理解,W(10) 表示 10 之前的数据都已经到达。
对于 有界乱序事件流2,我们通常认为乱序事件流并不能完全乱序,需要在一定的时间限定内,这个时候我们可以指定 maxOutOfOrderness,表示可以容忍的乱序时间,当 11 到达以后,会有 W(7),表示 7 之前的数据都已经到达,当 24 到达以后,会有 W(20),表示 20 之前的数据都已经到达,这个时候如果 19 再来,就会被认为是无效的数据。
在 flink 程序中,通常一个算子会有多个并行度,他们之间 watermark 的传递入上图所示。
通过上面的介绍,我们知道了 watermark 是做什么的,那么他是怎么产生的?为什么每隔几个事件才产生一个 watermark?
这些都与 Watermark Generator 有关,我们可以通过 assignTimestampsAndWatermarks 指定要使用的 Watermark Generator
import cn.hutool.core.date.DateUtil;
import java.time.Duration;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SocketDemo {
public static void main(String[] args) throws Exception {
// nc -lk 12345
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
// watermark 产生周期, 默认就是 200ms, 一般不需要做修改
// env.getConfig().setAutoWatermarkInterval(200);
dataStreamSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> {
// yyyyMMddHHmmss test message
String[] ss = StringUtils.split(element, StringUtils.SPACE);
return DateUtil.parse(ss[0]).getTime();
}));
dataStreamSource.print();
env.execute("demo");
}
}
从上面的示例可以看到,从 flink 1.12 起,为我们提供了以下几种周期发送 watermark 的 Watermark Generator
他们是根据 ExecutionConfig.getAutoWatermarkInterval() 来决定时间,默认是 200 毫秒,也就是说,默认 200 毫秒可能会产生一个新的 watermark
对于顺序事件使用 WatermarkStrategy.forMonotonousTimestamps()
对于乱序事件 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
3. Watermark 与 Window 之间的关系
window 通过 watermark 来判断是否需要计算窗口的数据,我们可以通过设置 watermark 的生成策略,来处理 window 中的乱序数据,示例如下
设置5秒的滚动窗口
按照顺序流的处理思路,当第四条数据 (20240618164205) 到达,这个时候就会触发窗口 [20240618164200, 20240618164205) 开始计算,那么迟到的第六条数据 (20240618164203) 就不会被统计到窗口中;
如果我们使用乱序事件流的 watermark 生成器,设置 maxOutOfOrderness = 3秒,那么只有当第七条数据 (20240618164209) 来到的时候,经过计算 9-1-3=5,watermark 变成 20240618164205,这个时候才会触发窗口 [20240618164200, 20240618164205) 开始计算,迟到的第六条数据 (20240618164203) 仍然可以被统计到窗口中。
示例代码:
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@Slf4j
public class SocketDemo {
public static void main(String[] args) throws Exception {
// nc -lk 12345
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
// watermark 产生周期, 默认就是 200ms, 一般不需要做修改
// env.getConfig().setAutoWatermarkInterval(200);
dataStreamSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> {
// yyyyMMddHHmmss test message
String[] ss = StringUtils.split(element, StringUtils.SPACE);
return DateUtil.parse(ss[0]).getTime();
}))
.uid("source")
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String element) throws Exception {
String[] ss = StringUtils.split(element, StringUtils.SPACE);
return Tuple2.of(ss[1], Integer.parseInt(ss[2]));
}
})
.uid("parse_map")
.keyBy(t2 -> t2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
long watermark = context.currentWatermark();
long processingTime = context.currentProcessingTime();
out.collect(StrUtil.format("process time: {}, watermark: {}, window: {}-{}, data key: {}, data: {}",
DateUtil.formatDateTime(DateUtil.date(processingTime)),
DateUtil.formatDateTime(DateUtil.date(watermark)),
DateUtil.formatDateTime(DateUtil.date(start)),
DateUtil.formatDateTime(DateUtil.date(end)),
s, Lists.newArrayList(elements).stream().map(e -> e.f1.toString()).collect(Collectors.joining("_"))));
}
})
.uid("window_process")
.print();
env.execute("demo");
}
}
输出结果:
6> process time: 2024-06-18 17:11:59, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 张三, data: 1
8> process time: 2024-06-18 17:11:59, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3
示例更丰富的图:
4. Window 窗口计算
Flink Window 分为 Keyed Windows 和 Non-Keyed Windows,Keyed Windows 是对 KeyedStream 使用窗口操作后产生的,KeyedStream 是我们使用 keyBy 算子,对 Stream 按 key 分区后产生的。
Window 的 stream api
// Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Flink Window 使用必须要有的两个操作
- 使用 WindowAssigner 将数据流中的元素分配到对应的窗口
- 当满足窗口触发条件后,对窗口内的数据使用 Window Function(窗口处理函数) 进行处理,常用的 Window Function 有 reduce、aggregate、process
Flink 每一种窗口就是对应一种 WindowAssigner,从源码中我们可以看到,当我们在指定窗口类型的时候,实际上就是在指定 WindowAssigner。
Flink 支持的窗口类型
关于窗口的详细介绍,可以看官网 https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/windows/
从 stram api 可以看到使用 window/windowAll 算子后,可以指定 trigger、evictor 等,当我们不指定他们时,会使用默认的参数,我们进入到 TumblingEventTimeWindows 类源码,可以到按照事件时间滚动窗口对应的默认 Trigger。
常见的 Trigger
Flink 内置 Window Trigger | 触发频率 | 主要功能 |
---|---|---|
ProcessingTimeTrigger | 一次触发 | 基于 ProcessingTime 触发,当机器时间大于窗口结束时间时触发 |
EventTimeTrigger | 一次触发 | 基于 EventTime,当 Watermark 大于窗口结束时间触发 |
ContinuousProcessingTimeTrigger | 多次触发 | 基于 ProcessTime 的固定时间间隔触发 |
ContinuousEventTimeTrigger | 多次触发 | 基于 EventTime 的固定时间间隔触发 |
CountTrigger | 多次触发 | 基于 Element 的固定条数触发 |
DeltaTrigger | 多次触发 | 基于本次 Element 和上次触发 Trigger 的 Element 做 Delta 计算,超过指定 Threshold 后触发 |
PuringTrigger | 对 Trigger 的封装实现,用于 Trigger 触发后额外清理中间状态数据 | |
NeverTrigger | GlobalWindows 独有的,在内部继承 Trigger 实现,从来不会触发 |
所有基于事件时间的窗口,默认 Trigger 都是 EventTimeTrigger,基于处理时间的窗口都是 ProcessingTimeTrigger,所以在上面3的例子中,最终只输出了一次结果。
Evicator 是用来清除状态中数据的,常见的 Evicator 如下:
Evicator 名称 | 功能描述 |
---|---|
CountEvicator | 保留一定数目的元素,多余的元素按照从前到后的顺序先后清理 |
TimeEvicator | 保留一个时间段的元素,早于这个时间段的元素会被清理 |
DeltaEvicator | 窗口计算时,最近一条 Element 和其他 Element 做 Delta 计算,仅保留 Delta 结果在指定 Threshold 内的 Element |
allowedLateness、sideOutputLateData、getSideOutput 可以放在一起配合使用,允许窗口处理延迟的数据,当我们使用 watermark 以后,可以一定情况处理乱序的数据,但是在开窗的时候同样会存在延迟的数据,这个时候我们可以使用 allowedLateness,允许迟到一定时间的数据继续可以进入窗口,再次触发窗口计算,如果还是超过了 allowedLateness 设置的延迟时间,可以通过 sideOutputLateData,把延迟的数据单独输出到一个流里面,根据业务逻辑做后续的处理。
示例代码:
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
@Slf4j
public class SocketDemo {
public static void main(String[] args) throws Exception {
// nc -lk 12345
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
// watermark 产生周期, 默认就是 200ms, 一般不需要做修改
// env.getConfig().setAutoWatermarkInterval(200);
final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-data") {
};
SingleOutputStreamOperator<String> resultStream = dataStreamSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> {
// yyyyMMddHHmmss test message
String[] ss = StringUtils.split(element, StringUtils.SPACE);
return DateUtil.parse(ss[0]).getTime();
}))
.uid("source")
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String element) throws Exception {
String[] ss = StringUtils.split(element, StringUtils.SPACE);
return Tuple2.of(ss[1], Integer.parseInt(ss[2]));
}
})
.uid("parse_map")
.keyBy(t2 -> t2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateOutputTag)
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
long watermark = context.currentWatermark();
long processingTime = context.currentProcessingTime();
out.collect(StrUtil.format("process time: {}, watermark: {}, window: {}-{}, data key: {}, data: {}",
DateUtil.formatDateTime(DateUtil.date(processingTime)),
DateUtil.formatDateTime(DateUtil.date(watermark)),
DateUtil.formatDateTime(DateUtil.date(start)),
DateUtil.formatDateTime(DateUtil.date(end)),
s, Lists.newArrayList(elements).stream().map(e -> e.f1.toString()).collect(Collectors.joining("_"))));
}
})
.uid("window_process");
resultStream.print();
resultStream.getSideOutput(lateOutputTag).print();
env.execute("demo");
}
}
// 输入数据
20240618164200 张三 1
20240618164201 李四 1
20240618164204 李四 4
20240618164205 李四 5
20240618164206 张三 6
20240618164203 李四 3
20240618164209 张三 9
// allowedLateness
20240618164202 李四 2
20240618164210 李四 10
20240618164202 李四 2
20240618164211 李四 11
20240618164202 李四 2
// 输出数据
6> process time: 2024-06-19 15:28:31, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 张三, data: 1
8> process time: 2024-06-19 15:28:31, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3
8> process time: 2024-06-19 15:28:36, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3_2
8> process time: 2024-06-19 15:28:45, watermark: 2024-06-18 16:42:06, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3_2_2
8> (李四,2)
可以看到第八和第十条数据,虽然 watermark 已经过了窗口的时间,但是由于设置了 allowedLateness(Time.seconds(3)),仍然进入到了窗口统计范围,触发了窗口计算,最后一条数据超过了 allowedLateness 设置,所以被单独输出到一个流。
标签:Flink,窗口,watermark,flink,计算,org,apache,import From: https://blog.csdn.net/sxsAffable/article/details/139805501