Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态(BroadcastState)
Flink之状态编程
一、按键分区状态(Keyed State)
1.1、值状态(ValueState)
1.1.1、定义
状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:
public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
- 1
- 2
- 3
- 4
1.1.2、使用案例
利用ValueState和定时器每10秒输出一次用户的pv量
package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ValueStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.addSource(new EventWithWatermarkSource());
SingleOutputStreamOperator<String> result = stream.keyBy(t -> t.user)
.process(new KeyedProcessFunction<String, Event, String>() {
// 定义两个状态,保存当前 pv 值,以及定时器时间戳
private ValueState<Long> valueState;
private ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<String> collector) throws Exception {
Long count = valueState.value();
if (count == null) {
valueState.update(1L);
} else {
valueState.update(count + 1);
}
// 注册定时器
if (timerTsState.value() == null) {
ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
timerTsState.update(value.timestamp + 10 * 1000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("时间:"+new Timestamp(timestamp) + " ->用户:"+ ctx.getCurrentKey() + "的pv值为:" + valueState.value());
timerTsState.clear();
}
});
result.print(">>>>");
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
1.2、列表状态(ListState)
1.2.1、定义
将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式
与一般的 List 非常相似。
1.2.2、使用案例
利用ListState进行实现sql中的join操作
package com.hpsk.flink.state;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
public class ListStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.fromElements(
Tuple3.of("a", "stream-1", 1000L),
Tuple3.of("b", "stream-1", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
}));
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.fromElements(
Tuple3.of("a", "stream-2", 3000L),
Tuple3.of("b", "stream-2", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
}));
stream1.keyBy(r -> r.f0)
.connect(stream2.keyBy(r -> r.f0))
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
private ListState<Tuple3<String, String, Long>> listState1;
private ListState<Tuple3<String, String, Long>> listState2;
@Override
public void open(Configuration parameters) throws Exception {
listState1 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState1", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
listState2 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState2", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> out) throws Exception {
listState1.add(left);
for (Tuple3<String, String, Long> right : listState2.get()) {
out.collect(left + " => " + right);
}
}
@Override
public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> out) throws Exception {
listState2.add(right);
for (Tuple3<String, String, Long> left : listState1.get()) {
out.collect(left + " => " + right);
}
}
})
.print();
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
1.3、映射状态(MapState)
1.3.1、定义
映射状态类似于 Java 中的 HashMap,可以记录一组KV的值,根据对应的Key对Value进行取值或更新。
1.3.2、使用案例
使用 KeyedProcessFunction 模拟滚动窗口
package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
// 使用 KeyedProcessFunction 模拟滚动窗口
public class MapStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.url)
.process(new FakeWindowResult(10000L));
result.print(">>>");
env.execute();
}
private static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
// 定义属性,窗口长度
private Long windowSize;
// 声明状态,用map保存pv值(窗口 start,count)
private MapState<Long, Long> windowPvMapState;
public FakeWindowResult(Long windowSize) {
this.windowSize = windowSize;
}
@Override
public void open(Configuration parameters) throws Exception {
windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Types.LONG, Types.LONG));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> collector) throws Exception {
// 每来一条数据,就根据时间戳判断属于哪个窗口
long windowStart = event.timestamp / windowSize * windowSize;
long windowEnd = windowStart + windowSize;
// 注册 end -1 的定时器,窗口触发计算
ctx.timerService().registerEventTimeTimer(windowEnd - 1);
// 更新状态中的 pv 值
if (windowPvMapState.contains(windowStart)) {
Long pv = windowPvMapState.get(windowStart);
windowPvMapState.put(windowStart, pv + 1);
} else {
windowPvMapState.put(windowStart, 1L);
}
}
// 定时器触发,直接输出统计的 pv 结果
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
long windowEnd = timestamp + 1;
long windowStart = windowEnd - windowSize;
long pv = windowPvMapState.get(windowStart);
out.collect( "url:[" + ctx.getCurrentKey()
+ "]的访问量为" + pv
+ ",对应窗口为:[" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + "]");
// 模拟窗口的销毁,清除 map 中的 key
windowPvMapState.remove(windowStart);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
1.4、归约状态(ReducingState)
1.4.1、定义
将当前数据状态和之前的数据状态进行计算,内置reduce进行规约统计
1.4.2、使用案例
对用户点击事件流每 5 个数据统计一次平均时间戳
package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ReducingStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
inputDS.print("input ");
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.user)
.flatMap(new RichFlatMapFunction<Event, String>() {
// 定义ReducingState计算并存储结果
private ReducingState<Tuple2<Event, Long>> reducingState;
// 定义一个值状态,用来保存当前用户访问频次
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Tuple2<Event, Long>>(
"reducing-state",
new ReduceFunction<Tuple2<Event, Long>>() {
@Override
public Tuple2<Event, Long> reduce(Tuple2<Event, Long> t1, Tuple2<Event, Long> t2) throws Exception {
// 对访问时间进行累加
return Tuple2.of(t1.f0, t2.f1 + t1.f1);
}
},
Types.TUPLE(Types.LONG, Types.LONG)
));
}
@Override
public void flatMap(Event event, Collector<String> collector) throws Exception {
Long count = countState.value();
if (count == null) {
count = 1L;
} else {
count ++;
}
// 更新访问次数
countState.update(count);
// 累加计算访问时间
reducingState.add(Tuple2.of(event, event.timestamp));
if (count == 5) {
// 访问5次,输出平均访问时间
collector.collect(event.user + " " + new Timestamp(reducingState.get().f1 / count));
// 清空状态
countState.clear();
reducingState.clear();
}
}
});
result.print("output ");
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
1.5、聚合状态(AggregatingState)
1.5.1、定义
将状态表示为一个用于聚合操作的列表
1.5.2、使用案例
对用户点击事件流每 5 个数据统计一次平均时间戳
package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class AggregatingStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
inputDS.print("input ");
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.user)
.flatMap(new RichFlatMapFunction<Event, String>() {
// 定义聚合状态,用来计算平均时间戳
private AggregatingState<Event, Long> aggregatingState;
// 定义一个值状态,用来保存当前用户访问频次
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
aggregatingState = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
"avg-ts",
new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> accumulator) {
return Tuple2.of(accumulator.f0 + event.timestamp, accumulator.f1 + 1);
}
@Override
public Long getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {
return null;
}
},
Types.TUPLE(Types.LONG, Types.LONG)));
}
@Override
public void flatMap(Event event, Collector<String> out) throws Exception {
Long count = countState.value();
if (count == null) {
count = 1L;
} else {
count ++;
}
countState.update(count);
aggregatingState.add(event);
if (count == 5) {
out.collect((event.user + " 平均时间戳: " + new Timestamp(aggregatingState.get())));
countState.clear();
}
}
});
result.print("output ");
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
二、广播状态(BroadcastState)
2.1、定义
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
2.2、案例
模拟动态配置实时数仓中维度表创建
package com.hpsk.flink.function;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
// 模拟动态配置实时数仓中维度表创建
public class BroadcastProcessFunctionDS {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.配置流:维度表的配置表
SingleOutputStreamOperator<String> tableConfigStream = env.fromElements(
"table1,createTable",
"table2,createTable",
"table3,createTable");
// 3.主流:业务库实时数据流
SingleOutputStreamOperator<Tuple2<String, String>> MySqlTableStream = env.socketTextStream("hadoop102", 8888)
.map(line -> Tuple2.of(line.split(",")[0], line.split(",")[1]))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
// 将配置流处理成广播流
MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, String.class);
BroadcastStream<String> broadcast = tableConfigStream.broadcast(mapStateDescriptor);
// 连接主流与广播流成连接流, 处理连接流,根据配置信息处理主流数据
BroadcastConnectedStream<Tuple2<String, String>, String> connectedStream = MySqlTableStream.connect(broadcast);
SingleOutputStreamOperator<String> result = connectedStream.process(new MyBroadcastProcessFunction(mapStateDescriptor));
// 5.输出结果
result.print("output ");
// 6.执行
env.execute();
}
public static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Tuple2<String, String>, String, String>{
private MapStateDescriptor<String, String> mapStateDescriptor;
public MyBroadcastProcessFunction(MapStateDescriptor<String, String> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String[] split = value.split(",");
broadcastState.put(split[0].trim(), split[1].trim());
}
@Override
public void processElement(Tuple2<String, String> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String table = value.f0;
String create = broadcastState.get(table);
if (create != null) {
out.collect(value.f0 + "为配置表,需要在phoenix中建表 -> 建表语句:" + create + ", 数据为:" + value.f1);
} else {
out.collect(value.f0 + "业务表, 跳过建表");
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74