首页 > 其他分享 >Flink AggregatingState 实例

Flink AggregatingState 实例

时间:2024-03-06 16:46:55浏览次数:28  
标签:flink 实例 api Flink new AggregatingState org apache import

Flink AggregatingState 实例

AggregatingState介绍

  • AggregatingState需要和AggregateFunction配合使用
  • add()方法添加一个元素,触发AggregateFunction计算
  • get()获取State的值

需求:计算每个设备10秒内的平均温度

  1. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.AggregateFunction;
  4. import org.apache.flink.api.common.state.AggregatingState;
  5. import org.apache.flink.api.common.state.AggregatingStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import org.apache.flink.api.java.functions.KeySelector;
  9. import org.apache.flink.api.java.tuple.Tuple2;
  10. import org.apache.flink.api.java.tuple.Tuple3;
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  15. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  16. import org.apache.flink.util.Collector;
  17. import java.time.Duration;
  18. import java.util.Random;
  19. public class AggregatingStateTest {
  20. public static void main(String[] args) throws Exception {
  21. // 计算每个设备10s内温度的平均值
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  24. env.getConfig().setAutoWatermarkInterval(100l);
  25. DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.addSource(new SourceFunction<Tuple3<String, Integer, Long>>() {
  26. boolean flag = true;
  27. @Override
  28. public void run(SourceContext<Tuple3<String, Integer, Long>> ctx) throws Exception {
  29. String[] str = {"水阀1", "水阀2", "水阀3"};
  30. while (flag) {
  31. int i = new Random().nextInt(3);
  32. // 温度
  33. int temperature = new Random().nextInt(100);
  34. Thread.sleep(1000l);
  35. // 设备号、温度、事件时间
  36. ctx.collect(new Tuple3<String, Integer, Long>(str[i], temperature, System.currentTimeMillis()));
  37. }
  38. }
  39. @Override
  40. public void cancel() {
  41. flag = false;
  42. }
  43. });
  44. tuple3DataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  45. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
  46. @Override
  47. public long extractTimestamp(Tuple3<String, Integer, Long> stringIntegerLongTuple3, long l) {
  48. return stringIntegerLongTuple3.f2;
  49. }
  50. })).keyBy(new KeySelector<Tuple3<String, Integer, Long>, String>() {
  51. @Override
  52. public String getKey(Tuple3<String, Integer, Long> stringIntegerLongTuple3) throws Exception {
  53. return stringIntegerLongTuple3.f0;
  54. }
  55. }).process(new KeyedProcessFunction<String, Tuple3<String, Integer, Long>, String>() {
  56. Long interval = 10 * 1000l;
  57. // <Integer, Double>这个类型是aggregatingState中的输入和输出类型
  58. AggregatingState<Integer, Double> aggregatingState = null;
  59. @Override
  60. public void open(Configuration parameters) throws Exception {
    1. @Override
    2. public void open(Configuration parameters) throws Exception {
    super.open(parameters);
  61. // <Integer, Tuple2<Integer,Integer>, Double>这是输入,中间状态,输出类型。TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){})这个是aggregatingState存储的数据的类型
  62. AggregatingStateDescriptor<Integer, Tuple2<Integer,Integer>, Double> aggregatingStateDescriptor =
  63. new AggregatingStateDescriptor<Integer, Tuple2<Integer,Integer>, Double>("aggregatingState", new MyAggregate(), TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){}));
  64. aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
  65. }
  66. @Override
  67. public void processElement(Tuple3<String, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {
  68. // 10s的起始的时间
  69. Long start = ctx.timestamp() - (ctx.timestamp() % interval);
  70. Long timerTimestamp = start + interval;
  71. ctx.timerService().registerEventTimeTimer(timerTimestamp);
  72. aggregatingState.add(value.f1);
  73. }
  74. @Override
  75. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  76. super.onTimer(timestamp, ctx, out);
  77. Double aDouble = aggregatingState.get();
  78. String str = "[" + ctx.getCurrentKey() + "] " + "十秒内的平均温度为:" + aDouble;
  79. out.collect(str);
  80. }
  81. }).print();
  82. env.execute("aggregatingState");
  83. }
  84. private static class MyAggregate implements AggregateFunction<Integer, Tuple2<Integer,Integer>, Double> {
  85. @Override
  86. public Tuple2<Integer, Integer> createAccumulator() {
  87. // 初始化温度和次数
  88. return new Tuple2<Integer, Integer>(0,0);
  89. }
  90. @Override
  91. public Tuple2<Integer, Integer> add(Integer integer, Tuple2<Integer, Integer> integerIntegerTuple2) {
  92. // 历史温度加上本次温度,次数加1
  93. return new Tuple2<Integer, Integer>(integerIntegerTuple2.f0 + integer, integerIntegerTuple2.f1 +1);
  94. }
  95. @Override
  96. public Double getResult(Tuple2<Integer, Integer> integerIntegerTuple2) {
  97. return Double.valueOf(integerIntegerTuple2.f0 / integerIntegerTuple2.f1);
  98. }
  99. @Override
  100. public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> acc1) {
  101. return new Tuple2<Integer, Integer>(integerIntegerTuple2.f0 + acc1.f0, integerIntegerTuple2.f1 + acc1.f1);
  102. }
  103. }
  104. }

