首页 > 数据库 >Flink:TableAPI 和 SQL

Flink:TableAPI 和 SQL

时间:2023-01-04 00:22:37浏览次数:45  
标签:窗口 函数 tableEnv Flink TableAPI ts user SQL

快速上手

引入依赖

要在代码中使用 TableAPI,必须引入相关的依赖。

这里的依赖是一个 Java 的“桥接器”,主要就是负责 TableAPI 和下层 DataStream API的连接支持,按照不同的语言分为 Java 版和 Scala 版。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

如果我们希望在本地的集成开发坏境里运行 TableAPI 和 SQL,还需要引入以下依赖。

这里主要添加的依赖是一个 “计划器”,它是 TableAPI 的核心组件,负责提供运行时环境,并生成程序的执行计划。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

简单示例

// 1.读取数据,得到DataStream
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        })
);

// 2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 3.将DataStream转换为Table
Table eventTable = tableEnv.fromDataStream(eventStream);

// 4.直接写SQL进行转换
Table resultTable = tableEnv.sqlQuery("select user, url from " + eventTable);

// 5.基于Table直接转换
Table resultTable1 = eventTable.select($("user"), $("url")).where($("user").isEqual("Alice"));

// 6.再转回去打印
tableEnv.toAppendStream(resultTable, EventNoTime.class).print("result");
tableEnv.toAppendStream(resultTable1, EventNoTime.class).print("result1");

基本 API

创建表环境

对于 Flink 这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用 TableAPI 和 SQL 需要一个特别的运行时环境(表环境),它主要负责:

  • 注册 Catalog 和表。
  • 执行 SQL 查询。
  • 注册用户自定义函数(UDF)。
  • DataStream 和表之间的转换。

这里的 Catalog 就是“目录”,主要用来管理所有数据库和表的元数据。通过 Catalog 可以方便地对数据库和表进行
查询的管理。

每个表和 SQL 的执行,都必须绑定在一个表环境中,可以通过调用静态的 crcate() 方法来创建一个表环境实例。方法需要传入一个环境的配置参数 EnvironmentSettings,它可以指定当前表环境的执行模式和计划器。

// 1.定义环境配置来创建表执行环境
// 1.1基于blink版本planner进行流处理
EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .inStreamingMode()
        .useBlinkPlanner()
        .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 1.2基于老版本planner进行流处理
EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
        .inStreamingMode()
        .useOldPlanner()
        .build();

TableEnvironment tableEnv1 = TableEnvironment.create(settings1);

// 1.3基于老版本planner进行批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);

// 1.4基于blink版本planner进行批处理
EnvironmentSettings settings2 = EnvironmentSettings.newInstance()
        .inBatchMode()
        .useBlinkPlanner()
        .build();
TableEnvironment tableEnv2 = TableEnvironment.create(settings2);

创建表

为了方便地查询表,表环境中会维护一个目录和表的对应关系。所以表都是通过 Catalog 来进行注册创建的。表在环境中有一个唯一的 ID,由三部分组成:目录名,数据库名,以及表名。

default_catalog.default_database.MyTable

如果想要自定义目录名和数据库名,可以在环境中设置。

tableEnv.useCatalog("custom_catalog");
tableEnv.useDatabase("custom_database");

具体创建表的方式,有通过连接器和虛拟表两种。

连接器

通过连接器连接到一个外部系统,然后定义出对应的表结构。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换,而当我们向这张表写入数据时,连接器就会将数据输出到外部系统中。

在代码中,可以调用表环境的 executeSql() 方法,可以传入一个 DDL 作为参数执行 SQL 操作。

tableEnv.executeSql("CREATE [TEMPORARY] TABLE Mytable ... WITH('connector' = ...)");
// 2.1创建表
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 2.2创建一张用于输出的表
String createOutDDL = "CREATE TABLE outTable (" +
        " `user` STRING," +
        " url STRING" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'output'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createOutDDL);

虚拟表

在环境中注册之后,就可以在 SQL 中直接使用这张表进行查询转换了。

Table newTable = tableEnv.sqlQuery("SELECT ... FROM Mytable ...");

由于 newTable 是一个 Table 对象,并没有在表环境中注册,所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用。

tableEnv.createTemporaryView("NewTable", newTable);

这里的注册其实是创建了一个虚拟表,与 SQL 中的视图类似。

查询表

执行 SQL 进行查询

Flink 基于 Apache Calcite 来提供对 SQL 的支持,Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具。

