首页 > 其他分享 >Flink 窗口计算

Flink 窗口计算

时间:2024-06-20 16:00:39浏览次数:32  
标签:Flink 窗口 watermark flink 计算 org apache import

Flink 窗口计算

1. 背景

在当今大数据时代,实时数据处理的需求日益增长,Flink 的窗口计算在这一领域中发挥着至关重要的作用。
窗口计算使得我们能够将无界的数据流切分成有意义的片段,从而进行特定时间段内的数据聚合和分析。它适用于众多场景,比如实时监控系统中对一段时间内的关键指标进行统计,金融交易中对特定窗口内的交易数据进行分析等。
与其他流式计算相比,Flink 窗口计算展现出了显著的优点。它提供了高度灵活的窗口定义方式,能够满足各种复杂的业务需求。无论是基于时间的滚动窗口、滑动窗口,还是基于数据驱动的窗口,Flink 都能轻松应对。同时,Flink 在处理大规模数据时具有出色的性能和稳定性,能够高效地处理高速流入的数据,确保计算结果的及时性和准确性。

2. Watermark

在 Flink 窗口计算中,还有一个关键概念与之紧密相关,那就是 watermark。watermark 对于处理乱序数据以及确保窗口计算的准确性起到了至关重要的作用。它就像是一个进度指示器,帮助我们在面对数据流的不确定性时,依然能够精确地进行窗口相关的操作和分析。

Watermark 是用来标记 Event-Time 的前进过程,表示较早的事件已经全部到达。

对于 顺序事件流1 比较好理解,W(10) 表示 10 之前的数据都已经到达。
对于 有界乱序事件流2,我们通常认为乱序事件流并不能完全乱序,需要在一定的时间限定内,这个时候我们可以指定 maxOutOfOrderness,表示可以容忍的乱序时间,当 11 到达以后,会有 W(7),表示 7 之前的数据都已经到达,当 24 到达以后,会有 W(20),表示 20 之前的数据都已经到达,这个时候如果 19 再来,就会被认为是无效的数据。

image.png
在 flink 程序中,通常一个算子会有多个并行度,他们之间 watermark 的传递入上图所示。

通过上面的介绍,我们知道了 watermark 是做什么的,那么他是怎么产生的?为什么每隔几个事件才产生一个 watermark?
这些都与 Watermark Generator 有关,我们可以通过 assignTimestampsAndWatermarks 指定要使用的 Watermark Generator

import cn.hutool.core.date.DateUtil;
import java.time.Duration;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SocketDemo {

    public static void main(String[] args) throws Exception {
        // nc -lk 12345
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
        // watermark 产生周期, 默认就是 200ms, 一般不需要做修改
        // env.getConfig().setAutoWatermarkInterval(200);
        dataStreamSource.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    // yyyyMMddHHmmss test message
                    String[] ss = StringUtils.split(element, StringUtils.SPACE);
                    return DateUtil.parse(ss[0]).getTime();
                }));
        dataStreamSource.print();

        env.execute("demo");
    }

}

从上面的示例可以看到,从 flink 1.12 起,为我们提供了以下几种周期发送 watermark 的 Watermark Generator
他们是根据 ExecutionConfig.getAutoWatermarkInterval() 来决定时间,默认是 200 毫秒,也就是说,默认 200 毫秒可能会产生一个新的 watermark
对于顺序事件使用 WatermarkStrategy.forMonotonousTimestamps()
对于乱序事件 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))

3. Watermark 与 Window 之间的关系

window 通过 watermark 来判断是否需要计算窗口的数据,我们可以通过设置 watermark 的生成策略,来处理 window 中的乱序数据,示例如下

设置5秒的滚动窗口
按照顺序流的处理思路,当第四条数据 (20240618164205) 到达,这个时候就会触发窗口 [20240618164200, 20240618164205) 开始计算,那么迟到的第六条数据 (20240618164203) 就不会被统计到窗口中;

如果我们使用乱序事件流的 watermark 生成器,设置 maxOutOfOrderness = 3秒,那么只有当第七条数据 (20240618164209) 来到的时候,经过计算 9-1-3=5,watermark 变成 20240618164205,这个时候才会触发窗口 [20240618164200, 20240618164205) 开始计算,迟到的第六条数据 (20240618164203) 仍然可以被统计到窗口中。

