基本处理函数
函数功能
处理函数主要是定义数据流的转换工作。
处理函数提供了一个“定时服务”,可以通过它访问流中的事件、时间戳、水位线,甚至可以注册“定时事件”。继承了 AbstractRichFunction
抽象类, 所以拥有富函数类的所有特性,同样可以访问状态和其他运行时信息。处理函数还可以直接将数据输出到侧输出流中。
ProcessFunction 解析
抽象类 ProcessFunction
继承了 AbstractRichFunction
,有两个泛型类型参数:I
表示输入的数据类型,O
表示处理完成之后输出的数据类型。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
....
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
....
}
processElement()
:用于“处理元素”,定义了处理的核心逻辑。- 输入数据 var1:当前流中的输入元素。
- 上下文 var2:类型是
ProcessFunction
中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”,以及可以将数据发送到“侧输出流”的方法output()
。 - 收集器 var3:用于返回输出数据。
onTimer()
:用于定义定时触发的操作。这个方法只有在注册好的定时器之后,定时器触发的时候才会调用,而定时器是通过“定时服务”来注册的。
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event event, ProcessFunction<Event, String>.Context context, Collector<String> collector) throws Exception {
if (event.user.equals("Marry")) {
collector.collect(event.user + "clicks" + event.url);
} else if (event.user.equals("Bob")) {
collector.collect(event.user);
collector.collect(event.user);
}
collector.collect(event.toString());
System.out.println("timestamp:" + context.timestamp());
System.out.println("Watermark:" + context.timerService().currentWatermark());
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
}
}).print();
分类
ProcessFunction
:最基本的处理函数,基于 DataStream 直接调用process()
时作为参数传入。KeyedProcessFunction
:对流按键分区后的处理函数,基于 KeyedStream 调用process()
时作为参数传入。ProcessWindowFunction
:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用process()
时作为参数传入。ProcessAllWindowFunction
:开窗之后的処理函数,基于 AllWindowedStream 调用process()
时作为参数传入。CoProcessFunction
:合并两条流之后的处理函数,基于 ConnectedStreams 调用process()
时作为参数传入。ProcessJoinFunction
:间隔连接两条流之后的处理函数,基于 IntervalJoined 调用process()
时作为参数传入。BroadcastProcessFunction
:广播连接流处理函数,基于 BroadcastConnectedStream 调用process()
时作为参数传入。
按键分区处理函数
只有在 KeyedStream 中オ支持使用 TimerService
设置定时器的操作。所以一般情况下,我们都是先做了 keyBy
分区之后,再去定义处理操作。
定时器和定时服务
定时器是处理函数中进行时间相关操作的主要机制。onTimer()
可以实现定时处理的逻辑。通过上下文提供的定时服务来注册定时器。
定时服务与当前运行的环境有关,ProcessFunction 的上下文中提供了 timerService()
方法,可以直接返回一个 TimerService 对象。
@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 获取当前处理时间
long currentProcessingTime();
// 获取当前水位线
long currentWatermark();
// 注册处理时间定时器
void registerProcessingTimeTimer(long var1);
// 注册事件时间定时器
void registerEventTimeTimer(long var1);
// 删除处理时间定时器
void deleteProcessingTimeTimer(long var1);
// 删除事件时间定时器
void deleteEventTimeTimer(long var1);
}
timerService()
会以键和时间戳为标准,对定时器进行去重。每一个键和时间戳最多只有一个定时器,如果注册了多次,onTimer()
也只调用一次。
KeyedProcessFunction 的使用
stream.keyBy(event -> event.user)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event event, KeyedProcessFunction<String, Event, String>.Context context, Collector<String> collector) throws Exception {
long currPT = context.timerService().currentProcessingTime();
collector.collect(context.getCurrentKey() + "数据到达时间:" + new Timestamp(currPT));
// 注册10秒后的定时器
context.timerService().registerProcessingTimeTimer(currPT + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + "定时器触发时间:" + new Timestamp(timestamp));
}
}).print();
案例 Top-N
需求:统计最近10秒内最热门的两个 url 链接,并且每5秒钟更新一次。
使用 ProcessAllWindowFunction
不区分 url 链接,而是将所有访问数据都收集起来,统一进行统计计算。所以可以不做 keyBy,直接基于 DataStream 开窗,然后使用全窗口函数 ProcessAllWindowFunction
来进行处理。
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
// 直接开窗,收集所有数据
stream.map(event -> event.url)
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlHashMapCountAgg(), new UrlAllWindowResult())
.print();
// 实现自定义的增量聚合函数
public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>> {
@Override
public HashMap<String, Long> createAccumulator() {
return new HashMap<>();
}
@Override
public HashMap<String, Long> add(String s, HashMap<String, Long> stringLongHashMap) {
if (stringLongHashMap.containsKey(s)) {
Long count = stringLongHashMap.get(s);
stringLongHashMap.put(s, count + 1);
} else {
stringLongHashMap.put(s, 1L);
}
return stringLongHashMap;
}
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> stringLongHashMap) {
ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
for (String key: stringLongHashMap.keySet()) {
result.add(Tuple2.of(key, stringLongHashMap.get(key)));
}
result.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return o2.f1.intValue() - o1.f1.intValue();
}
});
return result;
}
@Override
public HashMap<String, Long> merge(HashMap<String, Long> stringLongHashMap, HashMap<String, Long> acc1) {
return null;
}
}
// 实现自定义全窗口函数,包装信息输出结果
public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> iterable, Collector<String> collector) throws Exception {
ArrayList<Tuple2<String, Long>> list = iterable.iterator().next();
StringBuilder result = new StringBuilder();
result.append("--------");
result.append("窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n");
// 取list前两个,包装信息输出
for (int i = 0; i < 2; i++) {
Tuple2<String, Long> currTuple = list.get(i);
String info = "No." + (i + 1) + " "
+ "Url:" + currTuple.f0 + " "
+ "访问量:" + currTuple.f1 + "\n";
result.append(info);
}
result.append("--------");
collector.collect(result.toString());
}
}
使用 KeyedProcessFunction
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
// 1.按照url分组,统计窗口内每个url的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(event -> event.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
urlCountStream.print("url Count");
// 2.对于同一个窗口统计出访问量,进行收集和排序
urlCountStream.keyBy(urlViewCount -> urlViewCount.windowEnd)
.process(new TopNProcessResult(2))
.print();
// 实现自定义KeyedProcessFunction
public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String> {
// 定义 N
private Integer n;
// 定义列表状态
private ListState<UrlViewCount> urlViewCountListState;
public TopNProcessResult(Integer n) {
this.n = n;
}
// 在环境中获取状态
@Override
public void open(Configuration parameters) throws Exception {
urlViewCountListState = getRuntimeContext().getListState(
new ListStateDescriptor<UrlViewCount>("UrlCountList", Types.POJO(UrlViewCount.class))
);
}
@Override
public void processElement(UrlViewCount urlViewCount, KeyedProcessFunction<Long, UrlViewCount, String>.Context context, Collector<String> collector) throws Exception {
// 将数据保存到状态中
urlViewCountListState.add(urlViewCount);
// 注册 windowEnd + 1ms 的定时器
context.timerService().registerEventTimeTimer(context.getCurrentKey() + 1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for (UrlViewCount urlViewCount: urlViewCountListState.get()) {
urlViewCountArrayList.add(urlViewCount);
}
urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return o2.count.intValue() - o1.count.intValue();
}
});
// 包装信息打印
StringBuilder result = new StringBuilder();
result.append("--------");
result.append("窗口结束时间:" + new Timestamp(ctx.getCurrentKey()) + "\n");
// 取list前两个,包装信息输出
for (int i = 0; i < 2; i++) {
UrlViewCount currUrlViewCount = urlViewCountArrayList.get(i);
String info = "No." + (i + 1) + " "
+ "Url:" + currUrlViewCount.url + " "
+ "访问量:" + currUrlViewCount.count + "\n";
result.append(info);
}
result.append("--------");
out.collect(result.toString());
}
}
//增量聚合,来一条数据就加 1
public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
//包装窗口信息,输出UrlViewCount
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
Long uv = elements.iterator().next();
out.collect(new UrlViewCount(url, uv, start, end));
}
}
public class UrlViewCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
this.url = url;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "UrlViewCount[" +
"url='" + url + '\'' +
", count=" + count +
", windowStart=" + new Timestamp(windowStart) +
", windowEnd=" + new Timestamp(windowEnd) +
']';
}
}
标签:url,Flink,Long,处理函数,public,Override,new,event
From: https://www.cnblogs.com/fireonfire/p/17015807.html