首页 > 其他分享 >10分钟了解Flink Watermark水印

10分钟了解Flink Watermark水印

时间:2024-02-11 11:01:49浏览次数:46  
标签:10 窗口 Watermark Flink 水印 时间 事件 延迟

在上一篇中,介绍了Flink里时间的概念和窗口计算,在实际生产过程中,由于网络等原因,许多数据会延迟到达窗口,这种情况Flink如何处理?Watermark登场,本文从这几点进行介绍:水印的概念、水印如何计算、允许延迟和侧道输出、水印生成策略、案例及代码。

1、一个小例子

讲解概念前,我先举个例子。比如工厂的生产线有一批货物要发出,每个货物上都有一个生产时间的标记,司机在门口等待货物,他每天9:00出发,只要他看到最新过来的货物上的时间是9:00,那他立马就出发。

但是久而久之他发现,有些货物会延迟到达,比如9:00的货物已经到达,忽然他又看到一个8:59的货物到达了,为了能够一次性运送更多的货物,他决定继续多等5分钟,即:如果9:05的货物到达后,他就立马出发,不再等待了。

这样的话,即使有延迟到达的货物,只要它们能在9:05分之前到达,那这部分货物也会被发出。

2、水印的概念

我们来思考一个场景,比如,对于窗口[12:00-12:10),事件时间为12:04的数据,由于网络原因,到达Flink的时间是12:11。此时窗口已经关闭了,该数据将不属于任何窗口,最终这个数据会丢失。

所以,为了保证计算结果的正确性,需要让窗口等待延迟数据到达后再进行计算,但是也不能无限期地等待下去,必须有一种机制来确定何时触发窗口计算,这种机制就是水印(Watermark)。

水印是一种用于衡量事件时间进度的机制,其表示某个时刻(事件时间)以前的数据将不再产生,因此水印指的是一个时间点。水印作为数据流的一部分流动,并带有时间戳t。t表示该流中不应再有时间戳小于等于t的元素(即时间戳早于或等于水印的事件)。

如下图,显示了带有时间戳和嵌入式水印的事件流,事件是按顺序排列的,这意味着水印只是流中的周期性标记。

水印对于乱序流至关重要,如下图,其中事件不是按其时间戳排序的。通常,水印是数据流中一个点的声明,表示水印之前的所有事件都应该到达。一旦水印到达,算子则认为某个时间周期内的所有事件已经被收到,不会再有更多符合条件的事件了。

3、水印如何计算

计算水印需要提前指定一个允许最大延迟时间的参数。

水印 = 进入Flink的当前最大事件时间(比如上面例子中的9:05分到达的货物) ‒ 允许最大延迟时间(比如上面例子中的司机多等待的5分钟)

当水印 >= 窗口结束时间时,立即触发窗口计算,计算完毕后发射出计算结果并销毁窗口,否则窗口将一直等待。

所以,窗口触发计算的规则是:进入Flink的当前最大事件时间 >= 窗口结束时间+允许最大延迟时间。可见,设置水印后会改变窗口的触发计算规则。

例子:假设有一个[9:00~9:10)的窗口,设置的允许最大延迟时间为3分钟,当事件时间戳为9:11的事件到达时(说明有些数据可能已经延迟了,我在多等一会儿),由于该事件时间是进入Flink的当前最大事件时间,因此Watermark = 9:11‒3(分钟)= 9:08。此时水印在窗口内部不会触发窗口计算,窗口继续等待延迟数据。如下图:

接下来当事件时间戳为9:15的事件到达时,由于该事件时间是进入Flink的当前最大事件时间,因此Watermark = 9:15‒3(分钟)= 9:12。此时水印在窗口外部,满足窗口触发计算的规则:Watermark >= 窗口结束时间,因此窗口会立即触发计算,计算完毕后发射出计算结果并销毁窗口。

