首页 > 编程语言 >Java版Flink(十二)底层函数 API(process function)

Java版Flink(十二)底层函数 API(process function)

时间:2024-02-19 16:24:42浏览次数:41  
标签:function 定时器 Java process flink org apache import public

一、概述

之前的转化算子是无法访问事件的时间戳信息和水位线watermark,但是,在某些情况下,显得很重要。Flink 提供了 DataStream API 的Low- Level转化算子。比如说可以访问事件时间戳、watermark、以及注册定时器,还可以输出一些特定的事件,比如超时事件等。Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。例如,Flink SQL 就是使用 Process Function 实现的。
Flink 提供了 8 个 Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction

二、KeyedProcessFunction

2.1、概述

KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
而KeyedProcessFunction<K, I, O>还额外提供了两个方法:
processElement(I value, Context ctx, Collector out):流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。

onTimer(long timestamp, OnTimerContext ctx, Collector out):
是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

2.2、TimerService 和 定时器(Timers)

Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
long currentProcessingTime() 返回当前处理时间

long currentWatermark() 返回当前 watermark 的时间戳

void registerProcessingTimeTimer(long timestamp) 会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。

void registerEventTimeTimer(long timestamp) 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。

void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。

void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在keyed streams 上面使用。

2.3、案例

监控温度传感器的温度值,如果温度值在 10 秒钟之内(processing time)连续上升,则报警。

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunction_KeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 为了测试效果 用默认的 时间特征
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //env.getConfig().setAutoWatermarkInterval(500L);
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy(SensorReading::getId)
                .process(new CustomKeyedProcessFunction(10));
        resultDataStream.print();
        env.execute();
    }
    /**
     * String -> key 类型
     * SensorReading -> 输入类型
     * String -> 输出类型
     */
    public static class CustomKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {
        // 时间间隔
        private Integer internal;
        public CustomKeyedProcessFunction(Integer internal) {
            this.internal = internal;
        }
        // 上一条数据的传感器温度(状态编程在下面具体介绍)
        private ValueState<Double> lastTemperatureState;
        // 定时器的时间戳
        private ValueState<Long> timerTsState;
        @Override
        public void open(Configuration parameters) throws Exception {
            /**
             * "last-temp" -> 当前状态变量的名称
             * Double.class -> 当前状态变量的类型
             * Double.MIN_VALUE -> 当前状态变量的初始值
             */
            lastTemperatureState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class, 0.0d));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("last-timer-ts", Long.class, 0L));
        }
        @Override
        public void processElement(SensorReading input, Context context, Collector<String> collector) throws Exception {
            // 1、获取上一次状态值
            Double lastTemperature = lastTemperatureState.value();
            Long timerState = timerTsState.value();
            // 2、更新温度状态
            lastTemperatureState.update(input.getTemperature());
            // 3、比较上一次温度
            if (input.getTemperature() > lastTemperature && timerState == 0) {
                // first data
                long timeTs = context.timerService().currentProcessingTime() + internal * 1000L;
                // 注册定时器
                context.timerService().registerProcessingTimeTimer(timeTs);
                // 更新定时器状态值
                timerTsState.update(timeTs);
            } else if (input.getTemperature() < lastTemperature && timerState != 0) {
                // 当前温度小于上一次温度 并且定时器不为null
                // 删除定时器
                context.timerService().deleteProcessingTimeTimer(timerState);
                // 清除定时状态变量
                timerTsState.clear();
            }
        }
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 如果触发定时器函数 说明该传感器在10s内温度连续上升,需要预警
            String key = ctx.getCurrentKey();
            String resultStr = "传感器ID为:" + key + "在10s内温度连续上升...";
            out.collect(resultStr);
            // 清空定时器值
            timerTsState.clear();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

三、侧输出流(sideOutput)

3.1、概述

大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process
function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

3.2、案例

