首页 > 编程语言 >Flink之状态编程

Flink之状态编程

时间:2022-10-27 11:12:40浏览次数:39  
标签:状态 flink Task 重启 Flink apache import 编程

状态编程是Flink最出色的功能没有之一

一、什么是状态?

在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作).

那些需要记住多个事件信息的操作就是有状态的.

流式计算分为无状态计算和有状态计算两种情况

无状态计算:无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告

有状态计算:有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。

二、需要状态的场景:

去重

数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。

检测

检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。

聚合

对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况

更新机器学习模型

在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

三、Flink的Failover(故障转移机制)

   Job 的重启:每次运行,默认都是新的Job,没法实现
   Task的重启:
             有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
             重新找插槽,重新运行Task  可以实现,但不能保存原有状态

 

注意:Flink默认开启了故障时不重启策略,我们使用故障转移机制时需要将其关闭,不然会出现如下报错

 Recovery is suppressed by NoRestartBackoffTimeStrategy

设置故障转移机制

 //设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔 
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));

代码实现

 package net.cyan.state;
 ​
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.redis.RedisSink;
 ​
 import java.util.ArrayList;
 import java.util.List;
 ​
 ​
 /**
 ​
 把收到的每个字符串都存到一个list集合
   并且希望达到当程序挂掉时,状态可以自动恢复
 ​
   Job 的重启:每次运行,默认都是新的Job,没法实现
   Task的重启:
             有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
             重新找插槽,重新运行Task 可以实现
  */
 public class Demo1_test {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
 ​
           env.socketTextStream("hadoop103", 9999)
                          .map(new MyMapFunction())
                  .addSink(new SinkFunction<String>() {
                       @Override
                       public void invoke(String value, Context context) throws Exception {
                           //模拟出现异常
                           if (value.contains("x")){
                              throw  new RuntimeException("出异常了");
                          }
                           System.out.println(value);
                      }
                  });
 ​
         try {
             //启动执行环境
             env.execute();
        } catch (Exception e) {
             e.printStackTrace();
        }
 ​
 ​
    }
     public static class MyMapFunction  implements MapFunction<String,String>{
         private List<String> list=new ArrayList<>();
 ​
         @Override
         public String map(String value) throws Exception {
             list.add(value);
             return list.toString();
        }
    }
 }

故障转移机制受限于用户所设置的重启次数,一旦达到最大重启次数后将不再进行重启而是直接err,而且状态会丢失。

我们可以通过另一种方法达到无限的Task重启次数以及状态持久化保存,开启CheckPoint

 //开启checkpoint,每间隔2秒持久化到磁盘一次,可以实现无限重启
 env.enableCheckpointing(2000);
 //设置持久化路径,此路径不设置会默认保存在idea文件目录下
 env.getCheckpointConfig().setCheckpointStorage("file:///d:/ck");

开启CheckPoint可以让我们能够无限次的重启Task这样来足以应对流式计算,但如何在Task重启后恢复之前存档的状态呢?

那就是使用Flink提供的编程状态Managed State

Flink中的状态分类

Flink包括两种基本类型的状态Managed StateRaw State

  Raw State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 用户自己管理
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等 字节数组: byte[]
使用场景 绝大数Flink算子 所有算子

使用状态编程需要实现CheckpointedFunction接口,重写它的两个方法

