首页 > 编程语言 >[Flink] Flink源码分析 : BoundedOutOfOrdernessTimestampExtractor

[Flink] Flink源码分析 : BoundedOutOfOrdernessTimestampExtractor

时间:2024-02-02 18:24:37浏览次数:28  
标签:Flink api BoundedOutOfOrdernessTimestampExtractor flink maxOutOfOrderness 源码 imp

0 序言

0.1 缘起

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * window 样例作业
 * @author johnnyzen
 * @reference-doc
 * [1] Flink实战之窗口WindowsAPI使用示例 - Weixin - https://mp.weixin.qq.com/s/Pwv_pufdzi-WfPvXMID6aw
 */
public class SampleWindowJob {

    private final static Logger logger = LoggerFactory.getLogger(SampleWindowJob.class);

    public static void main(String[] args) throws Exception {
        //1. 作业配置
        Configuration jobConfiguration = new Configuration();
        // 设置WebUI绑定的本地端口
        //jobConfiguration.setInteger(RestOptions.PORT, 8081);//"rest.port" / RestOptions.PORT / RestOptions.BIND_PORT
        jobConfiguration.setString(RestOptions.BIND_PORT, "18081");

        //2. 创建流处理的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(jobConfiguration); // StreamExecutionEnvironment.getExecutionEnvironment();
        //声明使用 eventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //3. 使用 StreamExecutionEnvironment 创建 DataStream
        //Source(可以有多个Source)
        //Socket 监听本地端口 8888
        // 接收一个socket文本流
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //4. 数据流的数据处理
        //4.1 输入的活动数据转换
        DataStream<Tuple3<String, Long, Integer>> windowCountDataStream =
                lines.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String line) throws Exception {
                        String[] words = line.split(" ");
                        return new Tuple3<String, Long, Integer>(words[0], Long.valueOf(words[1]), 1);
                    }
                });

        //4.2 描述 flink 如何获取数据中的 event 时间戳进行判断
        /**
         * maxOutOfOrderness
         * 1. 用于指定element允许滞后(t-t_w,t为 element 的 eventTime , t_w 为前一次 watermark 的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
         */
        Time maxOutOfOrderness = Time.milliseconds(1000);//描述延迟的 watermark 1秒

        DataStream<Tuple3<String, Long, Integer>> textWithEventTimeDataStream =
            windowCountDataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Integer>>(maxOutOfOrderness) {
                    /**
                     * 本方法会调用子类的 extractTimestamp 方法抽取时间
                     * 1. 如果该时间 大于 currentMaxTimestamp,则: 更新 currentMaxTimestamp;
                     * 2. getCurrentWatermark 先计算 potentialWM :
                     *      如果 potentialWM >= lastEmittedWatermark , 则: 更新 lastEmittedWatermark
                     *      currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness :
                     *          这里表示 lastEmittedWatermark 太小了,所以差值超过了 maxOutOfOrderness ,因而调大 lastEmittedWatermark )
                     *   最后,返回 Watermark(lastEmittedWatermark)
                     * @param stringLongIntegerTuple3
                     * @return
                     */
                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Integer> stringLongIntegerTuple3) {
                        return stringLongIntegerTuple3.f1;
                    }
                }
            ).setParallelism(1);

        //4.3 按 key 分组,keyBy 之后是分到各个分区,再开 window 去处理
        KeyedStream<Tuple3<String, Long, Integer>, Tuple> textKeyStream = textWithEventTimeDataStream.keyBy(0);
        textKeyStream.print("textKey: ");

        //4.4 设置5秒的(会话窗口)活动时间间隔
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> sessionWindowStream = textKeyStream.window( EventTimeSessionWindows.withGap(Time.milliseconds(5000L)) ).sum(2);

        //4.5 调用 Sink (Sink必须调用)
        sessionWindowStream.print("sessionWindow: ").setParallelism(1);

        //5. 启动 (此处若有异常不建议try...catch... 捕获。因为:它会抛给上层flink,flink根据异常来做相应的重启策略等处理)
        env.execute("StreamWordCount");
    }
}
  • flink.version = 1.13.1
  • scala.version = 2.12 / 2.11

1 源码分析

1.1 继承关系 : BoundedOutOfOrdernessTimestampExtractor extends AssignerWithPeriodicWatermarks extends TimestampAssigner

public abstract class org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>

public interface TimestampAssigner<T> extends org.apache.flink.api.common.eventtime.TimestampAssigner<T>, Function

1.2 源码分析

  • org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor : flink-streaming-java_2.11-1.13.1.jar
