首页 > 其他分享 >Flink 三种时间窗口、窗口处理函数使用及案例

Flink 三种时间窗口、窗口处理函数使用及案例

时间:2024-12-30 12:31:15浏览次数:1  
标签:Flink 窗口 flink streaming org apache import 处理函数

Flink 在数据处理过程中越来越常见,它在流处理领域提供了丰富的窗口机制来处理无界数据流,我们聊下三种时间窗口,包括时间窗口的概念、窗口处理函数的使用以及实际案例。

一、Flink 中的时间概念

在 Flink 中,有三种时间概念:

  1. 事件时间(Event Time):是事件实际发生的时间,通常由事件中的时间戳表示。这是最符合实际情况的时间概念,但也需要处理数据乱序和延迟的情况。
  2. 处理时间(Processing Time):是指数据在 Flink 算子中被处理的时间。处理时间是最简单的时间概念,不需要考虑数据的乱序和延迟,但可能会导致结果不准确。
  3. 摄入时间(Ingestion Time):是数据进入 Flink 系统的时间。摄入时间介于事件时间和处理时间之间,它可以在一定程度上处理数据乱序,但也不能完全保证结果的准确性。

二、三种时间窗口

Flink 提供了三种主要的时间窗口:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)和会话窗口(Session Windows)。

1. 滚动窗口(Tumbling Windows)

滚动窗口是一种固定大小、不重叠的窗口。每个元素只属于一个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TumblingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个简单的整数数据流,并应用了滚动窗口,窗口大小为 3 秒。最后,我们对每个窗口中的元素求和并打印结果。

2. 滑动窗口(Sliding Windows)

滑动窗口是一种固定大小、可以重叠的窗口。每个元素可以属于多个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class SlidingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(5), Time.seconds(2))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒。这意味着每 2 秒就会有一个新的窗口生成,并且每个窗口包含最近 5 秒内的数据。最后,我们对每个窗口中的元素求和并打印结果。

3. 会话窗口(Session Windows)

会话窗口是一种根据活动间隙划分的窗口。当一段时间内没有数据到达时,会话窗口会关闭。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.SessionWindow;
public class SessionWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用会话窗口,间隙时间为 3 秒
        DataStream<Integer> resultStream = inputStream
               .windowAll(SessionWindows.withGap(Time.seconds(3)))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了会话窗口,间隙时间为 3 秒。这意味着当连续数据之间的时间间隔超过 3 秒时,一个新的会话窗口会开始。最后,我们对每个窗口中的元素求和并打印结果。

三、窗口处理函数

Flink 提供了多种窗口处理函数,用于对窗口中的数据进行计算。以下是一些常见的窗口处理函数:

1. 增量聚合函数(Incremental Aggregation Functions)

增量聚合函数可以在窗口中逐个处理元素,并在处理过程中维护一个中间结果。常见的增量聚合函数有 sum、min、max 和 count 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class IncrementalAggregationExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用增量聚合函数求和
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们使用了 sum 作为增量聚合函数,对每个滚动窗口中的元素求和。

2. 全窗口函数(Full Window Functions)

全窗口函数在窗口关闭时对窗口中的所有元素进行计算。常见的全窗口函数有 reduce、aggregate 和 apply 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class FullWindowFunctionExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用全窗口函数求平均值
        DataStream<Double> resultStream = inputStream
               .timeWindowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
               .apply(new AverageWindowFunction());
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义全窗口函数,用于求平均值
    public static class AverageWindowFunction implements org.apache.flink.streaming.api.functions.windowing.AllWindowFunction<Integer, Double, TimeWindow> {
        @Override
        public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double> out) throws Exception {
            int sum = 0;
            int count = 0;
            for (Integer value : values) {
                sum += value;
                count++;
            }
            out.collect((double) sum / count);
        }
    }
}

在上述代码中,我们自定义了一个全窗口函数 AverageWindowFunction,用于在滚动窗口关闭时计算窗口中元素的平均值。

四、案例分析

假设我们有一个电商网站的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。我们想要分析用户在一段时间内的购买行为,例如计算每个用户在每小时内的购买总额。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class EcommerceAnalyticsExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个模拟的用户行为数据流
        DataStream<UserAction> inputStream = env.fromElements(
                new UserAction("user1", "product1", 1000L),
                new UserAction("user1", "product2", 1005L),
                new UserAction("user2", "product3", 1010L),
                new UserAction("user1", "product4", 1020L),
                new UserAction("user2", "product5", 1030L),
                new UserAction("user1", "product6", 1040L),
                new UserAction("user2", "product7", 1050L),
                new UserAction("user1", "product8", 1060L),
                new UserAction("user2", "product9", 1070L),
                new UserAction("user1", "product10", 1080L)
        );
        // 应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组,计算每个用户在每小时内的购买总额
        DataStream<Tuple2<String, Double>> resultStream = inputStream
               .keyBy(UserAction::getUserId)
               .timeWindow(Time.hours(1))
               .sum("price")
               .map(userActionPriceSum -> new Tuple2<>(userActionPriceSum.f0, userActionPriceSum.f1));
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义用户行为类
    public static class UserAction {
        private String userId;
        private String productId;
        private double price;
        private long eventTimestamp;
        public UserAction(String userId, String productId, long eventTimestamp) {
            this.userId = userId;
            this.productId = productId;
            this.price = 10.0; // 假设每个商品的价格为 10 元
            this.eventTimestamp = eventTimestamp;
        }
        public String getUserId() {
            return userId;
        }
        public String getProductId() {
            return productId;
        }
        public double getPrice() {
            return price;
        }
        public long getEventTimestamp() {
            return eventTimestamp;
        }
    }
}