代码如下

 package net.cyan.state;
 ​
 import net.cyan.POJO.MyUtil;
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.redis.RedisSink;
 ​
 import java.util.ArrayList;
 import java.util.List;
 ​
 ​
 /**
 ​
 把收到的每个字符串都存到一个list集合
   并且希望达到当程序挂掉时,状态可以自动恢复
 ​
   Job 的重启:每次运行,默认都是新的Job,没法实现
   Task的重启:
             有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
             重新找插槽,重新运行Task 可以实现
  */
 public class Demo1_test {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
         //设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔
         //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
         //开启checkpoint,每间隔2秒持久化到磁盘一次,可以实现无限重启
         env.enableCheckpointing(2000);
         //设置持久化路径
         env.getCheckpointConfig().setCheckpointStorage("file:///e:/ck");
 ​
           env.socketTextStream("hadoop103", 9999)
                          .map(new MyMapFunction())
                  .addSink(new SinkFunction<String>() {
                       @Override
                       public void invoke(String value, Context context) throws Exception {
                           //模拟出现异常
                           if (value.contains("x")){
                              throw  new RuntimeException("出异常了");
                          }
                           System.out.println(value);
                      }
                  });
 ​
         try {
             //启动执行环境
             env.execute();
        } catch (Exception e) {
             e.printStackTrace();
        }
 ​
 ​
    }
     public static class MyMapFunction  implements MapFunction<String,String>, CheckpointedFunction {
         private List<String> list=new ArrayList<>();
         private ListState<String> acc;
 ​
         @Override
         public String map(String value) throws Exception {
             list.add(value);
             return list.toString();
        }
 ​
         @Override
         //快照状态,根据自定义的时间间隔进行存档
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
             //进行周期性的存档
             //获取初始化的装填,然后周期性的存储
             acc.update(list);//覆盖写
 ​
 ​
        }
 ​
         @Override
         //初始化状态,每次Task重启后运行一次
         public void initializeState(FunctionInitializationContext context) throws Exception {
             //从状态仓库中获取一个list状态
            acc = context.getOperatorStateStore().getListState(new ListStateDescriptor<String>("acc", String.class));
             //状态中存储着信息,要取出来放入自定义状态集合中
             Iterable<String> strings = acc.get();
             strings.forEach(s->list.add(s));
 ​
 ​
        }
    }
 }

当我们重启Task后数据恢复是两个并行度各两个元素,如果我们希望重启后每个并行度都有全部的元素就可以这样设置

  //从状态仓库中获取一个list状态
 acc = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<String>("acc", String.class));

标签:状态,flink,Task,重启,Flink,apache,import,编程
From: https://www.cnblogs.com/CYan521/p/16831500.html

相关文章

  • java多线程编程详细入门教程
    ##1、概念?线程是jvm调度的最小单元,也叫做轻量级进程,进程是由线程组成,线程拥有私有的程序技术器以及栈,并且能够访问堆中的共享资源。这里提出一个问题,为什么要用多......
  • java并发编程(java并发编程实战过时了吗)
    Java如何进行并发多连接socket编程呢?println("hasreceive。。。。");out。flush();if(str。equals("end"))break;client。close();catch(IOExceptionex)finally......
  • 安卓之按钮状态的保存
    1:首先定义全局变量。作为按钮状态的一个保存//rawData是否上传的全局状态publicstaticbooleanrawDataStatus=false;2:点击按钮时,在onClick方法中·实现按钮点击的开关......
  • TypeScript 复习与进阶三部曲 (2) – 把 TypeScript 当编程语言使用
    前言上一篇,我们提到,TypeScript进阶有3个阶段. 第一阶段是"把TypeScript当强类型语言使用",我们已经介绍完了. 第二阶段是"把TypeScript当编程语言使用"......
  • Vue 组件化编程
    1.1模块与组件、模块化与组件化1.1.1模块理解:向外提供特定功能的​​js​​​程序,一般就是一个​​js​​文件为什么:​​js​​文件很多很复杂作用:复用​​js​​​,简......
  • javascript编程单线程之异步模式Asynchronous
    异步模式Asynchronous不会等待这个任务结束才开始执行下一个任务,开启之后立即执行下一个任务,后续逻辑一般会通过回调函数的方式定义,异步模式对js非常重要,没有异步任务单线......
  • javascript编程单线程之异步模式Asynchronous
    异步模式Asynchronous不会等待这个任务结束才开始执行下一个任务,开启之后立即执行下一个任务,后续逻辑一般会通过回调函数的方式定义,异步模式对js非常重要,没有异步任务单......
  • Vue 组件化编程
    1.1模块与组件、模块化与组件化1.1.1模块理解:向外提供特定功能的js程序,一般就是一个js文件为什么:js文件很多很复杂作用:复用js,简化js的编写,提高js运行效率......
  • Python进阶篇04-面向对象编程
    面向对象编程面向对象编程和面向过程编程的区别:类和实例类:抽象的、用于创建实例的基础模板,类里面可以定义这个类所拥有的基础的属性。实例:根据类而创建的具体的对象,实......
  • C++模板元编程实战 电子书 pdf
    作者:李伟出版社:人民邮电出版社副标题:一个深度学习框架的初步实现 链接:C++模板元编程实战  《C++模板元编程实战:一个深度学习框架的初步实现》以一个深度学......