首页 > 其他分享 >Flink中的时间和窗口

Flink中的时间和窗口

时间:2024-03-31 12:00:15浏览次数:13  
标签:调用 聚合 Flink window 时间 窗口 数据 public

前言:

        在批处理统计中,我们可以等一批数据都到齐后统一处理。但是在实时处理统计中,我们是来一条数据处理一条数据,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

        所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这个范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是不分开的。接下来我们就深入了解一下Flink中得到时间语义和窗口的应用。

一、窗口(Window)

        1》窗口的概念

                Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

                正确理解:在Flink中,窗口其实并不是一个“框”,应该把窗口理解成一个“桶”。在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。        

                注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。 

        2》窗口的分类

                我们在上一节举的例子,其实是最为简单的一种时间窗口。在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。

        (1)按照驱动类型分

                窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。


                ①、时间窗口(Time Window)


                    时间窗口以时间点来定义窗口的开始(Stait)和结束(End),所以截取出的就是某一时间段的数据.到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁.所以可以说基本思路就是“定点发车”。


                ②、计数窗口(计数窗口)


                    计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口.每个窗口截取数据的个数,就是窗口的大小.基本思路是“人齐发车”。

        

        (2)按照窗口分配数据的规则分类

                根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

                ①、滚动窗口(Tumbling Windows)


                   滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态.这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口.

                     滚动窗口可以基于时间定义,也可以基于数据个数定义; 需要的参数只有一个,就是窗口的大小 (windowsize)。
                     比如我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计: 或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

                 混动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

                 ②、滑动窗口(Sliding Windows)


                   滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。
                  定义滑动窗口的参数有两个: 除去窗口大小 (widow size) 之外,还有一个“滑动步长”(window slide)它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

                  当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值 (size/slide) 来决定。

                  滚动窗口也可以看作是一种特殊的滑动窗口--窗口大小等于滑动步长 (size = slide)。


                  滑动窗口适合计算结果更新频率非常高的场景。

                 ③、会话窗口(Session Windows)

                     会话窗口,是基于“会话” (session) 来来对数据进行分组的。会话窗口只能基于时间来定义。
                     会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小 (size),那说明还在保持会话,它们就属于同一个窗口; 如果ap大于size.那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

                     会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间定是不会重叠的,而且会留有至少为size的间隔 (sessiongap )。


                在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

                ④、全局窗口(Global Windows)

                     “全局窗口”,这种窗口全局有效,会把相同kev的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器” (Trigger)。

                     全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink中的计数窗 (Count Window)底层就是用全局窗口实现的。

         (3)窗口API概览

                ①、按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

                 1、按键分区窗口(Keyed Windows)

                        经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

                        在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...)
       .window(...)

                 

                2、非按键分区(Non-Keyed Windows)

                        如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

                        在代码中,直接基于DataStream调用.windowAll()定义窗口。

stream.windowAll(...)

                注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

                ②、代码中窗口API的调用

                        窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>)

       .window(<window assigner>)

       .aggregate(<window function>)

                其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。

二、窗口分配器

        定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。

        窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。

        窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

        1》时间窗口

                时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

        (1)滚动处理时间窗口

                窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of()。

stream.keyBy(...)

       .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

       .aggregate(...)

                这里.of()方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。

                另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

        (2)滑动处理时间窗口

                窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of()。

stream.keyBy(...)

       .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))

       .aggregate(...)

                这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。

                滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

        (3)处理时间会话窗口

                窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。

stream.keyBy(...)

       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

       .aggregate(...)

                这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。

                另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。

        (4)滚动事件时间窗口

                窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...)

       .window(TumblingEventTimeWindows.of(Time.seconds(5)))

       .aggregate(...)

        (5)滑动事件时间窗口

                窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...)

       .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))

       .aggregate(...)

        (6)事件时间会话窗口

                窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。

stream.keyBy(...)

       .window(EventTimeSessionWindows.withGap(Time.seconds(10)))

       .aggregate(...)

        2》 计数窗口

                计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。

        (1)滚动计数窗口

                滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

stream.keyBy(...)

       .countWindow(10)

                我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。

        (2)滑动计数窗口

                与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...)

       .countWindow(10,3)

                我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。

        (3)全局窗口

                全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。

stream.keyBy(...)

       .window(GlobalWindows.create());

                需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