// 创建表环境
TableEnvironment tableEnv = ...;

// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH('connector' = ...)");

// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
	"SELECT user, url " +
  "FROM EventTable " +
  "WHERE user = 'Alice' "
);

我们也可以直接将查询的结果写入到己经注册的表中,这需要调用表环境的 executeSql() 方法来执行 DDL,传入的是一个 INSERT 语句。

// 注册表
tableEnv.executeSql("CREATE TABLE aliceVisitTable ... WITH('connector' = ...)");

// 将查询结果输出到aliceVisitTable中
tableEnv.executeSql(
  "INSERT INTO aliceVisitTable" +
	"SELECT user, url " +
  "FROM EventTable " +
  "WHERE user = 'Alice' "
);

调用 TableAPI 进行查询

由于 TableAPI 是基于 Table 的 Java 实例进行调用的,因此我们首先要得到表的 Java 对象。基于环境中已注册的表,可以通过表环境的 from() 方法非常容易地得到一个 Table 对象。

// eventTable是一个Table对象,EventTable是环境中注册的表名。
Table eventTable = tableEnv.from("EventTable");

Table maryClickTable = eventTable
  			 .where($("user").isEqual("Alice"))
  			 .select($("url"), $("user"));

流处理中的表

当我们将一个 Table 转换成 DataStream 时,有“仅插入流”和“更新日志流”两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。

关系型表/SQL 流处理
处理的数据对象 字段元组的有界集合 字段元组的无限序列
查询对数据的访问 可以访问到完整的数据输入 无法访问到完整的数据输入
查询终止条件 生成固定大小的结果集后终止 永不终止

动态表和持续查询

动态表

当流中有新数据到来,初始的表中会插入一行,而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”。

持续查询

由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”。

可以认为一次查询面对的数据集,就是当前输入动态表中的所有数据。这相当于是对输入动态表做了一个“快照”,当作有限数据集进行批处理。

持续查询的步骤:

  1. 流被转化为动态表。
  2. 对动态表进行持续查询,生成新的动态表。
  3. 生成的动态表转换为流。

持续查询

将流转换成动态表

为了能够使用 SQL 来做流处理,我们必须先把流转换成动态表。

如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变,只能在后面追加。所以我们其实是通过一个只有插入操作的更新日志流,来构建一个表。

将动态表转换为流

动态表也可以通过插入、更新和删除操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。

  • 仅追加流:仅通过插入更改来修改的动态表,可以直接转换为仅追加流。这个流中发出的数据,其实就是动态表中新增的每一行。
  • 撤回流:包含两类消息的流,添加消息和撤回消息。具体的编码规则是:插入操作编码为 add 消息,删除操作编码为 retract 消息。而更新操作则编码为被更改行的 retract 消息和更新后新行的 add 消息。
  • 更新插入流:包含两类消息,更新插入消息和删除消息。插入操作和更新操作,统一被编码为 upsert 消息,而删除操作则被编码为 delete 消息。

时间属性和窗口

基于时间的操作需要定义相关的时间语义和时间数据来源的信息。在 TableAPI 和 SQL 中,会给表单独提供一个逻辑上的时间字段,在表处理程序中专门用来指示时间。

时间属性的数据类型为 TIMESTAMP,可以直接访问并且进行计算。按照时间语义的不同,分成事件时间和处理时间。

事件时间

在事件时间语义下,允许表处理程序根据每个数据包含的时间戳来生成结果。通过设置水位线来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。

事件时间可以在创建表 DDL 中定义,也可以在数据流和表的转换中定义。

创建表 DDL 中定义

在创建表的 DDL 中,可以增加一个字段,通过 WATERMARK 语句来定义事件时间属性。WATERMARK 语向主要用来定义水位线的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的
延迟时间。

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);

这里把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。

如果原始的时间戳就是一个长整型的毫秒数,这时就需要定义一个字段来表示事件的时间属性,类型定义为 TIMESTAMP_LTZ 会更方便。

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts BIGINT,
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

这里我们另外定义了一个字段 ts_ltz,是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的。

// 在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

在数据流转换为表时定义

调用 fromDataStream() 方法创建表时,可以追加参数来定义表中的宇段结构。这时可以给某个字段加上 rowtime 后缀,就表示将当前字段为事件时间属性。这个字段会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP

这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该在 DataStream 上就定义好了。由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属性解析成不带时区的 TIMESTAMP 类型,所有的时间值都被当作 UTC 标准时间。

