首页 > 其他分享 >flink学习(7)——window

flink学习(7)——window

时间:2024-11-26 21:59:51浏览次数:8  
标签:String flink Tuple2 学习 window Tuple3 env new public

 概述

窗口的长度(大小): 决定了要计算最近多长时间的数据

窗口的间隔: 决定了每隔多久计算一次

举例:每隔10min,计算最近24h的热搜词,24小时是长度,每隔10分钟是间隔。

窗口的分类

1、根据window前是否调用keyBy分为键控窗口和非键控窗口

2、根据window中参数的配置分为基于时间的,基于条数的,会话窗口

SlidingProcessingTimeWindows —— 滑动窗口,按照处理时间

TumblingProcessingTimeWindows —— 滚动窗口,按照处理时间

ProcessingTimeSessionWindows —— 会话窗口

 Keyed Window --键控窗口

// Keyed Window
stream
        .keyBy(...)              <-  按照一个Key进行分组
        .window(...)            <-  将数据流中的元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream
        .windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process()      <-  窗口处理函数Window Function

方括号([…]) 中的命令是可选的。

首先:我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。

经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。

windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。

决定是否分组之后,窗口的后续操作基本相同。

经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1

Flink窗口的骨架结构中有两个必须的两个操作:

  • 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。

  • 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常用的Window Function有reduce、aggregate、process。

其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。

 

 

基于时间的窗口 

滚动窗口- TumblingWindow概念

package com.bigdata.day04;

public class _01_windows {
    /**
     * 1、实时统计每个红绿灯通过的汽车数量
     * 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动
     * 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换
        socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String line) throws Exception {
                String[] words = line.split(" ");
                return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));
            }
        }).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
                return value.f0;
            }
        })
        // 基于这个部分实现 滚动窗口 每一分钟 统计前一分钟的数据
        .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
        .sum(1).print();

        env.execute();
    }
}

滑动窗口– SlidingWindow概念 

package com.bigdata.day04;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;


/**
 * @基本功能:
 * @program:flinkProject
 * @author: jinnian
 * @create:2024-11-25 10:13:46
 **/
public class _01_windows {
    /**
     * 1、实时统计每个红绿灯通过的汽车数量
     * 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动
     * 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换
        socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String line) throws Exception {
                String[] words = line.split(" ");
                return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));
            }
        }).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
                return value.f0;
            }
        })
        // 基于这一部分实现,每30秒统计前一分钟的数据,大的在前,小的在后
         .window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))
        .sum(1).print();



        //5. execute-执行
        env.execute();
    }
}
如何显示窗口时间——apply

——apply将reduce替代

kafka生产数据
package com.bigdata.day04;

public class _02_kafka生产数据 {
    public static void main(String[] args) throws InterruptedException {
        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};
        Random random = new Random();
        for (int i = 0; i < 5000; i++) {
            int index = random.nextInt(arr.length);
            ProducerRecord<String, String> record = new ProducerRecord<>("edu", arr[index]);
            producer.send(record);
            Thread.sleep(30);
        }
    }
}
flink消费数据
package com.bigdata.day04;

public class _02_flink消费数据 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "gw2");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);
        source.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String value) throws Exception {
                return Tuple2.of(value,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }

        }).window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
         /*
         *
         *
         */
                .apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                long start = window.getStart();
                long end = window.getEnd();
                String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                int sum = 0;
                for (Tuple2<String, Integer> tuple2 : input) {
                    sum +=tuple2.f1;
                }
                sb.append("开始时间:"+startStr+",").append("结束时间:"+endStr+",").append("key: "+key+ ",").append("数量:"+sum);

                out.collect(sb.toString());

            }
        }).print();

        env.execute();
    }
}

基于条数的窗口——countWindow

package com.bigdata.day04;

public class _04_agg函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
        // 此时我要获取每个班级的平均成绩
        // 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
        // IN——Tuple3<String, String, Long>
        // ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
        // OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
        dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
        
            // 初始化一个 累加器
            @Override
            public Tuple3<String, Integer, Long> createAccumulator() {
                return Tuple3.of(null,0,0L);
            }


            // 累加器和输入的值进行累加
            // Tuple3<String, String, Long> value 第一个是传入的值
            // Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
            @Override
            public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {

                return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
            }

            // 获取结果——在不同节点的结果进行汇总后实现
            @Override
            public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {

                return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
            }


            // 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
            // 即累加器之间的累加
            @Override
            public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

会话窗口

package com.bigdata.day04;

