首页 > 其他分享 >Flink自定义Assigning Timestamps和Watermarks 使用Scal语言

Flink自定义Assigning Timestamps和Watermarks 使用Scal语言

时间:2024-01-16 19:56:52浏览次数:26  
标签:String 自定义 timeWindow Flink val env senv def Scal

Flink自定义Assigning Timestamps和Watermarks 使用Scal语言

为了让event time工作,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。下面介绍几种自定义事件时间戳方法
1.在数据流源中定义
可以看Flink静态Session Windows这边文章里面有
2.使用DataStream API中的assignAscendingTimestamps来指定时间戳。其中系统默认用此时间戳创建Watermark。注意::数据源任务中的时间戳是递增的,这是很必要的。

  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val input=env.fromCollection(List(("a",1L),("b",1L),("b",5L),("b",5L)))
    val timeWindow=input.assignAscendingTimestamps(t=>t._2)
    val result=timeWindow.keyBy(0).timeWindow(Time.milliseconds(4)).sum("_2")
    result.print()
    env.execute()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

结果:
result
3.实现BoundedOutOfOrdernessTimestampExtractor类

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(90000)
    val input=env.fromCollection(List(("b",1L),("b",2L),("b",3L),("b",4L),("b",5L),("b",6L),("b",7L),("b",8L),("b",9L)))
    val timeWindow=input.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.milliseconds(1)) {
      override def extractTimestamp(element: (String, Long)): Long = element._2
    })
    val result=timeWindow.keyBy(0).timeWindow(Time.milliseconds(4)).sum("_2")
    result.print()
    env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

结果:
result2
注意:此类有一个参数Time.milliseconds(1),代表最长的时延1ms。可以查看源码
result34实现AssignerWithPeriodicWatermarks接口


  def main(args: Array[String]) {
    val params = ParameterTool.fromArgs(args)
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      senv.getConfig.setAutoWatermarkInterval(900000)
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val text = senv.socketTextStream("localhost", 9999)
      .assignTimestampsAndWatermarks(new TimestampExtractor)
    val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.milliseconds(10))
      .sum(1)
    counts.print
    senv.execute("EventTime processing example")
  }
  class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
    private var currentMaxTimestamp = 0L
    private val maxOutOfOrderness = 3l
    override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
      val timestamp=e.split(",")(1).toLong
     // println( e.split(",")(1).toLong)
      currentMaxTimestamp = Math.max(prevElementTimestamp,timestamp)
      e.split(",")(1).toLong
}
override def getCurrentWatermark(): Watermark = {
  println(currentMaxTimestamp-maxOutOfOrderness)
  new Watermark(currentMaxTimestamp-maxOutOfOrderness)
}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

输入:
在这里插入图片描述
结果:
在这里插入图片描述
在这里插入图片描述
注意:1.窗口触发需要要满足两个条件:1.watermark>=window_end_time,2,此窗口内有数据。
2.同时也说明watermark对window的分段之间没有关系,比如输入(a,13),(a,12),(a,16)都在10ms~20ms窗口内
5.实现AssignerWithPunctuatedWatermarks接口

def main(args: Array[String]) {
// Checking input parameters
val params = ParameterTool.fromArgs(args)

// set up the execution environment
val senv = StreamExecutionEnvironment.getExecutionEnvironment
  senv.getConfig.setAutoWatermarkInterval(900000)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
  .assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
  .keyBy(0)
  .timeWindow(Time.milliseconds(10))
  .sum(1)
counts.print
senv.execute("EventTime processing example")

}
class TimestampExtractor extends AssignerWithPunctuatedWatermarks[String] with Serializable {

override def checkAndGetNextWatermark(lastElement: String, extractedTimestamp: Long): Watermark = {
  if(lastElement.split(",")(1).toLong%2==0)
    {
      println(extractedTimestamp)
      new Watermark(extractedTimestamp)
    }
  else null
}

override def extractTimestamp(element: String, previousElementTimestamp: Long): Long ={
  element.split(",")(1).toLong
}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

结果:
在这里插入图片描述总结:其中2~4是固定时延间隔指定timestamps和watermark,5是根据事件的特殊条件。
从中可以看出watermark的含义是在固定时延间隔乱序,整体是有序的。

原文链接:https://blog.csdn.net/weixin_42412645/article/details/93378738

标签:String,自定义,timeWindow,Flink,val,env,senv,def,Scal
From: https://www.cnblogs.com/sunny3158/p/17968418

相关文章

  • ES--自定义分词器
    默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器。 elasticsearch中分词器(analyzer)的组成包含三部分:characterfilters:在tokenizer之前对文本进行处理。例如删除字符、替换字符tokenizer:将文......
  • 自动化注册组件和自定义指令
    在我们封装全局组件和封装全局指令后,需要在main.ts中频繁进行全局注册,以下将对这块进行优化一、自动化注册组件我们在进行组件注册时,使用的是下面这种形式:app.component('组件名称',组件)这样我们可以使用循环组件的形式来对组件进行注册1.在components文件夹......
  • 无边框 自定义 wfp 钱包夹
    无边框自定义wfp钱包夹C#.net4.8wpfSqlServer2012消息队列Redis  银柱网-李银柱个人博客http://www.liyinzhu.com......
  • Django 使用swagger自定义自动生成类
    完整代码:https://gitee.com/mom925/django-system之前写的Django配置swagger(https://www.cnblogs.com/moon3496694/p/17657283.html)其实更多还是自己手动的写代码去书写接口文档,我希望它能更加的自动化生成出接口文档,所以我需要自己重写一些函数。安装所需的包,注册app,注册路由参考......
  • 《标签篇》Vue.directives自定义指令v-my
    参考链接:https://www.runoob.com/vue2/vue-custom-directive.html自定义指令除了默认设置的核心指令(v-model和v-show),Vue也允许注册自定义指令。下面我们注册一个全局指令v-focus,该指令的功能是在页面加载时,元素获得焦点:<divid="app"> <p>页面载入时,input元素自......
  • Flink异步IO
    本文讲解Flink用于访问外部数据存储的异步I/OAPI。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于Future和事件驱动编程的知识。对于异步I/O操作的需求在与外部系统交互(用数据库中的数据扩充流数据)的时候,需要考虑与外部系统的通信延迟对整个流处理应用的影响......
  • delphi firemonkey使用 TListView 自定义列表数据
    设计界面如下把ListView的Item的Appearance为DynamicAppearance,并且把Item改为高度100添加Item代码procedureTForm1.Button1Click(Sender:TObject);varimg:TListItemImage;text1,text2,text3:TListItemText;beginvaritem:=ListView1.Items.Add;text......
  • 自定义注解
    importjava.lang.annotation.Documented;importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Target(ElementType.FIELD)@Retention(RetentionPolic......
  • SpringBoot自定义注解实现操作日志记录
    1、增加依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId><version>${spring-version}</version>......
  • 自定义echarts绘制直方图,XY轴互调Demo
    1constcolorList=[2'#4f81bd',3'#c0504d',4'#9bbb59',5'#604a7b',6'#948a54',7'#e46c0b'8];9constdata=[10[10,16,3,'A'],11[16,18,15,&#......