在上述代码中,我们首先创建了一个模拟的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。然后,我们应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组。最后,我们对每个窗口中的元素求和,计算每个用户在每小时内的购买总额,并打印结果。

五、总结

Flink 的窗口机制是处理无界数据流的强大工具。通过三种时间窗口(滚动窗口、滑动窗口和会话窗口)和丰富的窗口处理函数,我们可以灵活地对数据流进行各种分析和计算。在实际应用中,我们需要根据具体的业务需求选择合适的时间窗口和窗口处理函数,以获得准确的结果。同时,我们还需要考虑数据的乱序和延迟等问题,合理地设置时间戳提取器和水印生成器,以确保流处理的准确性和可靠性。

标签:Flink,窗口,flink,streaming,org,apache,import,处理函数
From: https://www.cnblogs.com/skonw/p/18640766

相关文章

  • Flink source API定期读取MySQL数据
    主类MyPeriodQueryDbSourceimportorg.apache.flink.api.connector.source.*;importorg.apache.flink.core.io.SimpleVersionedSerializer;importjava.util.Properties;/***定期读取数据source**@param<T>输出对象泛型*/publicclassMyPeriodQueryDbSource<......
  • UE4.27, 揣摩源码, 序列化 (三) FLinkerLoad, FLinkerSave
    3.  FLinkerLoad,FLinkerSave分别是UObject的反序列化和序列化的内核3.0.UPackage与UObjectUObject因为涉及与其他UObject的复杂引用关系,如果我们客制化地单独正反序列化每一个UObject,我们会在反序列化的时候惊觉这是繁琐而不可能的。为了满足UObject......
  • Flink状态编程
            Flink处理机制的核心就是“有状态的流处理”,在某些情况下,一条数据的计算不仅要基于当前数据自身,还需要依赖数据流中的一些其他数据。这些在一个任务中,用来辅助计算的数据我们就称之为这个任务的状态。一、按键分区状态(KeyedState)分类        按键分......
  • PostgreSQL中FIRST_VALUE、LAST_VALUE、LAG 和 LEAD是窗口函数,允许返回在数据集的特
    在PostgreSQL中,FIRST_VALUE、LAST_VALUE、LAG和LEAD是窗口函数(windowfunctions),它们允许你在数据集的特定窗口(或分区)内访问行的相对位置。以下是对这些函数的详细解释和用法:1.FIRST_VALUEFIRST_VALUE函数返回在指定窗口或分区内的第一行的值。它常用于获取每个组的起......
  • Oracle数据库中FIRST_VALUE、LAST_VALUE、LAG和LEAD是用于在窗口函数中进行数据行之间
    在Oracle数据库中,FIRST_VALUE、LAST_VALUE、LAG和LEAD是用于在窗口函数中进行数据行之间相对位置访问的函数。下面分别介绍这些函数的作用和用法。1.FIRST_VALUEFIRST_VALUE函数返回在指定窗口帧内的第一行的值。常用于在分组或排序的上下文中获取某一组的起始值。......
  • Hudi数据湖_数据写原理_COW和MOR表Upsert原理_Flink和Spark写入区别_Insert和Overwrit
    可以看到数据写操作,有三种方式upsert就是通过index索引来,对数据到底是insert还是update会做上标记,并且,只有索引到了数据才会update,所以是依赖index索引的.insert就是不停的插入数据,跳过了index,插入快,但是有重复数据,可能需要自己处理bulk_insert 写排序默认......
  • Flink 集群有哪些⻆⾊?各⾃有什么作⽤?
    在Flink集群中有以下几个重要角色:JobManager(作业管理器)作用:作业管理:它是Flink集群的控制中心,负责接收用户提交的作业,协调和管理整个作业的执行过程。例如,当用户提交一个实时数据处理的流计算作业时,JobManager会负责调度该作业在集群中的执行。资源分配:JobManage......
  • Flink 中的 Time 有哪⼏种?
    事件时间(EventTime)概念:事件时间是事件在其产生设备(如传感器、服务器等)上发生的时间。这个时间通常是嵌入在事件数据本身中的一个时间戳字段。例如,在一个物联网应用中,每个传感器采集数据时会记录下采集时刻的时间戳,这个时间戳代表的就是事件时间。特点与应用场景:......
  • 006. 滑动窗口 /【模板】单调队列(洛谷P1886)
    006.滑动窗口/【模板】单调队列(洛谷P1886)题目描述有一个长为\(n\)的序列\(a\),以及一个大小为\(k\)的窗口。现在这个从左边开始向右滑动,每次滑动一个单位,求出每次滑动后窗口中的最大值和最小值。例如,对于序列\([1,3,-1,-3,5,3,6,7]\)以及\(k=3\),有如下过程:\[\def\a......
  • Flink CDC MySQL 同步数据到 Kafka实践中可能遇到的问题
    FlinkCDCMySQL同步数据到Kafka实践中可能遇到的问题一、问题场景[ERROR]CouldnotexecuteSQLstatement.Reason:org.apache.flink.table.api.ValidationException:Theprimarykeyisnecessarywhenenable'Key:'scan.incremental.snapshot.enabled',defau......