原文链接:https://blog.csdn.net/qq_35514685/article/details/124351482

标签:flink,实例,api,Flink,new,AggregatingState,org,apache,import
From: https://www.cnblogs.com/sunny3158/p/18056936

相关文章

  • flink 提交yarn 命令 flink run -m yarn-cluster
    flink提交yarn命令flinkrun-myarn-cluster文章目录Flink集群搭建和使用local本地测试flink集群搭建1、standallonecluster提交任务--将代码打包2.flinkonyarn只需要部署一个节点flink启动方式1、yarn-session2、直接提交任务到yarnFlink集群搭建和使用local本地......
  • 实例详解如何构建动态SQL语句
    本文分享自华为云社区《GaussDB数据库SQL系列-动态语句》,作者:Gauss松鼠会小助手2。一、前言在数据库中构建动态SQL语句是指根据不同的条件或参数创建不同的SQL语句。这通常是为了适应不同的业务需求,提高SQL的灵活性和效率。GaussDB数据库是一款具备高性能、高可用性和高扩展性的......
  • 解密Spring中的Bean实例化:推断构造方法(上)
    在Spring中,一个bean需要通过实例化来获取一个对象,而实例化的过程涉及到构造方法的调用。本文将主要探讨简单的构造推断和实例化过程,让我们首先深入了解实例化的步骤。实例化源码protectedBeanWrappercreateBeanInstance(StringbeanName,RootBeanDefinitionmbd,@NullableO......
  • 使用SSH客户端登录Linux实例提示“ssh_exchange_identification: read: Connection re
    产品推荐:1、安全稳定的云服务器租用,2核/2G/5M仅37元,点击抢购>>>;2、高防物理服务器20核/16G/50M/200G防御仅350元,点击抢购>>>3、百度智能建站(五合一网站)仅880元/年,点击抢购>>> 模板建站(PC+手机站)仅480元/年,点击抢购>>>使用SSH客户端登录Linux实例提示“ssh_exchange_identifi......
  • Java 辨析之实例化和初始化
    在面向对象编程中,实例化和初始化是两个相关但不同的概念:实例化(Instantiation):实例化是指创建一个类的新的具体对象的过程。当程序运行时,通过new关键字调用类的构造函数来创建该类的一个实例。例如,在Java中:MyClassmyObject=newMyClass();在这行代码中,newMyClass()就是......
  • 【Flink入门修炼】2-1 Flink 四大基石
    前一章我们对Flink进行了总体的介绍。对Flink是什么、能做什么、入门demo、架构等进行了讲解。本章我们将学习Flink重点概念、核心特性等。本篇对Flink四大基石进行概括介绍,是Flink中非常关键的四个内容。一、四大基石Flink四大基石分别是:Time(时间)、Window(窗口)、St......
  • 【Serverless】云存储新建账号无法创建存储实例解决方案
    ​ 【问题描述】一些开发者想要使用AGC云存储服务,在开通服务后,需要创建一个存储实例,但是在点击创建按钮时,出现了未知错误的报错提示,创建失败。​【解决方案】获取到了开发者的浏览器报错日志后,发现了在创建Bucket时返回了“138012:invokeqmserror”的错误。​​随后在咨询......
  • 实例讲解功能测试框架
    原文链接:https://mp.weixin.qq.com/s/D1Be8Cs76ONdFI7XA1hVZw?open_in_browser=true首先,我们先聊一下软件测试的目的,它到底可以帮我们解决什么问题?软件测试的目的包括:发现缺陷:通过测试,发现软件中存在的各种缺陷、错误和问题,包括功能性、性能、安全性等方面的问题。验证功......
  • 动态代理实现实例
    importjava.lang.reflect.InvocationHandler;importjava.lang.reflect.Proxy;/*@author12817*//***学生接口,能跑,能吃,能写作文。*/interfaceStudent{voideat();voidrun();voidwrite();}/***小韭菜,能跑,能吃,能写作文。*/classOrd......
  • Vue 3.0 应用&组件实例
    #创建一个应用实例每个Vue应用都是通过用 createApp 函数创建一个新的应用实例开始的: constapp=Vue.createApp({/*选项*/})  该应用实例是用来在应用中注册“全局”组件的。我们将在后面的指南中详细讨论,简单的例子: constapp=Vue.createApp({})ap......