首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

时间:2024-01-13 14:31:51浏览次数:24  
标签:rowtime null 自定义 示例 alan Table 联接 true id




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:表的join操作(内联接、外联接以及联接自定义函数等)



本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。


本文除了maven依赖外,没有其他依赖。

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:表的join操作(内联接、外联接以及联接自定义函数等)

本部分介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
关于自定义函数的联接将在flink 自定义函数中介绍,因为使用函数和联接本身关系不是非常密切。
19、Flink 的Table API 和 SQL 中的自定义函数(2)

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.call;

import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIJoinOperationDemo {
	
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private double balance;
		private Long rowtime;
	}
	
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class Order {
		private long id;
		private long user_id;
		private double amount;
		private Long rowtime;
	}

	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 25, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);
	
	final static List<Order> orderList = Arrays.asList(
			new Order(1L, 1, 18, 1698742358391L), 
			new Order(2L, 2, 19, 1698742359396L), 
			new Order(3L, 1, 25, 1698742360407L),
			new Order(4L, 3, 28, 1698742361409L), 
			new Order(5L, 1, 29, 1698742362424L),
			new Order(6L, 4, 49, 1698742362424L)
			);
	
	static void testInnerJoin() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList);
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
		
		DataStream<Order> orders = env.fromCollection(orderList);
		Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
		
		Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
		Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
		
		Table result = left.join(right)
		    .where($("user_id").isEqual($("userId")))
		    .select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
		
		env.execute();
	}
	
	/**
	 * 和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。
	 * @throws Exception
	 */
	static void testOuterJoin() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList);
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
		
		DataStream<Order> orders = env.fromCollection(orderList);
		Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
		
		Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
		Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));

		Table leftOuterResult = left.leftOuterJoin(right, $("user_id").isEqual($("userId")))
														.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
		DataStream<Tuple2<Boolean, Row>> leftOuterResultDS = tenv.toRetractStream(leftOuterResult, Row.class);
//		leftOuterResultDS.print();
//		12> (true,+I[null, null, null, null, alan, 18])
//		3> (true,+I[null, null, null, null, alanchan, 28])
//		12> (false,-D[null, null, null, null, alan, 18])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[null, null, null, null, alan, 19])
//		3> (false,-D[null, null, null, null, alanchan, 28])
//		12> (false,-D[null, null, null, null, alan, 19])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		3> (true,+I[null, null, null, null, alanchan, 29])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
		
		Table rightOuterResult = left.rightOuterJoin(right, $("user_id").isEqual($("userId")))
														.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
		DataStream<Tuple2<Boolean, Row>> rightOuterResultDS = tenv.toRetractStream(rightOuterResult, Row.class);
//		rightOuterResultDS.print();
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
		
		Table fullOuterResult = left.fullOuterJoin(right, $("user_id").isEqual($("userId")))
														.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
		DataStream<Tuple2<Boolean, Row>> fullOuterResultDS = tenv.toRetractStream(fullOuterResult, Row.class);
		fullOuterResultDS.print();
//		3> (true,+I[6, 4, 49.0, 1698742362424, null, null])
//		12> (true,+I[1, 1, 18.0, 1698742358391, null, null])
//		15> (true,+I[4, 3, 28.0, 1698742361409, null, null])
//		12> (false,-D[1, 1, 18.0, 1698742358391, null, null])
//		3> (false,-D[6, 4, 49.0, 1698742362424, null, null])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		3> (true,+I[null, null, null, null, alanchan, 29])
//		12> (true,+I[2, 2, 19.0, 1698742359396, null, null])
//		12> (false,-D[2, 2, 19.0, 1698742359396, null, null])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		15> (false,-D[4, 3, 28.0, 1698742361409, null, null])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
		
		env.execute();
	}
	
	/**
	 * Interval join 是可以通过流模式处理的常规 join 的子集。
	 * Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。
	 * 这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。
	 * @throws Exception
	 */
	static void testIntervalJoin() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList);
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
		
		DataStream<Order> orders = env.fromCollection(orderList);
		Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
		
		Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
		Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
		
		Table result = left.join(right)
				  .where(
					    and(
					    	$("user_id").isEqual($("userId")),
					    	$("user_id").isLess(3)
//					        $("u_rowtime").isGreaterOrEqual($("o_rowtime").minus(lit(5).minutes())),
//					        $("u_rowtime").isLess($("o_rowtime").plus(lit(10).minutes()))
					    )
				    )
				  .select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"))
				  ;
		result.printSchema();
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18.0])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19.0])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18.0])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18.0])
		
		env.execute();
	}
	
	/**
	 * join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 
	 * 如果表函数调用返回空结果,则删除左侧(外部)表的一行。
	 * 该示例为示例性的,具体的验证将在自定义函数中进行说明
	 * 
	 * @throws Exception
	 */
	static void testInnerJoinWithUDTF() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		// 注册 User-Defined Table Function
		TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
		tenv.registerFunction("split", split);

		// join
		DataStream<Order> orders = env.fromCollection(orderList);
		Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
		
		Table result = ordersTable
		    .joinLateral(call("split", $("c")).as("s", "t", "v"))
		    .select($("a"), $("b"), $("s"), $("t"), $("v"));
		
		
		env.execute();
	}
	
	/**
	 * join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。
	 * 如果表函数调用返回空结果,则保留相应的 outer(外部连接)行并用空值填充右侧结果。
	 * 目前,表函数左外连接的谓词只能为空或字面(常量)真。
	 * 该示例为示例性的,具体的验证将在自定义函数中进行说明
	 * 
	 * @throws Exception
	 */
	static void testLeftOuterJoinWithUDTF() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		// 注册 User-Defined Table Function
		TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
		tenv.registerFunction("split", split);

		// join
		DataStream<Order> orders = env.fromCollection(orderList);
		Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
		
		Table result = ordersTable
		    .leftOuterJoinLateral(call("split", $("c")).as("s", "t", "v"))
		    .select($("a"), $("b"), $("s"), $("t"), $("v"));
		
		
		env.execute();
	}
	
	/**
	 * Temporal table 是跟踪随时间变化的表。
	 * Temporal table 函数提供对特定时间点 temporal table 状态的访问。
	 * 表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。
	 * 目前仅支持与 temporal table 的 inner join。
	 * 
	 * @throws Exception
	 */
	static void testJoinWithTemporalTable() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		Table ratesHistory = tenv.from("RatesHistory");

		// 注册带有时间属性和主键的 temporal table function
		TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
		    $("r_proctime"),
		    $("r_currency")
		    );
		tenv.registerFunction("rates", rates);

		// 基于时间属性和键与“Orders”表关联
		Table orders = tenv.from("Orders");
		Table result = orders
		    .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));
		
		env.execute();
	}
	
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
//		testInnerJoin();
//		testOuterJoin();
//		testIntervalJoin();
		testInnerJoinWithUDTF();
		
	}

	static class SplitFunction extends TableFunction<Tuple3<String,String,String>>{
		
		public void eval(Tuple3<String,String,String> tp) {
			
//		    for (String s : str.split(",")) {
//		      // use collect(...) to emit a row
		      collect(Row.of(s, s.length()));
//		    }
			
		  }
	}
}

