文章目录
- Flink 系列文章
- 一、maven依赖
- 二、表的OrderBy, Offset 和 Fetch操作
- 三、表的insert操作
本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。
本文除了maven依赖外,没有其他依赖。
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、表的OrderBy, Offset 和 Fetch操作
在批处理模式下,也即有界情况下,order by 可以单独使用,排序也可以是任意字段,与一般数据库的排序结果一样。
在流模式下,也即无界的情况下,order by需要和fetch一起使用,排序字段需要有时间属性,与一般数据库的排序有点差异。
需要说明的是order by 和offset&fetch都可以在批处理模式和流模式下工作。
- Order By,和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
- Offset & Fetch,和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。
具体结果见下面示例
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo2.Order;
import org.tablesql.TestTableAPIJoinOperationDemo2.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo3 {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
// 创建输出表
final static String sinkSql = "CREATE TABLE sink_table (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" amount DOUBLE,\n" +
" rowtime BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
/**
* Order By
* 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
* 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
* Sort on a non-time-attribute field is not supported.
*
* Offset & Fetch
* 和 SQL 的 OFFSET 和 FETCH 子句类似。
* Offset 操作根据偏移位置来限定(可能是已排序的)结果集。
* Fetch 操作将(可能已排序的)结果集限制为前 n 行。
* 通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。
*
* @throws Exception
*/
static void testOrderByWithUnbounded() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
usersTable.printSchema();
// 从已排序的结果集中返回前3条记录
Table result = usersTable.orderBy($("rowtime").desc()).fetch(3);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
// resultDS.print();
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+I[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[3, alan, 25.0, 2023-10-31T08:52:40.407])
// (false,-D[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+I[4, alanchan, 28.0, 2023-10-31T08:52:41.409])
// (false,-D[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[5, alanchan, 29.0, 2023-10-31T08:52:42.424])
// 从已排序的结果集中返回跳过2条记录之后的所有记录
Table result2 = usersTable.orderBy($("rowtime").desc()).offset(2).fetch(4);
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
result2DS.print();
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (false,-U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+U[3, alan, 25.0, 2023-10-31T08:52:40.407])
// (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
env.execute();
}
/**
* 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
* 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
* 这个和一般的查询数据库的结果比较类似
*
* @throws Exception
*/
static void testOrderByWithBounded() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(5L, 1, 29, 1698742362424L),
row(6L, 4, 49, 1698742362424L)
));
Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table orderByResult = left.orderBy($("amount").desc());
tenv.createTemporaryView("order_union_t", orderByResult);
Table result = tenv.sqlQuery("select * from order_union_t");
//输出表
tenv.executeSql(sinkSql);
// +I[6, 4, 49.0, 1698742362424]
// +I[5, 1, 29.0, 1698742362424]
// +I[4, 3, 28.0, 1698742361409]
// +I[3, 1, 25.0, 1698742360407]
// +I[2, 2, 19.0, 1698742359396]
// +I[1, 1, 18.0, 1698742358391]
result.executeInsert("sink_table");
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testOrderByWithUnbounded();
testOrderByWithBounded();
}
}
三、表的insert操作
和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释,用 TablePipeline.execute() 来执行。
输出表必须已注册在 TableEnvironment中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。
该示例仅仅展示一个方法,运行环境和其他的示例一致,并且本示例仅仅展示的是insert Into,也可以使用execute Insert方法,在其他示例中有展示其使用。
static void testInsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<Order> orderA = env.fromCollection(orderList);
DataStream<Order> orderB = env.fromCollection(
Arrays.asList(
new Order(10L, 1, 18, 1698742358391L),
new Order(16L, 4, 49, 1698742362424L)
)
);
Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table tableB = tenv.fromDataStream(orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
tenv.executeSql(sinkSql);
tableA.insertInto("sink_table").execute();
tableB.insertInto("sink_table").execute();
// +I[1, 1, 18.0, 1698742358391]
// +I[2, 2, 19.0, 1698742359396]
// +I[3, 1, 25.0, 1698742360407]
// +I[4, 3, 28.0, 1698742361409]
// +I[5, 1, 29.0, 1698742362424]
// +I[6, 4, 49.0, 1698742362424]
// +I[10, 1, 18.0, 1698742358391]
// +I[16, 4, 49.0, 1698742362424]
}
以上,本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。
标签:OrderBy,10,示例,alan,52,2023,import,31T08 From: https://blog.51cto.com/alanchan2win/9232428