【Flink】复函数的使用,时间服务和定时器,值、列表、字典状态变量
文章目录
一 Flink DataStream API
1 复函数
在实际环境这种经常会有这样的需求:在函数处理数据之前,需要做一些初始化的工作;或者需要在处理数据时可以获得函数执行上下文的一些信息,如数据库的连接;以及在处理完数据时做一些清理工作。而 DataStream API 就提供了这样的机制。
DataStream API 提供的所有转换操作函数,都拥有它们的富版本,并且在使用常规函数或者匿名函数的地方来使用富函数。下面就是富函数的一些例子,可以看出,只需要在常规函数的前面加上 Rich 前缀就是富函数了。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
当使用富函数时,可以实现两个额外的方法:
- open() 方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open() 会被调用。open() 函数通常用来做一些只需要做一次即可的初始化工作。
- close() 方法是生命周期中的最后一个调用的方法,通常用来做一些清理工作。
另外,getRuntimeContext() 方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,当前子任务的索引,当前子任务的名字。同时还它还包含了访问分区状态的方法。
生命周期和其他信息的演示见以下代码:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.fromElements(1,2,3,4)
.map(new RichMapFunction<Integer, Integer>() {
// 会在map算子之前调用一次
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("声明周期开始...");
System.out.println("当前子任务的索引是:" + getRuntimeContext().getTaskNameWithSubtasks());
}
@Override
public Integer map(Integer integer) throws Exception {
return integer * integer;
}
// 会在map算子之后调用一次
@Override
public void close() throws Exception {
super.close();
System.out.println("生命周期结束...");
}
})
.print();
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
其会针对每个任务管理器的任务槽开启一个声明周期,见下例:
按照数字奇偶性将其发送到不同任务槽中:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("声明周期开始,子任务索引是:" + getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i = 0; i < 10; i++){
if(i % 2 == getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
})
.setParallelism(2)
.print()
.setParallelism(2);
env.execute();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
2 自定义输出到下游设备
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.fromElements(1,2,3)
.addSink(new SinkFunction<Integer>() {
@Override
public void invoke(Integer value, Context context) throws Exception {
SinkFunction.super.invoke(value,context);
System.out.println(value);
}
});
env.execute();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
二 处理函数
转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、水位线以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑 (使用之前的 window 函数和转换算子无法实现)。例如,Flink-SQL (上层的库)就是使用 Process Function (底层的API)实现的,底层的API将Flink所有的功能都暴露了出来。
Flink 提供了 8 个 Process Function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
1 KeyedProcessFunction的使用
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素(flatMap和reduce的终极加强版)。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,用来处理keyBy以后的流。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
processElement(String s, Context context, Collector<String> collector)
:流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流 (side outputs)。onTimer(long timestamp, OnTimerContext ctx, Collector out)
:本方法是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context参数一样,提供了上下文的一些信息,例如 firing trigger 的时间信息 (事件时间或者处理时间)。
(1)时间服务和定时器
Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
- currentProcessingTime() 返回当前处理时间currentWatermark() 返回当前水位线的时间戳
- registerProcessingTimeTimer(long time) 会注册当前 key 的 processing time 的 timer。当 processing time 到达定时时间时,触发 timer。
- registerEventTimeTimer(long time) 会注册当前 key 的 event time timer。当水位线大
于等于定时器注册的时间时,触发定时器执行回调函数。 - deleteProcessingTimeTimer(long time) 删除之前注册处理时间定时器。如果没有这个
时间戳的定时器,则不执行。 - deleteEventTimeTimer(long time) 删除之前注册的事件时间定时器,如果没有此时间
戳的定时器,则不执行。
当定时器 timer 触发时,执行回调函数 onTimer()。processElement() 方法和 onTimer()方法是同步(不是异步)方法,这样可以避免并发访问和操作状态。
针对每一个 key 和 timestamp,只能注册一个定期器。也就是说,每一个 key 可以注册多个定时器,但在每一个时间戳只能注册一个定时器。KeyedProcessFunction 默认将所有定时器的时间戳放在一个优先队列中。在 Flink 做检查点操作时,定时器也会被保存到状态后端中。
KeyedProcessFunction简单例子如下:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// windows下监控nc -l -p 9999
env
.socketTextStream("localhost",9999)
// keyBy到同一条流上
.keyBy(r -> 1)
.process(new MyKeyed())
.print();
env.execute();
}
// 第一个为Key的泛型,第二个为输入数据的泛型,第三个为输出数据的泛型
public static class MyKeyed extends KeyedProcessFunction<Integer,String,String> {
@Override
public void processElement(String s, Context context, Collector<String> collector) throws Exception {
// 获取当前机器时间
long ts = context.timerService().currentProcessingTime();
collector.collect("元素:" + s + " 在" + new Timestamp(ts) + " 到达");
// 注册一个10秒钟以后的定时器
long tenSecLater = ts + 10 * 1000;
collector.collect("注册了一个在" + new Timestamp(tenSecLater) + "秒钟后的定时器");
// 注册定时器的语法,注意:注册的是处理时间(机器时间)
context.timerService().registerProcessingTimeTimer(tenSecLater);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("定时器触发了,触发时间是:" + new Timestamp(timestamp));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
2状态变量
状态变量的特点:
-
状态变量的可见范围(作用域)是当前的key。
-
状态变量是单例,只能被实例化一次。
为什么是单例:状态变量会被备份到检查点中,假设程序宕机,在重启的时候,会去远程存储的检查点中查找状态变量,找到则恢复,否则创建实例。
-
除以上两点外,可以当做普通的java变量使用。
(1)值状态变量
a 需求一
使用状态变量实现求平均数的功能,并每10秒钟向下游发送一次:
public class Example5 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running){
sourceContext.collect(random.nextInt(10));
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
})
.keyBy(r -> true)
.process(new KeyedProcessFunction<Boolean, Integer, Double>() {
// 声明一个状态变量作为累加器
private ValueState<Tuple2<Integer,Integer>> valueState;
// 保存定时器的时间戳
private ValueState<Long> timerTs;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 实例化状态变量
valueState = getRuntimeContext().getState(
// 状态描述符
new ValueStateDescriptor<Tuple2<Integer, Integer>>("sun-count", Types.TUPLE(Types.INT,Types.INT))
);
timerTs = getRuntimeContext().getState(
new ValueStateDescriptor<Long>("timer",Types.LONG)
);
}
@Override
public void processElement(Integer integer, Context context, Collector<Double> collector) throws Exception {
// 当第一条数据到来时,状态变量的值为null
// 使用.value()方法读取状态变量的值,使用.update()方法更新状态变量的值
if (valueState.value() == null){
// 将状态变量的值当做累加器的值
valueState.update(Tuple2.of(integer,1));
} else{
Tuple2<Integer, Integer> tmp = valueState.value();
valueState.update(Tuple2.of(tmp.f0 + integer,tmp.f1 + 1));
}
if(timerTs.value() == null){
long tenSecLater = context.timerService().currentProcessingTime() + 10 * 1000L;
context.timerService().registerProcessingTimeTimer(tenSecLater);
timerTs.update(tenSecLater);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Double> out) throws Exception {
super.onTimer(timestamp, ctx, out);
if(valueState != null){
out.collect((double) valueState.value().f0 / valueState.value().f1);
// 每隔十秒钟发送一次
timerTs.clear();
}
}
})
.print();
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
reduce的累加器的类型和数据输入输出的类型必须相同,而状态变量灵活的多。
reduce的累加器处理完一条数据后就立刻发送给下游,状态变量可以通过定时器实现,等待固定时间后,再将结果发送到下游。
b 需求二
监控整数值的变化,如果整数值在一秒钟之内 (processing time) 连续上升,则报警。
public class Example6 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while(running){
sourceContext.collect(random.nextInt(10));
Thread.sleep(300);
}
}
@Override
public void cancel() {
running = false;
}
})
.keyBy(r -> true)
.process(new IntIncreaseAlert())
.print();
env.execute();
}
private static class IntIncreaseAlert extends KeyedProcessFunction<Boolean,Integer,String> {
// 定义两个状态变量
private ValueState<Integer> lastInt;
private ValueState<Long> timerTs;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 实例化两个状态变量
lastInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("last-integer", Types.INT));
timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG));
}
@Override
public void processElement(Integer integer, Context context, Collector<String> collector) throws Exception {
Integer prevInt = null;
if(lastInt.value() != null){
prevInt = lastInt.value();
}
lastInt.update(integer);
// 只要存在定时器,ts就不会null,无论在1秒钟内来几条数据
Long ts = null;
if(timerTs.value() != null){
ts = timerTs.value();
}
// 出现温度值的下降,需要将定时器删除
if(prevInt == null || integer < prevInt){
if(ts != null){
context.timerService().deleteProcessingTimeTimer(ts);
timerTs.clear();
}
} else if(integer > prevInt && ts == null){
long oneSecLater = context.timerService().currentProcessingTime() + 1000L;
context.timerService().registerProcessingTimeTimer(oneSecLater);
timerTs.update(oneSecLater);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("警告警告!警告警告!");
timerTs.clear();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
KeyedProcessFunction
结合状态变量加定时器之后,变得非常灵活。
(2)列表状态变量
值状态变量自始至终都保存了一个元组,当接收的数据更新完元组就会被丢弃,列表状态变量将所有的数据都存储了起来,非常占内存。
使用列表状态变量求平均值,可以当做数组使用。
.keyBy(r -> 1)
.process(new KeyedProcessFunction<Integer, Integer, Double>() {
private ListState<Integer> listState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("list-state", Types.INT));
}
@Override
public void processElement(Integer integer, Context context, Collector<Double> collector) throws Exception {
listState.add(integer);
Integer sum = 0;
Integer count = 0;
for(Integer i : listState.get()){
sum += i;
count += 1;
}
collector.collect((double) sum / count);
}
})
.print();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
(3)字典状态变量
用法与hashMap用法相同,使用自定义数据源,处理用户的平均pv(page view)。
public class Example8 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new ClickSource())
.keyBy(r -> 1)
.process(new KeyedProcessFunction<Integer, Event, String>() {
private MapState<String,Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mapState = getRuntimeContext().getMapState(
new MapStateDescriptor<String, Long>("map", Types.STRING, Types.LONG)
);
}
@Override
public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
if(mapState.contains(event.user)){
mapState.put(event.user,mapState.get(event.user) + 1L);
} else{
mapState.put(event.user,1L);
}
// 求pv平均值
long userNum = 0L;
long pvSum = 0L;
for (String key : mapState.keys()) {
userNum += 1;
pvSum += mapState.get(key);
}
collector.collect("当前pv的平均值是:" + (double) pvSum / userNum);
}
})
.print();
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41