首页 > 其他分享 >Flink-时间语义

Flink-时间语义

时间:2024-06-06 14:33:44浏览次数:25  
标签:触发 watermark Flink 语义 element 时间 Watermark 事件

1时间语义

flink种设计时间的不同概念:

  • 1 Event Time:事件时间,指代事件创建的时间,指代数据中的时间错带指代事件时间,Flink通过时间戳分配器访问事件时间
  • 2 Ingestion Time: 摄入时间:指代数据进入Flink的时间
  • 3 Processing Time:进程时间:数据执行算子的处理时间

1 EventTime 的引入:

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间属性

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2 Watermark

1 watermark概述:

1 在eventTime事件时间中,Flink接收事件的数据不是严格按照事件时间进行排序,会出现乱序,需要watermark进行处理乱序的一种机制

2 一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

2 watermark的理论知识

Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime- 延迟时长,

也就是说,Watermark是基于数据携带的时间戳生成的,

一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。

由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

图解案例:

乱序流的Watermarker如下图所示:(Watermark设置为2)

上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,

时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,

窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2

Watermark就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有数据都会收入窗中。

只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。

3 理论小结

watermark是用来处理按照事件时间出现乱序的一种机制

4 Watermark引入

dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000)) {
  @Override
public long extractTimestamp(element: SensorReading): Long = {
    return element.getTimestamp() * 1000L;//获取事件数据的时间戳作为事件时间
  }
} );

我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。

Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 在添加数据源时候设置时间戳以及watermark
DataStream<SensorReading> dataStream = env.addSource(new SensorSource())
        .assignTimestampsAndWatermarks(new MyAssigner());// 这里就可以自定义事件语义的时间戳

MyAssigner有两种类型(分配时间戳的接口)

  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks

以上两个接口都继承自TimestampAssigner。

watermark生产的时间间隔,怎么周期性生成watermark设置

可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

// 每隔5秒产生一个watermark env.getConfig.setAutoWatermarkInterval(5000);

以上代码解析:

产生watermark的逻辑:每隔5秒钟,

Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。

如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。

这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。

周期性获取时间戳的例子:自定义周期性时间戳分配器

// 自定义周期性时间戳分配器
public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{
    private Long bound = 60 * 1000L;    // 延迟一分钟
    private Long maxTs = Long.MIN_VALUE;    // 当前最大时间戳

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(maxTs - bound);
    }

    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        maxTs = Math.max(maxTs, element.getTimestamp());
        return element.getTime

不周期性:自定义时间戳:

如果数据是单调递增:AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成watermark。

代码如下:

DataStream<SensorReading> dataStream = …

dataStream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<SensorReading>() {
    @Override
    public long extractAscendingTimestamp(SensorReading element) {
        return element.getTimestamp() * 1000;
    }
});

乱序数据流,如果能大致估算出最大延迟时间,则使用 BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)),

这个可以根据事件的时间戳减去1S,作为时间戳

DataStream<SensorReading> dataStream = …

dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
    @Override
    public long extractTimestamp(SensorReading element) {
        return element.getTimestamp() * 1000L;
    }
});

标签:触发,watermark,Flink,语义,element,时间,Watermark,事件
From: https://blog.csdn.net/zengjun_csdn/article/details/139499000

相关文章

  • 多线程、队列、装饰器统计时间
    """一个列表中有100个url地址(每个请求0.5秒),设计一个程序,获取列表的url地址使用5个线程去发送这100个请求,计算出总共请求的时间"""importqueueimporttimefrommultiprocessing.poolimportThreadPooldefdownload(q:queue.Queue):whilenotq.empty():......
  • 【Web API DOM10】日期(时间)对象
    一:实例化1获取系统当前时间即创建日期对象constdate=newDate()console.log(date)2024年6月5日周三 2获取指定的时间以获取2025年6月29日为例constdate=newDate('2025-6-29')console.log(date)二:日期对象方法1使用场景:日期对象返回数据如上图,无法直接使......
  • JCR一区级 | Matlab实现TCN-GRU-MATT时间卷积门控循环单元多特征分类预测
    JCR一区级|Matlab实现TCN-GRU-MATT时间卷积门控循环单元多特征分类预测目录JCR一区级|Matlab实现TCN-GRU-MATT时间卷积门控循环单元多特征分类预测分类效果基本介绍程序设计参考资料分类效果基本介绍1.Matlab实现TCN-GRU-MATT时间卷积门控循环单元......
  • 存储引擎及特点、约束条件、严格模式、基本字段类型(整型、浮点型、字符串、日期时间
    【一】存储引擎在平常我们处理的文件格式有很多,并且针对不同的文件格式会有对应不同的存储方式和处理机制针对不同的数据应该有对应不同的处理机制存储引擎就是不同的处理机制。#查看所有引擎showengines;四种主要的存储引擎(1)Innodb引擎是MySQL5.5版本之后的默认存......
  • Prophet在R语言中进行时间序列数据预测
    原文链接:http://tecdat.cn/?p=7327原文出处:拓端数据部落公众号 您将学习如何使用Prophet(在R中)解决一个常见问题:预测公司明年的每日订单。  数据准备与探索Prophet最适合每日数据以及至少一年的历史数据。我们将使用SQL处理每天要预测的数据:  `select``date,......
  • 杭州出租车行驶轨迹数据空间时间可视化分析|附代码数据
    原文链接:http://tecdat.cn/?p=7324最近我们被客户要求撰写关于出租车的研究报告,包括一些图形和统计输出城市化带来的道路拥堵、出行耗时长等交通问题给交管部门带来了巨大的挑战▼通过安装在出租车上的GPS设备,可以采集到大量的轨迹数据,从而帮助我们分析人们出行信息,达到优化交......
  • POSTGRESQL中时间戳的奥秘timestamptz
    哈喽,大家好,我是木头左!一、前言在日常的数据库操作中,经常会遇到各种时间相关的数据类型,如DATE、TIME、TIMESTAMP等。这些时间类型的处理方式和精度差异,直接影响到对数据的查询和分析结果。今天,就来深入探讨一下POSTGRESQL中的两个重要时间戳类型:timestamp和timestamptz,看看它们......
  • Redis-3-过期时间淘汰策略与内存淘汰策略
    目录1.Redis过期时间淘汰策略1.1惰性删除1.2定期删除1.3主动扫描2.Redis内存淘汰策略2.1最大内存配置2.2LRU最近最少使用2.2.1传统LRU2.2.2Redis中的LRU2.2.3LRU的缺点2.3访问频率最低2.3.1传统LFU2.3.2Redis的LFU2.3.2.1时间衰减函数2.3.2.2热度值函数2.3.2.3总结2......
  • 算法的时间复杂度和空间复杂度
    目录1.算法效率1.1如何衡量一个算法的好坏1.2算法的复杂度1.3复杂度在校招中的考察2.时间复杂度2.1时间复杂度的概念2.2大O的渐进表示法2.3常见时间复杂度计算举例实例1:实例2:实例3:实例4:实例5:实例6:实例7:实例8:3.空间复杂度实例1:实例2:实例3:4.常见复杂度对比1.......
  • kafka-生产者事务-数据传递语义&事务介绍&事务消息发送
    文章目录1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽kafkadebug日志logback.xml3.7、引入spring-kafka依赖3.8、控制......