水印机制可以在一定程度上解决数据延迟到达问题,但不能完全解决。因为有些数据延迟太多了,这部分数据Flink默认丢弃掉。为了保证数据不丢失,Flink提供了允许延迟(AllowedLateness)和侧道输出机制(Side Output)。注意:这里的允许延迟,和水印的延迟时间不是一个概念,这里的允许延迟是水印之后的延迟。

4、允许延迟和侧道输出

允许延迟机制与水印不同,允许延迟并不会延迟触发窗口计算,而是触发窗口计算之后不会立马销毁窗口,会在一段时间内继续保留计算状态。

超过允许延迟时间的数据,Flink会将其放入侧道输出。侧道输出可以将数据收集起来,根据系统自身业务单独处理或存放于指定位置。

allowedLateness(lateness: Time):设置允许的延迟时间。

sideOutputLateData(outputTag: OutputTag[T]):将延迟到达的数据保存到outputTag对象中。

5、水印生成策略

我们可以针对每个事件生成水印,但是由于每个水印都会在下游做一些计算,因此过多的水印会降低程序性能。这就需要一种策略来规定Flink程序什么时候可以开始生成水印。

在Flink DataStream中使用assignTimestampsAndWatermarks方法用于生成水印。其作用是给数据流中的元素分配时间戳(Flink需要知道每个元素的事件时间),并生成水印以标记事件时间进度。

水印策略分为内置水印策略和自定义水印策略:

1、周期性水印策略

周期性地产生水印,默认周期时间是200毫秒。意思是,每隔200毫秒系统开始生成水印,其生成的规则为:水印 = 进入Flink的当前最大事件时间 ‒ 允许的最大延迟时间。

2、单调递增水印策略

水印是周期产生的,紧紧跟随数据中的最新时间戳。该策略实际上使用的就是周期性水印策略,只是将允许的最大延迟时间设置为0,即在周期性水印策略的基础上去掉了允许的最大延迟时间。WatermarkStrategy接口中已经内置了用于创建单调递增水印策略的静态方法forMonotonousTimestamps()。

3、无水印水印策略

该策略创建不生成任何水印的水印策略。该策略在纯基于处理时间的流处理的场景中可能很有用。WatermarkStrategy.noWatermarks()。

4、自定义水印策略

Flink内置的水印策略可以满足大部分应用场景,如果自定义水印策略需要实现WatermarkStrategy接口。

6、案例及代码

1、水印例子

比如,在控制台输入数据的事件时间和数据,通过自定义的水印策略,允许延迟2S的数据进入窗口计算。

代码如下:

// 比如输入:1000,a   2000,a  3000,b
DataStream<Tuple2<String, Integer>> windowCountStream = textStream
        // 水印策略,对于过来的事件时间上,可以延迟2秒
        .assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((event, timestamp) ->
                                Long.parseLong(event.split(",")[0])))
        .map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] splits = value.split(",");
                return Tuple2.of(splits[1], 1);
            }
        })
        .keyBy(value -> value.f0)
        // 滚动5分钟的窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .sum(1);

运行结果如下图:

具体代码地址:

https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo004/WatermarkSocketWindowWordCount.java

2、延迟数据和侧道输出

继续使用上面的例子,如果数据再水印之外,又延迟到达,再通过侧道输出出去。

代码如下:

private static final OutputTag<Tuple2<String, Integer>> lateEventsTag =
            new OutputTag<Tuple2<String, Integer>>("late-events") {
            };
// 比如输入:1000,a   2000,a  3000,b
SingleOutputStreamOperator<Tuple2<String, Integer>> windowCountStream = textStream
        // 水印策略,对于过来的事件时间上,可以延迟2秒
        .assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((event, timestamp) ->
                                Long.parseLong(event.split(",")[0])))
        .map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] splits = value.split(",");
                return Tuple2.of(splits[1], 1);
            }
        })
        .keyBy(value -> value.f0)
        // 滚动5分钟的窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.seconds(2))
        .sideOutputLateData(lateEventsTag)
        .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                out.collect(input.iterator().next());
            }
        });

