状态编程是Flink最出色的功能没有之一
一、什么是状态?
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作).
那些需要记住多个事件信息的操作就是有状态的.
流式计算分为无状态计算和有状态计算两种情况
无状态计算:无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告
有状态计算:有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。
二、需要状态的场景:
去重
数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测
检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合
对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型
在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
三、Flink的Failover(故障转移机制)
Job 的重启:每次运行,默认都是新的Job,没法实现
Task的重启:
有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
重新找插槽,重新运行Task 可以实现,但不能保存原有状态
注意:Flink默认开启了故障时不重启策略,我们使用故障转移机制时需要将其关闭,不然会出现如下报错
Recovery is suppressed by NoRestartBackoffTimeStrategy
设置故障转移机制
//设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
代码实现
package net.cyan.state;
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import java.util.ArrayList;
import java.util.List;
/**
把收到的每个字符串都存到一个list集合
并且希望达到当程序挂掉时,状态可以自动恢复
Job 的重启:每次运行,默认都是新的Job,没法实现
Task的重启:
有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
重新找插槽,重新运行Task 可以实现
*/
public class Demo1_test {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
env.socketTextStream("hadoop103", 9999)
.map(new MyMapFunction())
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
//模拟出现异常
if (value.contains("x")){
throw new RuntimeException("出异常了");
}
System.out.println(value);
}
});
try {
//启动执行环境
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyMapFunction implements MapFunction<String,String>{
private List<String> list=new ArrayList<>();
@Override
public String map(String value) throws Exception {
list.add(value);
return list.toString();
}
}
}
故障转移机制受限于用户所设置的重启次数,一旦达到最大重启次数后将不再进行重启而是直接err,而且状态会丢失。
我们可以通过另一种方法达到无限的Task重启次数以及状态持久化保存,开启CheckPoint
//开启checkpoint,每间隔2秒持久化到磁盘一次,可以实现无限重启
env.enableCheckpointing(2000);
//设置持久化路径,此路径不设置会默认保存在idea文件目录下
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ck");
开启CheckPoint可以让我们能够无限次的重启Task这样来足以应对流式计算,但如何在Task重启后恢复之前存档的状态呢?
那就是使用Flink提供的编程状态Managed State
Flink中的状态分类
Flink包括两种基本类型的状态Managed State和Raw State
Raw State | ||
---|---|---|
状态管理方式 | Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 | 用户自己管理 |
状态数据结构 | Flink提供多种常用数据结构, 例如:ListState, MapState等 | 字节数组: byte[] |
使用场景 | 绝大数Flink算子 | 所有算子 |
使用状态编程需要实现CheckpointedFunction接口,重写它的两个方法
代码如下
package net.cyan.state;
import net.cyan.POJO.MyUtil;
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import java.util.ArrayList;
import java.util.List;
/**
把收到的每个字符串都存到一个list集合
并且希望达到当程序挂掉时,状态可以自动恢复
Job 的重启:每次运行,默认都是新的Job,没法实现
Task的重启:
有个算子在某个Task,这个Task抛出了异常,Flink可以采取failover(故障转移机制)
重新找插槽,重新运行Task 可以实现
*/
public class Demo1_test {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//设置故障转移,第一个参数最多重试重启次数,第二个参数两次重启次数的时间间隔
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
//开启checkpoint,每间隔2秒持久化到磁盘一次,可以实现无限重启
env.enableCheckpointing(2000);
//设置持久化路径
env.getCheckpointConfig().setCheckpointStorage("file:///e:/ck");
env.socketTextStream("hadoop103", 9999)
.map(new MyMapFunction())
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
//模拟出现异常
if (value.contains("x")){
throw new RuntimeException("出异常了");
}
System.out.println(value);
}
});
try {
//启动执行环境
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction {
private List<String> list=new ArrayList<>();
private ListState<String> acc;
@Override
public String map(String value) throws Exception {
list.add(value);
return list.toString();
}
@Override
//快照状态,根据自定义的时间间隔进行存档
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//进行周期性的存档
//获取初始化的装填,然后周期性的存储
acc.update(list);//覆盖写
}
@Override
//初始化状态,每次Task重启后运行一次
public void initializeState(FunctionInitializationContext context) throws Exception {
//从状态仓库中获取一个list状态
acc = context.getOperatorStateStore().getListState(new ListStateDescriptor<String>("acc", String.class));
//状态中存储着信息,要取出来放入自定义状态集合中
Iterable<String> strings = acc.get();
strings.forEach(s->list.add(s));
}
}
}
当我们重启Task后数据恢复是两个并行度各两个元素,如果我们希望重启后每个并行度都有全部的元素就可以这样设置
//从状态仓库中获取一个list状态标签:状态,flink,Task,重启,Flink,apache,import,编程 From: https://www.cnblogs.com/CYan521/p/16831500.html
acc = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<String>("acc", String.class));