文章目录
- Flink 系列文章
- 一、maven依赖
- 二、表的union、unionall、intersect、intersectall、minus、minusall和in的操作
本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)17、Flink 之Table API: Table API 支持的操作(2)
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、表的union、unionall、intersect、intersectall、minus、minusall和in的操作
本示例的运行结果均在执行用例中,其中用例只能在批模式下工作,用例特意说明了,如果没说明的则意味着流批模式均可。
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Executable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo.Order;
import org.tablesql.TestTableAPIJoinOperationDemo.User;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
// 创建输出表
final static String sinkSql = "CREATE TABLE sink_table (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" amount DOUBLE,\n" +
" rowtime BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
/**
*
* @throws Exception
*/
static void testUnionBySQL() throws Exception {
// TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
DataStream<Order> orderA = env.fromCollection(orderList);
DataStream<Order> orderB = env.fromCollection(orderList);
// 将DataStream数据转Table和View,然后查询
Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
tenv.createTemporaryView("tableB", orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
// 查询:tableA中amount>2的和tableB中amount>1的数据最后合并
// select * from tableA where amount > 2
// union
// select * from tableB where amount > 1
String sql = "select * from " + tableA + " where amount > 2 union select * from tableB where amount > 1";
Table resultTable = tenv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);// union使用toRetractStream
// String sql = "select * from " + tableA + " where amount > 2 union select * from tableB where amount > 1";
// 9> (true,TestTableAPIJoinOperationDemo2.Order(id=1, user_id=1, amount=18.0, rowtime=1698742358391))
// 8> (true,TestTableAPIJoinOperationDemo2.Order(id=2, user_id=2, amount=19.0, rowtime=1698742359396))
// 4> (true,TestTableAPIJoinOperationDemo2.Order(id=5, user_id=1, amount=29.0, rowtime=1698742362424))
// 8> (true,TestTableAPIJoinOperationDemo2.Order(id=4, user_id=3, amount=28.0, rowtime=1698742361409))
// 14> (true,TestTableAPIJoinOperationDemo2.Order(id=6, user_id=4, amount=49.0, rowtime=1698742362424))
// 6> (true,TestTableAPIJoinOperationDemo2.Order(id=3, user_id=1, amount=25.0, rowtime=1698742360407))
// toAppendStream → 将计算后的数据append到结果DataStream中去
// toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
// 类似StructuredStreaming中的append/update/complete
// TODO 3.sink
resultDS.print();
// TODO 4.execute
env.execute();
}
/**
* 和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。
* 本示例仅仅使用同一个表来演示
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testUnion() throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(5L, 1, 29, 1698742362424L),
row(6L, 4, 49, 1698742362424L)
));
Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table unionResult = left.union(left);
tenv.createTemporaryView("order_union_t", unionResult);
Table result = tenv.sqlQuery("select * from order_union_t");
// 下面不能转换,只有流式表可以转成流
// 出现异常:The UNION operation on two unbounded tables is currently not supported.
// DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(result, Order.class);
// resultDS.print();
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
// +I[5, 1, 29.0, 1698742362424]
// +I[1, 1, 18.0, 1698742358391]
// +I[3, 1, 25.0, 1698742360407]
// +I[4, 3, 28.0, 1698742361409]
// +I[2, 2, 19.0, 1698742359396]
}
/**
* 和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。
* 本示例仅仅使用同一个表来演示
*
* @throws Exception
*/
static void testUnionAll() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table result = left.unionAll(left);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 14> (true,+I[5, alanchan, 29.0, 1698742362424])
// 8> (true,+I[4, alanchan, 28.0, 1698742361409])
// 5> (true,+I[1, alan, 18.0, 1698742358391])
// 10> (true,+I[1, alan, 18.0, 1698742358391])
// 11> (true,+I[2, alan, 19.0, 1698742359396])
// 6> (true,+I[2, alan, 19.0, 1698742359396])
// 7> (true,+I[3, alan, 25.0, 1698742360407])
// 13> (true,+I[4, alanchan, 28.0, 1698742361409])
// 12> (true,+I[3, alan, 25.0, 1698742360407])
// 9> (true,+I[5, alanchan, 29.0, 1698742362424])
env.execute();
}
/**
* 和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。
* 如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testIntersect() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.intersect(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[1, 1, 18.0, 1698742358391]
}
/**
* 和 SQL INTERSECT ALL 子句类似。
* IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testIntersectAll() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.intersectAll(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[2, 2, 19.0, 1698742359396]
// +I[1, 1, 18.0, 1698742358391]
}
/**
* 和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。
* 左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testMinus() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.minus(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
}
/**
* 和 SQL EXCEPT ALL 子句类似。
* MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,
* 例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testMinusAll() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.minus(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
}
/**
* 和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。
* 子查询表必须由一列组成。
* 这个列必须与表达式具有相同的数据类型。
*
* @throws Exception
*/
static void testIn() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("user_id"));
Table result = left.select($("userId"), $("name"), $("balance"),$("u_rowtime")).where($("userId").in(right));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 3> (true,+I[4, alanchan, 28.0, 1698742361409])
// 12> (true,+I[1, alan, 18.0, 1698742358391])
// 15> (true,+I[3, alan, 25.0, 1698742360407])
// 12> (true,+I[2, alan, 19.0, 1698742359396])
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testUnion();
// testUnionAll();
// testUnionBySQL();
// testIntersect();
// testIntersectAll() ;
// testMinus();
// testMinusAll();
testIn();
}
}
以上,本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。
标签:minusall,示例,flink,tenv,BIGINT,Table,DataTypes,id,row From: https://blog.51cto.com/alanchan2win/9232418