public class _03_会话窗口 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 8889);

        source.map(new MapFunction<String, Tuple2<String,Integer>>() {

            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] s = value.split(" ");

                return Tuple2.of(s[0],Integer.valueOf(s[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
            // 1、主要就是 ProcessingTimeSessionWindows 参数的使用
            // 2、使用 EventTimeSessionWindows的时候,若没有水印就不会有结果
        }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0,value1.f1+value2.f1);
            }
        }).print();

        env.execute();

    }
}

标签:String,flink,Tuple2,学习,window,Tuple3,env,new,public
From: https://blog.csdn.net/weixin_52642840/article/details/144069096

相关文章

  • 防止思维僵化需要从多个方面入手:保持批判性思维、接受不确定性、培养反思和创造性思维
    防止思维僵化,保持思维的灵活性和开放性,是个人成长和应对复杂问题的关键。思维僵化通常表现为固守某种思维模式或观点,难以适应变化或接受新思想。以下是一些防止思维僵化的有效方法:1. 培养批判性思维定义:批判性思维是指在面对问题或信息时,能够通过逻辑推理、分析、评估其可靠性......
  • CTF学习(19)MISC(面具下的flag)
    1.解压后发现为.jpg格式的文件--->使用010editor打开后搜索flag发现存在两个疑似flag文件的标识第一处:第二处:2.在kali使用binwalk发现藏有两个文件--->爆破zip文件(无果,可能是伪加密?)分离后的文件:3.检查文件头加密部分(偶数,无加密)--->检查文件尾加密部分'09'(奇......
  • PasteEx:一款.NET开源的Windows快捷粘贴神器
    前言PasteEx是一款.NET开源的用于增强Windows粘贴功能的小工具,它解决了将剪贴板内容保存为文件的繁琐步骤。无需打开记事本等应用,它可直接将文字、图片等内容粘贴到桌面上,极大提升了效率。功能特点自定义文本扩展规则:用户可以设置特定的文本扩展规则,以满足不同文件格式的需......
  • 学习分享-队列-1(数据结构C语言)
    本章写的是基于链表的队列,通过链表来实现队列的操作一个基于链表的队列(Queue)数据结构,先进先出结构体定义typedefstructNode{intdata;structNode*next;structNode*pre;}Node;定义一个节点(Node)结构体,包含数据(data)、指向下一个节点的指针(next)和指向......
  • C语言学习笔记(持续更新)
    C语言计算机的组成(预备知识)计算机组成计算机:能进行计算和逻辑的设备硬件:组成计算机的各种物理部件(鼠标,键盘)【硬件=电子设备+单片机编程+集成电路+嵌入式系统】软件:计算机中运行的程序和数据【软件=系统软件+应用软件+编程语言+算法和数据结构】计算机的六大部件中......
  • 学习分享-队列-2(数据结构C语言)
    本章通过C++代码使用STL(标准模板库)中的queue类实现了栈的基本操作,包括入队、出队、查看队头元素、判断队列是否为空以及清空队列。导入头文件#include<iostream>#include<queue>//引入队列的头文件usingnamespacestd;创建队列queue<int>q;入队操作q.push(10)......
  • Python基础学习-11函数参数
    1、"值传递”和“引用传递”1)不可变的参数通过“值传递”。比如整数、字符串等2)可变的参数通过“引用参数”。比如列表、字典。3)避免可变参数的修改4)内存模型简介2、函数参数类型1)deffunc()#无参数函数2)deffunc(value1,value2)#有参数函数;位置参数3)deffunc(......
  • 强化学习交易应用相关
     FinRL尝试这是第一个开源的金融强化学习框架,FinRL已经发展成为一个包含丰富资源的生态系统,为金融强化学习的研究和应用提供了强大的支持。项目地址:https://github.com/AI4Finance-Foundation/FinRL个人使用体验:可能因为维护不及时,示例代码无法顺利运行,需要各种修补才能勉强......
  • java语言学习(2)
    写在前面的话注:部分笔记没有保存,丢失了,这部分大多数是关于java的几种控制语句,和c语言的结构和写法大体一致,所以不必太担心。但是给自己提了个醒,一定要有保存的习惯,下面写一些零碎的知识点continue的细节分析和说明这里也有label这个功能return使用方法:表示跳出所在的方......
  • 前端技术中对JavaScript对象的学习
    对象目录对象创建对象使用循环遍历对象属性对象中的方法创建对象创建新对象有两种不同的方法:使用Object定义并创建对象的实例。使用函数来定义对象,然后创建新的对象实例。newObject在JavaScript中,几乎所有的对象都是Object类型的实例,它们都会从Object.prototype......