Flink 系列文章
[1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接]
[13、Flink 的table api与sql的基本概念、通用api介绍及入门示例] [14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性] [15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置] [16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)] [16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)] [16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)] [16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)]
[16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)]
[20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上]
[22、Flink 的table api与sql之创建表的DDL] [24、Flink 的table api与sql之Catalogs]
[26、Flink 的SQL之概览与入门示例] [27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)] [27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-1)] [27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-2)] [27、Flink 的SQL之SELECT (窗口函数 和 窗口聚合)介绍及详细示例(3)] [27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(4)] [27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(5)] [27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(6)]
[30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)]
[41、Flink之Hive 方言介绍及详细示例] [42、Flink 的table api与sql之Hive Catalog] [43、Flink之Hive 读写及详细验证示例] [44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的]
(文章目录)
本文简单的介绍了Flink 的join操作,并以具体的例子展示join的使用。 本文依赖flink和kafka集群、mysql能正常使用。 本文示例实在flink 1.17版本上运行的。
二、Joins
Flink SQL 支持对动态表进行复杂而灵活的连接操作。有几种不同类型的联接可以解决查询可能需要的各种语义。
默认情况下,联接顺序不优化。表按照它们在 FROM 子句中指定的顺序进行连接。可以通过首先列出更新频率最低的表和最后列出更新频率最高的表来调整联接查询的性能。确保按不产生交叉联接(笛卡尔积)的顺序指定表,这些表不受支持,并且会导致查询失败。
1、常规join
常规联接是最通用的联接类型,其中任何新记录或对联接任一侧的更改都是可见的,并会影响整个联接结果。例如,如果左侧有一条新记录,则当用户 ID 等于时,它将与右侧的所有以前和将来的记录联接。
Flink SQL> select * from alan_user_t1;
+----+-------------+--------------------------------+----------------------+
| op | id | name | age |
+----+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 |
| +I | 2 | 'alanchan' | 19 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 4 | 'alan_chan' | 19 |
Flink SQL> select * from alan_user_t2;
+----+-------------+--------------------------------+----------------------+
| op | id | name | age |
+----+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 4 | 'alan_chan' | 19 |
Flink SQL> SELECT alan_user_t1.* FROM alan_user_t1
> INNER JOIN alan_user_t2
> ON alan_user_t1.id = alan_user_t2.id
> ;
+----+-------------+--------------------------------+----------------------+
| op | id | name | age |
+----+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 4 | 'alan_chan' | 19 |
对于流式处理查询,常规联接的语法是最灵活的,允许任何类型的更新(插入、更新、删除)输入表。但是,此操作具有重要的操作意义:它需要将连接输入的两端永远保持在 Flink 状态。因此,计算查询结果所需的状态可能会无限增长,具体取决于所有输入表和中间连接结果的不同输入行数。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。
关于ttl设置示例请参考文章 43、Flink之Hive 读写及详细验证示例中的【2)、Temporal Join 最新的表】的示例。
对于流式处理查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供空闲状态保留时间,以防止状态大小过大。有关详细信息,请参阅空闲状态保留时间。
1)、INNER Equi-JOIN
返回受连接条件限制的简单笛卡尔积。目前,仅支持等连接,即至少具有一个具有相等谓词的连取条件的连接。不支持任意交叉或θ连接。
Flink SQL> SELECT alan_user_t1.* FROM alan_user_t1
> INNER JOIN alan_user_t2
> ON alan_user_t1.id = alan_user_t2.id
> ;
+----+-------------+--------------------------------+----------------------+
| op | id | name | age |
+----+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 |
| +I | 4 | 'alan_chan' | 19 |
2)、OUTER Equi-JOIN
返回限定笛卡尔积中的所有行(即,通过其连接条件的所有组合行),以及外部表中连接条件与另一个表的任何行不匹配的每行的一个副本。Flink 支持 LEFT、RIGHT 和 FULL 外部连接。目前,仅支持等连接,即至少具有一个具有相等谓词的连取条件的连接。不支持任意交叉或θ连接。
- left join示例
Flink SQL> SELECT * FROM alan_user_t1
> LEFT JOIN alan_user_t2
> ON alan_user_t1.id = alan_user_t2.id;
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| op | id | name | age | id0 | name0 | age0 |
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 | (NULL) | (NULL) | (NULL) |
| -D | 1 | 'alan' | 18 | (NULL) | (NULL) | (NULL) |
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | 2 | 'alanchan' | 19 | (NULL) | (NULL) | (NULL) |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| +I | 4 | 'alan_chan' | 19 | 4 | 'alan_chan' | 19 |
- right join示例
Flink SQL> SELECT * FROM alan_user_t1
> RIGHT JOIN alan_user_t2
> ON alan_user_t1.id = alan_user_t2.id;
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| op | id | name | age | id0 | name0 | age0 |
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | (NULL) | (NULL) | (NULL) | 3 | 'alanchanchn' | 20 |
| -D | (NULL) | (NULL) | (NULL) | 3 | 'alanchanchn' | 20 |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| +I | (NULL) | (NULL) | (NULL) | 4 | 'alan_chan' | 19 |
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| -D | (NULL) | (NULL) | (NULL) | 4 | 'alan_chan' | 19 |
| +I | 4 | 'alan_chan' | 19 | 4 | 'alan_chan' | 19 |
- full outer join示例
Flink SQL> SELECT * FROM alan_user_t1
> FULL OUTER JOIN alan_user_t2
> ON alan_user_t1.id = alan_user_t2.id;
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| op | id | name | age | id0 | name0 | age0 |
+----+-------------+--------------------------------+----------------------+-------------+--------------------------------+----------------------+
| +I | 1 | 'alan' | 18 | (NULL) | (NULL) | (NULL) |
| -D | 1 | 'alan' | 18 | (NULL) | (NULL) | (NULL) |
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | 2 | 'alanchan' | 19 | (NULL) | (NULL) | (NULL) |
| +I | (NULL) | (NULL) | (NULL) | 3 | 'alanchanchn' | 20 |
| -D | (NULL) | (NULL) | (NULL) | 3 | 'alanchanchn' | 20 |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| +I | (NULL) | (NULL) | (NULL) | 4 | 'alan_chan' | 19 |
| +I | 1 | 'alan' | 18 | 1 | 'alan' | 18 |
| +I | 3 | 'alanchanchn' | 20 | 3 | 'alanchanchn' | 20 |
| -D | (NULL) | (NULL) | (NULL) | 4 | 'alan_chan' | 19 |
| +I | 4 | 'alan_chan' | 19 | 4 | 'alan_chan' | 19 |
2、Interval Joins
返回受连接条件和时间约束限制的简单笛卡尔积。间隔连接至少需要一个等连接谓词和一个连接条件,该条件在两侧限制时间。两个适当的范围谓词可以定义这样的条件(<、<=、>=、>)、BETWEEN 谓词或单个相等谓词,用于比较两个输入表的相同类型(即处理时间或事件时间)的时间属性。
例如,如果订单在收到订单五分钟后发货,则此查询将联接所有订单及其相应的货件。
---1、建表
CREATE TABLE alan_order_t (
order_id STRING,
price DECIMAL(32,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'alan_order_t_topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'format' = 'csv'
);
CREATE TABLE alan_shipments_t (
ship_id STRING,
order_id STRING,
price DECIMAL(32,2),
ship_time TIMESTAMP(3),
WATERMARK FOR ship_time AS ship_time - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'alan_shipments_t_topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'format' = 'csv'
);
---2、查询数据(插入数据步骤忽略)
Flink SQL> select * from alan_order_t;
+----+--------------------------------+------------------------------------+-------------------------+
| op | order_id | price | order_time |
+----+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 210.00 | 2023-09-18 09:40:00.000 |
| +I | 2 | 2010.00 | 2023-09-18 09:40:10.000 |
| +I | 3 | 20010.00 | 2023-09-18 09:40:20.000 |
^CQuery terminated, received a total of 3 rows
Flink SQL> select * from alan_shipments_t;
+----+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | ship_id | order_id | price | ship_time |
+----+--------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 1 | 21.00 | 2023-09-18 09:45:00.000 |
| +I | 2 | 3 | 201.00 | 2023-09-18 09:45:10.000 |
| +I | 3 | 2 | 2001.00 | 2023-09-18 09:45:20.000 |
---3、查询订单列表
Flink SQL> SELECT o.*
> FROM alan_order_t o, alan_shipments_t s
> WHERE o.order_id = s.order_id
> AND o.order_time BETWEEN s.ship_time - INTERVAL '5' MINUTE AND s.ship_time;
+----+--------------------------------+------------------------------------+-------------------------+
| op | order_id | price | order_time |
+----+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 210.00 | 2023-09-18 09:40:00.000 |
| +I | 3 | 20010.00 | 2023-09-18 09:40:20.000 |
以下谓词是有效间隔连接条件的示例:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
对于流式处理查询,与常规联接相比,间隔联接仅支持具有时间属性的append-only tables。由于时间属性是准单调递增的,Flink 可以从其状态中删除旧值,而不会影响结果的正确性。
3、时态表 Joins
时态表是随时间演变的表,在 Flink 中也称为动态表。时态表中的行与一个或多个时态周期相关联,并且所有 Flink 表都是时态(动态的)。临时表包含一个或多个版本化表快照,它可以是跟踪更改的更改历史记录表(例如,数据库更改日志,包含所有快照),也可以是实现更改的更改维度表(例如,包含最新快照的数据库表)。
1)、Event Time Temporal Join
Event Time 时态联接允许针对versioned table联接。这意味着可以通过更改元数据来丰富表,并在某个时间点检索其值。
临时联接采用任意表(左侧输入/探测站点),并将每一行与versioned table中相应行的相关版本相关联(右侧输入/生成端)。Flink 使用 FOR SYSTEM_TIME AS OF 的 SQL 语法来执行 SQL:2011 标准中的此操作。临时联接的语法如下;
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
使用event-time attribute(即行时间属性),可以检索过去某个时间点的键值。这允许在公共时间点联接两个表。版本化表将存储自上次水印以来的所有版本(按时间标识)。
例如,假设我们有一个订单表,每个订单都有不同货币的价格。要将此表正确规范化为单一货币(如美元),每个订单都需要从下订单的时间点开始以正确的货币兑换率联接。
----1、建表
Flink SQL> CREATE TABLE orders (
> order_id STRING,
> price DECIMAL(32,2),
> currency STRING,
> order_time TIMESTAMP(3) METADATA FROM 'timestamp',
> WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'orders_topic',
> 'scan.startup.mode' = 'earliest-offset',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE currency_rates (
> update_time TIMESTAMP(3) METADATA FROM 'timestamp',
> currency STRING,
> conversion_rate DECIMAL(32, 2),
> WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
> PRIMARY KEY(currency) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'currency_rates_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-json'
> );
[INFO] Execute statement succeed.
----2、插入测试数据
----2.1、插入currency_rates测试数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic currency_rates_topic
>{"before": null,"after": {"currency": "RMB","conversion_rate": 7.18},"source": {"version": "1.13.5"},"op": "c","ts_ms": 1695006498827,"transaction": null}
>{"before": {"currency": "RMB","conversion_rate": 7.18},"after": {"currency": "RMB","conversion_rate": 7.19},"source": {"version": "1.13.5"},"op": "u","ts_ms": 1695006531621,"transaction": null}
>{"before": null,"after": {"currency": "EUR","conversion_rate": 1.14},"source": {"version": "1.13.5"},"op": "c","ts_ms": 1695006548509,"transaction": null}
>{"before": {"currency": "EUR","conversion_rate": 1.14},"after": {"currency": "CUR","conversion_rate": 1.16},"source": {"version": "1.13.5"},"op": "u","ts_ms": 1695006568629,"transaction": null}
>
Flink SQL> select * from currency_rates;
+----+-------------------------+--------------------------------+------------------------------------+
| op | update_time | currency | conversion_rate |
+----+-------------------------+--------------------------------+------------------------------------+
| +I | (NULL) | RMB | 7.18 |
| -U | (NULL) | RMB | 7.18 |
| +U | (NULL) | RMB | 7.19 |
| +I | (NULL) | EUR | 1.14 |
| -U | (NULL) | EUR | 1.14 |
| +U | (NULL) | CUR | 1.16 |
----2.2、插入orders测试数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic orders_topic
>1,21,RMB
>2,201,RMB
>3,210,EUR
>4,121,EUR
----3、验证
Flink SQL> SELECT
> order_id,
> price,
> orders.currency,
> conversion_rate,
> order_time
> FROM orders
> LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
> ON orders.currency = currency_rates.currency;
+----+--------------------------------+------------------------------------+--------------------------------+------------------------------------+-------------------------+
| op | order_id | price | currency | conversion_rate | order_time |
+----+--------------------------------+------------------------------------+--------------------------------+------------------------------------+-------------------------+
| +I | 1 | 21.00 | RMB | (NULL) | 2023-09-18 05:42:45.001 |
| +I | 2 | 201.00 | RMB | 7.18 | 2023-09-18 05:43:09.608 |
| +I | 3 | 210.00 | EUR | (NULL) | 2023-09-18 05:43:15.937 |
| +I | 4 | 121.00 | EUR | (NULL) | 2023-09-18 05:43:21.696 |
------------------以下是官网示例
-- Create a table of orders. This is a standard append-only dynamic table.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
'。。。'
);
-- Define a versioned table of currency rates.
-- This could be from a change-data-capture(CDC)
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table.
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
orders.currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
order_id price currency conversion_rate order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
1、事件时态联接由左右两侧的水印触发。INTERVAL 时间减法用于等待延迟事件,以确保联接满足预期。请确保连接的两面都正确设置了水印,该处是使用和验证的重要条件,否则不能正确的显示结果。
2、事件-时间连接要求时态连接条件的等价条件中包含的主键,例如,表 currency_rates 的主键 currency_rates.currency 在条件 orders.currency = currency_rates.currency 中受到约束。
与常规联接相比,尽管生成端发生了更改,但以前的临时表结果不会受到影响。与 interval joins相比,时态表联接不定义将联接记录的时间窗口。探测端的记录始终在 time 属性指定的时间与生成端的版本联接。因此,构建端的行可能是任意旧的。随着时间的流逝,不再需要的记录版本(对于给定的主键)将从状态中删除。
2)、Processing Time Temporal Join
processing time时态表联接使用处理时间属性将行与外部版本化表中键的最新版本相关联。
根据定义,使用processing-time属性,联接将始终返回给定键的最新值。可以将查找表视为一个简单的HashMap<K,V>,它存储了构建端的所有记录。这种连接的强大之处在于,当无法将表具体化为 Flink 中的动态表时,它允许 Flink 直接针对外部系统工作。
以下处理时时态表联接示例显示了应与表 LatestRate 联接的仅追加表订单。最新速率是使用最新速率实现的维度表(例如 HBase 表)。在时间 10:15、10:30、10:52,最新费率的内容如下所示:
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== changed from 114 to 116
Yen 1
LastestRates 在 10:15 和 10:30 相等. Euro rate 发生了变化,即从 114 to 116 在10:52。
Orders是一个append-only表,表示给定金额和给定货币的付款。例如,在 10:15 有一个金额为 2 欧元的订单。
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== arrived at time 10:15
1 US Dollar <== arrived at time 10:30
2 Euro <== arrived at time 10:52
鉴于这些表格,我们想计算转换为通用货币的所有订单。
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== arrived at time 10:15
1 US Dollar 102 102 <== arrived at time 10:30
2 Euro 116 232 <== arrived at time 10:52
目前,尚不支持临时联接与任何视图/表的最新版本的临时联接中使用的 FOR SYSTEM_TIME AS OF 语法,可以使用临时表函数语法,如下所示:
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
不支持临时联接与任何表/视图的最新版本的临时联接中使用的 FOR SYSTEM_TIME AS OF 语法的原因只是语义上的考虑,因为左流的联接处理不会等待时态表的完整快照,这可能会误导生产环境中的用户。时态表函数的处理时间时态连接也存在同样的语义问题,但它已经存在了很长时间,因此我们从兼容性的角度支持它。
对于处理时间,结果不是确定性的。处理时态联接最常用于使用外部表(即维度表)丰富流。
与常规联接相比,尽管生成端发生了更改,但以前的临时表结果不会受到影响。与间隔连接相比,临时表连接不定义记录连接的时间窗口,即旧行不存储在状态中。
3)、Temporal Table Function Join
使用临时表函数联接表的语法与使用表函数联接中的语法相同。
目前仅支持与临时表的内连接和左外连接。
假设 Rates 是一个时态表函数,则连接可以用 SQL 表示,如下所示:
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
上述时态表 DDL 和时态表函数之间的主要区别在于:
- 时态表 DDL 可以在 SQL 中定义,但时态表函数不能;
- 时态表 DDL 和时态表函数都支持时态联接版本化表,但只有时态表函数可以临时联接任何表/视图的最新版本。
4、Lookup Join
lookup join 通常用于使用从外部系统查询的数据来扩充表。join要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。
lookup join使用上述Processing Time临时联接语法,其中包含由查找源连接器支持的右侧表。
下面的示例演示用于指定查找联接的语法。
-- 1、创建 JDBC 表在时态表关联中作为维表
CREATE TABLE Alan_JDBC_User_Table (
id BIGINT,
name STRING,
age INT,
balance DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.10.44:3306/test',
'table-name' = 'user'
);
-----2、查询表中的数据(实际数据是之前测试的结果) -----
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | ead5352794 | 513 | 4.0 |
| +I | 2 | 728297a8d9 | 410 | 35.0 |
| +I | 3 | 643c2226cd | 142 | 80.0 |
| +I | 4 | 6115f11f01 | 633 | 69.0 |
| +I | 5 | 044ba5fa2f | 74 | 71.0 |
| +I | 6 | 98a112dc87 | 729 | 54.0 |
| +I | 7 | 705326a369 | 846 | 99.0 |
| +I | 8 | 532692924f | 872 | 79.0 |
| +I | 9 | b816802948 | 475 | 67.0 |
| +I | 10 | 06906bebb2 | 109 | 57.0 |
......
-----3、创建事实表,以kafka表作为代表 -----
CREATE TABLE Alan_KafkaTable_3 (
user_id BIGINT, -- 用户id
item_id BIGINT, -- 商品id
action STRING, -- 用户行为
ts BIGINT, -- 用户行为发生的时间戳
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件时间
WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND -- 在eventTime上定义watermark
) WITH (
'connector' = 'kafka',
'topic' = 'testtopic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-----4、发送kafka消息,同时观察事实表中的数据 -----
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic testtopic
>1,1001,"login",1692593500222
>2,1002,"p_read",1692593502242
>
Flink SQL> select * from Alan_KafkaTable_3;
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op | user_id | item_id | action | ts | proctime | event_time |
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I | 1 | 1001 | login | 1692593500222 | 2023-08-22 05:33:38.830 | 2023-08-22 05:39:54.439 |
| +I | 2 | 1002 | p_read | 1692593502242 | 2023-08-22 05:33:38.833 | 2023-08-22 05:40:41.284 |
Query terminated, received a total of 2 rows
-----5、以jdbc的维表进行关联查询事实表数据-----
SELECT
kafkamessage.user_id,
kafkamessage.item_id,
kafkamessage.action,
jdbc_dim_table.name,
jdbc_dim_table.age,
jdbc_dim_table.balance
FROM Alan_KafkaTable_3 AS kafkamessage
LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;
Flink SQL> SELECT
> kafkamessage.user_id,
> kafkamessage.item_id,
> kafkamessage.action,
> jdbc_dim_table.name,
> jdbc_dim_table.age,
> jdbc_dim_table.balance
> FROM Alan_KafkaTable_3 AS kafkamessage
> LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| op | user_id | item_id | action | name | age | balance |
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | 1001 | login | ead5352794 | 513 | 4.0 |
| +I | 2 | 1002 | p_read | 728297a8d9 | 410 | 35.0 |
5、Array Expansion
为给定数组中的每个元素返回一个新行。尚不支持使用 ORDINALITY 取消嵌套。
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
6、Table Function
将表与表函数的结果联接在一起。左(外)表的每一行都与表函数的相应调用生成的所有行联接。用户定义的表函数必须在使用前注册。
1)、INNER JOIN
如果左(外部)表的表函数调用返回空结果,则删除该表的行。
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
2)、LEFT OUTER JOIN
如果表函数调用返回空结果,则保留相应的outer row,并使用 null 值填充结果。目前,针对横向表的左外部连接需要在 ON 子句中使用 TRUE 文本。
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE
以上,简单的介绍了Flink 的join操作,并以具体的例子展示join的使用。
标签:27,示例,Flink,alan,order,currency,SQL,NULL,id From: https://blog.51cto.com/alanchan2win/7511974