首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

时间:2024-01-13 14:31:36浏览次数:32  
标签:OrderBy 10 示例 alan 52 2023 import 31T08



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、表的OrderBy, Offset 和 Fetch操作
  • 三、表的insert操作



本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。


本文除了maven依赖外,没有其他依赖。

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、表的OrderBy, Offset 和 Fetch操作

在批处理模式下,也即有界情况下,order by 可以单独使用,排序也可以是任意字段,与一般数据库的排序结果一样。
在流模式下,也即无界的情况下,order by需要和fetch一起使用,排序字段需要有时间属性,与一般数据库的排序有点差异。

需要说明的是order by 和offset&fetch都可以在批处理模式和流模式下工作。

  • Order By,和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
  • Offset & Fetch,和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。

具体结果见下面示例

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo2.Order;
import org.tablesql.TestTableAPIJoinOperationDemo2.User;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIJoinOperationDemo3 {
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private double balance;
		private Long rowtime;
	}
	
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class Order {
		private long id;
		private long user_id;
		private double amount;
		private Long rowtime;
	}

	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 25, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);
	
	final static List<Order> orderList = Arrays.asList(
			new Order(1L, 1, 18, 1698742358391L), 
			new Order(2L, 2, 19, 1698742359396L), 
			new Order(3L, 1, 25, 1698742360407L),
			new Order(4L, 3, 28, 1698742361409L), 
			new Order(5L, 1, 29, 1698742362424L),
			new Order(6L, 4, 49, 1698742362424L)
			);
	
	 // 创建输出表
	final static String sinkSql = "CREATE TABLE sink_table (\n" +
            "  id BIGINT,\n" +
            "  user_id BIGINT,\n" +
            "  amount DOUBLE,\n" +
            "  rowtime BIGINT\n" +
            ") WITH (\n" +
            "  'connector' = 'print'\n" +
            ")";
	
	/**
	 * Order By
	 * 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
	 * 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
	 * Sort on a non-time-attribute field is not supported.
	 * 
	 * Offset & Fetch
	 * 和 SQL 的 OFFSET 和 FETCH 子句类似。
	 * Offset 操作根据偏移位置来限定(可能是已排序的)结果集。
	 * Fetch 操作将(可能已排序的)结果集限制为前 n 行。
	 * 通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。
	 * 
	 * @throws Exception
	 */
	static void testOrderByWithUnbounded() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		env.setParallelism(1);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
		usersTable.printSchema();
		
		// 从已排序的结果集中返回前3条记录
		Table result = usersTable.orderBy($("rowtime").desc()).fetch(3);

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
//		resultDS.print();
//		(true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(true,+I[2, alan, 19.0, 2023-10-31T08:52:39.396])
//		(true,+I[3, alan, 25.0, 2023-10-31T08:52:40.407])
//		(false,-D[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(true,+I[4, alanchan, 28.0, 2023-10-31T08:52:41.409])
//		(false,-D[2, alan, 19.0, 2023-10-31T08:52:39.396])
//		(true,+I[5, alanchan, 29.0, 2023-10-31T08:52:42.424])

		// 从已排序的结果集中返回跳过2条记录之后的所有记录
		Table result2 = usersTable.orderBy($("rowtime").desc()).offset(2).fetch(4);

		DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
		result2DS.print();
//		(true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//		(true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(false,-U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//		(true,+U[3, alan, 25.0, 2023-10-31T08:52:40.407])
//		(false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
//		(true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//		(true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
		
		env.execute();
	}
	
	/**
	 * 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
	 * 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
	 * 这个和一般的查询数据库的结果比较类似
	 * 
	 * @throws Exception
	 */
	static void testOrderByWithBounded() throws Exception {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);
       
		Table ordersTable = tenv.fromValues(
				DataTypes.ROW(
						DataTypes.FIELD("id", DataTypes.BIGINT()),
						DataTypes.FIELD("user_id", DataTypes.BIGINT()),
						DataTypes.FIELD("amount", DataTypes.BIGINT()),
						DataTypes.FIELD("rowtime", DataTypes.BIGINT())
						),
				Arrays.asList(
						row(1L, 1, 18, 1698742358391L), 
						row(2L, 2, 19, 1698742359396L), 
						row(3L, 1, 25, 1698742360407L),
						row(4L, 3, 28, 1698742361409L), 
						row(5L, 1, 29, 1698742362424L),
						row(6L, 4, 49, 1698742362424L)
						));
		
		Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));
		
		Table orderByResult = left.orderBy($("amount").desc());
		
		tenv.createTemporaryView("order_union_t", orderByResult);

		Table result = tenv.sqlQuery("select * from order_union_t");
		
		//输出表
		tenv.executeSql(sinkSql);
//				+I[6, 4, 49.0, 1698742362424]
//				+I[5, 1, 29.0, 1698742362424]
//				+I[4, 3, 28.0, 1698742361409]
//				+I[3, 1, 25.0, 1698742360407]
//				+I[2, 2, 19.0, 1698742359396]
//				+I[1, 1, 18.0, 1698742358391]

		result.executeInsert("sink_table");
	}
	
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
//		testOrderByWithUnbounded();
		testOrderByWithBounded();
	}

}

