首页 > 其他分享 >Flink 增量窗口聚合函数 ReduceFunction(归约函数)和AggregateFunction(聚合函数)

Flink 增量窗口聚合函数 ReduceFunction(归约函数)和AggregateFunction(聚合函数)

时间:2024-02-20 16:13:18浏览次数:25  
标签:聚合 函数 Tuple2 AggregateFunction accumulator 归约 ReduceFunction 窗口

Flink 增量窗口聚合函数

定义了窗口分配器,只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是WindowedStream。这个类型并不是DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到DataStream,如下图所示

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

1、增量聚合函数(incremental aggregation functions)

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效了——这相当于真的在用批处理的思路来做实时流处理。为了提高实时性,可以再次将流处理的思路发扬光大:就像DataStream的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

归约函数(ReduceFunction)

最基本的聚合方式就是归约(reduce)。在基本转换的聚合算子中介绍过reduce的用法,窗口的归约聚合也非常类似,就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。窗口函数中也提供了ReduceFunction:只要基于WindowedStream调用.reduce()方法,然后传入ReduceFunction作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。这里的ReduceFunction其实与简单聚合时用到的ReduceFunction是同一个函数类接口,所以使用方式也是完全一样的。ReduceFunction中需要重写一个reduce方法,它的两个参数代表输入的两个元素,而归约最终输出结果的数据类型,与输入的数据类型必须保持一致。也就是说,中间聚合的状态和输出的结果,都和输入的数据类型是一样的。下面一是一个取用户活跃量的例子

复制代码
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> eventtream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序流 2s 延迟
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        eventtream.map(data -> Tuple2.of(data.user, 1L))
                .returns(new TypeHint<Tuple2<String, Long>>() {
                }).keyBy(data -> data.f0)
                //滚动窗口:窗口大小 5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        //规约聚合逻辑:key 不变,访问量两两相加 
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();
        env.execute();
    }
复制代码

运行结果

复制代码
(风清扬,1)
(令狐冲,2)
(依琳,3)
(莫大,1)
(令狐冲,3)
(依琳,1)
(任盈盈,1)
(莫大,1)
(依琳,1)
(风清扬,1)
(令狐冲,1)
复制代码

代码中我们对每个用户的行为数据进行了开窗统计。与wordcount逻辑类似,首先将数据转换成(user, count)的二元组形式(类型为Tuple2<String, Long>),每条数据对应的初始count值都是1;然后按照用户id分组,在处理时间下开滚动窗口,统计每5秒内的用户行为数量。对于窗口的计算,我们用ReduceFunction对count值做了增量聚合:窗口中会将当前的总count值保存成一个归约状态,每来一条数据,就会调用内部的reduce方法,将新数据中的count值叠加到状态上,并得到新的状态保存起来。等到了5秒窗口的结束时间,就把归约好的状态直接输出。这里需要注意,经过窗口聚合转换输出的数据,数据类型依然是二元组Tuple2<String, Long>。

聚合函数(AggregateFunction)

ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用ReduceFunction就会非常麻烦。例如,如果希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用ReduceFunction,那么应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要map-reduce-map三步操作,这显然不够高效。于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?Flink的Window API中的aggregate就提供了这样的操作。直接基于WindowedStream调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。AggregateFunction在源码中的定义如下:

复制代码
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    /**
     * Creates a new accumulator, starting a new aggregate.
     *
     * <p>The new accumulator is typically meaningless unless a value is added via {@link
     * #add(Object, Object)}.
     *
     * <p>The accumulator is the state of a running aggregation. When a program has multiple
     * aggregates in progress (such as per key and window), the state (per key and window) is the
     * size of the accumulator.
     *
     * @return A new accumulator, corresponding to an empty aggregate.
     */
    ACC createAccumulator();
    /**
     * Adds the given input value to the given accumulator, returning the new accumulator value.
     *
     * <p>For efficiency, the input accumulator may be modified and returned.
     *
     * @param value The value to add
     * @param accumulator The accumulator to add the value to
     * @return The accumulator with the updated state
     */
    ACC add(IN value, ACC accumulator);
    /**
     * Gets the result of the aggregation from the accumulator.
     *
     * @param accumulator The accumulator of the aggregation
     * @return The final aggregation result.
     */
    OUT getResult(ACC accumulator);
    /**
     * Merges two accumulators, returning an accumulator with the merged state.
     *
     * <p>This function may reuse any of the given accumulators as the target for the merge and
     * return that. The assumption is that the given accumulators will not be used any more after
     * having been passed to this function.
     *
     * @param a An accumulator to merge
     * @param b Another accumulator to merge
     * @return The accumulator with the merged state
     */
    ACC merge(ACC a, ACC b);
}
复制代码

AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

复制代码
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据value,
和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。 getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。 merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging
Window)的场景就是会话窗口(Session Windows)。
复制代码

 所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。下面来看一个具体例子。我们知道,在电商网站中,PV(页面浏览量)和UV(独立访客数)是非常重要的两个流量指标。一般来说,PV统计的是所有的点击量;而对用户id进行去重之后,得到的就是UV。所以有时我们会用PV/UV这个比值,来表示“人均重复访问量”,也就是平均每个用户会访问多少次页面,这在一定程度上代表了用户的粘度。 

