首页 > 其他分享 >Flink:处理函数

Flink:处理函数

时间:2022-12-30 21:00:43浏览次数:40  
标签:url Flink Long 处理函数 public Override new event

基本处理函数

函数功能

处理函数主要是定义数据流的转换工作。

处理函数提供了一个“定时服务”,可以通过它访问流中的事件、时间戳、水位线,甚至可以注册“定时事件”。继承了 AbstractRichFunction 抽象类, 所以拥有富函数类的所有特性,同样可以访问状态和其他运行时信息。处理函数还可以直接将数据输出到侧输出流中。

ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示输入的数据类型,O 表示处理完成之后输出的数据类型。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
		....
  
    public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }

  	....
}
  • processElement():用于“处理元素”,定义了处理的核心逻辑。
    • 输入数据 var1:当前流中的输入元素。
    • 上下文 var2:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”,以及可以将数据发送到“侧输出流”的方法 output()
    • 收集器 var3:用于返回输出数据。
  • onTimer():用于定义定时触发的操作。这个方法只有在注册好的定时器之后,定时器触发的时候才会调用,而定时器是通过“定时服务”来注册的。
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                })
        );

stream.process(new ProcessFunction<Event, String>() {
    @Override
    public void processElement(Event event, ProcessFunction<Event, String>.Context context, Collector<String> collector) throws Exception {
        if (event.user.equals("Marry")) {
            collector.collect(event.user + "clicks" + event.url);
        } else if (event.user.equals("Bob")) {
            collector.collect(event.user);
            collector.collect(event.user);
        }
        collector.collect(event.toString());
        System.out.println("timestamp:" + context.timestamp());
        System.out.println("Watermark:" + context.timerService().currentWatermark());

        System.out.println(getRuntimeContext().getIndexOfThisSubtask());
    }
}).print();

分类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入。
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入。
  • ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入。
  • ProcessAllWindowFunction:开窗之后的処理函数,基于 AllWindowedStream 调用 process() 时作为参数传入。
  • CoProcessFunction:合并两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入。
  • ProcessJoinFunction:间隔连接两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入。
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。

按键分区处理函数

只有在 KeyedStream 中オ支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作。

定时器和定时服务

定时器是处理函数中进行时间相关操作的主要机制。onTimer() 可以实现定时处理的逻辑。通过上下文提供的定时服务来注册定时器。

定时服务与当前运行的环境有关,ProcessFunction 的上下文中提供了 timerService() 方法,可以直接返回一个 TimerService 对象。

@PublicEvolving
public interface TimerService {
    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

  	// 获取当前处理时间
    long currentProcessingTime();

  	// 获取当前水位线
    long currentWatermark();

  	// 注册处理时间定时器
    void registerProcessingTimeTimer(long var1);

  	// 注册事件时间定时器
    void registerEventTimeTimer(long var1);

  	// 删除处理时间定时器
    void deleteProcessingTimeTimer(long var1);

  	// 删除事件时间定时器
    void deleteEventTimeTimer(long var1);
}

timerService() 会以键和时间戳为标准,对定时器进行去重。每一个键和时间戳最多只有一个定时器,如果注册了多次,onTimer() 也只调用一次。

KeyedProcessFunction 的使用

stream.keyBy(event -> event.user)
        .process(new KeyedProcessFunction<String, Event, String>() {
            @Override
            public void processElement(Event event, KeyedProcessFunction<String, Event, String>.Context context, Collector<String> collector) throws Exception {
                long currPT = context.timerService().currentProcessingTime();
                collector.collect(context.getCurrentKey() + "数据到达时间:" + new Timestamp(currPT));
                // 注册10秒后的定时器
                context.timerService().registerProcessingTimeTimer(currPT + 10 * 1000L);
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                out.collect(ctx.getCurrentKey() + "定时器触发时间:" + new Timestamp(timestamp));
            }
        }).print();

案例 Top-N