// 流中数据类型为二元组Tuple2,包含两个字段,需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimeStampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tableEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());
// 在流转换为table的时候定义时间属性
SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                })
        );

Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
clickTable.printSchema();

处理时间

处理时间就是系统时间,使用时不需要提取时间戳和生成水位线。因此在定义处理时间属性时,必须要额外声明一个
宇段,专门用来保存当前的处理时间。

创建表 DDL 中定义

在创建表的 DDL 中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME() 函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);

在数据流转换为表时定义

调用 fromDataStream() 方法创建表时,可以用 proctime() 后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个己有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

DataStream<Tuple2<String, String>> stream = inputStream.assignTimeStampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tableEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());

窗口

分组窗口

常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现。具体在 SQL 中就是调用 TUMBLE()HOP()SESSION(),传入时间属性字段、窗口大小等参数就可以了。

TUBLE(ts, INTERVAL '1' HOUR)

在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。

Table result = tableEnv.sqlQuery(
        "SELECT " +
                "user, " +
                "TUMBLE_END(ts, INTERVAL '1' HOUR) AS endT, " +
                "COUNT(url) AS CNT " +
                "FROM EventTable " +
                "GROUP BY " + // 使用窗口和用户名分组
                "user, " +
                "TUMBLE(ts, INTERVAL '1' HOUR)"  // 定义1小时滚动窗口
);

这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数 COUNT() 对分组数据的个数进行了聚合统计,并将结果字段重命名为 cnt,用 TUMBLE_END 函数获取滚动窗口的结束时间,重命名为 endT 提取出来。

窗口表值函数

窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数可以看作是返回一个表的函数。

  • 滚动窗口(Tumbling Windows):长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。只有一个核心参数就是窗口大小,在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入,另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
  • 滑动窗口(Hop Windows,跳跃窗口):可以通过设置滑动步长来控制统计输出的频率。除了也要传入表名、时间属性外,还需要传入窗口大小和滑动步长两个参数。
# 基于时间属性ts,在表EventTable上创建了大小为1小时的滑动窗口,每5分钟滑动一次。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS)
  • 累积窗口(Cumulate Windows):在一定的统计周期内进行累积计算,累积窗口中有两个核心的参数:最大窗口长度和累积步长。
# 基手时间属性ts,在表EventTable上定义了一个统计周期为1天、累积步长为1小时的累积窗口。
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
  • 会话窗口(Session Windows)

在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外3个列:窗口起始点、窗口结束点、窗口时间(窗口中能够包含数据的最大时间戳)。

聚合查询

在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值:比如求和、最大最小值、平均值等等,这种操作叫作聚合查询。

分组聚合

可以通过 GROUP BY 子句来指定分组的键,从而对数据按照某个字段做一个分组统计。

SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user

在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表。每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流或更新插入流的编码方式。

在持续查询的过程中,由于用于分组的键可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,可以在表环境中设置状态的生存时间。

// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 设置状态保存时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));

// 也可以直接设置配置项table.exec.state.ttl
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");

窗口聚合

窗口聚合也需要调用 SUM()MAX()MIN()COUNT() 一类的聚合函数,通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。

// 以ts作为时间属性字段、基于EventTable定义了1小时的滚动窗口,希望统计出每小时每个用户点击url的次数。用来分组的字段是用户名user,以及表示窗口的window_start和window_end
Table result = tableEnv.sqlQuery(
        "SELECT " +
                "user, " +
                "window_end AS endT, " +
                "COUNT(url) AS cnt " +
                "FROM TABLE( " +
                "TUMBLE( TABLE EventTable, " +
                "DESCRIPTOR(ts), " +
                "INTERVAL '1' HOUR)) " +
                "GROUP BY user, window_start, window_end "
);

开窗聚合

以每一行数据为基准,计算它之前1小时内所有数据的平均值。就像在每一行上打开了一扇窗户、收集数据进行统计一样,这就是开窗函数。

开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果。而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

SELECT <聚合函数> OVER (
  [PARTITION BY <字段1>[, <字段2>, ...]]
  ORDER BY <时间属性字段>
  <开窗范围>),
  ...
FROM ...
  • 开窗范围:要扩展多少行来聚合,这个范围由 BETWEEN <下界> AND <上界> 来定义。目前支持的上界只能是 CURRENT ROW,也就是定义一个从之前某一行到当前行的范围。
BETWEEN ... PRECEDING AND CURRENT ROW
  • 范围间隔:以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围。一般就是当前行时间戳之前的一段时间。
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 行间隔:以 ROWS 为前缀,就是直接确定要选多少行。
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