package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. 定义当前时间数据的最大时间戳 */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. 上一次提交的水印时间戳 */
    private long lastEmittedWatermark = Long.MIN_VALUE;

	/**
	 * The (fixed) interval between the maximum seen timestamp seen in the records
	 * and that of the watermark to be emitted.
	 */
    private final long maxOutOfOrderness; //最大乱序度,最大容忍的延时时长

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0L) {
            throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        } else {
            this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
            this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
        }
    }

    public long getMaxOutOfOrdernessInMillis() {
        return this.maxOutOfOrderness;
    }

	/**
	 * Extracts the timestamp from the given element.
	 *
	 * @param element The element that the timestamp is extracted from.
	 * @return The new timestamp.
	 */
    public abstract long extractTimestamp(T var1);

    public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards. (这保证了水印不会倒退)
		// 这个句代码保证了生成的水印是【单调递增】的
        long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
        if (potentialWM >= this.lastEmittedWatermark) {// 当前最大的时间戳减去延时时间 和 上次最后提交的水印时间比较
            this.lastEmittedWatermark = potentialWM;// 保留最大的时间(减去延时时间)作为水印
        }

        return new Watermark(this.lastEmittedWatermark);
    }

    public final long extractTimestamp(T element, long previousElementTimestamp) {
        // 提取业务事件数据中的时间作为 timestamp
        long timestamp = this.extractTimestamp(element);//extractTimestamp 由 业务用户 编程实现
        if (timestamp > this.currentMaxTimestamp) {
            this.currentMaxTimestamp = timestamp;
        }

        return timestamp;
    }
}

  • BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestampgetCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现

  • BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后

  • t-t_w
  • t为element的eventTime
  • t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
  • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间(t1)
  • 如果该时间(t1) 大于 currentMaxTimestamp,则: 更新 currentMaxTimestamp
  • getCurrentWatermark 方法:
  • 先计算potentialWM,如果 potentialWM >= lastEmittedWatermark,则: 更新 lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness)

这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)

X 参考文献

标签:Flink,api,BoundedOutOfOrdernessTimestampExtractor,flink,maxOutOfOrderness,源码,imp
From: https://www.cnblogs.com/johnnyzen/p/18003639

相关文章

  • Pytest 源码解读 [7] - PyTest on pluggy
    之前花了很多篇幅来介绍 pluggy 这个插件框架。核心原因就是因为其实 pytest 是一个完全基于 pluggy 开发的测试框架,这个也可以解释为什么说 pytest 是一个很灵活的测试框架,支持很多插件(https://docs.pytest.org/en/7.0.x/reference/plugin_list.html)。 其实原因就......
  • 在ubuntu16.04下,源码编译安装特定版本的MongoDB PHP扩展
    背景:我的php项目在连接其他mongo库时报:Serveratxxx:27017reportswireversion5,butthisversionoflibmongocrequiresatleast6(MongoDB3.6)原因:本地MongoDBPHP扩展的版本过高解决方法:降低本地PHP扩展MongoDB版本,现在要降到mongodb-1.12.0版本步骤:1:下载......
  • [Flink] Flink Job之Web UI
    0序言在本地电脑开发、调试,或集群环境下运行FlinkJob时,需要利用WebUI观测作业内部的运行情况。WEBUI,对我们观测Flink作业的总体运行情况(系统负载)、快速定位和解决问题,至关重要。全文基于如下版本演示:scala.version=2.11/2.12;flink.version=1.13.11FlinkJob......
  • Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
    1概况本文展示如何使用FlinkCDC+Iceberg+Doris构建实时湖仓一体的联邦查询分析,Doris1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。2系统架构我们整理架构图如下,   1.首先我们从Mysq......
  • Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
    1概况本文展示如何使用FlinkCDC+Iceberg+Doris构建实时湖仓一体的联邦查询分析,Doris1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。2系统架构我们整理架构图如下,   1.首先我们从Mysq......
  • 【设计模式】原型模式——JDK源码中的原型模式
    原型模式在JDK源码中有广泛的应用,想找原型模式的实现也比较容易,只要找Cloneable接口的子类即可。因为JDK源码中Cloneable的子类太多,本文只讨论最典型的几种。ArrayList首先看ArrayList和原型模式有关的代码:publicclassArrayList<E>extendsAbstractList<E>implementsL......
  • 高德司机端趣接单抢单辅助器源码分享下载 -24软件网
    在网约车行业中,司机端抢单是一项关键的操作,直接关系到司机的订单量和收入。有一些开发者或者个体经营者可能尝试通过编写抢单源码辅助器来提高抢单的效率。然而,这样的做法可能会违反平台规定,涉及到技术伦理和法律风险。本文将介绍司机端抢单源码辅助器的技术实现方式,以及可能面临......
  • 搭建高效企业培训平台:教育系统源码开发详解
    为了更好地满足企业培训的需求,许多组织纷纷转向数字化教育,搭建高效的企业培训平台成为当务之急。本篇文章,小编将为您讲解教育系统源码的开发细节,为搭建一个功能强大、灵活高效的企业培训平台提供详尽的指南。 一、教育系统的基础架构1.1数据库设计众所周知,数据库设计是整个平台的......
  • Striped64源码阅读
    目录简介模型代码分析成员变量方法补充ThreadLocalRandomContended注解-解决伪共享问题参考链接本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习本文基于cor......
  • 基于Java+Neo4j开发的知识图谱+全文检索的知识库管理系统(源码分析)
    在数字化高度普及的时代,企事业机关单位在日常工作中会产生大量的文档,例如医院制度汇编,企业知识共享库等。针对这些文档性的东西,手工纸质化去管理是非常消耗工作量的,并且纸质化查阅难,易损耗,所以电子化管理显得尤为重要。【springboot+elasticsearch+neo4j+vue+activiti】实现数字......