首页 > 其他分享 >flink定时器使用问题

flink定时器使用问题

时间:2024-01-31 19:46:38浏览次数:27  
标签:定时器 org flink time 使用 apache import public

flink定时器使用问题

        flink定时器的使用,需要涉及flink time、water mark、keyStream、keyState等概述,尽管关于flink time和water mark的文章烂大街,但还是有必要先简单介绍一下,有助于解释下面flink定时器使用遇到的问题。

时间模型

        flink在streaming程序中支持三种不同的时间模型

  1. event time:事件发生时间。根据事件时间处理,可能需要等待一定时间的延迟事件和处理无序事件,事件时间常常跟处理时间操作一起使用。基于事件时间处理的优势在于,无论是在处理实时的数据还是重新处理历史的数据,基于事件时间创建的流计算应用都能保证结果是一样的。
  2. ingestion time:进入flink的时间(source operator分配的时间)。不能处理任何无序事件或者延迟事件,优点是程序无需指定如何产生水印。
  3. processing time:flink执行window操作的时间。处理时间最简单,有最好的性能和最低的延迟,缺点是无法处理事件乱序问题。

        底层实现其实就两种:event time和processing time,ingestion time也算是processing time的一种,同一个事件的时间先后顺序:event time、ingestion time、processing time。

  • event time
    • event time
  • processing time
    • ingestion time
    • processing time