复制代码
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                }));
        // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除
        eventStream.keyBy(data -> "key")
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                .aggregate(new AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double>() {
                    @Override
                    public Tuple2<HashSet<String>, Long> createAccumulator() {
                        //初始状态:创建累加器
                        return Tuple2.of(new HashSet<String>(), 0L);
                    }
                    @Override
                    public Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {
                        //将1条数据塞进集合
                        accumulator.f0.add(value.user);
                        //返回2元祖
                        return Tuple2.of(accumulator.f0, accumulator.f1 + 1l);
                    }
                    @Override
                    public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
                        return (double) (accumulator.f1 / accumulator.f0.size());
                    }
                    @Override
                    public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {
                        return null;
                    }
                }).print();
        env.execute();
    }
复制代码 结果: 复制代码
1.0
1.0
2.0
3.0
2.0
2.0
2.0
复制代码

代码中创建了事件时间滑动窗口,统计10秒钟的“人均PV”,每2秒统计一次。由于聚合的状态还需要做处理计算,因此窗口聚合时使用了更加灵活的AggregateFunction。为了统计UV,这里用一个HashSet保存所有出现过的用户id,实现自动去重;而PV的统计则类似一个计数器,每来一个数据加一就可以了。所以这里的状态,定义为包含一个HashSet和一个count值的二元组(Tuple2<HashSet<String>, Long>),每来一条数据,就将user存入HashSet,同时count加1。这里的count就是PV,而HashSet中元素的个数(size)就是UV;所以最终窗口的输出结果,就是它们的比值。这里没有涉及会话窗口,所以merge()方法可以不做任何操作。另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。通过ReduceFunction和AggregateFunction我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这就是Flink所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。  

原文链接:https://www.cnblogs.com/wdh01/p/16393424.html

标签:聚合,函数,Tuple2,AggregateFunction,accumulator,归约,ReduceFunction,窗口
From: https://www.cnblogs.com/sunny3158/p/18023339

相关文章

  • python不能跳转进入某个函数或模块的一种解决思路
    例如,下图中的get_bucket_mount_root函数可以顺利import进来,但是按ctrl键不能跳转进入这个函数: 一个解决思路是,在vscode终端中,打开python解释器,import上图中的hatbc库,然后用hatbc.__file__命令查找该库的__init__.py文件的路径,按住ctrl键,点击这个路径,即可跳转进入这个__init__.......
  • 关于vue3的h函数
    h(ElInput,{class:'w200ml8',placeholder:'关键字搜索',clearable:true,modelValue:formData.url_pattern,'onUpdate:modelValue':(val:string)=&......
  • C++函数用法
    1.getline函数的用法函数声明boolgetline(istream&in,string&s)功能说明从输入流读入一行到变量strings,即使是空格也可以读入。直到出现以下情况为止:读入了文件结束标志读到一个新行(有重载函数可以指定行分隔符,默认是"\n".)达到字符串的最大长度如果getline没有读......
  • 推荐系统中回归任务常用损失函数
    1.MSE(均方误差损失)优点:1.收敛快缺点:1.假设了样本服从正态分布,如果训练样本label不服从正态分布,则MSE并非最大似然估计2.对异常点很敏感 2.MAE(平均绝对误差损失)优点:不容易受异常值影响缺点:收敛速度慢,拟合能力弱 ......
  • java1.8LocalDate日期常用函数
    LocalDatenow=LocalDate.now();//获取当前日期2023-12-31Stringformat=LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss"));//获取当前时间2023-12-3115:44:52intmonthValue=now.getMonthValue();//返回当前的月份intdayOfMonth=now.......
  • 备选数列 / 函数
    FibonacciSequenceFormula\[F_{1}=1\]\[F_{2}=1\]\[F_{i}=F_{i-1}+F_{i-2}(i\geq3)\]List\[F_1=1\]\[F_2=1\]\[F_{3}=2\]\[F_{4}=3\]\[F_{5}=5\]\[F_{6}=8\]\[F_{7}=13\]\[F_{8}=21\]\[F_{9}=34\]......
  • onMounted钩子函数场景
    onMounted是一个生命周期钩子,它在组件被挂载到DOM后被调用,这意味着,当组件被插入到页面中并且可以与DOM交互时,onMounted函数就会被执行在vue3中使用onMounted钩子才能获取页面DOM加载的元素信息,否则直接写在外面就会因为vue的异步特性导致数据而获取不到onMou......
  • C++ 函数指针,指针函数,左值右值
    C++函数指针,指针函数,左值右值1.函数指针是一个指针类型的变量,存放的内容都是函数的指针,用来间接调用函数,格式如下:intadd(inta,intb){ returna+b;}int(*fadd)(inta,intb);//函数的指针,变量名需要被括号括起来,并且前面+*注意:函数指针的变量名要在前面+*号,同时......
  • IoU GIoU等损失函数
    IoUGIoU等损失函数目录IoUGIoU等损失函数IoULoss交并比numpy实现torch实现优缺点GIoULossnumpy实现torch实现优缺点DIoULossnumpy实现优缺点CIoULoss图例介绍A:目标框覆盖的矩形面积(粉色区域表示,包含被C掩盖掉的区域)B:预测框覆盖的矩形面积(蓝色区域表示,包含被C掩盖......
  • 推导(Derivation)和归约(Reduction)
    目录推导(Derivation)归约(Reduction)在编译原理中,推导(Derivation)和归约(Reduction,有时也称为规约)是两个核心概念,用于描述如何根据形式文法的规则来生成或识别字符串。它们是基于形式语言理论中的上下文无关文法(Context-FreeGrammars,CFGs)进行的操作。推导(Derivation)推导是从文......