首页 > 其他分享 >Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)

Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)

时间:2023-09-10 10:04:34浏览次数:41  
标签:WaterSensor KeyedStream DataStream Flink 1.17 分区 keyBy key


聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)_数据

在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。
我们可以以id作为key做一个分区操作,代码实现如下:

package com.atguigu.zxl_test;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransKeyBy {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1L, 1),
            new WaterSensor("sensor_1", 2L, 2),
            new WaterSensor("sensor_2", 2L, 2),
            new WaterSensor("sensor_3", 3L, 3)
        );

        // 方式一:使用Lambda表达式
        KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

        // 添加操作符,例如打印结果  解决报错:No operators defined in streaming topology. Cannot execute.
        keyedStream.print();

        // 方式二:使用匿名类实现KeySelector
        /*KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor e) throws Exception {
                return e.id;
            }
        });

        // 添加操作符,例如打印结果 解决报错:No operators defined in streaming topology. Cannot execute.
        keyedStream1.print();*/

        env.execute();
    }
}

执行结果:

Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)_apache_02

需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。


标签:WaterSensor,KeyedStream,DataStream,Flink,1.17,分区,keyBy,key
From: https://blog.51cto.com/zhangxueliang/7423594

相关文章

  • Flink 1.17教程:输出算子之输出到MySQL(JDBC)
    输出到MySQL(JDBC)写入数据的MySQL的测试步骤如下。(1)添加依赖添加MySQL驱动:mysqlmysql-connector-java8.0.27官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apachesnapshot仓库下载,pom文件中指定仓库路径:apache-snapshotsapachesnapshotshttps://repository.a......
  • Flink 1.17教程:输出算子(Sink)之连接到外部系统
    输出算子(Sink)Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。连接到外部系统Flink的DataStreamAPI专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写......
  • Flink 1.17教程:基本合流操作
    基本合流操作在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。联合(Union)最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流......
  • Flink 1.17教程:输出算子之输出到文件
    输出到文件Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方......
  • flink kerberos认证源码剖析
    文章目录01引言02flink的安全机制03源码流程分析3.1程序入口3.2安全模块安装3.3模块安装源码04文末01引言官方的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-kerberos/我们都知道,如果某个大数据组件(如:hadoop、flink等)......
  • FLink
    java.util.concurrent.TimeoutException:Invocationof[RemoteRpcInvocation(TaskExecutorGateway.requestSlot(SlotID,JobID,AllocationID,ResourceProfile,String,ResourceManagerId,Time))]atrecipient[akka.tcp://flink@teste-34:40647/user/rpc/taskmanager_......
  • Flink kafka source
    kafkasource接收kafka的数据<!--Kafka相关依赖--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>......
  • 43、Flink之Hive 读写及详细验证示例
    Flink系列文章[1、Flink部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接][13、Flink的tableapi与sql的基本概念、通用api介绍及入门示例][14、Flink的tableapi与sql之数据类型:内置数据类型以及它们的属性][15、Flink的t......
  • Flink SQL基本语法
    在flinksql中,对表名、字段名、函数名等是严格区分大小写的,为了兼容hive等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。比如hive,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用flinksql去读写hive,出现大写字母时......
  • Flink高级特性(2)
    watermark水位线处理乱序数据流从数据产生到DataSource,再到具体的算子,中间是有一个过程和时间,有可能会导致数据乱序问题,通过watermark+EventTime来处理。作用:由于网络延迟等原因,一条数据会迟到计算,比如使用eventtime来划分窗口,我们知道窗口中的数据是计算一段时间的数据,如果一......