water mark

        使用event time会有乱序问题,解决时间乱序问题需要依赖于water mark,water mark的生成分两种

  1. 周期性水印:分配时间戳并定期生成水印(这可能依赖于流元素,或者纯粹基于处理时间。
    1. AssignerWithPeriodicWatermarks
    2. AscendingTimestampExtractor:递增时间戳的分配器
    3. BoundedOutOfOrdernessTimestampExtractor:允许固定时间延迟的时间戳分配器
  2. 带断点水印:当某一事件到达需要创建新的water mark时,使用AssignerWithPunctuatedWatermarks。

定时器使用

        flink定时器最常见的使用是配合KeyedProcessFunction使用,在其processElement()方法中注册定时器,onTimer()方法作为Timer触发时的回调逻辑。

        如果是周期性处理,在onTimer()方法内再注册定时器,这样只要有第一个事件进入之后,processElement()注册了定时器,到时间触发onTimer()回调,后面每到onTimer()设置的时间都会继续触发onTimer()回调。

        根据时间特征不同分为两种:

  1. 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册,在系统时间戳达到Timer设定的时间戳时触发调用onTimer()
  2. 事件时间——调用Context.timerService().registerEventTimeTimer()注册;在水印达到或超过Timer设定的时间戳时触发onTimer()

示例代码:

  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.java.tuple.Tuple;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.TimeCharacteristic;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  9. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. import org.apache.flink.streaming.api.watermark.Watermark;
  12. import org.apache.flink.util.Collector;
  13. import java.util.Arrays;
  14. import java.util.List;
  15. import java.util.Random;
  16. import java.util.concurrent.TimeUnit;
  17. public class TimerApp {
  18. public static class Counter {
  19. private Long lastTime = 0L;
  20. public Long getLastTime() {
  21. return lastTime;
  22. }
  23. public void setLastTime(Long lastTime) {
  24. this.lastTime = lastTime;
  25. }
  26. }
  27. public static void main(String[] args) throws Exception {
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. // 如果不指定EventTime,flink默认使用ProcessingTime
  30. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  31. // 如果使用TimeCharacteristic.ProcessingTime,则需要设置:env.getConfig().setAutoWatermarkInterval(300L);
  32. // env.getConfig().setAutoWatermarkInterval(300L);
  33. env.setParallelism(1)
  34. .addSource(new SourceFunction<Tuple2<String, Long>>() {
  35. private Random random = new Random();
  36. private List<String> names = Arrays.asList("A", "B", "C", "D");
  37. private Boolean isRunning = true;
  38. @Override
  39. public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
  40. while (true) {
  41. int index = random.nextInt(1);
  42. ctx.collect(new Tuple2(names.get(index), 1L){});
  43. // 睡眠1小时,用于模拟长时间没有事件
  44. TimeUnit.SECONDS.sleep(3600);
  45. }
  46. }
  47. @Override
  48. public void cancel() {
  49. isRunning = false;
  50. }
  51. })
  52. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
  53. private Long maxLateness = 200L;
  54. @Override
  55. public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
  56. return System.currentTimeMillis();
  57. }
  58. // 如果需要强制定时器生效,不管定时时间范围内有没有数据到达,则必须实际这个方法
  59. @Override
  60. public Watermark getCurrentWatermark() {
  61. // return the watermark as current time minus the maximum time lag
  62. return new Watermark(System.currentTimeMillis() - maxLateness);
  63. }
  64. })
  65. .keyBy(0)
  66. .process(new KeyedProcessFunction<Tuple, Tuple2<String, Long>, Tuple2<String, Long>>() {
  67. private ValueState<Counter> state;
  68. private Long INTERVAL = 2000L;
  69. @Override
  70. public void open(Configuration parameters) throws Exception {
  71. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Counter.class));
  72. }
  73. @Override
  74. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
  75. System.out.println("onTimer:" + timestamp);
  76. Counter counter = state.value();
  77. counter.setLastTime(counter.getLastTime() + INTERVAL);
  78. ctx.timerService().registerEventTimeTimer(counter.getLastTime());
  79. state.value();
  80. }
  81. @Override
  82. public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
  83. System.out.println("processElement:" + ctx.timestamp());
  84. Counter counter = state.value();
  85. if (counter == null) {
  86. counter = new Counter();
  87. counter.setLastTime(System.currentTimeMillis() + INTERVAL);
  88. }
  89. state.update(counter);
  90. ctx.timerService().registerEventTimeTimer(counter.getLastTime());
  91. }
  92. })
  93. .print("处理结果")
  94. ;
  95. env.execute();
  96. }
  97. }
  • 如果不设置:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),flink默认使用的是TimeCharacteristic.ProcessingTime
  • 如果不设置或者设置了:env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),必须设置:env.getConfig().setAutoWatermarkInterval(xxx)
  • 如果设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),则不需要设置:env.getConfig().setAutoWatermarkInterval(xxx)
  • 使用周期性水印,如果没有实现getCurrentWatermark()方法,且长时间没有数据注入,定时器不生效,即使定时器设置的时间到了也不回调onTimer()方法。
  • event time用registerEventTimeTimer注册定时器,processing time用registerProcessingTimeTimer注册定时器,不要混用!
  • 使用ProcessingTime,但注册的时候是EventTime,如果不调用env.getConfig().setAutoWatermarkInterval()方法进行设置,那么定时器也不生效

