Process
Flink 提供了 8 个不同的处理函数:
(1) ProcessFunction
最基本的处理函数,基于DataStream 直接调用.process()时作为参数传入。
(2) KeyedProcessFunction
对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3) ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream 调用.process()时作为参数传入。
(4) ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream 调用.process()时作为参数传入。
(5) CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6) ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
(7) BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
(8) KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物。
处理函数是一个最底层的API,直面的就是数据流中最基本的元素:数据事件(envent)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权,能做任何事。
基本处理函数(ProcessFunction)
ProcessFunction继承了AbstractRichFunction
ProcessFunction接口自身有2个比较重要的方法就是
- processElement(I value, Context ctx, Collector
out),流中的每一个元素都会调用此方法。在这个方法中有个 Context 即上下文,Context可以访问元素的时间戳、key值、output、TimerService时间服务。另外,TimerService()是一个抽象方法,里面可获得当前时间(处理时间)、获取水位线(事件时间)、注册定时器(process/event)、删除定时器的方法。 - onTimer(long timestamp, OnTimerContext ctx, Collector
out) ,onTime是一个条件触发执行方法,当我们注册了定时器,并且到达了指定时间时后,就会触发onTime方法。但该方法只能针对KeyedStream使用。而本身的ProcessFunction是基于DataStream流的。
//processElement(输入,上下文,输出)
/*ctx
-
timestamp
-
timeServer->可获得当前时间(处理时间)、获取水位线(事件事件)、注册定时器(process/event)、删除定时器
-
output
-
onTime只要在KeyBy操作后才能用(onTime是基于KeyedStream的方法)
-
onTime->当我们注册了定时器,并且到达了指定时间时后,就会触发onTime方法
*/
package com.peng.process;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @author 海绵先生
* @Description TODO
* @date 2022/11/6-16:28
*/
public class ProcessFunctionDemo01 {
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Event {
private String user;
private String url;
private Long timestamp;
}
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source
DataStreamSource<Event> dataStreamSource = env.fromElements(new Event("Sponge", "./user", 100L),
new Event("Tom", "./user", 150L),
new Event("Jack", "./lookspouce", 300L),
new Event("Sponge", "./like", 400L));
SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
SingleOutputStreamOperator<String> process = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
//processElement(输入,上下文,输出)
/*ctx上下文中的方法
* timestamp
* timeServer->可获得当前时间(处理时间)、获取水位线(事件事件)、注册定时器(process/event)、删除定时器
* output
*/
if (value.user.equals("Jack")) {
out.collect(value.user + " clicks " + value.url);
} else if (value.user.equals("Tom")) {
out.collect(value.user);
out.collect(value.user);
}
// 输出当前事件时间
System.out.println("timestamp:" + ctx.timestamp());
// 输出当前水位线
System.out.println("watermark:" + ctx.timerService().currentWatermark());
System.out.println("IndexOfThisSubtask:" + getRuntimeContext().getIndexOfThisSubtask());
}
});
process.print();
env.execute();
}
}
虽然Context的timeServer()里有注册定时器的功能,但是ProcessFunction实现不了,这时我们就需要自定义实现KeyedProcessFunction了。
KeyProcessFunction
KeyProcessFunction在实际开发中是用的比较多的。
KeyProcessFunction比ProcessFunction多了一个onTime功能。
基于处理时间的定时器:registerProcessingTimeTimer
package com.peng.process;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
/**
* @author 海绵先生
* @Description TODO 基于处理时间的定时器
* @date 2022/11/6-19:30
*/
public class KeyedProcessFunctionDemo01 {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// source
DataStreamSource<Event> eventDataStreamSource = env.addSource(new ClickSource());
eventDataStreamSource.keyBy(Event::getUser)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
long currTs = ctx.timerService().currentProcessingTime();
// ctx.getCurrentKey()获取当前的key值 new Timestamp()将时间转换成YYYY-MM-DD HH:MM:SS的格式
out.collect(ctx.getCurrentKey() + "数据到达时间:" + new Timestamp(currTs));//zhangsan数据到达时间:2022-11-06 20:04:20.269
// 注册一个10秒后的定时器 registerProcessingTimeTimer:基于处理时间
ctx.timerService().registerProcessingTimeTimer(currTs + 10*1000L);
}
// onTimer(当前时间,上下文,输出)
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// ctx.getCurrentKey()获取当前的key值
out.collect(ctx.getCurrentKey() + "定时器触发时间:" + new Timestamp(timestamp));//zhangsan定时器触发时间:2022-11-06 20:04:30.269
}
}).print();
env.execute();
}
}
基于事件时间的定时器:registerEventTimeTimer
package com.peng.process;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
/**
* @author 海绵先生
* @Description TODO 基于事件时间定时器
* @date 2022/11/6-20:21
*/
public class KeyedProcessFunctionDemo02 {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// source
SingleOutputStreamOperator<Event> eventDataStreamSource = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
}));
eventDataStreamSource.keyBy(Event::getUser)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
long currTs = ctx.timerService().currentProcessingTime();
// ctx.getCurrentKey()获取当前的key值
out.collect(ctx.getCurrentKey() + " 数据到达时间:" + new Timestamp(currTs) + " watermark:" + ctx.timerService().currentWatermark());//lisi 数据到达时间:2022-11-06 20:29:57.643 watermark:1667737795406
// 注册一个10秒后的定时器 registerEventTimeTimer:基于处理时间
ctx.timerService().registerEventTimeTimer(currTs + 10*1000L);
}
// onTimer(当前时间,上下文,输出)
// 基于事件时间的定时器,只有当水位线大于定时器时间时才会触发
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// ctx.getCurrentKey()获取当前的key值
out.collect(ctx.getCurrentKey() + " 定时器触发时间:" + new Timestamp(timestamp));// lisi 定时器触发时间:2022-11-06 20:30:07.643
}
}).print();
env.execute();
}
}
窗体的水位线是没隔一段时间才生成的,因此会出现时间对不上的现状,这很正常。
当一个程序结束时,Flink会把水位线的值调整成最大值,这样所有的定时器就会全部实现。
ProcessWindowFunction
窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似, 也是基于WindowedStream 直接调用方法就可以,只不过这时调用的是.process()。
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
/***************************************************/
public static MyProcessWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, W extends Window>
-
IN:input,数据流中窗口任务的输入数据类型。
-
OUT:output,窗口任务进行计算之后的输出数据类型。
-
KEY:数据中键 key 的类型。
-
W:窗口的类型,是Window 的子类型。一般情况下我们定义时间窗口,W 就是TimeWindow。
package com.peng.window;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;
/**
* @author 海绵先生
* @Description TODO 自定义实现ProcessWindowFunction方法
* @date 2022/11/11-15:56
*/
public class ProcessWindowFunctionDemo01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// source
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
stream.print("input");
// 使用ProcessWindowFunction计算UV
stream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new MyProcessWindowFunction())
.print();
env.execute();
}
//ProcessWindowFunction是一个抽象类,继承于AbstractRichFunction
public static class MyProcessWindowFunction extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{//第一个参数:输入数据类型,第二个参数:输出数据类型,第三个参数:KEY值数据类型,第四个参数:窗体时间类型
// 窗口结束时调用process方法
@Override
public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
// 用一个HashSet保存user->作用:去重
HashSet<String> userSet = new HashSet<>();
// elements中遍历数据,放到set中去重
for (Event event : elements) {
userSet.add(event.getUser());
}
int uv = userSet.size();
//结合窗口信息
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect("窗口" + new Timestamp(start) + "~" + new Timestamp(end)
+ "UV值为:" + uv);
}
}
}
output
侧输出流
处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。
绝大多数转换算子,输出的都是单一流,流里的数据类型只能有一种。如果使用 filter 算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费,而侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。
具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了。
SingleOutputStreamOperator<Event> processedStream = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
//out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
if (value.getUser().equals("zhangsan")) {
// 这里可以看到,侧输出流的数据类型不一定是ProcessFunction里已定义好的数据类型
ctx.output(zhangsanTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
} else if (value.getUser().equals("wangwu")) {
ctx.output(wangwuTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
} else {
out.collect(value);
}
}
});
这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag
,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据。
我们可以在外部先将OutputTag
声明出来:
OutputTag<Tuple3<String, String, Long>> zhangsanTag = new OutputTag<Tuple3<String, String, Long>>("zhangsan"){};
OutputTag<Tuple3<String, String, Long>> wangwuTag = new OutputTag<Tuple3<String, String, Long>>("wangwu"){};
不加{},Flink可能推断不出OutputTag的数据类型
如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag
,这个方式与窗口API 中获取侧输出流是完全一样的。
processedStream.getSideOutput(zhangsanTag).print("zhangsan");
processedStream.getSideOutput(wangwuTag).print("wangwu");
完整测试代码:
SideOutPutStreamTest01.java
package com.peng.process;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
* @author 海绵先生
* @Description TODO 测试侧输出流
* @date 2022/11/11-20:43
*/
public class SideOutPutStreamTest01 {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// source
DataStreamSource<Event> dataStreamSource = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
}));
// 定义一个输出标签
OutputTag<Tuple3<String, String, Long>> zhangsanTag = new OutputTag<Tuple3<String, String, Long>>("zhangsan"){};
OutputTag<Tuple3<String, String, Long>> wangwuTag = new OutputTag<Tuple3<String, String, Long>>("wangwu"){};
SingleOutputStreamOperator<Event> processedStream = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
if (value.getUser().equals("zhangsan")) {
ctx.output(zhangsanTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
} else if (value.getUser().equals("wangwu")) {
ctx.output(wangwuTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
} else {
out.collect(value);
}
}
});
processedStream.getSideOutput(zhangsanTag).print("zhangsan");
processedStream.getSideOutput(wangwuTag).print("wangwu");
processedStream.print("else");
env.execute();
}
}
ClinkSource.java
package com.peng.process;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Timestamp;
import java.util.Random;
/**
* @author 海绵先生
* @Description TODO 自定义数据源
* @date 2022/11/6-19:40
*/
public class ClickSource extends RichParallelSourceFunction<Event> {
private String[] names = {"zhangsan","lisi","wangwu","zhaoliu"};
private String[] urls = {"./user","./lookspouce","./like"};
private boolean flag = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
while (flag){
String user = names[random.nextInt(4)];
String url = urls[random.nextInt(3)];
long timeMillis = System.currentTimeMillis();
ctx.collect(new Event(user,url,timeMillis));
Thread.sleep(2000);// 每两秒产生一次数据
}
}
@Override
public void cancel() {
flag = false;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Event{
private String user;
private String url;
private Long timestamp;
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
运行输出结果:
else> Event{user='zhaoliu', url='./lookspouce', timestamp=2022-11-11 21:21:52.9}
else> Event{user='zhaoliu', url='./lookspouce', timestamp=2022-11-11 21:21:55.065}
zhangsan> (./lookspouce,./lookspouce,1668172917075)
wangwu> (./lookspouce,./lookspouce,1668172919080)
wangwu> (./lookspouce,./lookspouce,1668172921087)
wangwu> (./user,./user,1668172923089)
wangwu> (./user,./user,1668172925097)
zhangsan> (./like,./like,1668172927100)
wangwu> (./lookspouce,./lookspouce,1668172929111)
wangwu> (./like,./like,1668172931122)
else> Event{user='lisi', url='./user', timestamp=2022-11-11 21:22:13.135}
else> Event{user='lisi', url='./user', timestamp=2022-11-11 21:22:15.145}
标签:Flink,flink,ctx,new,org,apache,import,处理函数
From: https://www.cnblogs.com/Mr-Sponge/p/17037101.html