以上,本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。


标签:rowtime,null,自定义,示例,alan,Table,联接,true,id
From: https://blog.51cto.com/alanchan2win/9232426

相关文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch
    文章目录Flink系列文章一、maven依赖二、表的OrderBy,Offset和Fetch操作三、表的insert操作本文介绍了表的OrderBy、Offset和Fetch、insert操作,以示例形式展示每个操作的结果。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、Flin......
  • Qt/C++编写视频监控系统83-自定义悬浮条信息
    一、前言一般视频控件上会给出个悬浮条,这个悬浮条用于显示分辨率或者一些用户期望看到的信息,一般常用的信息除了分辨率以外,还有帧率、封装格式、视频解码器名称、音频解码器名称、实时码率等,由于实际的场景不一样,用户希望能过自定义勾选开启哪些信息,开启的就显示,不开启的则可以不......
  • 【CMake】5. 单项目多模块添加第三方依赖示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-mod-deps这里引入C++gRPC依赖,进行示例1.多模块工程+第三方依赖CMake多模块工程,这是一个示例工程simple-mod-deps,项目名称de......
  • 【CMake】3.单项目单模块添加第三方依赖包示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块-添加第三方依赖示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-deps1.单模块工程+第三方依赖CMake单模块工程,这是一个示例工程simple-deps,项目名称cmake,第三方依赖demo......
  • 【CMake】2. 单项目单模块示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple1.单模块工程CMake单模块工程,这是一个示例工程simple,项目名称cmake,第三方依赖demo,主模块main2.目录结构$.SIMPLE......
  • 如何让spring扫描到自定义注解的组件
    ClassPathScanningCandidateComponentProviderscanner=newClassPathScanningCandidateComponentProvider(true);scanner.addIncludeFilter(newAnnotationTypeFilter(Tenant.class));Set<BeanDefinition>beanDefinitions=scanner.findCandidat......
  • Flutter 自定义一个右侧可滑动 左侧为按钮的组件
    Flutter自定义一个右侧可滑动左侧为按钮的组件vartopicList=["圈子话题1","圈子话题2","圈子话题3","圈子话题4-天气很好","圈子话题5","圈子话题6"];@overrideWidgetbuild(BuildContextcontext){returnSizedBox(height:40......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersec
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 京东商品详情API实现实时数据获取的Java代码示例
    在电商行业中,商品详情页是用户了解商品信息、进行购买决策的重要页面。为了提高用户体验和促进销售,电商平台通常会提供商品详情的API接口,以便第三方应用能够实时获取商品数据。本文将介绍如何使用京东获得的JD商品详情API实现实时数据获取,并提供相应的Java代码示例。一、JD商品详......
  • 如何自定义选图功能?
    1,你可以更改功能板上的各个图标按钮和响应,你可以参考RCDChatViewController文件中的注释。2,根据1提到的注释,加上如下代码-(void)pluginBoardView:(RCPluginBoardView*)pluginBoardViewclickedItemWithTag:(NSInteger)tag{switch(tag){casePLUGIN_BOARD_ITEM_ALBUM_TAG:{......