首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

时间:2024-01-13 14:32:19浏览次数:34  
标签:示例 Distinct 18 flink alan env import true tenv



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:表的聚合操作
  • 1、示例代码公共部分
  • 2、group by
  • 3、GroupBy Window Aggregation
  • 4、Over Window Aggregation
  • 5、Distinct Aggregation
  • 6、Distinct



本文给出了关于表数据的聚合操作示例,比如group by、distinct、以及group by、over、distinct的窗口聚合示例。

本文除了maven依赖外,没有其他依赖。

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:表的聚合操作

本示例内容较多,下文是本部分示例的公共代码部分。

1、示例代码公共部分

本部分仅仅就是用的公共对象,比如User的定义,和需要引入的包。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIOperationDemo2 {
	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 25, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);
	
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {

	}
	
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int balance;
		private Long rowtime;
	}
	
}

2、group by

本示例仅仅展示了group by操作,比较简单。

static void test2() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
	
	// 建表
	tenv.executeSql(sourceSql);

	Table table = tenv.from("Alan_KafkaTable");
	//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。
	Table result = table.groupBy($("user_id")).select($("user_id"), $("user_id").count().as("count(user_id)"));
	
	DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
	resultDS.print();
//		12> (true,+I[1, 1])
	
	env.execute();
}

3、GroupBy Window Aggregation

使用分组窗口结合单个或者多个分组键对表进行分组和聚合。

static void test3() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						)
				;
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
		
		//使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
		Table result = usersTable
			    .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口
			    .groupBy($("name"), $("w")) // 按窗口和键分组
			    // 访问窗口属性并聚合
			    .select(
			        $("name"),
			        $("w").start(),
			        $("w").end(),
			        $("w").rowtime(),
			        $("balance").sum().as("sum(balance)")
			    );
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		2> (true,+I[alan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 62])
//		16> (true,+I[alanchan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 57])
		env.execute();
	}

4、Over Window Aggregation

和 SQL 的 OVER 子句类似。

static void test4() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
		//所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。
		//尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。
		Table result = usersTable
			    // 定义窗口
			    .window(
			        Over
			          .partitionBy($("name"))
			          .orderBy($("rowtime"))
			          .preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE))
			          .following(unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE))
			          .as("w"))
			    // 滑动聚合
			    .select(
			        $("id"),
			        $("balance").avg().over($("w")),
			        $("balance").max().over($("w")),
			        $("balance").min().over($("w"))
			    );
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		2> (true,+I[1, 18, 18, 18])
//		16> (true,+I[4, 28, 28, 28])
//		2> (true,+I[2, 18, 19, 18])
//		16> (true,+I[5, 28, 29, 28])
//		2> (true,+I[3, 20, 25, 18])
		
		env.execute();
	}

5、Distinct Aggregation

/**
	 * 和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。 
	 * Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值。 
	 * Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。
	 * @throws Exception
	 */
	static void test5() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
		
		// 按属性分组后的的互异(互不相同、去重)聚合
		Table groupByDistinctResult = usersTable
		    .groupBy($("name"))
		    .select($("name"), $("balance").sum().distinct().as("sum_balance"));
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(groupByDistinctResult, Row.class);
//		resultDS.print();
//		2> (true,+I[alan, 18])
//		16> (true,+I[alanchan, 28])
//		16> (false,-U[alanchan, 28])
//		2> (false,-U[alan, 18])
//		16> (true,+U[alanchan, 57])
//		2> (true,+U[alan, 37])
//		2> (false,-U[alan, 37])
//		2> (true,+U[alan, 62])
		
		//按属性、时间窗口分组后的互异(互不相同、去重)聚合
		Table groupByWindowDistinctResult = usersTable
			    .window(Tumble
			            .over(lit(5).minutes())
			            .on($("rowtime"))
			            .as("w")
			    )
			    .groupBy($("name"), $("w"))
			    .select($("name"), $("balance").sum().distinct().as("sum_balance"));
		DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(groupByDistinctResult, Row.class);
//		result2DS.print();
//		16> (true,+I[alanchan, 28])
//		2> (true,+I[alan, 18])
//		16> (false,-U[alanchan, 28])
//		2> (false,-U[alan, 18])
//		16> (true,+U[alanchan, 57])
//		2> (true,+U[alan, 37])
//		2> (false,-U[alan, 37])
//		2> (true,+U[alan, 62])
		
		//over window 上的互异(互不相同、去重)聚合
		Table result = usersTable
			    .window(Over
			        .partitionBy($("name"))
			        .orderBy($("rowtime"))
			        .preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE))
			        .as("w"))
			    .select(
			        $("name"), $("balance").avg().distinct().over($("w")),
			        $("balance").max().over($("w")),
			        $("balance").min().over($("w"))
			    );
		DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result, Row.class);
		result3DS.print();
