文章目录
- Flink 系列文章
- 一、maven依赖
- 二、Group window
- 1、Tumble (Tumbling Windows)
- 2、Slide (Sliding Windows)
- 3、Session (Session Windows)
本文介绍了表的group windows三种窗口(tumbling、sliding和session)操作,以示例形式展示每个操作的结果。
本文除了maven依赖外,没有其他依赖。
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、Group window
Group window 聚合根据时间或行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。
窗口是使用 window(GroupWindow w) 子句定义的,并且需要使用 as 子句来指定别名。为了按窗口对表进行分组,窗口别名的引用必须像常规分组属性一样在 groupBy(…) 子句中。
1、Tumble (Tumbling Windows)
滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIOperationWithWindowDemo {
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private int balance;
private Long rowtime;
}
static void testTumbleOver() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
//按属性、时间窗口分组后的互异(互不相同、去重)聚合
Table groupByWindowResult = usersTable
.window(Tumble
.over(lit(5).minutes())
.on($("rowtime"))
.as("w")
)
.groupBy($("name"), $("w"))
.select($("name"), $("balance").sum().distinct().as("sum_balance"));
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(groupByWindowResult, Row.class);
result2DS.print("result2DS:");
// result2DS::2> (true,+I[alan, 62])
// result2DS::16> (true,+I[alanchan, 57])
//使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
Table result = usersTable
.window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口
.groupBy($("name"), $("w")) // 按窗口和键分组
// 访问窗口属性并聚合
.select(
$("name"),
$("w").start(),
$("w").end(),
$("w").rowtime(),
$("balance").sum().as("sum(balance)")
);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print("resultDS:");
// resultDS::2> (true,+I[alan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 62])
// resultDS::16> (true,+I[alanchan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 57])
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
testTumbleOver();
}
}
2、Slide (Sliding Windows)
滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。滑动窗口可以定义在事件时间、处理时间或行数上。
static void testSlidingOver() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
Table result1 = usersTable
.window(
Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("rowtime"))
.as("w")
)
.groupBy($("name"),$("w"))
.select($("name"),$("balance").sum().as("sum(balance)"),$("w").start(),$("w").end(),$("w").rowtime())
;
DataStream<Tuple2<Boolean, Row>> result1DS = tenv.toRetractStream(result1, Row.class);
// result1DS.print("result1DS:");
// result1DS::16> (true,+I[alanchan, 57, 2023-10-31T08:45, 2023-10-31T08:55, 2023-10-31T08:54:59.999])
// result1DS::2> (true,+I[alan, 62, 2023-10-31T08:45, 2023-10-31T08:55, 2023-10-31T08:54:59.999])
// result1DS::16> (true,+I[alanchan, 57, 2023-10-31T08:50, 2023-10-31T09:00, 2023-10-31T08:59:59.999])
// result1DS::2> (true,+I[alan, 62, 2023-10-31T08:50, 2023-10-31T09:00, 2023-10-31T08:59:59.999])
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
Table usersTable2 = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime().as("proctime"));
Table result2 = usersTable2
.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("proctime"))
.as("w")
)
.groupBy($("name"),$("w"))
.select($("name"),$("balance").sum().as("sum(balance)"),$("w").start(),$("w").end(),$("w").proctime())
;
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
// result2DS.print("result2DS:");
// result2DS::2> (true,+I[alan, 62, 2023-10-31T08:45, 2023-10-31T08:55, 2023-11-03T02:17:19.345Z])
// result2DS::16> (true,+I[alanchan, 57, 2023-10-31T08:45, 2023-10-31T08:55, 2023-11-03T02:17:19.345Z])
// result2DS::16> (true,+I[alanchan, 57, 2023-10-31T08:50, 2023-10-31T09:00, 2023-11-03T02:17:19.348Z])
// result2DS::2> (true,+I[alan, 62, 2023-10-31T08:50, 2023-10-31T09:00, 2023-11-03T02:17:19.348Z])
//Sliding Row-count window (assuming a processing-time attribute "proctime")
Table usersTable3 = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime().as("proctime"));
Table result3 = usersTable3
.window(Slide.over(rowInterval(10L)).every(rowInterval(5L)).on($("proctime")).as("w"))
.groupBy($("name"),$("w"))
.select($("name"),$("balance").sum().as("sum(balance)"))
;
DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result3, Row.class);
result3DS.print("result3DS:");
//Event-time grouping windows on row intervals are currently not supported.
env.execute();
}
3、Session (Session Windows)
static void testSessionOver() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
// Session Event-time Window
Table result1 = usersTable
.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"))
.groupBy($("name"),$("w"))
.select($("name"),$("balance").sum().as("sum(balance)"))
;
DataStream<Tuple2<Boolean, Row>> result1DS = tenv.toRetractStream(result1, Row.class);
result1DS.print("result1DS:");
// result1DS::16> (true,+I[alanchan, 57])
// result1DS::2> (true,+I[alan, 62])
// Session Processing-time Window (assuming a processing-time attribute "proctime")
Table usersTable2 = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime().as("proctime"));
Table result2 = usersTable2
.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"))
.groupBy($("name"),$("w"))
.select($("name"),$("balance").sum().as("sum(balance)"))
;
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
result2DS.print("result2DS:");
// result2DS::2> (true,+I[alan, 62])
// result2DS::16> (true,+I[alanchan, 57])
env.execute();
}
以上,本文介绍了表的group windows三种窗口(tumbling、sliding和session)操作,以示例形式展示每个操作的结果。