三、窗口函数(Window Functions)

        定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了; 至于收集起来到底要做什么其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数” (window functions) 。

        窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。 

        1》增量聚合函数(ReduceFunction / AggregateFunction

                窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。

                典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

        (1)归约函数(ReduceFunction)

                代码示例:

public class WindowReduceDemo {



    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        env

                .socketTextStream("hadoop102", 7777)

                .map(new WaterSensorMapFunction())

                .keyBy(r -> r.getId())

                // 设置滚动事件时间窗口

                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))

                .reduce(new ReduceFunction<WaterSensor>() {



                    @Override

                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {

                        System.out.println("调用reduce方法,之前的结果:"+value1 + ",现在来的数据:"+value2);

                        return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc());

                    }

                })

                .print();



        env.execute();

    }

}

        (2)聚合函数(AggregateFunction)

                ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

                Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。

                AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

                接口中有四个方法:

                        1. createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。

                        2. add():将输入的元素添加到累加器中。

                        3. getResult():从累加器中提取聚合的输出结果。

                        4. merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

                所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

                代码实现如下:

public class WindowAggregateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env

                .socketTextStream("hadoop102", 7777)

                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> aggregate = sensorWS

                .aggregate(

                        new AggregateFunction<WaterSensor, Integer, String>() {

                            @Override

                            public Integer createAccumulator() {

                                System.out.println("创建累加器");

                                return 0;

                            }

                            @Override

                            public Integer add(WaterSensor value, Integer accumulator) {

                                System.out.println("调用add方法,value="+value);

                                return accumulator + value.getVc();

                            }

                            @Override

                            public String getResult(Integer accumulator) {

                                System.out.println("调用getResult方法");

                                return accumulator.toString();

                            }

                            @Override

                            public Integer merge(Integer a, Integer b) {

                                System.out.println("调用merge方法");

                                return null;

                            }

                        }

                );

     
        aggregate.print();


        env.execute();

    }

}

                另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。

        2》 全窗口函数(full window functions)

                有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。

                所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

                在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。

        (1)窗口函数(WindowFunction)

                WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。

stream

    .keyBy(<key selector>)

    .window(<window assigner>)

    .apply(new MyWindowFunction());

                这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

                不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。

        (2)处理窗口函数(ProcessWindowFunction)

                ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。

                事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。

                代码实现如下:

public class WindowProcessDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env

                .socketTextStream("hadoop102", 7777)

                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());


        // 1. 窗口分配器

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS

                .process(

                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override

                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {

                                long count = elements.spliterator().estimateSize();

                                long windowStartTs = context.window().getStart();

                                long windowEndTs = context.window().getEnd();

                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

                            }

                        }

                );

        process.print();

        env.execute();

    }

}

        3》 增量聚合和全窗口函数的结合使用

                在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。

                我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction与WindowFunction结合

public <R> SingleOutputStreamOperator<R> reduce(

        ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function)



// ReduceFunction与ProcessWindowFunction结合

public <R> SingleOutputStreamOperator<R> reduce(

        ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)



// AggregateFunction与WindowFunction结合

public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(

        AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)



// AggregateFunction与ProcessWindowFunction结合

public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(

        AggregateFunction<T,ACC,V> aggFunction,

        ProcessWindowFunction<V,R,K,W> windowFunction)

                这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。

                具体实现代码如下:

public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env

                .socketTextStream("hadoop102", 7777)

                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 2. 窗口函数:

        /**

         * 增量聚合 Aggregate + 全窗口 process

         * 1、增量聚合函数处理数据: 来一条计算一条

         * 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数

         * 3、经过全窗口函数的处理包装后,输出

         *

         * 结合两者的优点:

         * 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少

         * 2、全窗口函数: 可以通过 上下文 实现灵活的功能

         */

//        sensorWS.reduce()   //也可以传两个

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(

                new MyAgg(),

                new MyProcess()

        );

        result.print();

        env.execute();

    }

    public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override

        public Integer createAccumulator() {

            System.out.println("创建累加器");

            return 0;

        }

        @Override

        public Integer add(WaterSensor value, Integer accumulator) {

            System.out.println("调用add方法,value="+value);

            return accumulator + value.getVc();

        }

        @Override

        public String getResult(Integer accumulator) {

            System.out.println("调用getResult方法");

            return accumulator.toString();

        }

        @Override

        public Integer merge(Integer a, Integer b) {

            System.out.println("调用merge方法");

            return null;

        }

    }


 // 全窗口函数的输入类型 = 增量聚合函数的输出类型

    public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

        @Override

        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {

            long startTs = context.window().getStart();

            long endTs = context.window().getEnd();

            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");

            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

            long count = elements.spliterator().estimateSize();

            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());


        }

    }

}

                这里我们为了方便处理,单独定义了一个POJO类UrlViewCount来表示聚合输出结果的数据类型,包含了url、浏览量以及窗口的起始结束时间。用一个AggregateFunction来实现增量聚合,每来一个数据就计数加一;得到的结果交给ProcessWindowFunction,结合窗口信息包装成我们想要的UrlViewCount,最终输出统计结果。

