首页 > 其他分享 >【Flink】复函数的使用,时间服务和定时器,值、列表、字典状态变量

【Flink】复函数的使用,时间服务和定时器,值、列表、字典状态变量

时间:2024-02-18 14:55:20浏览次数:18  
标签:Exception 定时器 void Flink throws env 状态变量 public

【Flink】复函数的使用,时间服务和定时器,值、列表、字典状态变量

文章目录

Flink DataStream API

1 复函数

在实际环境这种经常会有这样的需求:在函数处理数据之前,需要做一些初始化的工作;或者需要在处理数据时可以获得函数执行上下文的一些信息,如数据库的连接;以及在处理完数据时做一些清理工作。而 DataStream API 就提供了这样的机制。

DataStream API 提供的所有转换操作函数,都拥有它们的富版本,并且在使用常规函数或者匿名函数的地方来使用富函数。下面就是富函数的一些例子,可以看出,只需要在常规函数的前面加上 Rich 前缀就是富函数了。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

当使用富函数时,可以实现两个额外的方法:

  • open() 方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open() 会被调用。open() 函数通常用来做一些只需要做一次即可的初始化工作。
  • close() 方法是生命周期中的最后一个调用的方法,通常用来做一些清理工作。

另外,getRuntimeContext() 方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,当前子任务的索引,当前子任务的名字。同时还它还包含了访问分区状态的方法。

生命周期和其他信息的演示见以下代码:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
            .fromElements(1,2,3,4)
            .map(new RichMapFunction<Integer, Integer>() {
                // 会在map算子之前调用一次
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    System.out.println("声明周期开始...");
                    System.out.println("当前子任务的索引是:" + getRuntimeContext().getTaskNameWithSubtasks());
                }
                @Override
                public Integer map(Integer integer) throws Exception {
                    return integer * integer;
                }
                // 会在map算子之后调用一次
                @Override
                public void close() throws Exception {
                    super.close();
                    System.out.println("生命周期结束...");
                }
            })
            .print();
    env.execute();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

其会针对每个任务管理器的任务槽开启一个声明周期,见下例:

按照数字奇偶性将其发送到不同任务槽中:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env
            .addSource(new RichParallelSourceFunction<Integer>() {
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    System.out.println("声明周期开始,子任务索引是:" + getRuntimeContext().getIndexOfThisSubtask());
                }
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                    for(int i = 0; i < 10; i++){
                        if(i % 2 == getRuntimeContext().getIndexOfThisSubtask()){
                            ctx.collect(i);
                        }
                    }
                }
                @Override
                public void cancel() {
                }
            })
            .setParallelism(2)
            .print()
            .setParallelism(2);
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

2 自定义输出到下游设备

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
            .fromElements(1,2,3)
            .addSink(new SinkFunction<Integer>() {
                @Override
                public void invoke(Integer value, Context context) throws Exception {
                    SinkFunction.super.invoke(value,context);
                    System.out.println(value);
                }
            });
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

二 处理函数

转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。

基于此,DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、水位线以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑 (使用之前的 window 函数和转换算子无法实现)。例如,Flink-SQL (上层的库)就是使用 Process Function (底层的API)实现的,底层的API将Flink所有的功能都暴露了出来。

Flink 提供了 8 个 Process Function:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

1 KeyedProcessFunction的使用

KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素(flatMap和reduce的终极加强版)。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,用来处理keyBy以后的流。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:

  • processElement(String s, Context context, Collector<String> collector):流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流 (side outputs)。
  • onTimer(long timestamp, OnTimerContext ctx, Collector out):本方法是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context参数一样,提供了上下文的一些信息,例如 firing trigger 的时间信息 (事件时间或者处理时间)。

(1)时间服务和定时器

Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:

  • currentProcessingTime() 返回当前处理时间currentWatermark() 返回当前水位线的时间戳
  • registerProcessingTimeTimer(long time) 会注册当前 key 的 processing time 的 timer。当 processing time 到达定时时间时,触发 timer。
  • registerEventTimeTimer(long time) 会注册当前 key 的 event time timer。当水位线大
    于等于定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(long time) 删除之前注册处理时间定时器。如果没有这个
    时间戳的定时器,则不执行。
  • deleteEventTimeTimer(long time) 删除之前注册的事件时间定时器,如果没有此时间
    戳的定时器,则不执行。

