首页 > 数据库 >Flink Sql 的查询

Flink Sql 的查询

时间:2024-04-02 20:31:44浏览次数:31  
标签:Join SQL Flink 查询 ws Sql 窗口 id SELECT

一、DataGen & Print

        (1)创建数据生成器源表

CREATE TABLE source (

    id INT,

    ts BIGINT,

    vc INT

) WITH (

    'connector' = 'datagen',

    'rows-per-second'='1',

    'fields.id.kind'='random',

    'fields.id.min'='1',

    'fields.id.max'='10',

    'fields.ts.kind'='sequence',

    'fields.ts.start'='1',

    'fields.ts.end'='1000000',

    'fields.vc.kind'='random',

    'fields.vc.min'='1',

    'fields.vc.max'='100'

);


CREATE TABLE sink (

    id INT,

    ts BIGINT,

    vc INT

) WITH (

'connector' = 'print'

);

        (2)查询源表

select * from source

        (3)插入sink表并查询

INSERT INTO sink select  * from source;

select * from sink;

二、With子句

        WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。

        (1)语法

WITH <with_item_definition> [ , ... ]

SELECT ... FROM ...;



<with_item_defintion>:

    with_item_name (column_name[, ...n]) AS ( <select_query> )

        (2)案例

WITH source_with_total AS (

    SELECT id, vc+10 AS total

    FROM source

)



SELECT id, SUM(total)

FROM source_with_total

GROUP BY id;

三、 SELECT & WHERE 子句

        (1)语法

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

        (2)案例

SELECT * FROM source

SELECT id, vc + 10 FROM source



-- 自定义 Source 的数据

SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)



SELECT vc + 10 FROM source WHERE id >10

四、 SELECT DISTINCT 子句

        用作根据 key 进行数据去重

SELECT DISTINCT vc FROM source

        对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

五、 分组聚合

        SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

select COUNT(*) from source;

        而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。

SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;

        这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。

        分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。

        (1)group聚合案例

CREATE TABLE source1 (

dim STRING,

user_id BIGINT,

price BIGINT,

row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),

WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'datagen',

'rows-per-second' = '10',

'fields.dim.length' = '1',

'fields.user_id.min' = '1',

'fields.user_id.max' = '100000',

'fields.price.min' = '1',

'fields.price.max' = '100000'

);





CREATE TABLE sink1 (

dim STRING,

pv BIGINT,

sum_price BIGINT,

max_price BIGINT,

min_price BIGINT,

uv BIGINT,

window_start bigint

) WITH (

'connector' = 'print'

);





insert into sink1

select dim,

count(*) as pv,

sum(price) as sum_price,

max(price) as max_price,

min(price) as min_price,

-- 计算 uv 数

count(distinct user_id) as uv,

cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start

from source1

group by

dim,

-- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, 

cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

        (2)多维分析

        Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:

SELECT

  supplier_id

, rating

, product_id

, COUNT(*)

FROM (

VALUES

  ('supplier1', 'product1', 4),

  ('supplier1', 'product2', 3),

  ('supplier2', 'product3', 3),

  ('supplier2', 'product4', 4)

)

-- 供应商id、产品id、评级

AS Products(supplier_id, product_id, rating)  

GROUP BY GROUPING SETS(

  (supplier_id, product_id, rating),

  (supplier_id, product_id),

  (supplier_id, rating),

  (supplier_id),

  (product_id, rating),

  (product_id),

  (rating),

  ()

);

六、分组窗口聚合

        从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合,在这里简单做个介绍。

        直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。

        SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。

分组窗口函数描述

TUMBLE(time_attr, interval)

定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

HOP(time_attr, interval, interval)

定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

SESSION(time_attr, interval)

定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

                                                ……

        (1)准备数据

CREATE TABLE ws (

  id INT,

  vc INT,

  pt AS PROCTIME(), --处理时间

  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间

  WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark

) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '10',

  'fields.id.min' = '1',

  'fields.id.max' = '3',

  'fields.vc.min' = '1',

  'fields.vc.max' = '100'

);

        (2)滚动窗口示例(时间属性字段,窗口长度)

select  

id,

TUMBLE_START(et, INTERVAL '5' SECOND)  wstart,

TUMBLE_END(et, INTERVAL '5' SECOND)  wend,

sum(vc) sumVc

from ws

group by id, TUMBLE(et, INTERVAL '5' SECOND);

        (3)滑动窗口(时间属性字段,滑动步长,窗口长度)

select  

id,

HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)   wstart,

HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)  wend,

   sum(vc) sumVc

from ws

group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);

        (4)会话窗口(时间属性字段,会话间隔)

select  

id,

SESSION_START(et, INTERVAL '5' SECOND)  wstart,

SESSION_END(et, INTERVAL '5' SECOND)  wend,

sum(vc) sumVc

from ws

group by id, SESSION(et, INTERVAL '5' SECOND);

七、 窗口表值函数(TVF)聚合

        对比GroupWindow,TVF窗口更有效和强大。包括:

  • 提供更多的性能优化手段
  • 支持GroupingSets语法
  • 可以在window聚合中使用TopN
  • 提供累积窗口

        对于窗口表值函数,窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end