四、其他API

        对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。

        1》触发器(Trigger)

                触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

                基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)

       .window(...)

       .trigger(new MyTrigger())

        2》移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)

       .window(...)

       .evictor(new MyEvictor())

五、时间语义

        1》 Flink中的时间语义

        2》哪种时间语义更重要 

       (1)从《星球大战》说起

                为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。

                如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。

        (2)数据处理系统中的时间语义

                在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

                在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

标签:调用,聚合,Flink,window,时间,窗口,数据,public
From: https://blog.csdn.net/2301_78959404/article/details/137193918

相关文章

  • SiMBA:基于Mamba的跨图像和多元时间序列的预测模型
    这是3月26日新发的的论文,微软的研究人员简化的基于mamba的体系结构,并且将其同时应用在图像和时间序列中并且取得了良好的成绩。语言模型的发展正在从大型语言模型(LLMs)向小型语言模型(SLMs)转变。llm和slm的核心都是transformers,它是llm和slm的构建模块。虽然transformers通过其......
  • 一个可以让你有更多时间摸鱼的WPF控件(二)
    前言  上文介绍了如何通过一个Form自定义控件来简化数据的录入,并自动实现数据校验,自动布局排列等功能。本文继续介绍如何优化表格控件的使用,缩减代码量,实现工作效率的提升。一、功能实现   上文中分析了DataGrid跟ListView两种表格控件的优劣,在这里我们选择ListView来实......
  • 滑动窗口算法(Sliding Window Algorithm)
    滑动窗口的核心就是,右指针给窗口扩容,直至抵达扩容限制条件或抵达边界;左指针则是给窗口缩容,以释放限制条件的约束,保证窗口继续向边界移动。需求讲解给定一个字符串str,请找出其中不含有重复字符的最长子串的长度。publicstaticintlengthOfLongestSubstring(Stringstr){......
  • Flink 反压问题处理
    在分布式流处理系统中,反压(Backpressure)是一个常见的问题,它发生在下游处理速度跟不上上游数据发送速度时。ApacheFlink是一个高性能的流处理框架,它提供了多种机制来处理反压问题。下面是一步步分析问题原因,给出案例,并提出解决方案的过程。###1.问题原因分析**上游发送速度......
  • 时间序列预测算法python全集合--深度学习
    共整理了60+个深度学习的时间序列预测算法,Python代码,包括多输入单输出,单输入单输出。深度学习算法主要为:LSTM,bilstm,grubigru,arima,ssa-arima,ceemdan,bp,elm,kelm,knn,mlp,slp,svm,XGBOOST,lightgbm,catboost,rf,lssvm,RNN,SARIMA,transformer等智能优化算法:SSA,WOA,AVOA,CS,DBO,FA,FWA,GW......
  • 【快速解决】使用python图形库,禁止用户拉伸收缩界面,使用tkinter中的window.resizable(
    目录简单介绍1.window.resizable()方法2.参数取值说明3.控制效果4.使用场景示例代码解释展示使用前后的样子 使用前使用后结语简单介绍当你在使用Python的tkinter库创建GUI(图形用户界面)应用程序时,可以使用window.resizable(False,False)技术来控制窗口是......
  • qt语言国际化(翻译),并实现多窗口同时翻译
    https://blog.csdn.net/qq_15181569/article/details/135934033一、.pro文件中添加支持的语言在.pro文件中添加下面几句,支持中文和英文TRANSLATIONS=lanague_cn.ts\lanague_en.ts12二、通过qt语言家更新翻译生成.ts文件完成以后在工程目录可以看......
  • C语言rand、srand库函数生成随机数(附时间戳)
    前言:当我们想要用C语言写程序来获取一个随机数时,该如何获取呢?这里我们上百度搜索一下这里就有提到使用rand、srand、time库函数搭配来获取随机数,也许根据其所说我们已经可以获得随机数解决问题,但想问题不能只浮于表面,下面我们来深入认识一下rand、srand、time库函数。一、ra......
  • flink水位线案例
    前言:    结合上个水位线知识点做出的题目案例给予以下代码作为参考。例题:1.创建Flink流处理环境。//创建流环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.execute();2.从“access.txt”文件中获取数据源。......
  • flink水位线
    一、什么是水位线    在Flink中,用来衡量事件时间进展的标记,就被称为“水位线(Watermark)”。    水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来......