文章目录
- Flink 系列文章
- 一、maven依赖
- 二、示例:表的join操作(内联接、外联接以及联接自定义函数等)
本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
本文除了maven依赖外,没有其他依赖。
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、示例:表的join操作(内联接、外联接以及联接自定义函数等)
本部分介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
关于自定义函数的联接将在flink 自定义函数中介绍,因为使用函数和联接本身关系不是非常密切。
19、Flink 的Table API 和 SQL 中的自定义函数(2)
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.call;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
static void testInnerJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table result = left.join(right)
.where($("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
env.execute();
}
/**
* 和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。
* @throws Exception
*/
static void testOuterJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table leftOuterResult = left.leftOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> leftOuterResultDS = tenv.toRetractStream(leftOuterResult, Row.class);
// leftOuterResultDS.print();
// 12> (true,+I[null, null, null, null, alan, 18])
// 3> (true,+I[null, null, null, null, alanchan, 28])
// 12> (false,-D[null, null, null, null, alan, 18])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[null, null, null, null, alan, 19])
// 3> (false,-D[null, null, null, null, alanchan, 28])
// 12> (false,-D[null, null, null, null, alan, 19])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 3> (true,+I[null, null, null, null, alanchan, 29])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
Table rightOuterResult = left.rightOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> rightOuterResultDS = tenv.toRetractStream(rightOuterResult, Row.class);
// rightOuterResultDS.print();
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
Table fullOuterResult = left.fullOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> fullOuterResultDS = tenv.toRetractStream(fullOuterResult, Row.class);
fullOuterResultDS.print();
// 3> (true,+I[6, 4, 49.0, 1698742362424, null, null])
// 12> (true,+I[1, 1, 18.0, 1698742358391, null, null])
// 15> (true,+I[4, 3, 28.0, 1698742361409, null, null])
// 12> (false,-D[1, 1, 18.0, 1698742358391, null, null])
// 3> (false,-D[6, 4, 49.0, 1698742362424, null, null])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 3> (true,+I[null, null, null, null, alanchan, 29])
// 12> (true,+I[2, 2, 19.0, 1698742359396, null, null])
// 12> (false,-D[2, 2, 19.0, 1698742359396, null, null])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 15> (false,-D[4, 3, 28.0, 1698742361409, null, null])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
env.execute();
}
/**
* Interval join 是可以通过流模式处理的常规 join 的子集。
* Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。
* 这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。
* @throws Exception
*/
static void testIntervalJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table result = left.join(right)
.where(
and(
$("user_id").isEqual($("userId")),
$("user_id").isLess(3)
// $("u_rowtime").isGreaterOrEqual($("o_rowtime").minus(lit(5).minutes())),
// $("u_rowtime").isLess($("o_rowtime").plus(lit(10).minutes()))
)
)
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"))
;
result.printSchema();
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18.0])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19.0])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18.0])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18.0])
env.execute();
}
/**
* join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。
* 如果表函数调用返回空结果,则删除左侧(外部)表的一行。
* 该示例为示例性的,具体的验证将在自定义函数中进行说明
*
* @throws Exception
*/
static void testInnerJoinWithUDTF() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
tenv.registerFunction("split", split);
// join
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table result = ordersTable
.joinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
env.execute();
}
/**
* join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。
* 如果表函数调用返回空结果,则保留相应的 outer(外部连接)行并用空值填充右侧结果。
* 目前,表函数左外连接的谓词只能为空或字面(常量)真。
* 该示例为示例性的,具体的验证将在自定义函数中进行说明
*
* @throws Exception
*/
static void testLeftOuterJoinWithUDTF() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
tenv.registerFunction("split", split);
// join
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table result = ordersTable
.leftOuterJoinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
env.execute();
}
/**
* Temporal table 是跟踪随时间变化的表。
* Temporal table 函数提供对特定时间点 temporal table 状态的访问。
* 表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。
* 目前仅支持与 temporal table 的 inner join。
*
* @throws Exception
*/
static void testJoinWithTemporalTable() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
Table ratesHistory = tenv.from("RatesHistory");
// 注册带有时间属性和主键的 temporal table function
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("r_proctime"),
$("r_currency")
);
tenv.registerFunction("rates", rates);
// 基于时间属性和键与“Orders”表关联
Table orders = tenv.from("Orders");
Table result = orders
.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testInnerJoin();
// testOuterJoin();
// testIntervalJoin();
testInnerJoinWithUDTF();
}
static class SplitFunction extends TableFunction<Tuple3<String,String,String>>{
public void eval(Tuple3<String,String,String> tp) {
// for (String s : str.split(",")) {
// // use collect(...) to emit a row
collect(Row.of(s, s.length()));
// }
}
}
}
以上,本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。