FROM TABLE(

窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…)

)

GROUP BY [window_start,][window_end,] --可选

        (1)滚动窗口

SELECT

window_start,

window_end,

id , SUM(vc)

sumVC

FROM TABLE(

  TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, id;

        (2)滑动窗口

        要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)

SELECT window_start, window_end, id , SUM(vc) sumVC

FROM TABLE(

  HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))

GROUP BY window_start, window_end, id;

        (3)累积窗口

SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE(
  HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, id;

        如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。

        (1)语法

SELECT [column_list]

FROM (

SELECT [column_list],

ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]

ORDER BY time_attr [asc|desc]) AS rownum

FROM table_name)

WHERE rownum = 1

        (2)案例

        对每个传感器的水位值去重

select
    id,
    et,
    vc,
    rownum
from
(
    select
        id,
        et,
        vc,
        row_number() over(
            partition by id,vc
            order by et 
        ) as rownum
    from ws
)
where rownum=1;

八、 联结(Join)查询

        在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。

        在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。

        (1) 常规联结查询

        常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。

        与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。

        Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

  • Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
  • Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
  • Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]

        Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联
  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。

再准备一张表用于join

CREATE TABLE ws1 (

  id INT,

  vc INT,

  pt AS PROCTIME(), --处理时间

  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间

  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark

) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1',

  'fields.id.min' = '3',

  'fields.id.max' = '5',

  'fields.vc.min' = '1',

  'fields.vc.max' = '100'

);

        等值内联结(INNER Equi-JOIN

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。

SELECT *

FROM ws

INNER JOIN ws1

ON ws.id = ws1.id

        等值外联结(OUTER Equi-JOIN

        与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

具体用法如下:

SELECT *

FROM ws

LEFT JOIN ws1

ON ws.id = ws1.id



SELECT *

FROM ws

RIGHT JOIN ws1

ON ws.id = ws1.id



SELECT *

FROM ws

FULL OUTER JOIN ws1

ON ws.id = ws.id

        这部分知识与标准SQL中是完全一样的。

        (2) 间隔联结查询

        我们曾经学习过DataStream API中的双流Join,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。

        间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

        1、两表的联结

        间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

        2、联结条件

        联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。

                1.时间间隔限制

                我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用ltime和rtime表示左右表中的时间字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

 

SELECT *
FROM ws,ws1
WHERE ws.id = ws1. id
AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND 

         (3) 维表联结查询

        Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。

        上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段。

表A

JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名

ON xx.字段=别名.字段

        比如维表在mysql,维表join的写法如下:

CREATE TABLE Customers (

  id INT,

  name STRING,

  country STRING,

  zip STRING

) WITH (

  'connector' = 'jdbc',

  'url' = 'jdbc:mysql://hadoop102:3306/customerdb',

  'table-name' = 'customers'

);



-- order表每来一条数据,都会去mysql的customers表查找维度数据



SELECT o.order_id, o.total, c.country, c.zip

FROM Orders AS o

  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c

    ON o.customer_id = c.id;

九、 Order by 和 limit

        (1)order by

        支持 Batch\Streaming,但在实时任务中一般用的非常少。

        实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。

SELECT *

FROM ws

ORDER BY et, id desc

2)limit

SELECT *

FROM ws

LIMIT 3

十、 SQL Hints

        在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。

select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;

十一、集合操作

        (1)UNION 和 UNION ALL

UNION:将集合合并并且去重

UNION ALL:将集合合并,不做去重。

(SELECT id FROM ws) UNION (SELECT id FROM ws1);

(SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);

(2)Intersect Intersect All

Intersect:交集并且去重

Intersect ALL:交集不做去重

(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);

(SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);

(3)Except Except All

Except:差集并且去重

Except ALL:差集不做去重

(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);

(SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);

        上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流

(4)In 子查询

        In 子查询的结果集只能有一列

SELECT id, vc

FROM ws

WHERE id IN (

SELECT id FROM ws1

)

        上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。

十二、系统函数

        系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。

        Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。

(1)标量函数(Scalar Functions)

        标量函数指的就是只对输入数据做转换操作、返回一个值的函数。

        标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。

        1、比较函数(Comparison Functions)

        比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:

(1)value1 = value2  判断两个值相等;

(2)value1 <> value2  判断两个值不相等

(3)value IS NOT NULL 判断value不为空

        2、逻辑函数(Logical Functions)

        逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:

(1)boolean1 OR boolean2  布尔值boolean1与布尔值boolean2取逻辑或

(2)boolean IS FALSE  判断布尔值boolean是否为false

(3)NOT boolean  布尔值boolean取逻辑非

        3、算术函数(Arithmetic Functions)

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:

(1)numeric1 + numeric2  两数相加

(2)POWER(numeric1, numeric2)  幂运算,取数numeric1的numeric2次方

(3)RAND()  返回(0.0, 1.0)区间内的一个double类型的伪随机数

        4、字符串函数(String Functions)

进行字符串处理的函数。例如:

(1)string1 || string2  两个字符串的连接

(2)UPPER(string)  将字符串string转为全部大写

(3)CHAR_LENGTH(string)  计算字符串string的长度

        5、时间函数(Temporal Functions)

进行与时间相关操作的函数。例如:

(1)DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

(2)TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

(3)CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

(4)INTERVAL string range  返回一个时间间隔。

(2)聚合函数(Aggregate Functions)

        聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:

(1)COUNT(*)  返回所有行的数量,统计个数。

(2)SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

(3)RANK()   返回当前值在一组值中的排名。

(4)ROW_NUMBER()    对一组值排序后,返回当前值的行号。

其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。

十三、 Module操作

        Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。

        目前 Flink 包含了以下三种 Module:

  • CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  • 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

        使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

        (1)语法

-- 加载

LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]