示例代码:

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

@Slf4j
public class SocketDemo {

    public static void main(String[] args) throws Exception {
        // nc -lk 12345
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
        // watermark 产生周期, 默认就是 200ms, 一般不需要做修改
        // env.getConfig().setAutoWatermarkInterval(200);
        dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((element, recordTimestamp) -> {
                        // yyyyMMddHHmmss test message
                        String[] ss = StringUtils.split(element, StringUtils.SPACE);
                        return DateUtil.parse(ss[0]).getTime();
                    }))
            .uid("source")
            .map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String element) throws Exception {
                    String[] ss = StringUtils.split(element, StringUtils.SPACE);
                    return Tuple2.of(ss[1], Integer.parseInt(ss[2]));
                }
            })
            .uid("parse_map")
            .keyBy(t2 -> t2.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                @Override
                public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                    long start = context.window().getStart();
                    long end = context.window().getEnd();
                    long watermark = context.currentWatermark();
                    long processingTime = context.currentProcessingTime();
                    out.collect(StrUtil.format("process time: {}, watermark: {}, window: {}-{}, data key: {}, data: {}",
                        DateUtil.formatDateTime(DateUtil.date(processingTime)),
                        DateUtil.formatDateTime(DateUtil.date(watermark)),
                        DateUtil.formatDateTime(DateUtil.date(start)),
                        DateUtil.formatDateTime(DateUtil.date(end)),
                        s, Lists.newArrayList(elements).stream().map(e -> e.f1.toString()).collect(Collectors.joining("_"))));
                }
            })
            .uid("window_process")
            .print();

        env.execute("demo");
    }

}

输出结果:

6> process time: 2024-06-18 17:11:59, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 张三, data: 1
8> process time: 2024-06-18 17:11:59, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3

示例更丰富的图:
image.png

4. Window 窗口计算


Flink Window 分为 Keyed Windows 和 Non-Keyed Windows,Keyed Windows 是对 KeyedStream 使用窗口操作后产生的,KeyedStream 是我们使用 keyBy 算子,对 Stream 按 key 分区后产生的。

Window 的 stream api

// Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"


// Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Flink Window 使用必须要有的两个操作

  1. 使用 WindowAssigner 将数据流中的元素分配到对应的窗口
  2. 当满足窗口触发条件后,对窗口内的数据使用 Window Function(窗口处理函数) 进行处理,常用的 Window Function 有 reduce、aggregate、process

Flink 每一种窗口就是对应一种 WindowAssigner,从源码中我们可以看到,当我们在指定窗口类型的时候,实际上就是在指定 WindowAssigner。
image.png
image.png

Flink 支持的窗口类型

关于窗口的详细介绍,可以看官网 https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/windows/

从 stram api 可以看到使用 window/windowAll 算子后,可以指定 trigger、evictor 等,当我们不指定他们时,会使用默认的参数,我们进入到 TumblingEventTimeWindows 类源码,可以到按照事件时间滚动窗口对应的默认 Trigger。
image.png

常见的 Trigger

Flink 内置 Window Trigger触发频率主要功能
ProcessingTimeTrigger一次触发基于 ProcessingTime 触发,当机器时间大于窗口结束时间时触发
EventTimeTrigger一次触发基于 EventTime,当 Watermark 大于窗口结束时间触发
ContinuousProcessingTimeTrigger多次触发基于 ProcessTime 的固定时间间隔触发
ContinuousEventTimeTrigger多次触发基于 EventTime 的固定时间间隔触发
CountTrigger多次触发基于 Element 的固定条数触发
DeltaTrigger多次触发基于本次 Element 和上次触发 Trigger 的 Element 做 Delta 计算,超过指定 Threshold 后触发
PuringTrigger对 Trigger 的封装实现,用于 Trigger 触发后额外清理中间状态数据
NeverTriggerGlobalWindows 独有的,在内部继承 Trigger 实现,从来不会触发

