首页 > 其他分享 >Flink Watermark详解

Flink Watermark详解

时间:2024-06-07 18:59:06浏览次数:16  
标签:WatermarkStrategy Watermark Flink event 详解 Event 乱序

Watermark 是用于处理流数据中事件时间(event time)乱序情况的重要机制。在流处理中,数据往往不是按照它们实际发生的时间顺序到达的,这可能是由于网络延迟、系统处理延迟或其他因素导致的。为了能够在这种乱序环境中正确地执行基于时间的操作(如时间窗口聚合),Flink 引入了 Watermark 的概念。
Watermark 是一个特殊的标记,它表示“在此时间戳之前的数据应该都已经到达了”。当 Flink 的算子(operator)处理到 Watermark 时,它会认为该 Watermark 时间戳之前的所有数据都已经到达了,并可以安全地关闭或处理任何基于该时间戳的窗口。

概念

  • **定义:**Watermark是一个特殊的时间戳,代表了某个时间点之前的数据理论上应该都已经到达了系统,即“最多允许的延迟”。
  • **作用:**用于处理乱序事件,确保在某个时间窗口内完成所有相关的事件处理。

原理

  • **乱序问题:**在流处理中,由于网络延迟等因素,事件可能会乱序到达。Watermark机制就是用来解决这种乱序问题。
  • **工作原理:**当数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的Watermark,插入到消息流中。Flink operator算子按照时间窗口缓存所有流入的消息,当操作符处理到Watermark时,它会对所有小于该Watermark时间戳的时间窗口的数据进行处理并发送到下一个操作符节点,然后也将Watermark发送到下一个操作符节点。

用途

  • **确保窗口计算的正确性:**Watermark结合窗口机制,可以确保在特定的时间后触发窗口去计算,从而避免因为乱序事件导致的窗口计算错误。
  • **处理延迟数据:**Watermark提供了一个“最多允许的延迟”机制,对于延迟到达的数据,Flink可以根据Watermark来决定是否将其纳入当前窗口的计算。

样例

package com.wfg.flink.example.watermark;

import com.wfg.flink.example.watermark.data.Event;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.time.Instant;

public class FlinkWatermarkDemo {

    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 假设我们有一个数据源,这里使用 fromElements 模拟
        DataStream<Event> eventStream = env.fromElements(
                new Event(1L, Instant.now().minusSeconds(10).toEpochMilli()),
                new Event(2L, Instant.now().minusSeconds(8).toEpochMilli()),
                new Event(3L, Instant.now().minusSeconds(12).toEpochMilli()),
                new Event(4L, Instant.now().minusSeconds(15).toEpochMilli()),
                new Event(5L, Instant.now().minusSeconds(19).toEpochMilli()),
                new Event(6L, Instant.now().minusSeconds(18).toEpochMilli()),
                new Event(7L, Instant.now().minusSeconds(22).toEpochMilli())
        );

        // 定义 Watermark 策略,允许 5 秒的乱序
        WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> {
                    // 从事件中提取时间戳
                    return event.getTimestamp();
//                    timestamp.assignTimestamp(event.getTimestamp());
                });

        // 应用 Watermark 策略,并处理数据流
        DataStream<String> resultStream = eventStream
                .assignTimestampsAndWatermarks(watermarkStrategy)
                // 根据事件 ID 进行分区(这只是一个示例,实际可能根据业务需求分区)
                .keyBy(Event::getId)
                // 接下来可以进行窗口操作、时间聚合等操作
                .map(new MapFunction<Event, String>() {
                    @Override
                    public String map(Event event) throws Exception {
                        return "Event ID: " + event.getId() + ", Timestamp: " + Instant.ofEpochMilli(event.getTimestamp());
                    }
                });

        // 输出结果
        resultStream.print();

        // 执行任务
        env.execute("Flink Watermark Demo");
    }
}

若运行出错,可配置启动环境:–add-opens java.base/java.util=ALL-UNNAMED
数据类

package com.wfg.flink.example.watermark.data;

import lombok.Data;

/**
 * @author wfg
 */
@Data
public class Event {
    private final long id;
    private final long timestamp;

    public Event(long id, long timestamp) {
        this.id = id;
        this.timestamp = timestamp;
    }
}

WatermarkStrategy

atermarkStrategy是用于处理基于事件时间(event time)的流计算系统中可能出现的数据乱序情况的机制。
Watermark是数据流中的一种特殊数据,由Flink内部周期(可自定义)产生。它的主要作用是指示某个时间点之前的数据已经到达Flink系统,从而允许Flink开始处理这些数据。Watermark的生成策略可以实现数据乱序的兼容。