应用实例 Top N

TOPN() 聚合函数相当于把一个表聚合成了另一个表,所以叫作表聚合函数。

普通 Top N

在 Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现 TopN 的。将一个特殊的聚合函数 ROW_NUMBER() 应用到 OVER 窗口上,统计出每一行排序后的行号,作为一个字段提取出来,然后再用 WHERE 子句筛选行号小于等于 N 的那些行返回。

SELECT ...
FROM (
  SELECT ...,
  ROW_NUMBER() OVER(
    [PARTITION BY <字段1> [, <字段2> ... ]]
    ORDER BY <排序字段1> [asc|desc] [, <排序字段2> [asc|desc] ... ]
  ) AS row_num
  FROM ... )
WHERE row_num <= N [AND <其他条件>]
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 普通TopN,选取当前所有用户中浏览量最大的2个
Table topNTable = tableEnv.sqlQuery("SELECT user, cnt, row_num " +
        "FROM (" +
        "SELECT *, ROW_NUMBER() OVER (" +
        "ORDER BY cnt DESC" +
        ") AS row_num" +
        "FROM (SELECT user, COUNT(url) AS cnt FROM clickTable GROUP BY user)" +
        ") WHERE row_num <= 2");

tableEnv.toChangelogStream(topNTable).print("top2:");

窗口 Top N

实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量,然后将统计数据收集起来,按窗口进行分组,并按点击量大小降序排序,选取前 N 个作为结果返回。

先做一个窗口聚合,将窗口信息 window_start、window_end 连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的 TopN 那样定义 OVER 窗口了,按窗口分组,按点击量排序,用 ROW_NUMBER() 统计行号并筛选前 N 行就可以得到结果。

// 窗口TopN,统计一段时间内的活跃用户
String subQuery = "SELECT user, COUNT(url) AS cnt, window_start, window_end, " +
        "FROM TABLE(" +
        "TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
        ")" +
        "GROUP BY user, window_start, window_end";

Table windowTopN = tableEnv.sqlQuery("SELECT user, cnt, row_num" +
        "FROM (" +
        "SELECT *, ROW_NUMBER() OVER(" +
        "PARTITION BY window_start, window_end" +
        "ORDER BY cnt DESC" +
        ") AS row_num" +
        "FROM (" + subQuery + ")" +
        ") WHERE row_num <= 2");

tableEnv.toDataStream(windowTopN).print("top2:");

联结查询

常规联结查询

通过关键字 JOIN 来联结两个表,后面用关键字 ON 来指明联结条件。

在两个动态表的联结中,任何一侧表的插入或更改操作都会让联结的结果表发生改变。所以常规联结查询一般是更新查询。目前仅支持等值条件作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。

等值内联结

内联结用 INNER JOIN 来定义,会返回两表中符合联接条件的所有行的组合,也就是笛卡尔积。

# 联结条件是订单数据的product_id和商品数据的id相等。
SELECT * 
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id

等值外联结

外联结会返回符合联结条件的所有行的笛卡尔积。还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外、右外和全外联结。

SELECT * 
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id

SELECT * 
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id

SELECT * 
FROM Order
FULL JOIN Product
ON Order.product_id = Product.id

间隔联结查询

间隔联结返回的,同样是符合约束条件的两条流中数据的笛卡尔积。只不过这里的约束条件除了常规的联结条件外,还多了一个时间间隔的限制。

  • 两表的联结:间隔联结不需要用 JOIN 关键字,直接在 FROM 后将要联结的两表列出来就可以,用逗 号分隔。
  • 联结条件:联结条件用 WHERE 子句来定义,用一个等值表达式描述。
  • 时间间隔限制:在 WHERE 子句中,联结条件后用 AND 追加一个时间间隔的限制条件。这里分别用 ltime 和 rtime 表示左右表中的时间字段。
    • ltime = rtime
    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
# 发货表Shipment,要求在收到订单后四个小时内发货
SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time -INTERVAL '4' HOUR AND s.ship_time

在流处理中,间隔联结查询只支持具有时间属性的仅追加表。

函数

在 SQL 中,可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是函数。Flink 的 TableAPI 和 SQL 同样提供了函数的功能。

TableAPI 中的函数是通过数据对象的方法调用来实现的,而 SQL 则是直接引用函数名称,传入数据作为参数。

系统函数

标量函数