需求:统计最近10秒内最热门的两个 url 链接,并且每5秒钟更新一次。

使用 ProcessAllWindowFunction

不区分 url 链接,而是将所有访问数据都收集起来,统一进行统计计算。所以可以不做 keyBy,直接基于 DataStream 开窗,然后使用全窗口函数 ProcessAllWindowFunction 来进行处理。

SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        }));

// 直接开窗,收集所有数据
stream.map(event -> event.url)
        .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .aggregate(new UrlHashMapCountAgg(), new UrlAllWindowResult())
        .print();
// 实现自定义的增量聚合函数
public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>> {
    @Override
    public HashMap<String, Long> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public HashMap<String, Long> add(String s, HashMap<String, Long> stringLongHashMap) {
        if (stringLongHashMap.containsKey(s)) {
            Long count = stringLongHashMap.get(s);
            stringLongHashMap.put(s, count + 1);
        } else {
            stringLongHashMap.put(s, 1L);
        }
        return stringLongHashMap;
    }

    @Override
    public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> stringLongHashMap) {
        ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
        for (String key: stringLongHashMap.keySet()) {
            result.add(Tuple2.of(key, stringLongHashMap.get(key)));
        }
        result.sort(new Comparator<Tuple2<String, Long>>() {
            @Override
            public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                return o2.f1.intValue() - o1.f1.intValue();
            }
        });
        return result;
    }

    @Override
    public HashMap<String, Long> merge(HashMap<String, Long> stringLongHashMap, HashMap<String, Long> acc1) {
        return null;
    }
}
// 实现自定义全窗口函数,包装信息输出结果
public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow> {
    @Override
    public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> iterable, Collector<String> collector) throws Exception {
        ArrayList<Tuple2<String, Long>> list = iterable.iterator().next();

        StringBuilder result = new StringBuilder();
        result.append("--------");
        result.append("窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n");

        // 取list前两个,包装信息输出
        for (int i = 0; i < 2; i++) {
            Tuple2<String, Long> currTuple = list.get(i);
            String info = "No." + (i + 1) + " "
                    + "Url:" + currTuple.f0 + " "
                    + "访问量:" + currTuple.f1 + "\n";
            result.append(info);
        }
        result.append("--------");
        collector.collect(result.toString());
    }
}

使用 KeyedProcessFunction

SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        }));
// 1.按照url分组,统计窗口内每个url的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(event -> event.url)
        .window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

urlCountStream.print("url Count");

// 2.对于同一个窗口统计出访问量,进行收集和排序
urlCountStream.keyBy(urlViewCount -> urlViewCount.windowEnd)
        .process(new TopNProcessResult(2))
        .print();
// 实现自定义KeyedProcessFunction
public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String> {
    // 定义 N
    private Integer n;

    // 定义列表状态
    private ListState<UrlViewCount> urlViewCountListState;

    public TopNProcessResult(Integer n) {
        this.n = n;
    }

    // 在环境中获取状态
    @Override
    public void open(Configuration parameters) throws Exception {
        urlViewCountListState = getRuntimeContext().getListState(
                new ListStateDescriptor<UrlViewCount>("UrlCountList", Types.POJO(UrlViewCount.class))
        );
    }

    @Override
    public void processElement(UrlViewCount urlViewCount, KeyedProcessFunction<Long, UrlViewCount, String>.Context context, Collector<String> collector) throws Exception {
        // 将数据保存到状态中
        urlViewCountListState.add(urlViewCount);
        // 注册 windowEnd + 1ms 的定时器
        context.timerService().registerEventTimeTimer(context.getCurrentKey() + 1);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
        for (UrlViewCount urlViewCount: urlViewCountListState.get()) {
            urlViewCountArrayList.add(urlViewCount);
        }
        urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
            @Override
            public int compare(UrlViewCount o1, UrlViewCount o2) {
                return o2.count.intValue() - o1.count.intValue();
            }
        });

        // 包装信息打印
        StringBuilder result = new StringBuilder();
        result.append("--------");
        result.append("窗口结束时间:" + new Timestamp(ctx.getCurrentKey()) + "\n");

        // 取list前两个,包装信息输出
        for (int i = 0; i < 2; i++) {
            UrlViewCount currUrlViewCount = urlViewCountArrayList.get(i);
            String info = "No." + (i + 1) + " "
                    + "Url:" + currUrlViewCount.url + " "
                    + "访问量:" + currUrlViewCount.count + "\n";
            result.append(info);
        }
        result.append("--------");
        out.collect(result.toString());
    }
}
//增量聚合,来一条数据就加 1
public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(Event value, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
        return null;
    }
}
//包装窗口信息,输出UrlViewCount
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
    @Override
    public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
        Long start = context.window().getStart();
        Long end = context.window().getEnd();
        Long uv = elements.iterator().next();
        out.collect(new UrlViewCount(url, uv, start, end));
    }
}
public class UrlViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
        this.url = url;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "UrlViewCount[" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + new Timestamp(windowStart) +
                ", windowEnd=" + new Timestamp(windowEnd) +
                ']';
    }
}