使用

atermarkStrategy在Flink中有两种主要的使用方式:

  1. 直接在数据源上使用: 这种方式下,WatermarkStrategy会在数据源处被指定,并应用于从数据源读取的数据流。这种方式可以更精准地跟踪Watermark,因为数据源可以利用watermark生成逻辑中有关分片/分区的信息。
  2. 直接在非数据源的操作之后使用: 如果无法直接在数据源上设置WatermarkStrategy,可以在数据流的其他位置(如经过某个操作后)设置。但这种方式通常不如第一种方式精准。

配置

WatermarkStrategy的配置主要涉及到Watermark的生成策略和自动发送周期。例如,可以使用WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))来配置一个允许数据乱序程度不超过20秒的WatermarkStrategy。此外,还可以通过修改Flink的配置文件(如flink-conf.yaml)或调用相关API方法来设置Watermark的自动发送周期。

应用

基于Flink 1.16+版本的Java API,可以使用WatermarkStrategy类配合TimestampAssigner和TimestampExtractor接口来实现Watermark的生成器。具体实现方式可以参考相关文档和示例代码。

详情

WatermarkStrategy 是一个接口,它定义了如何为流中的事件生成 Watermarks。由于 Flink 是一个开源项目,我们可以直接查看其源代码来了解 WatermarkStrategy 的具体实现。
WatermarkStrategy 接口定义在 Flink 的 org.apache.flink.streaming.api.functions.timestamps 包中。这个接口定义了两个方法:

  1. TimestampAssigner createTimestampAssigner(SerializedValue<TypeInformation> typeInfo): 用于创建一个 TimestampAssigner,该 TimestampAssigner 负责为流中的每个元素分配时间戳。
  2. WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context): 用于创建一个 WatermarkGenerator,该 WatermarkGenerator 负责基于流中的元素生成 Watermarks。
// 定义 Watermark 策略,允许 5 秒的乱序
        WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> {
                    // 从事件中提取时间戳
                    return event.getTimestamp();
//                    timestamp.assignTimestamp(event.getTimestamp());
                });

通常,不会直接实现 WatermarkStrategy 接口,而是使用 Flink 提供的静态工厂方法来创建策略。例如,WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness) 方法就是一个常用的策略,它允许一定程度的乱序。

assignTimestampsAndWatermarks

assignTimestampsAndWatermarks 方法是用于为数据流中的事件分配时间戳和 Watermarks 的。这个方法通常与 WatermarkStrategy 一起使用,以定义如何为流中的每个元素分配时间戳以及何时生成 Watermarks。

// ...  
  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 假设有一个名为 eventStream 的 DataStream,其中包含具有时间戳的事件  
DataStream<MyEvent> eventStream = ...; // 获取或创建事件流  
  
// 创建一个 WatermarkStrategy,这里使用了一个允许一定乱序的 BoundedOutOfOrdernessTimestampExtractor  
WatermarkStrategy<MyEvent> watermarkStrategy = WatermarkStrategy  
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))  
    .withTimestampAssigner(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>() {  
        @Override  
        public long extractTimestamp(MyEvent element) {  
            return element.getTimestamp(); // 假设 MyEvent 有一个 getTimestamp() 方法返回事件的时间戳  
        }  
  
        @Override  
        public long getMaxAllowedLatency(MyEvent element) {  
            return Duration.ofSeconds(10).toMillis(); // 最大允许乱序时间为10秒  
        }  
    });  
  
// 为事件流分配时间戳和 Watermarks  
DataStream<MyEvent> timestampedStream = eventStream.assignTimestampsAndWatermarks(watermarkStrategy);  
  
// 现在可以基于事件时间进行窗口操作或其他时间感知的操作了  
timestampedStream  
    .keyBy(event -> event.getKey()) // 假设 MyEvent 有一个 getKey() 方法  
    .timeWindow(Time.seconds(30)) // 使用基于事件时间的30秒窗口  
    .apply(new WindowFunction<MyEvent, String, String, TimeWindow>() {  
        // ... 实现 WindowFunction  
    })  
    .print(); // 打印结果或其他后续操作  
  
// ...