三、表的insert操作

和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释,用 TablePipeline.execute() 来执行。

输出表必须已注册在 TableEnvironment中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。

该示例仅仅展示一个方法,运行环境和其他的示例一致,并且本示例仅仅展示的是insert Into,也可以使用execute Insert方法,在其他示例中有展示其使用。

static void testInsert() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<Order> orderA = env.fromCollection(orderList);
		DataStream<Order> orderB = env.fromCollection(
				Arrays.asList(
						new Order(10L, 1, 18, 1698742358391L), 
						new Order(16L, 4, 49, 1698742362424L)
						)
				);
		
		Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
		Table tableB = tenv.fromDataStream(orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
		tenv.executeSql(sinkSql);
		
		tableA.insertInto("sink_table").execute();
		tableB.insertInto("sink_table").execute();
//				+I[1, 1, 18.0, 1698742358391]
//				+I[2, 2, 19.0, 1698742359396]
//				+I[3, 1, 25.0, 1698742360407]
//				+I[4, 3, 28.0, 1698742361409]
//				+I[5, 1, 29.0, 1698742362424]
//				+I[6, 4, 49.0, 1698742362424]
//				+I[10, 1, 18.0, 1698742358391]
//				+I[16, 4, 49.0, 1698742362424]
		
	}

以上,本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。


标签:OrderBy,10,示例,alan,52,2023,import,31T08
From: https://blog.51cto.com/alanchan2win/9232428

相关文章

  • 【CMake】5. 单项目多模块添加第三方依赖示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-mod-deps这里引入C++gRPC依赖,进行示例1.多模块工程+第三方依赖CMake多模块工程,这是一个示例工程simple-mod-deps,项目名称de......
  • 【CMake】3.单项目单模块添加第三方依赖包示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块-添加第三方依赖示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-deps1.单模块工程+第三方依赖CMake单模块工程,这是一个示例工程simple-deps,项目名称cmake,第三方依赖demo......
  • 【CMake】2. 单项目单模块示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple1.单模块工程CMake单模块工程,这是一个示例工程simple,项目名称cmake,第三方依赖demo,主模块main2.目录结构$.SIMPLE......
  • 多态和虚函数 [补档-2022-10-23]
    简述:  多态分为两类,一种是静态多态(如函数重载,运算符重载,复用函数名等)和动态多态(派生类和虚函数实现运行时的多态)  静态多态和动态多态的区别是:静态多态的函数地址早绑定,在编译阶段就确定了函数地址。动态多态的函数地址是晚绑定,即运行阶段确定函数地址。​多态的满......
  • 深拷贝和浅拷贝的问题 [补档-2022-10-22]
    简介:​ (在没有用户自己定义拷贝构造函数的情况下)编译器通过调用默认拷贝构造函数将一个对象的内容完整地复制到另一个对象上。如果我们要拷贝的对象它的成员有指针,并且指针指向着某一块空间,那么就要小心了。​通常我们创建一块动态空间,在不需要的时候会......
  • D25XB100-ASEMI家用电器整流桥D25XB100
    编辑:llD25XB100-ASEMI家用电器整流桥D25XB100型号:D25XB100品牌:ASEMI封装:GBJ-5(带康铜丝)平均正向整流电流(Id):25A最大反向击穿电压(VRM):1000V产品引线数量:5产品内部芯片个数:4产品内部芯片尺寸:72MIL峰值正向漏电流:<10ua恢复时间:>2000ns正向浪涌电流:450A正向压降:1.05V恢复时......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersec
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • D25XB100-ASEMI家用电器整流桥D25XB100
    编辑:llD25XB100-ASEMI家用电器整流桥D25XB100型号:D25XB100品牌:ASEMI封装:GBJ-5(带康铜丝)平均正向整流电流(Id):25A最大反向击穿电压(VRM):1000V产品引线数量:5产品内部芯片个数:4产品内部芯片尺寸:72MIL峰值正向漏电流:<10ua恢复时间:>2000ns正向浪涌电流:450A正向压降:1.05V恢复时间:工作结温:-40℃~......
  • ORA-01102: cannot mount database in EXCLUSIVE mode的错误解决
    数据库运行环境oracle19c,安装后,启动数据库时报错,如下,经排查解决方法记录如下SQL>startupmountORACLEinstancestarted.TotalSystemGlobalArea2415917880bytesFixedSize   8899384bytesVariableSize  520093696bytesDatabaseBuffers 1879048192bytes......
  • Windows 10 中,可以使用 PowerShell 添加打印和文件服务的角色功能组件,包括 Internet
    在Windows10中,可以使用PowerShell添加打印和文件服务的角色功能组件,包括Internet打印客户端、LPD打印服务和LPR端口监视器。以下是添加这些功能组件的PowerShell命令:首先,以管理员身份打开PowerShell终端。在开始菜单中搜索"PowerShell",然后右键点击"WindowsPo......