示例代码:

  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.java.tuple.Tuple;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.TimeCharacteristic;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  9. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. import org.apache.flink.streaming.api.watermark.Watermark;
  12. import org.apache.flink.util.Collector;
  13. import java.util.Arrays;
  14. import java.util.List;
  15. import java.util.Random;
  16. import java.util.concurrent.TimeUnit;
  17. public class TimerApp {
  18. public static class Counter {
  19. private Long lastTime = 0L;
  20. public Long getLastTime() {
  21. return lastTime;
  22. }
  23. public void setLastTime(Long lastTime) {
  24. this.lastTime = lastTime;
  25. }
  26. }
  27. public static void main(String[] args) throws Exception {
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  30. // env.getConfig().setAutoWatermarkInterval(300L);
  31. env.setParallelism(1)
  32. .addSource(new SourceFunction<Tuple2<String, Long>>() {
  33. private Random random = new Random();
  34. private List<String> names = Arrays.asList("A", "B", "C", "D");
  35. private Boolean isRunning = true;
  36. @Override
  37. public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
  38. while (true) {
  39. int index = random.nextInt(1);
  40. ctx.collect(new Tuple2(names.get(index), 1L){});
  41. TimeUnit.SECONDS.sleep(4);
  42. }
  43. }
  44. @Override
  45. public void cancel() {
  46. isRunning = false;
  47. }
  48. })
  49. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
  50. @Override
  51. public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
  52. return System.currentTimeMillis();
  53. }
  54. @Override
  55. public Watermark getCurrentWatermark() {
  56. // return the watermark as current time minus the maximum time lag
  57. return new Watermark(System.currentTimeMillis() - 0);
  58. }
  59. })
  60. .keyBy(0)
  61. .process(new KeyedProcessFunction<Tuple, Tuple2<String, Long>, Tuple2<String, Long>>() {
  62. private ValueState<Counter> state;
  63. private Long INTERVAL = 2000L;
  64. @Override
  65. public void open(Configuration parameters) throws Exception {
  66. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Counter.class));
  67. }
  68. @Override
  69. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
  70. System.out.println("onTimer:" + timestamp);
  71. Counter counter = state.value();
  72. counter.setLastTime(counter.getLastTime() + INTERVAL);
  73. ctx.timerService().registerEventTimeTimer(counter.getLastTime());
  74. state.value();
  75. }
  76. @Override
  77. public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
  78. System.out.println("currentWatermark:" + ctx.timerService().currentWatermark());
  79. Counter counter = state.value();
  80. if (counter == null) {
  81. counter = new Counter();
  82. counter.setLastTime(System.currentTimeMillis() + INTERVAL);
  83. }
  84. state.update(counter);
  85. ctx.timerService().registerEventTimeTimer(counter.getLastTime());
  86. }
  87. })
  88. .print("处理结果")
  89. ;
  90. env.execute();
  91. }
  92. }

输出如下

currentWatermark:-9223372036854775808
currentWatermark:-9223372036854775808
currentWatermark:-9223372036854775808
currentWatermark:-9223372036854775808

……

        为什么是-9223372036854775808这个值(Long的最小值)?为什么用ProcessingTime且不设置env.getConfig().setAutoWatermarkInterval()的话,但调用registerEventTimeTimer注册定时器,定时器就不生效呢?跟默认获取water mark的默认时间间隔有莫大干系:

  • ProcessingTime:0ms
  • EventTime:200ms

        具体分析可参考这篇文章:Flink WaterMark的生成以及获取_lvwenyuan_1的博客-CSDN博客,欢迎讨论。

后续

        重新讨论下长时间没有事件与定时器的关系

        以下面简单的代码为例,MyTimerProcess注册和实现定时逻辑,processElement和onTimer方法分别注册定时器,实现固定间隔回调

env.addSource(new MySource())
    .map(new MyMap())
    .assignTimestampsAndWatermarks(new MyWaterMark())
    .keyBy(0)
    .process(new MyTimerProcess())
    .addSink(new MySink())
    ;

对于长时间没有事件的定时器,分两种情况:

  • 来了第一次事件后,长时没有事件进入:只要有一次事件进入之后,后面的定时触发逻辑就会生效。
  • 从始至终都没有事件到达,定时器:只要从开始到现在都没有事件进入,那么后面的定时触发逻辑就不会生效。
原文链接:https://blog.csdn.net/L13763338360/article/details/113357469

标签:定时器,org,flink,time,使用,apache,import,public
From: https://www.cnblogs.com/sunny3158/p/17999950