-- 卸载

UNLOAD MODULE module_name



-- 查看

SHOW MODULES;

SHOW FULL MODULES;

        在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。

        此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:

USE MODULE hive,core;

        USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。

        (2)案例

        加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。

        (1)上传jar包到flink的lib中

        上传hive connector

cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/

        注意:拷贝hadoop的包,解决依赖冲突问题

cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/

        (2)重启flink集群和sql-client

        (3)加载hive module

-- hive-connector内置了hive module,提供了hive自带的系统函数

load module hive with ('hive-version'='3.1.3');

show modules;

show functions;

-- 可以调用hive的split函数

select split('a,b', ',');

标签:Join,SQL,Flink,查询,ws,Sql,窗口,id,SELECT
From: https://blog.csdn.net/2301_78959404/article/details/137286477

相关文章

  • docker下mysql连接数修改后不生效问题的解决
    使用容器的方式使用mysql,在创建mysql容器的时候,映射了配置文件的路径(我映射的路径是:/home/env/mysql/config/),当修改了路径中配置文件中的最大连接数后,重启容器后,发现参数是不生效的情况,你遇到过吗?先说结果:是权限的问题,777权限会被mysql认为是安全的问题,忽略这种文件。具体如下:......
  • 配置Linux上的MySQL
    MySQL数据库环境搭建和编程MySQL环境安装设置ubuntu环境安装mysql-server和mysql开发包,包括mysql头文件和动态库文件,命令如下:sudoapt-getinstallmysql-server=》安装最新版MySQL服务器sudoapt-getinstalllibmysqlclient-dev=》安装开发包ubuntu默认安装最新的mysq......
  • mysql基于时间的盲注,使用python爆破库名
    mysql基于时间的盲注使用python代码进行爆破库名importrequestsimporttimes=requests.session()#创建session对象后,才可以调用对应的方法发送请求。url='http://192.168.2.101/sqli-labs-master/Less-9/?id='flag=''i=0whileTrue:i=i+1low......
  • 【附源码】JAVA计算机毕业设计智慧点餐系统(springboot+mysql+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的快速发展和互联网的普及,人们的生活方式发生了深刻的变化。特别是在餐饮行业,传统的点餐方式已经无法满足现代消费者对于便捷性、个性化......
  • java计算机毕业设计(附源码)医院新型冠状病毒疫苗接种管理系统(ssm+mysql+maven+LW文档)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义在当今全球疫情的背景下,新型冠状病毒疫苗的接种成为了防控疫情的重要手段。然而,由于疫苗接种人群广泛,且接种过程复杂,需要记录的信息量大,因此,传统的人工管理方式已......
  • java计算机毕业设计(附源码)医院薪酬管理系统(ssm+mysql+maven+LW文档)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义选题背景:在现代医疗体系中,医院作为提供专业医疗服务的核心机构,其运营效率和服务质量直接影响着公众健康和社会福祉。医院薪酬管理系统是确保医疗人员得到合理报酬......
  • java计算机毕业设计(附源码)医院药品管理系统(ssm+mysql+maven+LW文档)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义在当今社会,随着医疗技术的不断发展和人们健康意识的提高,医院药品管理系统的重要性日益凸显。药品管理作为医疗服务的核心环节之一,对于确保患者用药安全、提高医疗......
  • java计算机毕业设计(附源码)医院医疗救助系统(ssm+mysql+maven+LW文档)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义选题背景:随着社会的发展和人口老龄化的加剧,医疗救助系统在现代社会中扮演着越来越重要的角色。医院作为医疗救助的主要场所,其系统的完善与否直接关系到广大人民群......
  • SQL Server Profilter - 简单使用
    介绍SQLServerProfiler是一个界面,用于创建和管理跟踪并分析和重播跟踪结果。这些事件保存在一个跟踪文件中,稍后诊断问题时,可以对该文件进行分析或用它来重播一系列特定的步骤。使用SQLServerProfilerMicrosoftSQLServerProfiler是SQL跟踪的图形用户界面,用于监视......
  • mysql配置文件解析
     mysql服务启动默认使用的配置文件路径mysqld--verbose--help|grepcnfmy.cnf配置文件字段解析[client]port=3306socket=/tmp/mysql.sock[mysqld]port=3306socket=/tmp/mysql.sockbasedir=/usr/local/mysqldatadir=/data/mysqlpid-file=/data/mys......