运行结果如下图:

具体代码地址:

https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo004/LateEventsSocketWindowWordCount.java

总结:本文主要讲了Flink Watermark水印的概念和使用。

本篇完结!感谢你的阅读,欢迎点赞 关注 收藏 私信!!!

原文链接:http://www.mangod.top/articles/2023/08/08/1691469007650.htmlhttps://mp.weixin.qq.com/s/WJZVbhxbUJgc79tXB_nclw

标签:10,窗口,Watermark,Flink,水印,时间,事件,延迟
From: https://www.cnblogs.com/mangod/p/18013231

相关文章

  • 5分钟了解Flink状态管理
    什么叫做Flink的有状态计算呢?说白了就是将之前的中间结果暂时存储起来,等待后续的事件数据过来后,可以使用之前的中间结果继续计算。本文主要介绍Flink状态计算和管理、代码示例。1、有状态的计算什么是Flink的有状态的计算。在流式计算过程中将算子的中间结果保存在内存或者文......
  • 一次打通FlinkCDC同步Mysql数据
    业务痛点离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。业务中常见的需要数据同步的场景1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历......
  • 10分钟入门Flink--架构和原理
    相信你读完上一节的《10分钟入门Flink--了解Flink》对Flink已经有初步了解了。这是继第一节之后的Flink入门系列的第二篇,本篇主要内容是是:了解Flink运行模式、Flink调度原理、Flink分区、Flink安装。1、运行模式Flink有多种运行模式,可以运行在一台机器上,称为本地(单机)模式;也可以......
  • 10分钟入门Flink--了解Flink
    Flink入门系列文章主要是为了给想学习Flink的你建立一个大体上的框架,助力快速上手Flink。学习Flink最有效的方式是先入门了解框架和概念,然后边写代码边实践,然后再把官网看一遍。Flink入门分为四篇,第一篇是《了解Flink》,第二篇《架构和原理》,第三篇是《DataStream》,第四篇是《Tabl......
  • 程序员创业踩过的10个坑
    我在之前的文章《程序员如何行稳致远》和《程序员是否适合创业》中跟朋友们提过,程序员要早点积累自己的生产资料,尽早尝试轻创业。但是创业有很多坑,我总结了这些年自己踩过的10个坑,希望对你有帮助。1、产品是什么。创立公司之前一定要想清楚自己要打磨的产品是什么,产品和销售是公......
  • 10分钟入门Flink--安装
    本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、FlinkStandalone搭建、FlinkStandalongHA搭建。演示使用的Flink版本是1.15.4,官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/......
  • 10-验证-中文识别点选
    1.获取图片#@课程:爬虫逆向实战课#@讲师:武沛齐#@课件获取:wupeiqi666importreimporttimeimportddddocrimportrequestsfromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.chrome.serviceimport......
  • 10.使用RestSharps请求WebAPI
    1.请求类publicclassBaseRequest{///<summary>///请求法式///</summary>publicRestSharp.MethodMethod{get;set;}///<summary>///路由///</summary>publicstr......
  • 读千脑智能笔记10_人类智能存在的风险
    1. 人类智能存在的风险1.1. “末日时钟”1.1.1. 核战争引发的大火列为地球毁灭的主要原因1.1.2. 气候变化列为人类自我毁灭的第二大潜在原因1.2. 除非我们刻意加入自私的驱动力、动机或情感,否则智能机器并不会威胁到人类的生存1.2.1. 人类在不远的将来会创造出更多的......
  • 2024/2/10学习进度笔记
    RDD,学名可伸缩的分布式数据集(ResilientDistributedDataset)。是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘。而这正是之前MR抽象的一个重要痛点,每一个步骤都需要落盘,使得不必要的开销很高。对于分布式系统,容错支持是必不可少的。......