相关文章

  • flink状态编程
    flink状态编程简单记录一下最近工作中常用的flink状态flink中可以创建不同类型的状态,如键控状态(KeyedState)和操作符状态(OperatorState)等。状态管理是在流处理的整个过程中保持状态的一种能力,它让我们能够在复杂的事件处理和流转换中保留重要的状态信息,例如:聚合结果、过滤条件......
  • SpringMVC之异常处理器的使用
    SpringMVC的异常处理器是处理控制器方法执行过程出现的异常。SpringMVC提供了一个处理异常的接口HandlerExceptionResolver。HandlerExceptionResolver接口有两个实现类:DefaultHandlerExceptionResolver实现类和SimpleMappingExceptionResolver实现类。DefaultHandlerExcepti......
  • Windows 10 11 安全加固 仅供参考,请查阅资料清楚后使用
    WindowsRegistryEditorVersion5.00;设置密码策略[HKEY_LOCAL_MACHINE\SECURITY\Policies\PasswordPolicy]"MinimumPasswordLength"=dword:00000008"MaximumPasswordAge"=dword:00000030"PasswordComplexity"=dword:00000001"PasswordHi......
  • 数据库MySQL8.0.29安装与备份||了解和掌握MySQL的安装和简单使用和备份数据
    内容:了解和掌握MySQL的安装和简单使用:(1) 了解安装MySQL的软硬件环境和安装方法;(2) 熟悉MySQL的相关基本使用;(3) 熟悉MySQL的构成和相关工具;(4) 通过MySQL的使用来理解数据库系统的基本概念。要求:1. 在微机上安装MySQL数据库系统,为后续实验搭建实验环境,提供前期准备;2. 完成实......
  • mybatis的代码生成器generate的使用
    三步1、打开idea的插件管理,添加mybatisPlus 2、连接数据库  3、找到对应的表  下面红色圈内的内容需要注意,比如module是你想把代码生成在哪个模块,其次是package就是想在哪个目录下,然后就是主键自增方式和生成的哪些类。之后就看下生成的类是否有问题即可,一般是......
  • 使用 PyQt5(PySide2)+SQLAlchemy 做一个登录注册页(一)
    使用PyQt5(PySide2)+SQLAlchemy做一个登录注册页(一)本文将介绍自己用PyQt5+SQLAlchemy做的一个登录注册页,使用邮箱接收验证码,本文介绍是前后端未分离的实现方式,后续将出一个前后端分离的,你可以将PyQt5改为PySide2以获得更宽松的开源协议本文由于涉及到的代码较多,将会是......
  • C++ 使用单调时钟按一定时间间隔执行任务
    使用condition_variable实现定时执行任务遇到一个开发任务,需要按一定的时间间隔执行任务,本来是一个简单的功能,直接使用condition_variable就可以了最开始是直接使用condition_variable实现的定时触发机制,代码的大致实现类似于:#include<condition_variable>#include<chrono......
  • 同时使用300万个GPT是什么体验?ChatGPT新上线@功能
    据ChatGPT特邀灰度用户介绍,ChatGPT内测推出了“GPT@Mentions”功能,允许用户使用“@”标签+GPTs的名称来内联GPT商店中的任意一款GPTs,实现在同一对话窗口中与多个定制的GPT模型交互,就像将多个高级智能代理集成到您的私人助手中,每个代理都擅长解决特定的问题。目前GPT商店超300万个G......
  • [word] Word for iPad日常使用体验及吐槽分享
    现在平板上写Word文章也可以有不错的体验。当iPadOS来了后,微软进一步增强了Word的功能。现在在平板上使用Word打字顺得很,加上云盘的历史记录保存,加上WORD丰富的第三方插件:比如手写数学公式,再加上iPadOS原生的实体键盘五笔加持,加上ApplePencil的手写标注,再加上iPad的便携性,使得现在......
  • 记录: OpenAI中转代理API接口服务的使用
    由于OpenAI提供服务的地区列表里没有China,因此想要方便使用OpenAIAPI的话就需要用到中转服务。本文介绍的iDataRiver平台便提供这样的API,且比官方OpenAI还要便宜,其文档地址入口为https://docs.idatariver.com/zh支持模型如何统计消费的token量token是大语言模型处理信息......