根据传感器温度,将低于60度的数据输入到侧输出流

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutput_Demo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        // 定义获取侧输出流
        final OutputTag<String> outputTag = new OutputTag<String>("side-out-put"){};
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.process(new CustomSideOutput(outputTag));
        DataStream<String> sideOutputDataStream = resultDataStream.getSideOutput(outputTag);
        sideOutputDataStream.print("low > ");
        resultDataStream.print();
        env.execute();
    }
    public static class CustomSideOutput extends ProcessFunction<SensorReading, SensorReading> {
        private OutputTag<String> outputTag;
        public CustomSideOutput(OutputTag<String> outputTag) {
            this.outputTag = outputTag;
        }
        @Override
        public void processElement(SensorReading sensorReading, Context context, Collector<SensorReading> collector) throws Exception {
            if (sensorReading.getTemperature() < 60) {
                String msg = sensorReading.getId() + " 的温度低于60度 -> " + sensorReading.getTemperature();
                context.output(outputTag, msg);
            }
            collector.collect(sensorReading);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
原文链接:https://blog.csdn.net/qq_41311979/article/details/115000412

标签:function,定时器,Java,process,flink,org,apache,import,public
From: https://www.cnblogs.com/sunny3158/p/18021369

相关文章

  • [Go] Get used to return (*SomeType, error) as function return type
    packagemainimport( "fmt" "log" "strconv" "strings")typePointstruct{ xint yint}typeLinestruct{ p1*Point p2*Point}funcgetInput()string{return`0,9->5,98,0->0,89,4->......
  • 熟悉又陌生的JavaWeb 第0天
    传送门JavaWeb程序设计不满足于Java基础的窗口命令行程序,那来试试网站吧于是便有了JavaWeb印象中的大学教材应该是这本书,不过无所谓了,大同小异,教的内容其实差不多看看教科书的目录JavaWeb开发环境配置B/S结构服务器安装IDE安装第一个Web项目课后习题HTML基础......
  • Flink入门之Flink程序开发步骤(java语言)
    Flink入门之Flink程序开发步骤(java语言)文章目录(0)开发程序所需依赖(1)获取执行环境(2)加载/创建数据源(3)数据转换处理(4)处理后数据放置/输出(5)执行计算程序(6)完整示例注:本篇章的flink学习均是基于java开发语言我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?前......
  • 前端知识回顾概览--JavaScript 高级
     掌握JS语言,针对闭包、原型链等有深入理解对typescript静态化工具熟练掌握精通常见设计模式了解函数式编程 1.this指针/闭包/作用域this指针详解闭包的概念及应用场景作用域(全局作用域/函数作用域)默认绑定、显式绑定、隐式绑定存储空间、执行上下文2.面向对象编......
  • Java中==和equals有什么区别
    原文网址:​​Java中==和equals有什么区别_IT利刃出鞘的博客-CSDN博客​​简介本文介绍java中==和equals的区别。分享Java技术星球(自学精灵),有面试真题和架构技术等:​​https://learn.skyofit.com/​​区别区别是:一个是运算符,一个是方法。==比较变量的值是否相同。如果比较......
  • 在script标签写export为什么会抛错|type module import ES5 ES6 预处理 指令序言 JavaS
    今天我们进入到语法部分的学习。在讲解具体的语法结构之前,这一堂课我首先要给你介绍一下JavaScript语法的一些基本规则。脚本和模块首先,JavaScript有两种源文件,一种叫做脚本,一种叫做模块。这个区分是在ES6引入了模块机制开始的,在ES5和之前的版本中,就只有一种源文件类型(就......
  • Java方法重写与重载
    一、方法重载(overload)概念方法重载指同一个类中定义的多个方法之间的关系,满足下列条件的多个方法相互构成重载 多个方法在同一个类中 多个方法具有相同的方法名 多个方法的参数不相同,类型不同或者数量不同 所谓方法重载就是指我们可以定义一些名称相同的方法,通过定......
  • java普通项目转springboot项目
    添加启动类@SpringBootApplicationpublicclassSpringBootMain{publicstaticvoidmain(String[]args){SpringApplication.run(SpringBootMain.class,args);}}添加依赖<parent><groupId>org.springframework.boot</grou......
  • java 日期计算
      importjava.util.Calendar;publicclassMain{publicstaticvoidmain(String[]args){//创建一个Calendar对象并设置为当前时间Calendarcalendar=Calendar.getInstance();//获取当前年份、月份和日期inty......
  • 一位普通Javaer的成长之路
    前言此文章用于记录自己作为Java开发者的成长历程永远置顶于我的博客为什么要做Java其实本来是想学C#做桌面应用程序的,奈何Java的火热和易上手,加上好找工作些,所以入行了Java当然,也不影响我现在偶尔会学学C#,做windows下的桌面应用程序以《斗破苍穹》的斗气段位来代表计算机专......