当定时器 timer 触发时,执行回调函数 onTimer()。processElement() 方法和 onTimer()方法是同步(不是异步)方法,这样可以避免并发访问和操作状态。

针对每一个 key 和 timestamp,只能注册一个定期器。也就是说,每一个 key 可以注册多个定时器,但在每一个时间戳只能注册一个定时器。KeyedProcessFunction 默认将所有定时器的时间戳放在一个优先队列中。在 Flink 做检查点操作时,定时器也会被保存到状态后端中。

KeyedProcessFunction简单例子如下:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // windows下监控nc -l -p 9999
    env
            .socketTextStream("localhost",9999)
        	// keyBy到同一条流上
            .keyBy(r -> 1)
            .process(new MyKeyed())
            .print();
    env.execute();
}
// 第一个为Key的泛型,第二个为输入数据的泛型,第三个为输出数据的泛型
public static class MyKeyed extends KeyedProcessFunction<Integer,String,String> {
    @Override
    public void processElement(String s, Context context, Collector<String> collector) throws Exception {
        // 获取当前机器时间
        long ts = context.timerService().currentProcessingTime();
        collector.collect("元素:" + s + " 在" + new Timestamp(ts) + " 到达");
        // 注册一个10秒钟以后的定时器
        long tenSecLater = ts + 10 * 1000;
        collector.collect("注册了一个在" + new Timestamp(tenSecLater) + "秒钟后的定时器");
        // 注册定时器的语法,注意:注册的是处理时间(机器时间)
        context.timerService().registerProcessingTimeTimer(tenSecLater);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        out.collect("定时器触发了,触发时间是:" + new Timestamp(timestamp));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

2状态变量

状态变量的特点:

  • 状态变量的可见范围(作用域)是当前的key。

  • 状态变量是单例,只能被实例化一次。

    为什么是单例:状态变量会被备份到检查点中,假设程序宕机,在重启的时候,会去远程存储的检查点中查找状态变量,找到则恢复,否则创建实例。

  • 除以上两点外,可以当做普通的java变量使用。

(1)值状态变量

a 需求一

使用状态变量实现求平均数的功能,并每10秒钟向下游发送一次:

public class Example5 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env
                .addSource(new SourceFunction<Integer>() {
                    private boolean running = true;
                    private Random random = new Random();
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        while (running){
                            sourceContext.collect(random.nextInt(10));
                            Thread.sleep(100);
                        }
                    }
                    @Override
                    public void cancel() {
                        running = false;
                    }
                })
                .keyBy(r -> true)
                .process(new KeyedProcessFunction<Boolean, Integer, Double>() {
                    // 声明一个状态变量作为累加器
                    private ValueState<Tuple2<Integer,Integer>> valueState;
                    // 保存定时器的时间戳
                    private ValueState<Long> timerTs;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 实例化状态变量
                        valueState = getRuntimeContext().getState(
                                // 状态描述符
                                new ValueStateDescriptor<Tuple2<Integer, Integer>>("sun-count", Types.TUPLE(Types.INT,Types.INT))
                        );
                        timerTs = getRuntimeContext().getState(
                          new ValueStateDescriptor<Long>("timer",Types.LONG)
                        );
                    }
                    @Override
                    public void processElement(Integer integer, Context context, Collector<Double> collector) throws Exception {
                        // 当第一条数据到来时,状态变量的值为null
                        // 使用.value()方法读取状态变量的值,使用.update()方法更新状态变量的值
                        if (valueState.value() == null){
                            // 将状态变量的值当做累加器的值
                            valueState.update(Tuple2.of(integer,1));
                        } else{
                            Tuple2<Integer, Integer> tmp = valueState.value();
                            valueState.update(Tuple2.of(tmp.f0 + integer,tmp.f1 + 1));
                        }
                        if(timerTs.value() == null){
                            long tenSecLater = context.timerService().currentProcessingTime() + 10 * 1000L;
                            context.timerService().registerProcessingTimeTimer(tenSecLater);
                            timerTs.update(tenSecLater);
                        }
                    }
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Double> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        if(valueState != null){
                            out.collect((double) valueState.value().f0 / valueState.value().f1);
                            // 每隔十秒钟发送一次
                            timerTs.clear();
                        }
                    }
                })
                .print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