标量是只有大小,没有方向的量。标量函数只对输入数据做转换操作,返回一个值的函数。另外,对于一些没有输入参数,直接得到唯一的结果的函数,也属于标量函数。

  • 比较函数:比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。
  • 逻辑函数:逻辑表达式,也就是用与、或、非将布尔类型的值连接起来,可以用判断语句进行真值判断,返回的还是一个布尔类型的值。
  • 算数函数:包括用算术符号连接的运算和复杂的数学运算。
  • 字符串函数:进行字符串处理的函数。
  • 时间函数:进行与时间相关操作的函数。

聚合函数

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。

  • COUNT(*):返回所有行的数量,统计个数。
  • SUM([ALL|DISTINCT]expression):对某个字段进行求和操作,默认情況 下省略了关键字 ALL,表示对所有行求和,如果指定 DISTINCT,则会对数据进行去重,每个值只叠加一次。
  • RANK():返回当前值在一组值中的排名。
  • ROW_NUMBER():对一组值排序后,返回当前值得行号。

自定义函数

Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF 主要有以下几类:

  • 标量函数:将输入的标量值转换成一个新的标量值。
  • 表函数:将标量值转换成一个或多个新的行数据,也就是扩展成一个表。
  • 聚合函数:将多行数据里的标量值转换成一个新的标量值。
  • 表聚合函数:将多行数据里的标量值转换成一个或多个新的行数据。

整体调用流程

首先自定义对应 UDF 抽象类的实现,并在表环境中注册这个函数,然后就可以在 TableAPI 和 SQL 中调用了。

  • 注册函数:调用表环境的 createTemporarySystemFunction() 方法,传入注册的函数名以及 UDF 类的 Class 对象。
// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
  • 使用 Table API 调用函数:使用 call() 方法来调用自定义函数。
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

// TableAPI 也可以不注册函数,直接用内联的方式调用UDF
tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")));
  • 在 SQL 中调用函数。
tableEnv.sqlQuery("SELECT MyFunction(myFied) FROM MyTable");

标量函数

// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 2.注册自定义标量函数
tableEnv.createTemporaryFunction("MyHash", MyHashFunction.class);

// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, MyHash(user) from clickTable");

// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 自定义实现ScalarFunction
public static class MyHashFunction extends ScalarFunction {
    public int eval(String str) {
        return str.hashCode();
    }
}

表函数

// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 2.注册自定义表函数
tableEnv.createTemporaryFunction("MySplit", MySplit.class);

// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, url, word, length " +
        "from clickTable, LATERAL TABLE(MySplit(url)) AS T(word, length)");

// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 实现自定义的表函数
public static class MySplit extends TableFunction<Tuple2<String, Integer>> {
    public void eval(String str) {
        String[] fields = str.split("\\?");
        for (String field: fields) {
            collect(Tuple2.of(field, field.length()));
        }
    }
}

聚合函数

// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 2.注册自定义聚合函数
tableEnv.createTemporaryFunction("WeightedAverage", WeightedAverage.class);

// 3.调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user, WeightedAverage(ts, 1) as w_avg" +
        "from clickTable group by user");

// 4.转换成流打印输出
tableEnv.toChangelogStream(resultTable).print();
// 单独定义一个累加器类型
public static class WeightedAccumulator {
    public long sum = 0;
    public int count = 0;
}

// 实现自定义的聚合函数,计算加权平均值
public static class WeightedAverage extends AggregateFunction<Long, WeightedAccumulator> {
    @Override
    public Long getValue(WeightedAccumulator weightedAccumulator) {
        if (weightedAccumulator.count == 0) {
            return null;
        } else {
            return weightedAccumulator.sum / weightedAccumulator.count;
        }
    }

    @Override
    public WeightedAccumulator createAccumulator() {
        return new WeightedAccumulator();
    }

    // 累加计算的方法
    public void accumulate(WeightedAccumulator accumulator, Long iValue, Integer iWeight) {
        accumulator.sum += iValue * iWeight;
        accumulator.count += iWeight;
    }
}

表聚合函数

// 1.在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
        " `user` STRING," +
        " url STRING," +
        " ts BIGINT," +
        " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000))," +
        " WATERMARK FOR et AS et - INTERVAL '1' SECOND" +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'datas/click.txt'," +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

// 2.注册自定义表函数
tableEnv.createTemporaryFunction("Top2", Top2.class);

// 3.调用UDF进行查询转换
String windowAggQuery = "SELECT user, COUNT(url) AS cnt, window_start, window_end, " +
        "FROM TABLE(" +
        "TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
        ")" +
        "GROUP BY user, window_start, window_end";
