首页 > 其他分享 >flink学习(8)——窗口函数

flink学习(8)——窗口函数

时间:2024-11-26 22:00:05浏览次数:10  
标签:窗口 函数 flink accumulator Tuple2 Tuple3 env new public

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;

public class _04_agg函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
        // 此时我要获取每个班级的平均成绩
        // 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
        // IN——Tuple3<String, String, Long>
        // ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
        // OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
        dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
        
            // 初始化一个 累加器
            @Override
            public Tuple3<String, Integer, Long> createAccumulator() {
                return Tuple3.of(null,0,0L);
            }


            // 累加器和输入的值进行累加
            // Tuple3<String, String, Long> value 第一个是传入的值
            // Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
            @Override
            public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {

                return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
            }

            // 获取结果——在不同节点的结果进行汇总后实现
            @Override
            public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {

                return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
            }


            // 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
            // 即累加器之间的累加
            @Override
            public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;

public class _05_app函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);

        //2. source-加载数据
        dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {

            @Override
            public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {
                Long sum = 0L;
                int length = 0;
                String key = null;
                for (Tuple3<String, String, Long> value : values) {
                    sum += value.f2;
                    length++;
                    key = value.f0;

                }
                out.collect(Tuple2.of(key,(double) sum/length));
            }
        }).print();
      
        env.execute();
    }
}

// 总结

// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果

// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) 

Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象

//使用窗口对象我们可以拿到窗口的起始时间
 long start = window.getStart();
 long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型

 @Override
        public void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
            
            // Long 是数据类型 结果使用collector中的collect 收集
            collector.collect(String.valueOf(l));
        }

@Override
        public void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
        
            //  String 是数据类型 结果使用collector中的collect 收集
            collector.collect(s);
        }
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素

OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

标签:窗口,函数,flink,accumulator,Tuple2,Tuple3,env,new,public
From: https://blog.csdn.net/weixin_52642840/article/details/144069203

相关文章

  • flink学习(7)——window
     概述窗口的长度(大小):决定了要计算最近多长时间的数据窗口的间隔:决定了每隔多久计算一次举例:每隔10min,计算最近24h的热搜词,24小时是长度,每隔10分钟是间隔。窗口的分类1、根据window前是否调用keyBy分为键控窗口和非键控窗口2、根据window中参数的配置分为基于时间......
  • re模块 函数模式详解
    re模块python爬虫过程中,实现页面元素解析的方法很多,正则解析只是其中之一,常见的还有BeautifulSoup和lxml,它们都支持网页HTML元素解析,re模块提供了强大的正则表达式功能re模块常用方法compile(pattern,flags=0):用于编译一个正则表达式字符串,生成一个re.pattern对象......
  • Python基础学习-11函数参数
    1、"值传递”和“引用传递”1)不可变的参数通过“值传递”。比如整数、字符串等2)可变的参数通过“引用参数”。比如列表、字典。3)避免可变参数的修改4)内存模型简介2、函数参数类型1)deffunc()#无参数函数2)deffunc(value1,value2)#有参数函数;位置参数3)deffunc(......
  • 1023: 1023 分段函数
    题目描述数学中经常使用分段函数来计算函数值,请编程实现。输入输入文件有多行,每行包括一个x,要求计算f(x)的值。输出当x<0时,f(x)=(x+1)*(x+1)+2x+1/x,当x>=0时,f(x)=x的平方根,保留两位小数,每行一个结果。样例输入 复制10-0.50样例输出 复制f(10.00)=3.16f(-0.50)=......
  • Mysql的聚合函数的详细使用方法
    Mysql的聚合函数的详细使用方法CREATETABLEsales(sale_idINTAUTO_INCREMENTPRIMARYKEY,sale_dateDATE,salespersonVARCHAR(50),productVARCHAR(50),amountDECIMAL(10,2));INSERTINTOsales(sale_date,salesperson,product,amount)......
  • YOLOv8改进:CSWinTransformer交叉形窗口网络在目标检测中的应用与优化【YOLOv8】
    本专栏专为AI视觉领域的爱好者和从业者打造。涵盖分类、检测、分割、追踪等多项技术,带你从入门到精通!后续更有实战项目,助你轻松应对面试挑战!立即订阅,开启你的YOLOv8之旅!专栏订阅地址:https://blog.csdn.net/mrdeam/category_12804295.html文章目录YOLOv8改进:CSWinTransf......
  • Python那些事儿 - 函数的参数详解
    第十回巅峰对决前言这一回我们将对函数的参数进行详细的讲解。函数的参数分为形参和实参,形参又分为:位置参数、默认参数(缺省参数)、位置不定长参数、关键字不定长参数实参又分为:位置参数、关键字参数接下来让我们一起走进函数的参数吧!......
  • SAP B1 系统-主窗口和定制工具的笔记
    文章目录前言一、SAPB1主窗口1.菜单栏2.工具栏3.主菜单4.状态栏5.上下文菜单6.搜索引擎二、SAPB1定制工具1.用户自定义的字段举例1:在物料主数据添加自定义字段“收货提前期”,字段头衔为“U_DeliveryLeadTime”,字段类型为“数字”2.用户自定义的值举例1:将销售订单的......
  • bootstrap模态窗口美化特效
    这是一款bootstrap模态窗口美化特效。该特效在原生bootstrap模态窗口的基础上,通过添加自定义的CSS样式,制作出效果非常炫酷的模态窗口。演示  下载 使用方法在页面中引入下面的文件。<linkrel="stylesheet"href="http://jrain.oscitas.netdna-cdn.com/tutorial......
  • 对数组操作的相关js函数
    汇总一下js中,数组的相关函数(如有问题,请在评论区q我哦!感谢!)1.添加和删除数组元素//1.push在数组末尾添加一个或多个元素,并返回新的长度(改变原数组)letarray=[1,2,3];array.push(4);console.log(array);//输出[1,2,3,4]//2.pop移除数组末尾的一个元素,并返......