//		16> (true,+I[alanchan, 28, 28, 28])
//		2> (true,+I[alan, 18, 18, 18])
//		2> (true,+I[alan, 18, 19, 18])
//		16> (true,+I[alanchan, 28, 29, 28])
//		2> (true,+I[alan, 20, 25, 18])
		
		env.execute();
	}

用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同(互异、去重的)值的聚合结果,则只需向聚合函数添加 distinct 修饰符即可。

Table orders = tEnv.from("Orders");

// 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users")
    .select(
        $("users"),
        call("myUdagg", $("points")).distinct().as("myDistinctResult")
    );

6、Distinct

和 SQL 的 DISTINCT 子句类似。 返回具有不同组合值的记录。

static void test6() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		List<User> userList = Arrays.asList(
				new User(1L, "alan", 18, 1698742358391L), 
				new User(2L, "alan", 19, 1698742359396L), 
				new User(3L, "alan", 25, 1698742360407L),
				new User(4L, "alanchan", 28, 1698742361409L), 
				new User(5L, "alanchan", 29, 1698742362424L),
				new User(5L, "alanchan", 29, 1698742362424L)
				);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
//		Table orders = tableEnv.from("Orders");
		Table result = usersTable.distinct();
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
		// 数据集有6条记录,并且有一条是重复的,故只输出5条
//		9> (true,+I[2, alan, 19, 2023-10-31T08:52:39.396])
//		1> (true,+I[1, alan, 18, 2023-10-31T08:52:38.391])
//		13> (true,+I[3, alan, 25, 2023-10-31T08:52:40.407])
//		7> (true,+I[4, alanchan, 28, 2023-10-31T08:52:41.409])
//		13> (true,+I[5, alanchan, 29, 2023-10-31T08:52:42.424])
		
		env.execute();
	}

以上,本文给出了关于表数据的聚合操作示例,比如group by、distinct、以及group by、over、distinct的窗口聚合示例。


标签:示例,Distinct,18,flink,alan,env,import,true,tenv
From: https://blog.51cto.com/alanchan2win/9232424

相关文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以
    文章目录Flink系列文章一、maven依赖二、示例:表的join操作(内联接、外联接以及联接自定义函数等)本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、FlinkTableAPI支......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch
    文章目录Flink系列文章一、maven依赖二、表的OrderBy,Offset和Fetch操作三、表的insert操作本文介绍了表的OrderBy、Offset和Fetch、insert操作,以示例形式展示每个操作的结果。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、Flin......
  • 【CMake】5. 单项目多模块添加第三方依赖示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-mod-deps这里引入C++gRPC依赖,进行示例1.多模块工程+第三方依赖CMake多模块工程,这是一个示例工程simple-mod-deps,项目名称de......
  • 【CMake】3.单项目单模块添加第三方依赖包示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块-添加第三方依赖示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-deps1.单模块工程+第三方依赖CMake单模块工程,这是一个示例工程simple-deps,项目名称cmake,第三方依赖demo......
  • 【CMake】2. 单项目单模块示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple1.单模块工程CMake单模块工程,这是一个示例工程simple,项目名称cmake,第三方依赖demo,主模块main2.目录结构$.SIMPLE......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersec
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 京东商品详情API实现实时数据获取的Java代码示例
    在电商行业中,商品详情页是用户了解商品信息、进行购买决策的重要页面。为了提高用户体验和促进销售,电商平台通常会提供商品详情的API接口,以便第三方应用能够实时获取商品数据。本文将介绍如何使用京东获得的JD商品详情API实现实时数据获取,并提供相应的Java代码示例。一、JD商品详......
  • 天拓四方5G边缘计算网关在工业领域的应用示例
    在工业领域,5G边缘计算网关的应用正在逐渐普及,为工业4.0和智能制造的实现提供了强大的技术支持。通过将数据分析和处理能力从中心节点转移至网络的边缘,5G边缘计算网关为工业生产带来了前所未有的机遇。本文将通过举例实际应用,探讨5G边缘计算网关在工业领域的具体应用。案例一:智能制......
  • 使用R语言和pholcus库进行网页爬取的简单示例
    如果您想要下载网页上的丰富内容,pholcus库似乎是一个用于网页爬虫的工具,但请注意使用爬虫工具时需要遵守网站的使用规则和法律法规。未经允许的爬取行为可能违反网站的服务条款,并可能导致法律问题。以下是一个使用pholcus库的简单示例。请确保您已经安装了pholcus库,可以通过执行以......
  • Java版Flink(十一)时间语义和watermark
    Java版Flink(十一)时间语义和watermark一、时间语义在Flink中涉及到三个重要时间概念:EventTime、IngestionTime、ProcessingTime。1.1、EventTimeEventTime表示日志事件产生的时间戳,每一条数据都会记录自己生产的时间。1.2、IngestionTimeIngestionTime表示数据进入......