Table aggTable = tableEnv.sqlQuery(windowAggQuery);

Table resultTable = aggTable.groupBy($("window_end"))
        .flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
        .select($("window_end"), $("value"), $("rank"));

// 4.转换成流打印输出
tableEnv.toDataStream(resultTable).print();
// 单独定义一个累加器类型,包含了当前最大和第二大的数据
public static class Top2Accumulator {
    public Long max;
    public Long secondMax;
}

// 实现自定义表聚合函数
public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator> {
    @Override
    public Top2Accumulator createAccumulator() {
        Top2Accumulator top2Accumulator = new Top2Accumulator();
        top2Accumulator.max = Long.MIN_VALUE;
        top2Accumulator.secondMax = Long.MIN_VALUE;
        return top2Accumulator;
    }

    // 定义一个更新累加器的方法
    public void accumulate(Top2Accumulator accumulator, Long value) {
        if (value > accumulator.max) {
            accumulator.secondMax = accumulator.secondMax;
            accumulator.max = value;
        } else if (value > accumulator.secondMax) {
            accumulator.secondMax = value;
        }
    }

    // 输出结果,当前的top2
    public void emitValue(Top2Accumulator accumulator, Collector<Tuple2<Long, Integer>> out {
        if (accumulator.max != Long.MIN_VALUE) {
            out.collect(Tuple2.of(accumulator.max, 1));
        }
        if (accumulator.secondMax != Long.MIN_VALUE) {
            out.collect(Tuple2.of(accumulator.secondMax, 2));
        }
    }
}

标签:窗口,函数,tableEnv,Flink,TableAPI,ts,user,SQL
From: https://www.cnblogs.com/fireonfire/p/17023761.html

相关文章

  • MySQL记录锁、间隙锁、临键锁
    最近要在公司内做一次技术分享,思来想去不知道该分享些什么,最后在朋友的提示下,准备分享一下MySQL的InnoDB引擎下的事务幻读问题与解决方案--LBCC&MVCC。经过好几天的熬夜通......
  • spring整合Mybatis | Postgresql为例
    1.创建配置文件jdbc.propertiesjdbc.url=jdbc:postgresql://localhost:5432/postgis_hy?useSSL=falsejdbc.username=postgresjdbc.password=arcgis2.相关依赖<de......
  • 从零开始学 MySQL -- 数据库和数据表操作
    阅读本文大概需要7 分钟前言上篇文章我们学习了SELECT语句,今天我们学习下核心的内容,学习并实践如何对数据库表和表中的内容做修改,删除,重命名等操作。(想看看周末还有多少......
  • 从零开始学习 MySQL 系列--索引、视图、导入和导出
    前言上篇文章我们学习了数据库和数据表操作语句,今天我们学习下数据库索引,视图,导入和导出的知识。作为基础篇,不会涉及到关于索引和视图的高级应用和核心概念,但是基本操作大家......
  • Mysql 访问视图却报ERROR 1356 (HY000)错误
    目录1.适用范围2.问题概述3.问题原因3.1.通过root用户查询问题仍然存在,且所有视图问题都是相同的。3.2.root自己的创建的视图可以正常访问3.3.查看用户权限3.4.以为是权......
  • Mysql安装
    1、windows安装mysql有两种安装方式,一种是使用exe安装程序去安装,一种是直接解压安装。推荐使用解压安装的方式,因为exe程序安装的方式,安装过程中会和操作系统有过多的关......
  • Mysql读写分离
    MySQL主从复制介绍:MySQL主从复制是一个异步的复制过程,底层是基于Mysql数据库自带的二进制日志功能。就是一台或多台MySOL数据库(slave,即从库)从另一台MySOL数据库(master......
  • Centos7离线安装Mysql8(rpm安装)
    1.下载:        官网下载: MySQL::DownloadMySQLCommunityServer2.解压将下载好的tar文件放到centos中,目录文件夹名称自定义,解压后得到:[root@localhost......
  • Flink mini-batch "引发" 的乱序问题
    问题描述近期业务反馈,开启了mini-batch之后,出现了数据不准的情况,关掉了mini-batch之后,就正常了,因此业务方怀疑,是不是Flink的mini-batch存在bug?问题排查......
  • SqlServer的substring用法
    SUBSTRING(expression,start,length) 参数expression字符串、二进制字符串、文本、图像、列或包含列的表达式。请勿使用包含聚合函数的表达式。 start整数......