reduce的累加器的类型和数据输入输出的类型必须相同,而状态变量灵活的多。

reduce的累加器处理完一条数据后就立刻发送给下游,状态变量可以通过定时器实现,等待固定时间后,再将结果发送到下游。

b 需求二

监控整数值的变化,如果整数值在一秒钟之内 (processing time) 连续上升,则报警。

public class Example6 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env
                .addSource(new SourceFunction<Integer>() {
                    private boolean running = true;
                    private Random random = new Random();
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        while(running){
                            sourceContext.collect(random.nextInt(10));
                            Thread.sleep(300);
                        }
                    }
                    @Override
                    public void cancel() {
                        running = false;
                    }
                })
                .keyBy(r -> true)
                .process(new IntIncreaseAlert())
                .print();
        env.execute();
    }
    private static class IntIncreaseAlert extends KeyedProcessFunction<Boolean,Integer,String> {
        // 定义两个状态变量
        private ValueState<Integer> lastInt;
        private ValueState<Long> timerTs;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 实例化两个状态变量
            lastInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("last-integer", Types.INT));
            timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG));
        }
        @Override
        public void processElement(Integer integer, Context context, Collector<String> collector) throws Exception {
            Integer prevInt = null;
            if(lastInt.value() != null){
                prevInt = lastInt.value();
            }
            lastInt.update(integer);
            // 只要存在定时器,ts就不会null,无论在1秒钟内来几条数据
            Long ts = null;
            if(timerTs.value() != null){
                ts = timerTs.value();
            }
            // 出现温度值的下降,需要将定时器删除
            if(prevInt == null || integer < prevInt){
                if(ts != null){
                    context.timerService().deleteProcessingTimeTimer(ts);
                    timerTs.clear();
                }
            } else if(integer > prevInt && ts == null){
                long oneSecLater = context.timerService().currentProcessingTime() + 1000L;
                context.timerService().registerProcessingTimeTimer(oneSecLater);
                timerTs.update(oneSecLater);
            }
        }
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            out.collect("警告警告!警告警告!");
            timerTs.clear();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

KeyedProcessFunction 结合状态变量加定时器之后,变得非常灵活。

(2)列表状态变量

值状态变量自始至终都保存了一个元组,当接收的数据更新完元组就会被丢弃,列表状态变量将所有的数据都存储了起来,非常占内存。

使用列表状态变量求平均值,可以当做数组使用。

.keyBy(r -> 1)
.process(new KeyedProcessFunction<Integer, Integer, Double>() {
    private ListState<Integer> listState;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("list-state", Types.INT));
    }
    @Override
    public void processElement(Integer integer, Context context, Collector<Double> collector) throws Exception {
        listState.add(integer);
        Integer sum = 0;
        Integer count = 0;
        for(Integer i : listState.get()){
            sum += i;
            count += 1;
        }
        collector.collect((double) sum / count);
    }
})
.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

(3)字典状态变量

用法与hashMap用法相同,使用自定义数据源,处理用户的平均pv(page view)。

public class Example8 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env
                .addSource(new ClickSource())
                .keyBy(r -> 1)
                .process(new KeyedProcessFunction<Integer, Event, String>() {
                    private MapState<String,Long> mapState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        mapState = getRuntimeContext().getMapState(
                                new MapStateDescriptor<String, Long>("map", Types.STRING, Types.LONG)
                        );
                    }
                    @Override
                    public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
                        if(mapState.contains(event.user)){
                            mapState.put(event.user,mapState.get(event.user) + 1L);
                        } else{
                            mapState.put(event.user,1L);
                        }
                        // 求pv平均值
                        long userNum = 0L;
                        long pvSum = 0L;
                        for (String key : mapState.keys()) {
                            userNum += 1;
                            pvSum += mapState.get(key);
                        }
                        collector.collect("当前pv的平均值是:" + (double) pvSum / userNum);
                    }
                })
                .print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
