快速上手
引入依赖
要在代码中使用 TableAPI,必须引入相关的依赖。
这里的依赖是一个 Java 的“桥接器”,主要就是负责 TableAPI 和下层 DataStream API的连接支持,按照不同的语言分为 Java 版和 Scala 版。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.12.1</version>
</dependency>
如果我们希望在本地的集成开发坏境里运行 TableAPI 和 SQL,还需要引入以下依赖。
这里主要添加的依赖是一个 “计划器”,它是 TableAPI 的核心组件,负责提供运行时环境,并生成程序的执行计划。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.12.1</version>
</dependency>
简单示例
// 1.读取数据,得到DataStream
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
// 2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 3.将DataStream转换为Table
Table eventTable = tableEnv.fromDataStream(eventStream);
// 4.直接写SQL进行转换
Table resultTable = tableEnv.sqlQuery("select user, url from " + eventTable);
// 5.基于Table直接转换
Table resultTable1 = eventTable.select($("user"), $("url")).where($("user").isEqual("Alice"));
// 6.再转回去打印
tableEnv.toAppendStream(resultTable, EventNoTime.class).print("result");
tableEnv.toAppendStream(resultTable1, EventNoTime.class).print("result1");
基本 API
创建表环境
对于 Flink 这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用 TableAPI 和 SQL 需要一个特别的运行时环境(表环境),它主要负责:
- 注册 Catalog 和表。
- 执行 SQL 查询。
- 注册用户自定义函数(UDF)。
- DataStream 和表之间的转换。
这里的 Catalog 就是“目录”,主要用来管理所有数据库和表的元数据。通过 Catalog 可以方便地对数据库和表进行
查询的管理。
每个表和 SQL 的执行,都必须绑定在一个表环境中,可以通过调用静态的 crcate()
方法来创建一个表环境实例。方法需要传入一个环境的配置参数 EnvironmentSettings
,它可以指定当前表环境的执行模式和计划器。
// 1.定义环境配置来创建表执行环境
// 1.1基于blink版本planner进行流处理
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 1.2基于老版本planner进行流处理
EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
.inStreamingMode()
.useOldPlanner()
.build();
TableEnvironment tableEnv1 = TableEnvironment.create(settings1);
// 1.3基于老版本planner进行批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
// 1.4基于blink版本planner进行批处理
EnvironmentSettings settings2 = EnvironmentSettings.newInstance()
.inBatchMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv2 = TableEnvironment.create(settings2);
创建表
为了方便地查询表,表环境中会维护一个目录和表的对应关系。所以表都是通过 Catalog 来进行注册创建的。表在环境中有一个唯一的 ID,由三部分组成:目录名,数据库名,以及表名。
default_catalog.default_database.MyTable
如果想要自定义目录名和数据库名,可以在环境中设置。
tableEnv.useCatalog("custom_catalog");
tableEnv.useDatabase("custom_database");
具体创建表的方式,有通过连接器和虛拟表两种。
连接器
通过连接器连接到一个外部系统,然后定义出对应的表结构。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换,而当我们向这张表写入数据时,连接器就会将数据输出到外部系统中。
在代码中,可以调用表环境的 executeSql()
方法,可以传入一个 DDL 作为参数执行 SQL 操作。
tableEnv.executeSql("CREATE [TEMPORARY] TABLE Mytable ... WITH('connector' = ...)");
// 2.1创建表
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 2.2创建一张用于输出的表
String createOutDDL = "CREATE TABLE outTable (" +
" `user` STRING," +
" url STRING" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'output'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createOutDDL);
虚拟表
在环境中注册之后,就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM Mytable ...");
由于 newTable
是一个 Table 对象,并没有在表环境中注册,所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用。
tableEnv.createTemporaryView("NewTable", newTable);
这里的注册其实是创建了一个虚拟表,与 SQL 中的视图类似。
查询表
执行 SQL 进行查询
Flink 基于 Apache Calcite 来提供对 SQL 的支持,Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具。
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH('connector' = ...)");
// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
我们也可以直接将查询的结果写入到己经注册的表中,这需要调用表环境的 executeSql()
方法来执行 DDL,传入的是一个 INSERT
语句。
// 注册表
tableEnv.executeSql("CREATE TABLE aliceVisitTable ... WITH('connector' = ...)");
// 将查询结果输出到aliceVisitTable中
tableEnv.executeSql(
"INSERT INTO aliceVisitTable" +
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
调用 TableAPI 进行查询
由于 TableAPI 是基于 Table 的 Java 实例进行调用的,因此我们首先要得到表的 Java 对象。基于环境中已注册的表,可以通过表环境的 from()
方法非常容易地得到一个 Table 对象。
// eventTable是一个Table对象,EventTable是环境中注册的表名。
Table eventTable = tableEnv.from("EventTable");
Table maryClickTable = eventTable
.where($("user").isEqual("Alice"))
.select($("url"), $("user"));
流处理中的表
当我们将一个 Table 转换成 DataStream 时,有“仅插入流”和“更新日志流”两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。
关系型表/SQL | 流处理 | |
---|---|---|
处理的数据对象 | 字段元组的有界集合 | 字段元组的无限序列 |
查询对数据的访问 | 可以访问到完整的数据输入 | 无法访问到完整的数据输入 |
查询终止条件 | 生成固定大小的结果集后终止 | 永不终止 |
动态表和持续查询
动态表
当流中有新数据到来,初始的表中会插入一行,而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”。
持续查询
由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”。
可以认为一次查询面对的数据集,就是当前输入动态表中的所有数据。这相当于是对输入动态表做了一个“快照”,当作有限数据集进行批处理。
持续查询的步骤:
- 流被转化为动态表。
- 对动态表进行持续查询,生成新的动态表。
- 生成的动态表转换为流。
将流转换成动态表
为了能够使用 SQL 来做流处理,我们必须先把流转换成动态表。
如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变,只能在后面追加。所以我们其实是通过一个只有插入操作的更新日志流,来构建一个表。
将动态表转换为流
动态表也可以通过插入、更新和删除操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。
- 仅追加流:仅通过插入更改来修改的动态表,可以直接转换为仅追加流。这个流中发出的数据,其实就是动态表中新增的每一行。
- 撤回流:包含两类消息的流,添加消息和撤回消息。具体的编码规则是:插入操作编码为 add 消息,删除操作编码为 retract 消息。而更新操作则编码为被更改行的 retract 消息和更新后新行的 add 消息。
- 更新插入流:包含两类消息,更新插入消息和删除消息。插入操作和更新操作,统一被编码为 upsert 消息,而删除操作则被编码为 delete 消息。
时间属性和窗口
基于时间的操作需要定义相关的时间语义和时间数据来源的信息。在 TableAPI 和 SQL 中,会给表单独提供一个逻辑上的时间字段,在表处理程序中专门用来指示时间。
时间属性的数据类型为 TIMESTAMP
,可以直接访问并且进行计算。按照时间语义的不同,分成事件时间和处理时间。
事件时间
在事件时间语义下,允许表处理程序根据每个数据包含的时间戳来生成结果。通过设置水位线来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。
事件时间可以在创建表 DDL 中定义,也可以在数据流和表的转换中定义。
创建表 DDL 中定义
在创建表的 DDL 中,可以增加一个字段,通过 WATERMARK
语句来定义事件时间属性。WATERMARK
语向主要用来定义水位线的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的
延迟时间。
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
这里把 ts
字段定义为事件时间属性,而且基于 ts
设置了 5 秒的水位线延迟。
如果原始的时间戳就是一个长整型的毫秒数,这时就需要定义一个字段来表示事件的时间属性,类型定义为 TIMESTAMP_LTZ
会更方便。
CREATE TABLE EventTable(
user STRING,
url STRING,
ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
这里我们另外定义了一个字段 ts_ltz
,是把长整型的 ts
转换为 TIMESTAMP_LTZ
得到的。
// 在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
在数据流转换为表时定义
调用 fromDataStream()
方法创建表时,可以追加参数来定义表中的宇段结构。这时可以给某个字段加上 rowtime
后缀,就表示将当前字段为事件时间属性。这个字段会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP
。
这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该在 DataStream 上就定义好了。由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属性解析成不带时区的 TIMESTAMP
类型,所有的时间值都被当作 UTC 标准时间。
// 流中数据类型为二元组Tuple2,包含两个字段,需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimeStampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tableEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());
// 在流转换为table的时候定义时间属性
SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
clickTable.printSchema();
处理时间
处理时间就是系统时间,使用时不需要提取时间戳和生成水位线。因此在定义处理时间属性时,必须要额外声明一个
宇段,专门用来保存当前的处理时间。
创建表 DDL 中定义
在创建表的 DDL 中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()
函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ
。
CREATE TABLE EventTable(
user STRING,
url STRING,
ts AS PROCTIME()
) WITH (
...
);
在数据流转换为表时定义
调用 fromDataStream()
方法创建表时,可以用 proctime()
后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个己有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
DataStream<Tuple2<String, String>> stream = inputStream.assignTimeStampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tableEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());
窗口
分组窗口
常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现。具体在 SQL 中就是调用 TUMBLE()
、HOP()
、SESSION()
,传入时间属性字段、窗口大小等参数就可以了。
TUBLE(ts, INTERVAL '1' HOUR)
在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。
Table result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) AS endT, " +
"COUNT(url) AS CNT " +
"FROM EventTable " +
"GROUP BY " + // 使用窗口和用户名分组
"user, " +
"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义1小时滚动窗口
);
这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数 COUNT()
对分组数据的个数进行了聚合统计,并将结果字段重命名为 cnt
,用 TUMBLE_END
函数获取滚动窗口的结束时间,重命名为 endT
提取出来。
窗口表值函数
窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数可以看作是返回一个表的函数。
- 滚动窗口(Tumbling Windows):长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。只有一个核心参数就是窗口大小,在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入,另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
- 滑动窗口(Hop Windows,跳跃窗口):可以通过设置滑动步长来控制统计输出的频率。除了也要传入表名、时间属性外,还需要传入窗口大小和滑动步长两个参数。
# 基于时间属性ts,在表EventTable上创建了大小为1小时的滑动窗口,每5分钟滑动一次。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS)
- 累积窗口(Cumulate Windows):在一定的统计周期内进行累积计算,累积窗口中有两个核心的参数:最大窗口长度和累积步长。
# 基手时间属性ts,在表EventTable上定义了一个统计周期为1天、累积步长为1小时的累积窗口。
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
- 会话窗口(Session Windows)
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外3个列:窗口起始点、窗口结束点、窗口时间(窗口中能够包含数据的最大时间戳)。
聚合查询
在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值:比如求和、最大最小值、平均值等等,这种操作叫作聚合查询。
分组聚合
可以通过 GROUP BY
子句来指定分组的键,从而对数据按照某个字段做一个分组统计。
SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user
在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表。每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流或更新插入流的编码方式。
在持续查询的过程中,由于用于分组的键可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,可以在表环境中设置状态的生存时间。
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 设置状态保存时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));
// 也可以直接设置配置项table.exec.state.ttl
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");
窗口聚合
窗口聚合也需要调用 SUM()
、MAX()
、MIN()
、COUNT()
一类的聚合函数,通过 GROUP BY
子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。
// 以ts作为时间属性字段、基于EventTable定义了1小时的滚动窗口,希望统计出每小时每个用户点击url的次数。用来分组的字段是用户名user,以及表示窗口的window_start和window_end
Table result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"window_end AS endT, " +
"COUNT(url) AS cnt " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable, " +
"DESCRIPTOR(ts), " +
"INTERVAL '1' HOUR)) " +
"GROUP BY user, window_start, window_end "
);
开窗聚合
以每一行数据为基准,计算它之前1小时内所有数据的平均值。就像在每一行上打开了一扇窗户、收集数据进行统计一样,这就是开窗函数。
开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果。而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。
SELECT <聚合函数> OVER (
[PARTITION BY <字段1>[, <字段2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...
- 开窗范围:要扩展多少行来聚合,这个范围由
BETWEEN <下界> AND <上界>
来定义。目前支持的上界只能是CURRENT ROW
,也就是定义一个从之前某一行到当前行的范围。
BETWEEN ... PRECEDING AND CURRENT ROW
- 范围间隔:以
RANGE
为前缀,就是基于ORDER BY
指定的时间字段去选取一个范围。一般就是当前行时间戳之前的一段时间。
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
- 行间隔:以
ROWS
为前缀,就是直接确定要选多少行。
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
应用实例 Top N
TOPN()
聚合函数相当于把一个表聚合成了另一个表,所以叫作表聚合函数。
普通 Top N
在 Flink SQL 中,是通过 OVER
聚合和一个条件筛选来实现 TopN 的。将一个特殊的聚合函数 ROW_NUMBER()
应用到 OVER
窗口上,统计出每一行排序后的行号,作为一个字段提取出来,然后再用 WHERE
子句筛选行号小于等于 N 的那些行返回。
SELECT ...
FROM (
SELECT ...,
ROW_NUMBER() OVER(
[PARTITION BY <字段1> [, <字段2> ... ]]
ORDER BY <排序字段1> [asc|desc] [, <排序字段2> [asc|desc] ... ]
) AS row_num
FROM ... )
WHERE row_num <= N [AND <其他条件>]
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 普通TopN,选取当前所有用户中浏览量最大的2个
Table topNTable = tableEnv.sqlQuery("SELECT user, cnt, row_num " +
"FROM (" +
"SELECT *, ROW_NUMBER() OVER (" +
"ORDER BY cnt DESC" +
") AS row_num" +
"FROM (SELECT user, COUNT(url) AS cnt FROM clickTable GROUP BY user)" +
") WHERE row_num <= 2");
tableEnv.toChangelogStream(topNTable).print("top2:");
窗口 Top N
实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量,然后将统计数据收集起来,按窗口进行分组,并按点击量大小降序排序,选取前 N 个作为结果返回。
先做一个窗口聚合,将窗口信息 window_start、window_end 连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的 TopN 那样定义 OVER
窗口了,按窗口分组,按点击量排序,用 ROW_NUMBER()
统计行号并筛选前 N 行就可以得到结果。
// 窗口TopN,统计一段时间内的活跃用户
String subQuery = "SELECT user, COUNT(url) AS cnt, window_start, window_end, " +
"FROM TABLE(" +
"TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
")" +
"GROUP BY user, window_start, window_end";
Table windowTopN = tableEnv.sqlQuery("SELECT user, cnt, row_num" +
"FROM (" +
"SELECT *, ROW_NUMBER() OVER(" +
"PARTITION BY window_start, window_end" +
"ORDER BY cnt DESC" +
") AS row_num" +
"FROM (" + subQuery + ")" +
") WHERE row_num <= 2");
tableEnv.toDataStream(windowTopN).print("top2:");
联结查询
常规联结查询
通过关键字 JOIN
来联结两个表,后面用关键字 ON
来指明联结条件。
在两个动态表的联结中,任何一侧表的插入或更改操作都会让联结的结果表发生改变。所以常规联结查询一般是更新查询。目前仅支持等值条件作为联结条件,也就是关键字 ON
后面必须是判断两表中字段相等的逻辑表达式。
等值内联结
内联结用 INNER JOIN
来定义,会返回两表中符合联接条件的所有行的组合,也就是笛卡尔积。
# 联结条件是订单数据的product_id和商品数据的id相等。
SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id
等值外联结
外联结会返回符合联结条件的所有行的笛卡尔积。还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外、右外和全外联结。
SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
FULL JOIN Product
ON Order.product_id = Product.id
间隔联结查询
间隔联结返回的,同样是符合约束条件的两条流中数据的笛卡尔积。只不过这里的约束条件除了常规的联结条件外,还多了一个时间间隔的限制。
- 两表的联结:间隔联结不需要用
JOIN
关键字,直接在FROM
后将要联结的两表列出来就可以,用逗 号分隔。 - 联结条件:联结条件用
WHERE
子句来定义,用一个等值表达式描述。 - 时间间隔限制:在
WHERE
子句中,联结条件后用AND
追加一个时间间隔的限制条件。这里分别用 ltime 和 rtime 表示左右表中的时间字段。ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
# 发货表Shipment,要求在收到订单后四个小时内发货
SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time -INTERVAL '4' HOUR AND s.ship_time
在流处理中,间隔联结查询只支持具有时间属性的仅追加表。
函数
在 SQL 中,可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是函数。Flink 的 TableAPI 和 SQL 同样提供了函数的功能。
TableAPI 中的函数是通过数据对象的方法调用来实现的,而 SQL 则是直接引用函数名称,传入数据作为参数。
系统函数
标量函数
标量是只有大小,没有方向的量。标量函数只对输入数据做转换操作,返回一个值的函数。另外,对于一些没有输入参数,直接得到唯一的结果的函数,也属于标量函数。
- 比较函数:比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。
- 逻辑函数:逻辑表达式,也就是用与、或、非将布尔类型的值连接起来,可以用判断语句进行真值判断,返回的还是一个布尔类型的值。
- 算数函数:包括用算术符号连接的运算和复杂的数学运算。
- 字符串函数:进行字符串处理的函数。
- 时间函数:进行与时间相关操作的函数。
聚合函数
聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。
COUNT(*)
:返回所有行的数量,统计个数。SUM([ALL|DISTINCT]expression)
:对某个字段进行求和操作,默认情況 下省略了关键字ALL
,表示对所有行求和,如果指定DISTINCT
,则会对数据进行去重,每个值只叠加一次。RANK()
:返回当前值在一组值中的排名。ROW_NUMBER()
:对一组值排序后,返回当前值得行号。
自定义函数
Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF 主要有以下几类:
- 标量函数:将输入的标量值转换成一个新的标量值。
- 表函数:将标量值转换成一个或多个新的行数据,也就是扩展成一个表。
- 聚合函数:将多行数据里的标量值转换成一个新的标量值。
- 表聚合函数:将多行数据里的标量值转换成一个或多个新的行数据。
整体调用流程
首先自定义对应 UDF 抽象类的实现,并在表环境中注册这个函数,然后就可以在 TableAPI 和 SQL 中调用了。
- 注册函数:调用表环境的
createTemporarySystemFunction()
方法,传入注册的函数名以及 UDF 类的 Class 对象。
// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
- 使用 Table API 调用函数:使用
call()
方法来调用自定义函数。
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
// TableAPI 也可以不注册函数,直接用内联的方式调用UDF
tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")));
- 在 SQL 中调用函数。
tableEnv.sqlQuery("SELECT MyFunction(myFied) FROM MyTable");
标量函数
// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 2.注册自定义标量函数
tableEnv.createTemporaryFunction("MyHash", MyHashFunction.class);
// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, MyHash(user) from clickTable");
// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 自定义实现ScalarFunction
public static class MyHashFunction extends ScalarFunction {
public int eval(String str) {
return str.hashCode();
}
}
表函数
// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 2.注册自定义表函数
tableEnv.createTemporaryFunction("MySplit", MySplit.class);
// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, url, word, length " +
"from clickTable, LATERAL TABLE(MySplit(url)) AS T(word, length)");
// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 实现自定义的表函数
public static class MySplit extends TableFunction<Tuple2<String, Integer>> {
public void eval(String str) {
String[] fields = str.split("\\?");
for (String field: fields) {
collect(Tuple2.of(field, field.length()));
}
}
}
聚合函数
// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 2.注册自定义聚合函数
tableEnv.createTemporaryFunction("WeightedAverage", WeightedAverage.class);
// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, WeightedAverage(ts, 1) as w_avg" +
"from clickTable group by user");
// 4.转换成流打印输出
tableEnv.toChangelogStream(resultTable).print();
// 单独定义一个累加器类型
public static class WeightedAccumulator {
public long sum = 0;
public int count = 0;
}
// 实现自定义的聚合函数,计算加权平均值
public static class WeightedAverage extends AggregateFunction<Long, WeightedAccumulator> {
@Override
public Long getValue(WeightedAccumulator weightedAccumulator) {
if (weightedAccumulator.count == 0) {
return null;
} else {
return weightedAccumulator.sum / weightedAccumulator.count;
}
}
@Override
public WeightedAccumulator createAccumulator() {
return new WeightedAccumulator();
}
// 累加计算的方法
public void accumulate(WeightedAccumulator accumulator, Long iValue, Integer iWeight) {
accumulator.sum += iValue * iWeight;
accumulator.count += iWeight;
}
}
表聚合函数
// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" `user` STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'datas/click.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
// 2.注册自定义表函数
tableEnv.createTemporaryFunction("Top2", Top2.class);
// 3.调用UDF进行查询转换
String windowAggQuery = "SELECT user, COUNT(url) AS cnt, window_start, window_end, " +
"FROM TABLE(" +
"TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
")" +
"GROUP BY user, window_start, window_end";
Table aggTable = tableEnv.sqlQuery(windowAggQuery);
Table resultTable = aggTable.groupBy($("window_end"))
.flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
.select($("window_end"), $("value"), $("rank"));
// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 单独定义一个累加器类型,包含了当前最大和第二大的数据
public static class Top2Accumulator {
public Long max;
public Long secondMax;
}
// 实现自定义表聚合函数
public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator top2Accumulator = new Top2Accumulator();
top2Accumulator.max = Long.MIN_VALUE;
top2Accumulator.secondMax = Long.MIN_VALUE;
return top2Accumulator;
}
// 定义一个更新累加器的方法
public void accumulate(Top2Accumulator accumulator, Long value) {
if (value > accumulator.max) {
accumulator.secondMax = accumulator.secondMax;
accumulator.max = value;
} else if (value > accumulator.secondMax) {
accumulator.secondMax = value;
}
}
// 输出结果,当前的top2
public void emitValue(Top2Accumulator accumulator, Collector<Tuple2<Long, Integer>> out {
if (accumulator.max != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.max, 1));
}
if (accumulator.secondMax != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.secondMax, 2));
}
}
}
标签:窗口,函数,tableEnv,Flink,TableAPI,ts,user,SQL
From: https://www.cnblogs.com/fireonfire/p/17023761.html