所有基于事件时间的窗口,默认 Trigger 都是 EventTimeTrigger,基于处理时间的窗口都是 ProcessingTimeTrigger,所以在上面3的例子中,最终只输出了一次结果。

Evicator 是用来清除状态中数据的,常见的 Evicator 如下:

Evicator 名称功能描述
CountEvicator保留一定数目的元素,多余的元素按照从前到后的顺序先后清理
TimeEvicator保留一个时间段的元素,早于这个时间段的元素会被清理
DeltaEvicator窗口计算时,最近一条 Element 和其他 Element 做 Delta 计算,仅保留 Delta 结果在指定 Threshold 内的 Element

allowedLateness、sideOutputLateData、getSideOutput 可以放在一起配合使用,允许窗口处理延迟的数据,当我们使用 watermark 以后,可以一定情况处理乱序的数据,但是在开窗的时候同样会存在延迟的数据,这个时候我们可以使用 allowedLateness,允许迟到一定时间的数据继续可以进入窗口,再次触发窗口计算,如果还是超过了 allowedLateness 设置的延迟时间,可以通过 sideOutputLateData,把延迟的数据单独输出到一个流里面,根据业务逻辑做后续的处理。

示例代码:

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

@Slf4j
public class SocketDemo {

    public static void main(String[] args) throws Exception {
        // nc -lk 12345
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 1.12 开始, 默认 TimeCharacteristic 是 EventTime
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 12345);
        // watermark 产生周期, 默认就是 200ms, 一般不需要做修改
        // env.getConfig().setAutoWatermarkInterval(200);

        final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-data") {
        };
        SingleOutputStreamOperator<String> resultStream = dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((element, recordTimestamp) -> {
                        // yyyyMMddHHmmss test message
                        String[] ss = StringUtils.split(element, StringUtils.SPACE);
                        return DateUtil.parse(ss[0]).getTime();
                    }))
            .uid("source")
            .map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String element) throws Exception {
                    String[] ss = StringUtils.split(element, StringUtils.SPACE);
                    return Tuple2.of(ss[1], Integer.parseInt(ss[2]));
                }
            })
            .uid("parse_map")
            .keyBy(t2 -> t2.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .allowedLateness(Time.seconds(3))
            .sideOutputLateData(lateOutputTag)
            .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                @Override
                public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                    long start = context.window().getStart();
                    long end = context.window().getEnd();
                    long watermark = context.currentWatermark();
                    long processingTime = context.currentProcessingTime();
                    out.collect(StrUtil.format("process time: {}, watermark: {}, window: {}-{}, data key: {}, data: {}",
                        DateUtil.formatDateTime(DateUtil.date(processingTime)),
                        DateUtil.formatDateTime(DateUtil.date(watermark)),
                        DateUtil.formatDateTime(DateUtil.date(start)),
                        DateUtil.formatDateTime(DateUtil.date(end)),
                        s, Lists.newArrayList(elements).stream().map(e -> e.f1.toString()).collect(Collectors.joining("_"))));
                }
            })
            .uid("window_process");

        resultStream.print();
        resultStream.getSideOutput(lateOutputTag).print();

        env.execute("demo");
    }

}

// 输入数据
20240618164200 张三 1
20240618164201 李四 1
20240618164204 李四 4
20240618164205 李四 5
20240618164206 张三 6
20240618164203 李四 3
20240618164209 张三 9

// allowedLateness
20240618164202 李四 2
20240618164210 李四 10
20240618164202 李四 2
20240618164211 李四 11
20240618164202 李四 2


// 输出数据
6> process time: 2024-06-19 15:28:31, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 张三, data: 1
8> process time: 2024-06-19 15:28:31, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3
8> process time: 2024-06-19 15:28:36, watermark: 2024-06-18 16:42:05, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3_2
8> process time: 2024-06-19 15:28:45, watermark: 2024-06-18 16:42:06, window: 2024-06-18 16:42:00-2024-06-18 16:42:05, data key: 李四, data: 1_4_3_2_2
8> (李四,2)

可以看到第八和第十条数据,虽然 watermark 已经过了窗口的时间,但是由于设置了 allowedLateness(Time.seconds(3)),仍然进入到了窗口统计范围,触发了窗口计算,最后一条数据超过了 allowedLateness 设置,所以被单独输出到一个流。

