Flink SQL
1、SQL-Client
Flink提供的SQL客户端
准备工作
-
启动yarn-session
yarn-session.sh -d
-
启动Flink SQL客户端
sql-client.sh
2、数据源
Kafka
准备工作
-
添加依赖到$FLINK_HOME/lib
flink-sql-connector-kafka-1.15.0.jar
-
重启yarn-sesion以及sql客户端
# 杀死yarn-session yarn application -kill application_1668648201089_0018 # 或者使用stop指令进行关闭 echo "stop" | yarn-session.sh -id application_1668648201089_0018 # 启动yarn-session yarn-session.sh -d # 启动sql客户端 sql-client.sh
读Kafka
-
构建source table
# 注意SQL中的注释是 -- 不是 // CREATE TABLE kafka_word ( `word` String ) WITH ( 'connector' = 'kafka', -- 使用Kafka连接器 'topic' = 'words', -- 指定topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址 'properties.group.id' = 'testGroup01', -- 指定消费者组id 'scan.startup.mode' = 'latest-offset', -- 指定启动策略 'format' = 'csv', 'csv.field-delimiter' = ',', 'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略 );
-
参数
-
scan.startup.mode 指定消费时启动时的策略
-
earliest-offset 从最早的位置开始消费
-
latest-offset 从最新的位置开始消费
-
group-offsets 使用当前组上一次提交的消费位置开始消费
-
timestamp 指定从某一个时间戳往后开始消费
-
specific-offsets 指定从某个特定的位置进行消费
-
-
写Kafka
-
将Append-Only流写入Kafka
# 创建student source 表 CREATE TABLE kafka_students ( id BIGINT, name String, age BIGINT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', -- 使用Kafka连接器 'topic' = 'students', -- 指定topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址 'properties.group.id' = 'testGroup01', -- 指定消费者组id 'scan.startup.mode' = 'earliest-offset', -- 指定启动策略 'format' = 'csv', 'csv.field-delimiter' = ',', 'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略 ); # 创建sink 表 保存文科班的学生信息 # kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic students_liberal --create CREATE TABLE students_liberal_sink ( id BIGINT, name String, age BIGINT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', -- 使用Kafka连接器 'topic' = 'students_liberal', -- 指定topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址 'properties.group.id' = 'testGroup01', -- 指定消费者组id 'scan.startup.mode' = 'earliest-offset', -- 指定启动策略 'format' = 'json' ); # 将文科班的学生查询出来 insert into students_liberal_sink select * from kafka_students where clazz like '文科%';
-
将Upsert流写入Kafka
# 统计班级人数 由于最终生成的流是更新的 所以不能写入普通的Kafka sink表 需要以upsert的方式写入 # kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic clazz_cnt_upsert --create # 以upsert的方式构建kafka sink 表 drop table if exists clazz_cnt_upsert_sink; CREATE TABLE clazz_cnt_upsert_sink ( clazz STRING, cnt BIGINT, PRIMARY KEY (clazz) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'clazz_cnt_upsert', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'key.format' = 'json', 'value.format' = 'json' ); # 执行插入语句 insert into clazz_cnt_upsert_sink select clazz ,count(distinct id) as cnt from kafka_students group by clazz; # 从命令行消费 # 只能看到数据的value,结果也是逐一递增 kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_cnt_upsert # 从sql-client进行查询 # 可以看到最终的结果 select * from clazz_cnt_upsert_sink;=
JDBC
准备工作
-
添加依赖
flink-connector-jdbc-1.15.0.jar
mysql-connector-java-5.1.49.jar
-
重启yarn-session以及sql-client
# 通过yarn命令找到yarn-session的application id yarn application -list # 停止yarn-session yarn application -kill appplication_id # 重新启动yarn-session yarn-session.sh -d
读MySQL
# 构建MySQL的source ce
DROP TABLE IF EXISTS mysql_student;
CREATE TABLE mysql_student (
id INT,
name String,
age INT,
gender STRING,
clazz STRING,
last_mod TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/student?useSSL=false',
'table-name' = 'student',
'username' = 'root',
'password' = '123456'
);
# 使用SQL-Client的JDBC方式查询MySQL时得到的结果是静态的
# 相当于这里是有界流
select * from mysql_student;
写MySQL
# 构建MySQL的sink表
DROP TABLE IF EXISTS clazz_cnt_sink;
CREATE TABLE clazz_cnt_sink (
clazz STRING,
cnt BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/student?useSSL=false',
'table-name' = 'clazz_cnt',
'username' = 'root',
'password' = '123456'
);
# 从kafka中查询学生数据并计算班级人数 最终将结果保存到MySQL的sink表中
# 如果需要将动态更新的结果写入MySQL则所插入的表需要指定主键
insert into clazz_cnt_sink
select clazz
,count(distinct id) as cnt
from kafka_students
group by clazz;
文件系统
读HDFS文件
-- 有界流
-- 构建source 表
CREATE TABLE hdfs_students (
id INT,
name String,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://master:9000/data/spark/stu/input',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
DataGen
只能构建Source表,不能构建Sink表
一般用于随机生成数据
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'rows-per-second'='5', -- 每秒生成多少条数据
'fields.f_sequence.kind'='sequence', -- 指定某一列为序列
'fields.f_sequence.start'='1', -- 指定生成序列的最小范围
'fields.f_sequence.end'='1000', -- 指定生成序列的最大范围
'fields.f_random.min'='1', -- 指定随机生成的最小值
'fields.f_random.max'='1000', -- 指定随机生成的最大值
'fields.f_random_str.length'='10' -- 指定生成的字符串的长度
);
用于将最终的结果进行输出,只能用于构建Sink表
-- 将DataGen生成的数据通过Print进行打印
CREATE TABLE print_table
WITH ('connector' = 'print')
LIKE datagen (EXCLUDING ALL);
-- 从DataGen中查询数据插入到print_table中
INSERT INTO print_table
select f_sequence,f_random,f_random_str from datagen;
BlackHole
黑洞,只能用于构建Sink表,数据可以写入,但是不会有任何输出
一般用于测试性能
CREATE TABLE blackhole_table
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL);
INSERT INTO blackhole_table
select f_sequence,f_random,f_random_str from datagen;
HBase
特点:列式数据库、数据都是以KV格式存储、通过rowkey+CF:Qualification+version可以定位唯一一个cell
适合海量数据的实时查询,不适合做计算
计算:MR、HIVE、Spark、Flink
准备工作
-
添加依赖
flink-sql-connector-hbase-1.0-jar-with-dependencies.jar
自己通过Maven项目添加Maven依赖打包生成
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-1.4</artifactId> <version>1.15.0</version> </dependency> <!-- 带依赖的插件--> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
官方提供的flink-connector-hbase-1.4-1.15.0.jar有bug
-
重启yarn-session以及sql-client
读HBase
DROP table hbase_stu;
CREATE TABLE hbase_stu (
id STRING,
cf1 ROW<name STRING, age STRING, gender STRING, clazz STRING>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'stu',
'zookeeper.quorum' = 'master:2181'
);
# 查询 有界流
select id,cf1.name,cf1.age,cf1.gender,cf1.clazz from hbase_stu;
写HBase
# 在HBase中创建clazz_cnt表
create 'clazz_cnt','cf1'
# 构建HBase Sink表
DROP table clazz_cnt_sink;
CREATE TABLE clazz_cnt_sink (
clazz STRING,
cf1 ROW<clz STRING, cnt BIGINT>,
PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'clazz_cnt',
'zookeeper.quorum' = 'master:2181'
);
# HBase 支持动态更新
insert into clazz_cnt_sink
select t1.clazz
,ROW(clz, cnt)
from (
select clazz
,clazz as clz
,count(distinct id) as cnt
from kafka_students
where id is not null
group by clazz
) t1;
3、CataLogs
GenericInMemoryCatalog
GenericInMemoryCatalog
是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。一旦SQL Client重启,元数据就会丢失
JdbcCatalog
-
准备工作
添加依赖
flink-connector-jdbc-1.15.0.jar
mysql-connector-java-5.1.49.jar
-
从MySQL中选择一个数据库
-
创建基于MySQL的Catalog
CREATE CATALOG my_catalog WITH( 'type' = 'jdbc', 'default-database' = 'student', 'username' = 'root', 'password' = '123456', 'base-url' = 'jdbc:mysql://master:3306?useSSL=false' );
只适合查询MySQL库中的表,不能直接使用sql-client创建表(相当于无法保存Flink创建的元数据)
HiveCatalog
-
准备工作
添加依赖flink-connector-hive_2.12-1.15.0.jar到FLINK的lib目录下
-
重启yarn-session以及sql-client
-
启动Hive MetaStore
hive --service metastore
-
创建catalog
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf' );
-
切换catalog
use catalog myhive;
-
删除catalog
drop catalog myhive;
-
查看所有的catalog
show catalogs;
-
测试
# 通过flink的sql-client创建以Kakfa作为source 的表 DROP TABLE kafka_word; CREATE TABLE kafka_word ( `word` String ) WITH ( 'connector' = 'kafka', -- 使用Kafka连接器 'topic' = 'words', -- 指定topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址 'properties.group.id' = 'testGroup01', -- 指定消费者组id 'scan.startup.mode' = 'earliest-offset', -- 指定启动策略 'format' = 'csv', 'csv.field-delimiter' = ',', 'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略 ); # 可以在hive的客户端看到上面创建的表 # 但是不能在hive中查询 # flink只是将它的catalog(元数据)借助hive保存起来
-
修改默认的catalog
# 1.13版本之前 可以通过修改conf/sql-client-defaults.yaml文件,在其中加入 catalogs: - name: myhive type: hive hive-conf-dir: /usr/local/soft/hive-1.2.1/conf default-database: default # 1.13版本之后只能通过启动时指定初始化sql脚本实现 # 编辑一个init.sql文件 vim init.sql CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf' ); use catalog hive_catalog; # 启动sql-client加上该文件 sql-client.sh -i init.sql
4、Hive方言
为了能够方便在Flink的客户端中执行Hive的SQL,不需要两个客户端来回切换
准备工作
-
添加依赖
# 先删除自带的依赖 mv flink-table-planner-loader-1.15.0.jar flink-table-planner-loader-1.15.0.jar.bak # 将$FLINK_HOME/opt目录下的flink-table-planner_2.12-1.15.0.jar放入lib目录 cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/ # 手动添加Hive的依赖 cd /usr/local/soft/hive-1.2.1/lib cp hive-exec-1.2.1.jar /usr/local/soft/flink-1.15.0/lib/ cp hive-metastore-1.2.1.jar /usr/local/soft/flink-1.15.0/lib/ cp antlr-runtime-3.4.jar /usr/local/soft/flink-1.15.0/lib/ cp libfb303-0.9.2.jar /usr/local/soft/flink-1.15.0/lib/
-
重启yarn-session及sql-client
切换方言
set table.sql-dialect=hive;
set table.sql-dialect=default;
5、Hive模块
如果需要使用Hive中的函数在Flink中处理数据就需要先加载Hive模块
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
6、FLink SQL语法
explain
查看SQL的执行过程
set/reset
设置一些参数
-- Properties that change the fundamental execution behavior of a table program.
-- 可以指定处理模式:'batch' or 'streaming'
SET 'execution.runtime-mode' = 'streaming';
-- 指定最终结果的显示模式:'table', 'changelog' , 'tableau'
SET 'sql-client.execution.result-mode' = 'table';
-- 设置最大的数据的缓存条数
SET 'sql-client.execution.max-table-result.rows' = '10000';
-- 设置默认的并行度
SET 'parallelism.default' = '1';
-- 默认的水位线
SET 'pipeline.auto-watermark-interval' = '200';
-- 设置最大的并行度
SET 'pipeline.max-parallelism' = '10';
-- 设置查询结果的Table界面展示的时间,当达到1000s后会自动退出
SET 'table.exec.state.ttl' = '1000';
-- 设置重启策略
SET 'restart-strategy' = 'fixed-delay';
-- Configuration options for adjusting and tuning table programs.
SET 'table.optimizer.join-reorder-enabled' = 'true';
SET 'table.exec.spill-compression.enabled' = 'true';
SET 'table.exec.spill-compression.block-size' = '128kb';
load/unload
用于加载模块
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
with
可以将从多次使用的SQL使用with进行定义,可以让SQL的到复用
with student_lib as (
select id
,name
,age
,gender
,clazz
from kafka_students
where clazz like '文科%'
)
select clazz
,count(id) as cnt
from student_lib
group by clazz;
hints
可以在查询时使用SQL注释的方式更改动态表的某些参数
select id
,name
,age
,gender
,clazz
from kafka_students /*+ OPTIONS('scan.startup.mode'='latest-offset') */ t1
where clazz like '文科%';
SELECT 与 WHERE 子句
用于过滤
select id
,name
,age
,gender
,clazz
from kafka_students /*+ OPTIONS('scan.startup.mode'='latest-offset') */ t1
where clazz like '文科%' and gender = '男';
SELECT DISTINCT
用于在流上进行去重,需要一直维护状态,性能比较低
Windowing TVFs
TUMBLE 滚动窗口
HOP 滑动窗口
CUMULATE 累计窗口
这里提到的窗口函数并不是像Hive中的row_number或者是rank
TUMBLE 滚动窗口
# 创建Kafka source表
DROP table kafka_bid;
CREATE TABLE kafka_bid (
bidtime TIMESTAMP(3),
price DECIMAL(10, 2),
item STRING,
-- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'bid', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid --create
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
# 准备数据
2020-04-15 08:05:00,4.00,C
2020-04-15 08:07:00,2.00,A
2020-04-15 08:09:00,5.00,D
2020-04-15 08:08:00,5.00,D
2020-04-15 08:10:01,5.00,D
2020-04-15 08:11:00,3.00,B
2020-04-15 08:13:00,1.00,E
2020-04-15 08:17:00,6.00,F
2020-04-15 08:18:00,6.00,F
2020-04-15 08:19:00,6.00,F
2020-04-15 08:20:00,6.00,F
# 进行查询
# TUMBLE会自动给每条数据加上window_start、window_end、window_time
SELECT *
FROM TABLE(
TUMBLE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);
# 统计每个窗口的交易金额之和
SELECT window_start
,window_end
,SUM(price)
FROM TABLE(
TUMBLE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
HOP 滑动窗口
# 同一条数据可能属于多个窗口,因为滑动窗口之间有数据的重叠
SELECT *
FROM TABLE(
-- 滑动窗口,需要指定两个时间,分别是:滑动时间、窗口大小
HOP(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);
SELECT window_start
,window_end
,SUM(price) as sum_price
,avg(price) as avg_price
,count(item) as cnt
FROM TABLE(
-- 滑动窗口,需要指定两个时间,分别是:滑动时间、窗口大小
HOP(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
CUMULATE 累计窗口
需要指定两个参数:step增长的步长、size最大的窗口大小
例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将得到窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) 每天。
窗口会不断按照步长step增大直到达到最大的窗口范围结束,之后重复该过程
# 窗口不断变大直到达到最大又从最小开始
SELECT *
FROM TABLE(
CUMULATE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
);
SELECT window_start
,window_end
,SUM(price) as sum_price
,avg(price) as avg_price
,count(item) as cnt
FROM TABLE(
-- 累计窗口,需要指定两个时间,分别是:增长步长、窗口最大范围
CUMULATE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
7、Over聚合
类似Hive中的窗口函数,可以通过over指定一个窗口,并使用函数作用在该窗口上
只能使用时间字段进行排序
统计是以每条数据中的时间为准
# 基于kafka_bid表,统计最近10分钟内每种商品的累计交易和平均交易金额
# 准备数据
2020-04-15 08:17:00,6.00,F
2020-04-15 08:18:00,6.00,F
2020-04-15 08:19:00,6.00,F
2020-04-15 08:20:00,6.00,F
2020-04-15 08:25:00,6.00,F
2020-04-15 08:30:00,6.00,F
2020-04-15 08:31:00,6.00,F
2020-04-15 08:33:00,6.00,F
2020-04-15 08:35:00,6.00,F
2020-04-15 08:36:00,6.00,F
SELECT item, price, bidtime,
SUM(price) OVER w AS sum_price,
AVG(price) OVER w AS avg_price
FROM kafka_bid
WINDOW w AS (
PARTITION BY item
ORDER BY bidtime
RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW
);
8、TopN
组内排名,例如:班级总分前三名
# 数据
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117985031601010,"camera_id":"01214","orientation":"西南","road_id":34052057,"time":1614711914,"speed":45.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117984031601010,"camera_id":"01024","orientation":"西北","road_id":34052058,"time":1614711924,"speed":45.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117970031606010,"camera_id":"01022","orientation":"西北","road_id":34052059,"time":1614712022,"speed":75.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340520","card":117956031625010,"camera_id":"01132","orientation":"西北","road_id":34052060,"time":1614712120,"speed":46.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340521","card":117925031638010,"camera_id":"00202","orientation":"西北","road_id":34052061,"time":1614712218,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340521","card":117902031651010,"camera_id":"01102","orientation":"西北","road_id":34052062,"time":1614712316,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117885031666010,"camera_id":"01221","orientation":"西北","road_id":34308114,"time":1614712414,"speed":48.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117855031704010,"camera_id":"00231","orientation":"西北","road_id":34308115,"time":1614712619,"speed":59.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117817031742010,"camera_id":"01130","orientation":"西北","road_id":34308116,"time":1614712824,"speed":52.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117784031777010,"camera_id":"00123","orientation":"西北","road_id":34308117,"time":1614713030,"speed":71.5}
# 构建Kafka source表
-- 卡口过车数据
CREATE TABLE kafka_cars (
car STRING, -- 车牌号
city_code STRING,--城市编码
county_code STRING,-- 区县编码
card BIGINT,--卡口编号
camera_id STRING,--摄像头编号
orientation STRING,--方向
road_id BIGINT,-- 道路编号
`time` BIGINT,-- 事件时间
speed DOUBLE--速度
) WITH (
'connector' = 'kafka',-- 链接kafka
'topic' = 'cars',--topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker地址
'properties.group.id' = 'testGroup', -- 消费者组
'scan.startup.mode' = 'earliest-offset',-- 读取数据位置
'format' = 'json' -- 数据的格式,csv时文本格式,需要安装字段顺序进行映射
);
# 创建cars的Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic cars --create
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic cars
# 实时统计每个城市车流量最多的前3个区县
# 第一步:统计每个区县车流量
select city_code
,county_code
,count(car) as cnt
from kafka_cars
group by city_code,county_code;
# 第二步:给每个城市的区县车流量进行排名
select t1.city_code
,t1.county_code
,t1.cnt
,row_number() over w as rn
from (
select city_code
,county_code
,count(car) as cnt
from kafka_cars
group by city_code,county_code
) t1 WINDOW w AS (
PARTITION BY city_code
ORDER BY t1.cnt desc
);
# 第三步:取出每个城市车流量排名前三的区县
select tt1.city_code
,tt1.county_code
,tt1.cnt
,tt1.rn
from (
select t1.city_code
,t1.county_code
,t1.cnt
,row_number() over w as rn
from (
select city_code
,county_code
,count(car) as cnt
from kafka_cars
group by city_code,county_code
) t1 WINDOW w AS (
PARTITION BY city_code
ORDER BY t1.cnt desc
)
) tt1 where tt1.rn<=3;
注意:
1、在流上进行row_number时如果没有时间字段,则需要使用where对最终窗口函数统计的结果进行过滤
2、执行时会碰到flink的一个bug
Caused by: java.lang.NoSuchMethodError: org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;
缺失了commons-lang3-3.3.2.jar依赖
解决方法:从本地Maven仓库找到该依赖并上传到$FLINK_HOME/lib目录下
重启yarn-session及sql-client
9、窗口TopN
基于每一个窗口数据,在窗口内进行TopN
# 创建Kafka source表
DROP table if exists kafka_bid_sup;
CREATE TABLE kafka_bid_sup (
bidtime TIMESTAMP(3),
price DECIMAL(10, 2),
item STRING,
supplier_id STRING,
-- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'bid_sup', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid_sup --create
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_sup
# 准备数据
2020-04-15 08:05:00,4.00,A,supplier1
2020-04-15 08:06:00,4.00,C,supplier2
2020-04-15 08:07:00,2.00,G,supplier1
2020-04-15 08:08:00,2.00,B,supplier3
2020-04-15 08:09:00,5.00,D,supplier4
2020-04-15 08:11:00,2.00,B,supplier3
2020-04-15 08:13:00,1.00,E,supplier1
2020-04-15 08:15:00,3.00,H,supplier2
2020-04-15 08:17:00,6.00,F,supplier5
2020-04-15 08:21:00,6.00,F,supplier5
2020-04-15 08:18:00,6.00,F,supplier5
2020-04-15 08:19:00,6.00,F,supplier5
2020-04-15 08:25:00,6.00,F,supplier5
# 每个10分钟统计一次供应商供应的商品销售之和的前三名
# 第一步:每个10分钟统计每个供应商供应的商品销售之和
SELECT window_start
,window_end
,supplier_id
,sum(price) as sum_price_per_window
from TABLE(
TUMBLE(TABLE kafka_bid_sup, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
group by window_start,window_end,supplier_id;
# 第二步:基于第一步的结果进行组内TopN
SELECT tt1.window_start
,tt1.window_end
,tt1.supplier_id
,tt1.sum_price_per_window
,tt1.rn
from (
SELECT t1.window_start
,t1.window_end
,t1.supplier_id
,t1.sum_price_per_window
,row_number() over (partition by t1.window_start,t1.window_end order by t1.sum_price_per_window desc) as rn
from (
SELECT window_start
,window_end
,supplier_id
,sum(price) as sum_price_per_window
from TABLE(
TUMBLE(TABLE kafka_bid_sup, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
group by window_start,window_end,supplier_id
) t1
) tt1 where tt1.rn <= 3;
10、使用TopN进行去重
性能会比直接使用distinct要好一点
# 创建Kafka source表
DROP table if exists kafka_bid_sup_distinct;
CREATE TABLE kafka_bid_sup_distinct (
bidtime TIMESTAMP(3),
price DECIMAL(10, 2),
item STRING,
supplier_id STRING,
-- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'bid_sup_distinct', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'latest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid_sup_distinct --create
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_sup_distinct
# 准备数据
2020-04-15 08:05:00,4.00,A,supplier1
2020-04-15 08:06:00,4.00,C,supplier2
# 使用TopN进行去重
SELECT t1.bidtime
,t1.price
,t1.item
,t1.supplier_id
,t1.rn
from (
SELECT bidtime
,price
,item
,supplier_id
,row_number() over (partition by bidtime,price,item,supplier_id order by bidtime) as rn
from kafka_bid_sup_distinct
) t1 where t1.rn = 1;
11、Join关联
Regular Joins 常规join
Interval Joins 间隔join
Temporal Joins 时态join
Lookup Join 查找join
Regular Joins
inner join 内连接
outer join 外连接:left outer join、right outer join、full outer join
如果两个流表直接join,Flink需要一直维护两个表中所有数据的状态,随着时间累积,需要维护的状态会越来越多,导致CheckPoint的时间越来越大,数据延时越来越高,最终任务可能会失败
# 分别在kafka中创建student、score的topic,以及对应的Source表
DROP table if exists kafka_student_join;
CREATE TABLE kafka_student_join (
id STRING,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'student_join', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'latest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
DROP table if exists kafka_score_join;
CREATE TABLE kafka_score_join (
id STRING,
sub_id STRING,
score BIGINT
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'score_join', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'latest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic student_join --create
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic score_join --create
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
# inner join
SELECT t1.id
,t1.name
,t1.clazz
,t2.sub_id
,t2.score
from kafka_student_join t1
join kafka_score_join t2
on t1.id = t2.id;
# left join
SELECT t1.id
,t1.name
,t1.clazz
,t2.sub_id
,t2.score
from kafka_student_join t1
left join kafka_score_join t2
on t1.id = t2.id;
# right join
SELECT t1.id
,t1.name
,t1.clazz
,t2.sub_id
,t2.score
from kafka_student_join t1
right join kafka_score_join t2
on t1.id = t2.id;
# full join
SELECT t1.id
,t1.name
,t1.clazz
,t2.sub_id
,t2.score
from kafka_student_join t1
full join kafka_score_join t2
on t1.id = t2.id;
Interval Join
两个流表在关联时,可以指定一定的时间间隔进行关联,超过该间隔的状态可以不需要维护
优点:不需要维护大量的状态,程序运行稳定
缺点:有可能因为数据延时导致关联不上
一般情况下两个流表关联会使用该方式,这种关联方式所需要维护的状态不会随着时间无限增大
# 创建student、score的source表,并给每条数据加上一个处理时间用于关联
DROP table if exists kafka_student_join_interval;
CREATE TABLE kafka_student_join_interval (
id STRING,
name STRING,
age BIGINT,
gender STRING,
clazz STRING,
proctime AS PROCTIME() -- 增加一列处理时间
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'student_join', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'latest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
DROP table if exists kafka_score_join_interval;
CREATE TABLE kafka_score_join_interval (
id STRING,
sub_id STRING,
score BIGINT,
proctime AS PROCTIME() -- 增加一列处理时间
) WITH (
'connector' = 'kafka', -- 使用Kafka连接器
'topic' = 'score_join', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
'properties.group.id' = 'testGroup01', -- 指定消费者组id
'scan.startup.mode' = 'latest-offset', -- 指定启动策略
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
# Topic同Regular Join中使用的一致,不需要额外创建
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
# 执行关联
SELECT t1.id
,t1.proctime
,t2.proctime
,t1.name
,t1.clazz
,t2.sub_id
,t2.score
from kafka_student_join_interval t1
left join kafka_score_join_interval t2
on t1.id = t2.id and (
t1.proctime BETWEEN t2.proctime - INTERVAL '10' SECOND AND t2.proctime
or t2.proctime BETWEEN t1.proctime - INTERVAL '10' SECOND AND t1.proctime
);
Temporal Joins
使用场景:
假设有两张表:orders 订单表、currency_rates汇率表
用户会无时无刻下订单,汇率表也会随着时间不断波动(更新频率稍慢)
在汇率表中可以反应历史的汇率的变化,在这里可以将其视为拉链表,也可以叫做时态表
需要通过两表关联实现对货币按照汇率进行转换
如果两张表直接做Regular Join 最终需要维护的状态会越来越大
如果两张表做Interval Join,汇率表更新不及时会导致关联不上
为了解决上述问题两张表可以做时态Join
相当于从订单表获取一条数据的时间,通过该时间再去汇率表中找一条离该时间最近(只能小于)的一条记录,再进行转换
通过时态join,不需要维护大量的状态,只需要维护汇率表中最新的记录即可
-- 订单表
drop table if exists orders;
CREATE TABLE orders (
order_id STRING, -- 订单编号
price DECIMAL(32,2), --订单的价格,人民币价格
currency STRING, -- 汇率表主键
order_time TIMESTAMP(3), -- 订单发生的事件
WATERMARK FOR order_time AS order_time -- 设置事件时间和水位线
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-- 汇率表
drop table if exists currency_rates;
CREATE TABLE currency_rates (
currency STRING, -- 汇率表主键
conversion_rate DECIMAL(32, 2), -- 汇率
update_time TIMESTAMP(3) , --汇率更新时间
WATERMARK FOR update_time AS update_time,--时间字段和水位线
PRIMARY KEY(currency) NOT ENFORCED--设置主键
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates1',
'properties.bootstrap.servers' = 'master:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 准备数据
-- 向汇率表中插数据 只能使用insert命令再sql-client中执行 不能通过命令行生产者生产数据
insert into currency_rates(currency,conversion_rate,update_time)
values
('1',0.16, TIMESTAMP'2022-09-03 10:15:10'),
('1',0.17, TIMESTAMP'2022-09-03 10:20:10'),
('1',0.18, TIMESTAMP'2022-09-03 10:25:10'),
('1',0.19, TIMESTAMP'2022-09-03 10:30:10'),
('1',0.19, TIMESTAMP'2022-09-03 10:41:10');
-- 向订单表的topic中生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
001,100,1,2022-09-03 10:14:10
001,100,1,2022-09-03 10:15:10
001,100,1,2022-09-03 10:19:10
001,100,1,2022-09-03 10:21:10
001,100,1,2022-09-03 10:22:10
001,100,1,2022-09-03 10:27:10
001,100,1,2022-09-03 10:28:10
001,100,1,2022-09-03 10:30:10
001,100,1,2022-09-03 10:40:10
001,100,1,2022-09-03 10:60:10
-- 时态表join
SELECT order_id,
price,
b.currency,
conversion_rate,
update_time,
order_time
FROM orders as a
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF a.order_time as b
ON a.currency = b.currency;
Lookup Join
适用于流表关联维表
优点:可以动态捕获维表中数据的变化
缺点:每进行一条数据的关联就需要读取一次MySQL,关联的效率比较低
-- 分数表,分数表是一个无界流表
drop table if exists score_join_lookup;
CREATE TABLE score_join_lookup (
id STRING,
sub_id STRING,
score BIGINT,
proctime as PROCTIME()-- 处理时间字段
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-- 学生表: mysql中表 维表 偶尔需要更新
drop table if exists mysql_student;
CREATE TABLE mysql_student (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/student',
'table-name' = 'student',
'username' = 'root',
'password' ='123456'
);
-- 如果直接关联,mysql中的维表只会在任务启动的时候被加载一次
-- 如果后续维表发生变化,flink是无法感知的
select
a.id,a.score,b.id,b.name,b.age
from
score_join_lookup as a
join
mysql_student as b
on a.id=b.id;
-- 如果既能实现维表的关联 又可以捕获维表的更新
-- 使用lookup join进行关联
select
a.id,a.score,b.id,b.name,b.age
from
score_join_lookup as a
join
mysql_student FOR SYSTEM_TIME AS OF a.proctime as b
on a.id=b.id;
标签:join,--,Flink,9092,kafka,学习,t1,数据,id
From: https://www.cnblogs.com/lkd0910/p/16945740.html