标签:url,Flink,Long,处理函数,public,Override,new,event
From: https://www.cnblogs.com/fireonfire/p/17015807.html

相关文章

  • flink 实现 postgre-CDC
    一、前置工作1.修改postgresql配置文件  /data/pgsql/13/data/postgresql.conf相关配置:#更改wal日志方式为logicalwal_level=logical#minimal,replica,or......
  • Flink 中的 Time 有哪几种
    处理时间(ProcessingTime)1、Flink程序执行对应操作的系统时间。所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间。例如:每个小时的处理时间窗口包括在......
  • Flink 的经典场景和业务故事有哪些?看看他们就知道了
    在大数据的日常场景中,从数据生产者,到数据收集、数据处理、数据应用(BI+AI),整个大数据+AI全栈的每个环节,Flink均可应用于其中。作为新一代开源大数据计算引擎,Flink不仅满......
  • Flink 实战:如何解决应用中的技术难题?
    倒计时5天!4月25-26日,全球首个Apache顶级项目在线会议FlinkForward精华版即将重磅开启。FlinkForward全球在线会议精华版均为中文直播,核心内容分为Keynote与社区......
  • 源码剖析Flink设计思想,搞定Flink中的一切难题 | 文末送书
    大家好,我是梦想家Alex,好不容易到了周末,为了感谢大家一直以来的支持,我决定安排一次送书活动,感兴趣的朋友们记得划到文末去查看。流计算从出现到普及,经历了非常多的变化—......
  • 六、格式化输入输出和错误处理函数
    相关函数:printf头文件 :#include<stdio.h>函数原型:intprintf(constchar*format,…);函数说明:printf会根据参数format来转换并格式化数据,然后将结果写到标准输......
  • 字符串处理函数 相关函数: strstr
    相关函数:strstr头文件 :#include<string.h>函数原型:char*strstr(constchar*haystack, constchar*needle);函数说明:在字符串haystack中查找字符串needle......
  • Flink:DataStreamAPI
    执行环境获取的执行环境是StreamExecutionEnvironment类的对象。在代码中创建执行环境的方法,就是调用这个类的静态方法。getExecutionEnvironment根据上下文直接得到......
  • Flink使用TableAPi方式读取和写入Hive
    以下是一个简单的参考实例,用来验证通过FlinkSQL来跑批方式清洗Hive数据可行的。(1)验证了Hive中org.openx.data.jsonserde.JsonSerDe格式的表是可以直接读取数据出来的(2)通......
  • Flink的概念、特点及运行原理
    Flink是一个针对流数据和批数据的分布式处理引擎,主要用Java代码实现。目前,Flink主要还是依靠开源社区的贡献来发展的。对于Flink,其处理的数据主要是流数据,批数据只是流......