原文链接:https://blog.csdn.net/weixin_43923463/article/details/127960359

标签:Exception,定时器,void,Flink,throws,env,状态变量,public
From: https://www.cnblogs.com/sunny3158/p/18019320

相关文章

  • asp.net 托管服务 后台任务 定时器任务
    托管服务1\1.txtthisisatestfile托管服务1\appsettings.Development.json{"Logging":{"LogLevel":{"Default":"Information","Microsoft.AspNetCore":"Warning"}}}托管服......
  • Flink CDC引起的Mysql元数据锁
    记一次FlinkCDC引起的Mysql元数据锁事故,总结经验教训。后续在编写FlinkCDC任务时,要处理好异常,避免产生长时间的元数据锁。同时出现生产问题时要及时排查,不能抱有侥幸心理。1、事件经过某天上午,收到系统的告警信息,告警提示:同步Mysql的某张表数据到Elasticsearch异常,提示连不......
  • 10分钟了解Flink窗口计算
    在有状态流处理中,时间在计算中起着重要的作用。比如,当进行时间序列分析、基于特定时间段进行聚合,或者进行事件时间去处理数据时,都与时间相关。接下来将重点介绍在使用实时Flink应用程序时应该考虑的跟时间相关的一些元素。文中的示例使用到netcat工具。窗口计算有如下几个核心......
  • Flink DataStream API-数据源、数据转换、数据输出
    本文继续介绍FlinkDataStreamAPI先关内容,重点:数据源、数据转换、数据输出。1、Source数据源1.1、Flink基本数据源文件数据源//2.读取数据源DataStream<String>fileDataStreamSource=env.readTextFile("/Users/yclxiao/Project/bigdata/flink-blog/doc/wor......
  • 10分钟了解Flink Watermark水印
    在上一篇中,介绍了Flink里时间的概念和窗口计算,在实际生产过程中,由于网络等原因,许多数据会延迟到达窗口,这种情况Flink如何处理?Watermark登场,本文从这几点进行介绍:水印的概念、水印如何计算、允许延迟和侧道输出、水印生成策略、案例及代码。1、一个小例子讲解概念前,我先举个例......
  • 5分钟了解Flink状态管理
    什么叫做Flink的有状态计算呢?说白了就是将之前的中间结果暂时存储起来,等待后续的事件数据过来后,可以使用之前的中间结果继续计算。本文主要介绍Flink状态计算和管理、代码示例。1、有状态的计算什么是Flink的有状态的计算。在流式计算过程中将算子的中间结果保存在内存或者文......
  • 一次打通FlinkCDC同步Mysql数据
    业务痛点离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。业务中常见的需要数据同步的场景1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历......
  • 10分钟入门Flink--架构和原理
    相信你读完上一节的《10分钟入门Flink--了解Flink》对Flink已经有初步了解了。这是继第一节之后的Flink入门系列的第二篇,本篇主要内容是是:了解Flink运行模式、Flink调度原理、Flink分区、Flink安装。1、运行模式Flink有多种运行模式,可以运行在一台机器上,称为本地(单机)模式;也可以......
  • 10分钟入门Flink--了解Flink
    Flink入门系列文章主要是为了给想学习Flink的你建立一个大体上的框架,助力快速上手Flink。学习Flink最有效的方式是先入门了解框架和概念,然后边写代码边实践,然后再把官网看一遍。Flink入门分为四篇,第一篇是《了解Flink》,第二篇《架构和原理》,第三篇是《DataStream》,第四篇是《Tabl......
  • 10分钟入门Flink--安装
    本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、FlinkStandalone搭建、FlinkStandalongHA搭建。演示使用的Flink版本是1.15.4,官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/......