标签:Flink,窗口,watermark,flink,计算,org,apache,import
From: https://blog.csdn.net/sxsAffable/article/details/139805501

相关文章

  • 学习笔记:计算机内存管理
    虚拟内存    单片机是没有操作系统的,所以每次写完代码,都需要借助工具把程序烧录进去,这样程序才能跑起来。        单片机的CPU是直接操作内存的「物理地址」。        在这种情况下,要想在内存中同时运行两个程序是不可能的。如果第一个程序在20......
  • 计算机毕业设计flask+python企业公司进销存管理系统
    1、内容和要求:(1)完成以下课题研究内容:①研究进销存系统相关理论、特征。②研究进销存系统背景和意义。③研究进销存系统现实发展前景。④对企业进销存系统进行需求分析。⑤设计和实现新疆世纪金桥企业进销存系统。⑥对设计的进销存系统进行测试。(2)完成......
  • 单细胞测序最好的教程(十五):最具代表性的拟时序计算模型合集
    作者按本章节主要讲解了不基于RNA速率的三种拟时序模型的代表算法,包括扩散时间,Slingshot,Palantir,VIA等,并简单介绍了细胞的状态转移。本教程首发于单细胞最好的中文教程,未经授权许可,禁止转载。全文字数|预计阅读时间:5000|10min——Starlitnightly(星夜)1.背景单细胞测序技......
  • 【计算机网络仿真实验-实验3.1、3.2】交换路由综合实验
    实验3.1交换路由综合实验——作业1一、实验目的运用实验二(可前往博主首页计算机网络专栏下查看)中学到的知识,将这个图中的PC机连接起来组网并分析,本篇涉及代码以截图展示,过于简单的代码及操作不再详细介绍,建议做完实验二中所有实验后再来完成该实验,难度递进,学习过程合理......
  • Matlab r2023a v23.2.0 解锁版安装步骤 (工程计算商业数学软件)
    前言Matlab(矩阵实验室)是全球领先的数学计算软件开发商美国MathWorks公司研发的一款面向科学与工程计算的高级语言的商业数学软件,集算法开发、数据分析、可视化和数值计算于一体的编程环境,其核心是仿真交互式矩阵计算,广泛应用于科学计算、数据分析、算法开发和绘图设计等......
  • Java计算机毕业设计+Vue实习实训管理系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在当今社会,实习实训已成为高等教育中不可或缺的一部分,对于学生实践能力和职业素养的提升具有重要意义。然而,传统的实习实训管理方式存在着诸多不便,如......
  • 计算几何【Pick定理】
    Pick定理Pick定理:给定顶点均为整点的简单多边形,皮克定理说明了其面积A{\displaystyleA}A和内部格点数目......
  • 基于springboot实现课程答疑管理系统项目【项目源码+论文说明】计算机毕业设计
    摘要随着信息互联网信息的飞速发展,无纸化作业变成了一种趋势,针对这个问题开发一个专门适应师生交流形式的网站。本文介绍了课程答疑系统的开发全过程。通过分析企业对于课程答疑系统的需求,创建了一个计算机管理课程答疑系统的方案。文章介绍了课程答疑系统的系统分析部分......
  • 基于springboot实现宠物咖啡馆平台管理系统项目【项目源码+论文说明】计算机毕业设计
    摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了基于SpringBoot的宠物咖啡馆平台的设计与实现的开发全过程。通过分析基于SpringBoot的宠物咖啡馆平台的设计与实现管理的不足,创建了一个计算机管理基于SpringBoot的......
  • 【计算机网络仿真】b站湖科大教书匠思科Packet Tracer——实验2 MAC地址,IP地址,ARP协议
    一、实验目的1.掌握计算机网络的寻址问题;2.验证MAC地址与IP地址的关系;3.了解ARP协议的作用。二、实验要求1.使用CiscoPacketTracer仿真平台;2.观看B站湖科大教书匠仿真实验视频,完成对应实验。三、实验内容1.构建网络拓扑;2.配置网络设备;3.跟踪并查看数据包;4.在实......