WatermarkStrategy 使用了 BoundedOutOfOrdernessTimestampExtractor,它允许一定程度的数据乱序(在这个例子中是10秒)。extractTimestamp 方法用于为事件分配时间戳,而 getMaxAllowedLatency 方法定义了乱序时间的上限。然后,我们使用 assignTimestampsAndWatermarks 方法将这个策略应用到事件流上,从而得到一个带有时间戳和 Watermarks 的新流,可以在其上执行基于事件时间的操作。

标签:WatermarkStrategy,Watermark,Flink,event,详解,Event,乱序
From: https://blog.csdn.net/mqiqe/article/details/139421607

相关文章

  • Asp .Net Core 系列:详解鉴权(身份验证)以及实现 Cookie、JWT、自定义三种鉴权 (含源码解
    什么是鉴权(身份验证)?https://learn.microsoft.com/zh-cn/aspnet/core/security/authentication/?view=aspnetcore-8.0定义鉴权,又称身份验证,是确定用户身份的过程。它验证用户提供的凭据(如用户名和密码)是否有效,并据此确认用户是否具备访问系统的权利。过程用户向系统提供......
  • 机器学习策略篇:详解进行误差分析(Carrying out error analysis)
    从一个例子开始讲吧。假设正在调试猫分类器,然后取得了90%准确率,相当于10%错误,,开发集上做到这样,这离希望的目标还有很远。也许的队员看了一下算法分类出错的例子,注意到算法将一些狗分类为猫,看看这两只狗,它们看起来是有点像猫,至少乍一看是。所以也许的队友给一个建议,如何针对狗的......
  • linux系统-umask详解
    转自:https://blog.csdn.net/kld230/article/details/134508978  umask(userfile-creatiopnmodemask)是linux中的一个命令,用于为用户文件创建权限掩码,语法“umask[-S][权限掩码]”;其中,“权限掩码”是由3个八进制的数字所组成,将现有的存取权限减掉权限掩码后,即可产生建立文......
  • 打造高效视频融合平台:基于GB28181和Ehome等多协议接入的EasyCVR方案详解
    EasyCVR视频融合/汇聚云平台基于“云-边-端”一体化架构,部署轻量简单、功能灵活多样,平台可支持多协议(GAT1400/GB28181/RTSP/Onvif/海康SDK/Ehome/大华SDK/RTMP推流等)、多类型设备接入(IPC/NVR/监控平台),在视频能力上,可实现视频直播、录像、回放、检索、云存储、告警上报、语音对讲......
  • Http协议详解之三次握手
    HTTP的三次握手在计算机网络中,HTTP(HyperTextTransferProtocol,超文本传输协议)是用于在客户端和服务器之间传输超文本的协议。尽管HTTP本身是一个无状态的应用层协议,但它通常依赖于TCP(TransmissionControlProtocol,传输控制协议)来确保数据的可靠传输。TCP是一种面向连接的......
  • 【Linux驱动设备开发详解】11.内存与I/O访问
    1.内存管理单元高性能处理器一般会提供一个内存管理单元(MMU),用于辅助操作系统尽心修改内存管理,提供虚拟地址和物理地址的映射、内存访问权限保护和Cache缓存控制等硬件支持。1.1MMU基本概念1.1.1概念含义1.TLB(TranslationLookasideBuffer):旁路转换缓存,TLB是MMU的核心......
  • AI助手:Agent工作流程与应用场景详解
    引言智能体(Agent)是一种在特定环境中自主行动、感知环境、做出决策并与其他智能体或人类进行交互的计算机程序或实体。它们具备自主性、反应性、社交性和适应性等特点,能够根据环境的变化调整自己的行为,以达到预设的目标。本文将详细拆解智能体从提示词接收、LLM大模型理解识别、知......
  • 【百万字详解Redis】集群部署
    文章目录Redis集群部署......
  • Shell脚本语言用法详解(超详细~)
    Shell目录Shell一、Shell是什么?二、Shell怎么使用?1.变量变量的命名和赋值变量类型变量的作用域只读变量删除变量环境变量系统预定义变量变量的使用特殊变量和位置参数2.运算符3.条件判断4.流程控制if判断case语句for循环while循环5.读取控制台输入6.函数系统函数自定......
  • 一口气搞懂Flink Metrics监控指标和性能优化,全靠这33张图和7千字
    https://www.51cto.com/article/684249.html flink中值得监控的几个指标背景为了维持flink的正常运行,对flink的日常监控就变得很重要,本文我们就来看一下flink中要监控的几个重要的指标重要的监控指标1.算子的处理速度的指标:numRecordsInPerSecond/numRecordsOutPerSecond,......