首页 > 其他分享 >聊聊Flink必知必会(四)

聊聊Flink必知必会(四)

时间:2023-06-16 21:00:22浏览次数:35  
标签:WatermarkStrategy return 必知 Flink 水印 时间 事件 聊聊

概述

Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。

Flink明确支持以下3个不同的时间概念。
(1)事件时间:事件发生的时间,由产生(或存储)事件的设备记录。

(2)接入时间:Flink在接入事件时记录的时间戳。

(3)处理时间:管道中特定操作符处理事件的时间。

image

支持事件时间的流处理器需要一种方法来度量事件时间的进度。在Flink中测量事件时间进展的机制是水印(watermark)。水印是一种特殊类型的事件,是告诉系统事件时间进度的一种方式。水印流是数据流的一部分,并带有时间戳t。水印(t)声明事件时间已经到达该流中的时间t,这意味着时间戳t′≤t(时间戳更早或等于水印的事件)的流中不应该有更多的元素。

Flink中水印的处理

水印的时间戳

时间t的水印标记了数据流中的一个位置,并断言此时的流在时间t之前已经完成。为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要包含水印的流。水印就是系统事件时间的时钟。水印触发是基于事件时间的计时器的触发。

事件流的类型有两种,一个是顺序的,一个是无序的。先看顺序场景下,水印的排列。

image

对于无序流,水印是至关重要的,其中事件不是按照它们的时间戳排序的。

例如,当操作符接收到w(11)这条水印时,可以认为时间戳小于或等于11的数据已经到达,此时可以触发计算。同样,当接收到w(17)这条水印时,可以认为时间戳小于或等于17的数据已经到达,此时可以触发计算。

image

可以看出,水印的时间戳是单调递增的,时间戳为t的水印意味着所有后续记录的时间戳将大于t。一般来讲,水印是一种声明,在流中的那个点之前,即在某个时间戳之前的所有事件都应该已经到达。

水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常可以独立地生成水印。这些水印定义了特定并行源处的事件时间。

水印的生成

Flink提供了用于处理事件时间、时间戳和水印的API。

为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过TimestampAssigner从元素中的某个字段访问/提取时间戳实现的。

Flink提供了两种方式创建水印。

1.使用WatermarkStrategy上的静态辅助方法实现公共水印策略:

image

2.实现WatermarkStrategy接口,自定义TimestampAssigner与WatermarkGenerator捆绑在一起:


@Public
public interface WatermarkStrategy<T>
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

    @Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return new RecordTimestampAssigner<>();
    }

    @Experimental
    default WatermarkAlignmentParams getAlignmentParameters() {
        return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
    }

    default WatermarkStrategy<T> withTimestampAssigner(
            TimestampAssignerSupplier<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
    }

    default WatermarkStrategy<T> withTimestampAssigner(
            SerializableTimestampAssigner<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(
                this, TimestampAssignerSupplier.of(timestampAssigner));
    }

    default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
    }

    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift) {
        return withWatermarkAlignment(
                watermarkGroup,
                maxAllowedWatermarkDrift,
                WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
    }

    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
        return new WatermarksWithWatermarkAlignment<T>(
                this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
    }

    static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

    static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
        return generatorSupplier::createWatermarkGenerator;
    }

    static <T> WatermarkStrategy<T> noWatermarks() {
        return (ctx) -> new NoWatermarksGenerator<>();
    }
}

这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是createWatermarkGenerator方法。

所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator。

@Public
public interface WatermarkGenerator<T> {

	/**
	 * Called for every event, allows the watermark generator to examine and remember the
	 * event timestamps, or to emit a watermark based on the event itself.
	 */
	void onEvent(T event, long eventTimestamp, WatermarkOutput output);

	/**
	 * Called periodically, and might emit a new watermark, or not.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 */
	void onPeriodicEmit(WatermarkOutput output);
}

这个方法简单明了,主要是有两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.

  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L)

标签:WatermarkStrategy,return,必知,Flink,水印,时间,事件,聊聊
From: https://www.cnblogs.com/zhiyong-ITNote/p/17486503.html

相关文章

  • 聊聊Flink的必知必会(三)
    概述在进行流处理时,很多时候想要对流的有界子集进行聚合分析。例如有如下的需求场景:(1)每分钟的页面浏览(PV)次数。(2)每用户每周的会话次数。(3)每分钟每传感器的最高温度。(4)当电商发布一个秒杀活动时,想要每隔10min了解流量数据。对于这些需求的处理,程序需要处理元素组,而......
  • Flink提交任务命令整理
     环境:Flink1.13.6和Flink1.14.4yarn-session模式:--启动yarnseeionbin/yarn-session.sh\-s8\-jm4g\-tm16g\-nmyarn-session-flink\-dyarn-session.sh-jm1g-tm8g-s4-d参数解释:-jm1024表示jobmanager1024M内存-tm1024表示taskmanager......
  • HTTP请求:requests模块基础使用必知必会
    1背景http请求是常见的一种网页协议,我们看到的各种网页,其实都是发送了http请求得到了服务器的响应,从而将数据库中复杂的数据以简单、直观的方式呈现出来,方便大众阅读、使用。而如何发送http请求呢?今天来探讨一下使用requests模块,达到高效、简单的http请求操作。2什么是request......
  • Flink1.13.6 部署踩坑记录
    环境  Hadoop集群是Ambari2.7.5的版本   Flink是1.13.6_2.12的版本问题记录  1.缺少jar包报错:ERRORorg.apache.flink.yarn.cli.FlinkYarnSessionCli[]-ErrorwhilerunningtheFlinksession.java.lang.NoClassDefFoundError:com/sun/jerse......
  • JAVA面试题解惑系列(八)——聊聊基本类型(内置类型)
    关键字:java面试题基本类型intlongbooleanfloatdoublechar作者:臧圩人(zangweiren)基本类型,或者叫做内置类型,是JAVA中不同于类的特殊类型。它们是我们编程中使用最频繁的类型,因此面试题中也总少不了它们的身影,在这篇文章中我们将从面试中常考的几个方面来回顾一......
  • Flink重启策略
    Flink默认重启策略是通过Flink的配置文件设置的flink-conf.yaml,配置参数restart-strategy定义采用的策略。注意:如果启用了checkpoint并且没有显式配置重启策略,会默认使用fixeddelay策略,最大重试次数为Integer.MAX_VALUE。1.固定延迟重启策略固定延迟重启策略是尝试给定次数重......
  • jenkins 自动化部署 flink job
    JenkinsfiledefdeployIp='192.168.1.53'defremote=[:]remote.name=deployIpremote.host=deployIpremote.user='root'remote.password=LCX_PWD_5sremote.allowAnyHosts=truedefgitUrl='http://192.168.1.43:8600/bigda......
  • Flink 的 checkpoint 机制对比 spark 有什么不同和优势?
    sparkstreaming的checkpoint仅仅是针对driver的故障恢复做了数据和元数据的checkpoint。而flink的checkpoint机制要复杂很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。......
  • 聊聊什么是分布式事务
    概述分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上,以上是百度百科的解释。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要......
  • Redis系列16:聊聊布隆过滤器(原理篇)
    Redis系列1:深刻理解高性能Redis的本质Redis系列2:数据持久化提高可用性Redis系列3:高可用之主从架构Redis系列4:高可用之Sentinel(哨兵模式)Redis系列5:深入分析Cluster集群模式追求性能极致:Redis6.0的多线程模型追求性能极致:客户端缓存带来的革命Redis系列8:Bitmap实现亿万级......