State-理解原理即可
Flink中状态的自动管理
之前写的Flink代码中其实已经做好了状态自动管理,如
发送hello ,得出(hello,1)
再发送hello ,得出(hello,2)
说明Flink已经自动的将当前数据和历史状态/历史结果进行了聚合,做到了状态的自动管理
在实际开发中绝大多数情况下,我们直接使用自动管理即可
一些特殊情况才会使用手动的状态管理!---后面项目中会使用!
所以这里得先学习state状态如何手动管理!
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Desc 演示DataStream-Source-基于Socket
*/
public class SourceDemo03_Socket {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
/*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
words.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value,1);
}
});*/
//注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute
env.execute();
}
}
无状态计算和有状态计算
- 无状态计算,不需要考虑历史值, 如map
hello --> (hello,1)
hello --> (hello,1)
- 有状态计算,需要考虑历史值,如:sum
hello , (hello,1)
hello , (hello,2)
状态分类
- State
- ManagerState--开发中推荐使用 : Fink自动管理/优化,支持多种数据结构
- KeyState--只能在keyedStream上使用,支持多种数据结构
- OperatorState--一般用在Source上,支持ListState - RawState--完全由用户自己管理,只支持byte[],只能在自定义Operator上使用
- OperatorState
- ManagerState--开发中推荐使用 : Fink自动管理/优化,支持多种数据结构
分类详细图解:
代码演示-ManagerState-keyState
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Desc 使用KeyState中的ValueState获取流数据中的最大值/实际中可以使用maxBy即可
*/
public class StateDemo01_KeyState {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L)
);
//TODO 2.transformation
//需求:求各个城市的value最大值
//实际中使用maxBy即可
DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0).maxBy(1);
//学习时可以使用KeyState中的ValueState来实现maxBy的底层
DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
//-1.定义一个状态用来存放最大值
private ValueState<Long> maxValueState;
//-2.状态初始化
@Override
public void open(Configuration parameters) throws Exception {
//创建状态描述器
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Long.class);
//根据状态描述器获取/初始化状态
maxValueState = getRuntimeContext().getState(stateDescriptor);
}
//-3.使用状态
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
Long currentValue = value.f1;
//获取状态
Long historyValue = maxValueState.value();
//判断状态
if (historyValue == null || currentValue > historyValue) {
historyValue = currentValue;
//更新状态
maxValueState.update(historyValue);
return Tuple3.of(value.f0, currentValue, historyValue);
} else {
return Tuple3.of(value.f0, currentValue, historyValue);
}
}
});
//TODO 3.sink
//result1.print();
//4> (北京,6)
//1> (上海,8)
result2.print();
//1> (上海,xxx,8)
//4> (北京,xxx,6)
//TODO 4.execute
env.execute();
}
}
标签:状态,Flink,flink,value,api,org,apache,import
From: https://www.cnblogs.com/Mr-Sponge/p/17037271.html