首页 > 其他分享 >大数据学习之Flink(六)

大数据学习之Flink(六)

时间:2022-12-02 21:55:08浏览次数:33  
标签:join -- Flink 9092 kafka 学习 t1 数据 id

Flink SQL

1、SQL-Client

Flink提供的SQL客户端

准备工作
  • 启动yarn-session

    yarn-session.sh -d
  • 启动Flink SQL客户端

    sql-client.sh

2、数据源

Kafka
准备工作
  • 添加依赖到$FLINK_HOME/lib

    flink-sql-connector-kafka-1.15.0.jar

  • 重启yarn-sesion以及sql客户端

    # 杀死yarn-session
    yarn application -kill application_1668648201089_0018
    # 或者使用stop指令进行关闭
    echo "stop" | yarn-session.sh -id application_1668648201089_0018
    ​
    # 启动yarn-session
    yarn-session.sh -d
    # 启动sql客户端
    sql-client.sh
读Kafka
  • 构建source table

    # 注意SQL中的注释是 -- 不是 //
    CREATE TABLE kafka_word (
      `word` String
    ) WITH (
      'connector' = 'kafka', -- 使用Kafka连接器
      'topic' = 'words', -- 指定topic
      'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
      'properties.group.id' = 'testGroup01', -- 指定消费者组id
      'scan.startup.mode' = 'latest-offset', -- 指定启动策略
      'format' = 'csv',
      'csv.field-delimiter' = ',',
      'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
    );
  • 参数

    • scan.startup.mode 指定消费时启动时的策略

      • earliest-offset 从最早的位置开始消费

      • latest-offset 从最新的位置开始消费

      • group-offsets 使用当前组上一次提交的消费位置开始消费

      • timestamp 指定从某一个时间戳往后开始消费

      • specific-offsets 指定从某个特定的位置进行消费

写Kafka
  • 将Append-Only流写入Kafka

    # 创建student source 表
    CREATE TABLE kafka_students (
      id BIGINT,
      name String,
      age BIGINT,
      gender STRING,
      clazz STRING
    ) WITH (
      'connector' = 'kafka', -- 使用Kafka连接器
      'topic' = 'students', -- 指定topic
      'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
      'properties.group.id' = 'testGroup01', -- 指定消费者组id
      'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
      'format' = 'csv',
      'csv.field-delimiter' = ',',
      'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
    );
    # 创建sink 表 保存文科班的学生信息
    # kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic students_liberal  --create
    ​
    CREATE TABLE students_liberal_sink (
      id BIGINT,
      name String,
      age BIGINT,
      gender STRING,
      clazz STRING
    ) WITH (
      'connector' = 'kafka', -- 使用Kafka连接器
      'topic' = 'students_liberal', -- 指定topic
      'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
      'properties.group.id' = 'testGroup01', -- 指定消费者组id
      'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
      'format' = 'json'
    );
    ​
    # 将文科班的学生查询出来
    insert into students_liberal_sink
    select  *
    from kafka_students
    where clazz like '文科%';
  • 将Upsert流写入Kafka

    # 统计班级人数 由于最终生成的流是更新的 所以不能写入普通的Kafka sink表 需要以upsert的方式写入
    # kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic clazz_cnt_upsert --create
    # 以upsert的方式构建kafka sink 表
    drop table if exists clazz_cnt_upsert_sink;
    CREATE TABLE clazz_cnt_upsert_sink (
      clazz STRING,
      cnt BIGINT,
      PRIMARY KEY (clazz) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'clazz_cnt_upsert',
      'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    ​
    # 执行插入语句
    insert into clazz_cnt_upsert_sink
    select clazz
           ,count(distinct id) as cnt
    from kafka_students
    group by clazz;
    ​
    # 从命令行消费
    # 只能看到数据的value,结果也是逐一递增
    kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_cnt_upsert
    ​
    # 从sql-client进行查询
    # 可以看到最终的结果
    select * from clazz_cnt_upsert_sink;=
JDBC
准备工作
  • 添加依赖

    flink-connector-jdbc-1.15.0.jar

    mysql-connector-java-5.1.49.jar

  • 重启yarn-session以及sql-client

    # 通过yarn命令找到yarn-session的application id
    yarn application -list
    # 停止yarn-session
    yarn application -kill appplication_id
    # 重新启动yarn-session
    yarn-session.sh -d
读MySQL
# 构建MySQL的source ce
DROP TABLE IF EXISTS mysql_student;
CREATE TABLE mysql_student (
  id INT,
  name String,
  age INT,
  gender STRING,
  clazz STRING,
  last_mod TIMESTAMP,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/student?useSSL=false',
   'table-name' = 'student',
   'username' = 'root',
   'password' = '123456'
);
​
# 使用SQL-Client的JDBC方式查询MySQL时得到的结果是静态的
# 相当于这里是有界流
select * from mysql_student;
写MySQL
# 构建MySQL的sink表
DROP TABLE IF EXISTS clazz_cnt_sink;
CREATE TABLE clazz_cnt_sink (
  clazz STRING,
  cnt  BIGINT,
  PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/student?useSSL=false',
   'table-name' = 'clazz_cnt',
   'username' = 'root',
   'password' = '123456'
);
​
# 从kafka中查询学生数据并计算班级人数 最终将结果保存到MySQL的sink表中
# 如果需要将动态更新的结果写入MySQL则所插入的表需要指定主键
insert into clazz_cnt_sink
select clazz
       ,count(distinct id) as cnt
from kafka_students
group by clazz;
文件系统
读HDFS文件
-- 有界流
-- 构建source 表
CREATE TABLE hdfs_students (
  id INT,
  name String,
  age INT,
  gender STRING,
  clazz STRING
) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://master:9000/data/spark/stu/input',
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
DataGen

只能构建Source表,不能构建Sink表

一般用于随机生成数据

CREATE TABLE datagen (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5', -- 每秒生成多少条数据
​
 'fields.f_sequence.kind'='sequence', -- 指定某一列为序列
 'fields.f_sequence.start'='1', -- 指定生成序列的最小范围
 'fields.f_sequence.end'='1000', -- 指定生成序列的最大范围
​
 'fields.f_random.min'='1', -- 指定随机生成的最小值
 'fields.f_random.max'='1000', -- 指定随机生成的最大值
​
 'fields.f_random_str.length'='10' -- 指定生成的字符串的长度
);
Print

用于将最终的结果进行输出,只能用于构建Sink表

-- 将DataGen生成的数据通过Print进行打印
CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE datagen (EXCLUDING ALL);
​
-- 从DataGen中查询数据插入到print_table中
INSERT INTO print_table
select f_sequence,f_random,f_random_str from datagen;
BlackHole

黑洞,只能用于构建Sink表,数据可以写入,但是不会有任何输出

一般用于测试性能

CREATE TABLE blackhole_table 
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL);
​
INSERT INTO blackhole_table
select f_sequence,f_random,f_random_str from datagen;
HBase

特点:列式数据库、数据都是以KV格式存储、通过rowkey+CF:Qualification+version可以定位唯一一个cell

适合海量数据的实时查询,不适合做计算

计算:MR、HIVE、Spark、Flink

准备工作
  • 添加依赖

    flink-sql-connector-hbase-1.0-jar-with-dependencies.jar

    自己通过Maven项目添加Maven依赖打包生成

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hbase-1.4</artifactId>
    <version>1.15.0</version>
    </dependency>
    ​
    <!-- 带依赖的插件-->
    <plugin>
     <artifactId>maven-assembly-plugin</artifactId>
     <configuration>
         <descriptorRefs>
             <descriptorRef>jar-with-dependencies</descriptorRef>
         </descriptorRefs>
     </configuration>
     <executions>
         <execution>
             <id>make-assembly</id>
             <phase>package</phase>
             <goals>
                 <goal>single</goal>
             </goals>
         </execution>
     </executions>
    </plugin>

    官方提供的flink-connector-hbase-1.4-1.15.0.jar有bug

  • 重启yarn-session以及sql-client

读HBase
DROP table hbase_stu;
CREATE TABLE hbase_stu (
 id STRING,
 cf1 ROW<name STRING, age STRING, gender STRING, clazz STRING>,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'stu',
 'zookeeper.quorum' = 'master:2181'
);
​
# 查询  有界流
select id,cf1.name,cf1.age,cf1.gender,cf1.clazz from hbase_stu;
写HBase
# 在HBase中创建clazz_cnt表
create 'clazz_cnt','cf1'
​
# 构建HBase Sink表
DROP table clazz_cnt_sink;
CREATE TABLE clazz_cnt_sink (
 clazz STRING,
 cf1 ROW<clz STRING, cnt BIGINT>,
 PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'clazz_cnt',
 'zookeeper.quorum' = 'master:2181'
);
​
# HBase 支持动态更新
insert into clazz_cnt_sink
select  t1.clazz
        ,ROW(clz, cnt)
from (
    select clazz
            ,clazz as clz
           ,count(distinct id) as cnt
    from kafka_students
    where id is not null
    group by clazz
) t1;

3、CataLogs

GenericInMemoryCatalog

GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

一旦SQL Client重启,元数据就会丢失

JdbcCatalog
  • 准备工作

    添加依赖

    flink-connector-jdbc-1.15.0.jar

    mysql-connector-java-5.1.49.jar

  • 从MySQL中选择一个数据库

  • 创建基于MySQL的Catalog

    CREATE CATALOG my_catalog WITH(
        'type' = 'jdbc',
        'default-database' = 'student',
        'username' = 'root',
        'password' = '123456',
        'base-url' = 'jdbc:mysql://master:3306?useSSL=false'
    );

    只适合查询MySQL库中的表,不能直接使用sql-client创建表(相当于无法保存Flink创建的元数据)

HiveCatalog
  • 准备工作

    添加依赖flink-connector-hive_2.12-1.15.0.jar到FLINK的lib目录下

  • 重启yarn-session以及sql-client

  • 启动Hive MetaStore

    hive --service metastore
  • 创建catalog

    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
    );
  • 切换catalog

    use catalog myhive;
  • 删除catalog

    drop catalog myhive;
  • 查看所有的catalog

    show catalogs;
  • 测试

    # 通过flink的sql-client创建以Kakfa作为source 的表
    DROP TABLE kafka_word;
    CREATE TABLE kafka_word (
      `word` String
    ) WITH (
      'connector' = 'kafka', -- 使用Kafka连接器
      'topic' = 'words', -- 指定topic
      'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
      'properties.group.id' = 'testGroup01', -- 指定消费者组id
      'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
      'format' = 'csv',
      'csv.field-delimiter' = ',',
      'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
    );
    ​
    # 可以在hive的客户端看到上面创建的表
    # 但是不能在hive中查询
    # flink只是将它的catalog(元数据)借助hive保存起来
  • 修改默认的catalog

    # 1.13版本之前 可以通过修改conf/sql-client-defaults.yaml文件,在其中加入
    catalogs: 
      - name: myhive
        type: hive
        hive-conf-dir: /usr/local/soft/hive-1.2.1/conf
        default-database: default
    ​
    # 1.13版本之后只能通过启动时指定初始化sql脚本实现
    # 编辑一个init.sql文件
    vim init.sql
    ​
    CREATE CATALOG hive_catalog WITH (
      'type' = 'hive',
      'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
    );
    ​
    use catalog hive_catalog;
    ​
    # 启动sql-client加上该文件
    sql-client.sh -i init.sql

4、Hive方言

为了能够方便在Flink的客户端中执行Hive的SQL,不需要两个客户端来回切换

准备工作
  • 添加依赖

    # 先删除自带的依赖
    mv flink-table-planner-loader-1.15.0.jar flink-table-planner-loader-1.15.0.jar.bak
    # 将$FLINK_HOME/opt目录下的flink-table-planner_2.12-1.15.0.jar放入lib目录
    cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/
    ​
    # 手动添加Hive的依赖
    cd /usr/local/soft/hive-1.2.1/lib
    cp hive-exec-1.2.1.jar /usr/local/soft/flink-1.15.0/lib/
    cp hive-metastore-1.2.1.jar /usr/local/soft/flink-1.15.0/lib/
    cp antlr-runtime-3.4.jar /usr/local/soft/flink-1.15.0/lib/
    cp libfb303-0.9.2.jar /usr/local/soft/flink-1.15.0/lib/
  • 重启yarn-session及sql-client

切换方言
set table.sql-dialect=hive;
​
set table.sql-dialect=default;

5、Hive模块

如果需要使用Hive中的函数在Flink中处理数据就需要先加载Hive模块

LOAD MODULE hive WITH ('hive-version' = '1.2.1');

6、FLink SQL语法

explain

查看SQL的执行过程

set/reset

设置一些参数

-- Properties that change the fundamental execution behavior of a table program.
-- 可以指定处理模式:'batch' or 'streaming'
SET 'execution.runtime-mode' = 'streaming'; 
-- 指定最终结果的显示模式:'table', 'changelog' , 'tableau'
SET 'sql-client.execution.result-mode' = 'table'; 
-- 设置最大的数据的缓存条数
SET 'sql-client.execution.max-table-result.rows' = '10000'; 
-- 设置默认的并行度
SET 'parallelism.default' = '1'; 
-- 默认的水位线
SET 'pipeline.auto-watermark-interval' = '200'; 
-- 设置最大的并行度
SET 'pipeline.max-parallelism' = '10'; 
-- 设置查询结果的Table界面展示的时间,当达到1000s后会自动退出
SET 'table.exec.state.ttl' = '1000'; 
-- 设置重启策略
SET 'restart-strategy' = 'fixed-delay';
​
-- Configuration options for adjusting and tuning table programs.
​
SET 'table.optimizer.join-reorder-enabled' = 'true';
SET 'table.exec.spill-compression.enabled' = 'true';
SET 'table.exec.spill-compression.block-size' = '128kb';
load/unload

用于加载模块

LOAD MODULE hive WITH ('hive-version' = '1.2.1');
with

可以将从多次使用的SQL使用with进行定义,可以让SQL的到复用

with student_lib as (
    select  id
            ,name
            ,age
            ,gender
            ,clazz
    from kafka_students
    where clazz like '文科%'
)
select  clazz
        ,count(id) as cnt
from student_lib
group by clazz;
hints

可以在查询时使用SQL注释的方式更改动态表的某些参数

select  id
        ,name
        ,age
        ,gender
        ,clazz
from kafka_students /*+ OPTIONS('scan.startup.mode'='latest-offset') */  t1
where clazz like '文科%';
SELECT 与 WHERE 子句

用于过滤

select  id
        ,name
        ,age
        ,gender
        ,clazz
from kafka_students /*+ OPTIONS('scan.startup.mode'='latest-offset') */  t1
where clazz like '文科%' and gender = '男';
SELECT DISTINCT

用于在流上进行去重,需要一直维护状态,性能比较低

Windowing TVFs

TUMBLE 滚动窗口

HOP 滑动窗口

CUMULATE 累计窗口

这里提到的窗口函数并不是像Hive中的row_number或者是rank

TUMBLE 滚动窗口
# 创建Kafka source表
DROP table kafka_bid;
CREATE TABLE kafka_bid (
  bidtime  TIMESTAMP(3),
  price  DECIMAL(10, 2),
  item  STRING,
  -- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'bid', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid --create
​
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
​
# 准备数据
2020-04-15 08:05:00,4.00,C
2020-04-15 08:07:00,2.00,A
2020-04-15 08:09:00,5.00,D
2020-04-15 08:08:00,5.00,D
2020-04-15 08:10:01,5.00,D
2020-04-15 08:11:00,3.00,B
2020-04-15 08:13:00,1.00,E
2020-04-15 08:17:00,6.00,F
2020-04-15 08:18:00,6.00,F
2020-04-15 08:19:00,6.00,F
2020-04-15 08:20:00,6.00,F
​
# 进行查询
# TUMBLE会自动给每条数据加上window_start、window_end、window_time
SELECT  *
FROM TABLE(
    TUMBLE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);
​
# 统计每个窗口的交易金额之和
SELECT  window_start
        ,window_end
        ,SUM(price)
FROM TABLE(
    TUMBLE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
HOP 滑动窗口
# 同一条数据可能属于多个窗口,因为滑动窗口之间有数据的重叠
SELECT  *
FROM TABLE(
    -- 滑动窗口,需要指定两个时间,分别是:滑动时间、窗口大小
    HOP(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);
​
​
SELECT  window_start
        ,window_end
        ,SUM(price) as sum_price
        ,avg(price) as avg_price
        ,count(item) as cnt
FROM TABLE(
    -- 滑动窗口,需要指定两个时间,分别是:滑动时间、窗口大小
    HOP(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
CUMULATE 累计窗口

需要指定两个参数:step增长的步长、size最大的窗口大小

例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将得到窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) 每天。

窗口会不断按照步长step增大直到达到最大的窗口范围结束,之后重复该过程

# 窗口不断变大直到达到最大又从最小开始
SELECT  *
FROM TABLE(
    CUMULATE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
);
​
SELECT  window_start
        ,window_end
        ,SUM(price) as sum_price
        ,avg(price) as avg_price
        ,count(item) as cnt
FROM TABLE(
    -- 累计窗口,需要指定两个时间,分别是:增长步长、窗口最大范围
    CUMULATE(TABLE kafka_bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;

7、Over聚合

类似Hive中的窗口函数,可以通过over指定一个窗口,并使用函数作用在该窗口上

只能使用时间字段进行排序

统计是以每条数据中的时间为准

# 基于kafka_bid表,统计最近10分钟内每种商品的累计交易和平均交易金额
# 准备数据
2020-04-15 08:17:00,6.00,F
2020-04-15 08:18:00,6.00,F
2020-04-15 08:19:00,6.00,F
2020-04-15 08:20:00,6.00,F
2020-04-15 08:25:00,6.00,F
2020-04-15 08:30:00,6.00,F
2020-04-15 08:31:00,6.00,F
2020-04-15 08:33:00,6.00,F
2020-04-15 08:35:00,6.00,F
2020-04-15 08:36:00,6.00,F
​
SELECT item, price, bidtime,
  SUM(price) OVER w AS sum_price,
  AVG(price) OVER w AS avg_price
FROM kafka_bid
WINDOW w AS (
  PARTITION BY item
  ORDER BY bidtime
  RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW
);

8、TopN

组内排名,例如:班级总分前三名

# 数据
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117985031601010,"camera_id":"01214","orientation":"西南","road_id":34052057,"time":1614711914,"speed":45.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117984031601010,"camera_id":"01024","orientation":"西北","road_id":34052058,"time":1614711924,"speed":45.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117970031606010,"camera_id":"01022","orientation":"西北","road_id":34052059,"time":1614712022,"speed":75.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340520","card":117956031625010,"camera_id":"01132","orientation":"西北","road_id":34052060,"time":1614712120,"speed":46.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340521","card":117925031638010,"camera_id":"00202","orientation":"西北","road_id":34052061,"time":1614712218,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340521","card":117902031651010,"camera_id":"01102","orientation":"西北","road_id":34052062,"time":1614712316,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117885031666010,"camera_id":"01221","orientation":"西北","road_id":34308114,"time":1614712414,"speed":48.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117855031704010,"camera_id":"00231","orientation":"西北","road_id":34308115,"time":1614712619,"speed":59.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117817031742010,"camera_id":"01130","orientation":"西北","road_id":34308116,"time":1614712824,"speed":52.5}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340181","card":117784031777010,"camera_id":"00123","orientation":"西北","road_id":34308117,"time":1614713030,"speed":71.5}
​
# 构建Kafka source表
-- 卡口过车数据
CREATE TABLE kafka_cars (
    car STRING, -- 车牌号
    city_code STRING,--城市编码
    county_code STRING,-- 区县编码
    card BIGINT,--卡口编号
    camera_id STRING,--摄像头编号
    orientation STRING,--方向
    road_id BIGINT,-- 道路编号
    `time` BIGINT,-- 事件时间
    speed DOUBLE--速度
) WITH (
    'connector' = 'kafka',-- 链接kafka
    'topic' = 'cars',--topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker地址
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'earliest-offset',-- 读取数据位置
    'format' = 'json' -- 数据的格式,csv时文本格式,需要安装字段顺序进行映射
);
​
# 创建cars的Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic cars --create
​
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic cars
​
# 实时统计每个城市车流量最多的前3个区县
# 第一步:统计每个区县车流量
select  city_code
        ,county_code
        ,count(car) as cnt
from kafka_cars
group by city_code,county_code;
# 第二步:给每个城市的区县车流量进行排名
select  t1.city_code
        ,t1.county_code
        ,t1.cnt
        ,row_number() over w as rn
from (
    select  city_code
            ,county_code
            ,count(car) as cnt
    from kafka_cars
    group by city_code,county_code
) t1 WINDOW w AS (
    PARTITION BY city_code
    ORDER BY t1.cnt desc
);
# 第三步:取出每个城市车流量排名前三的区县
select  tt1.city_code
        ,tt1.county_code
        ,tt1.cnt
        ,tt1.rn
from (
    select  t1.city_code
            ,t1.county_code
            ,t1.cnt
            ,row_number() over w as rn
    from (
        select  city_code
                ,county_code
                ,count(car) as cnt
        from kafka_cars
        group by city_code,county_code
    ) t1 WINDOW w AS (
        PARTITION BY city_code
        ORDER BY t1.cnt desc
    )
) tt1 where tt1.rn<=3;

注意:

1、在流上进行row_number时如果没有时间字段,则需要使用where对最终窗口函数统计的结果进行过滤

2、执行时会碰到flink的一个bug

Caused by: java.lang.NoSuchMethodError: org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;

缺失了commons-lang3-3.3.2.jar依赖

解决方法:从本地Maven仓库找到该依赖并上传到$FLINK_HOME/lib目录下

重启yarn-session及sql-client

9、窗口TopN

基于每一个窗口数据,在窗口内进行TopN

# 创建Kafka source表
DROP table if exists kafka_bid_sup;
CREATE TABLE kafka_bid_sup (
  bidtime  TIMESTAMP(3),
  price  DECIMAL(10, 2),
  item  STRING,
  supplier_id STRING,
  -- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'bid_sup', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'earliest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid_sup --create
​
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_sup
​
# 准备数据
2020-04-15 08:05:00,4.00,A,supplier1
2020-04-15 08:06:00,4.00,C,supplier2
2020-04-15 08:07:00,2.00,G,supplier1
2020-04-15 08:08:00,2.00,B,supplier3
2020-04-15 08:09:00,5.00,D,supplier4
2020-04-15 08:11:00,2.00,B,supplier3
2020-04-15 08:13:00,1.00,E,supplier1
2020-04-15 08:15:00,3.00,H,supplier2
2020-04-15 08:17:00,6.00,F,supplier5
2020-04-15 08:21:00,6.00,F,supplier5
2020-04-15 08:18:00,6.00,F,supplier5
2020-04-15 08:19:00,6.00,F,supplier5
2020-04-15 08:25:00,6.00,F,supplier5
​
# 每个10分钟统计一次供应商供应的商品销售之和的前三名
​
# 第一步:每个10分钟统计每个供应商供应的商品销售之和
SELECT  window_start
        ,window_end
        ,supplier_id
        ,sum(price) as sum_price_per_window
from TABLE(
    TUMBLE(TABLE kafka_bid_sup, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
group by window_start,window_end,supplier_id;
​
# 第二步:基于第一步的结果进行组内TopN
SELECT  tt1.window_start
        ,tt1.window_end
        ,tt1.supplier_id
        ,tt1.sum_price_per_window
        ,tt1.rn
from (
  SELECT  t1.window_start
          ,t1.window_end
          ,t1.supplier_id
          ,t1.sum_price_per_window
          ,row_number() over (partition by t1.window_start,t1.window_end order by t1.sum_price_per_window desc) as rn
  from (
      SELECT  window_start
              ,window_end
              ,supplier_id
              ,sum(price) as sum_price_per_window
      from TABLE(
          TUMBLE(TABLE kafka_bid_sup, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
      )
      group by window_start,window_end,supplier_id
    ) t1
) tt1 where tt1.rn <= 3;

10、使用TopN进行去重

性能会比直接使用distinct要好一点

# 创建Kafka source表
DROP table if exists kafka_bid_sup_distinct;
CREATE TABLE kafka_bid_sup_distinct (
  bidtime  TIMESTAMP(3),
  price  DECIMAL(10, 2),
  item  STRING,
  supplier_id STRING,
  -- 声明 bidtime 是事件时间属性,并且用 延迟 5 分钟的策略来生成 watermark
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'bid_sup_distinct', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'latest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic bid_sup_distinct --create
​
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_sup_distinct
​
# 准备数据
2020-04-15 08:05:00,4.00,A,supplier1
2020-04-15 08:06:00,4.00,C,supplier2
​
# 使用TopN进行去重
SELECT  t1.bidtime
        ,t1.price
        ,t1.item
        ,t1.supplier_id
        ,t1.rn
from (
  SELECT  bidtime
          ,price
          ,item
          ,supplier_id
          ,row_number() over (partition by bidtime,price,item,supplier_id order by bidtime) as rn
  from kafka_bid_sup_distinct
) t1 where t1.rn = 1;

11、Join关联

Regular Joins 常规join

Interval Joins 间隔join

Temporal Joins 时态join

Lookup Join 查找join

Regular Joins

inner join 内连接

outer join 外连接:left outer join、right outer join、full outer join

如果两个流表直接join,Flink需要一直维护两个表中所有数据的状态,随着时间累积,需要维护的状态会越来越多,导致CheckPoint的时间越来越大,数据延时越来越高,最终任务可能会失败

# 分别在kafka中创建student、score的topic,以及对应的Source表
DROP table if exists kafka_student_join;
CREATE TABLE kafka_student_join (
  id  STRING,
  name  STRING,
  age  BIGINT,
  gender STRING,
  clazz STRING
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'student_join', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'latest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
DROP table if exists kafka_score_join;
CREATE TABLE kafka_score_join (
  id  STRING,
  sub_id  STRING,
  score  BIGINT
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'score_join', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'latest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
# 创建bid Topic
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic student_join --create
kafka-topics.sh --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic score_join --create
​
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
​
# inner join
SELECT  t1.id
        ,t1.name
        ,t1.clazz
        ,t2.sub_id
        ,t2.score
from kafka_student_join t1
join kafka_score_join t2
on t1.id = t2.id;
​
# left join
SELECT  t1.id
        ,t1.name
        ,t1.clazz
        ,t2.sub_id
        ,t2.score
from kafka_student_join t1
left join kafka_score_join t2
on t1.id = t2.id;
​
# right join
SELECT  t1.id
        ,t1.name
        ,t1.clazz
        ,t2.sub_id
        ,t2.score
from kafka_student_join t1
right join kafka_score_join t2
on t1.id = t2.id;
​
# full join
SELECT  t1.id
        ,t1.name
        ,t1.clazz
        ,t2.sub_id
        ,t2.score
from kafka_student_join t1
full join kafka_score_join t2
on t1.id = t2.id;
Interval Join

两个流表在关联时,可以指定一定的时间间隔进行关联,超过该间隔的状态可以不需要维护

优点:不需要维护大量的状态,程序运行稳定

缺点:有可能因为数据延时导致关联不上

一般情况下两个流表关联会使用该方式,这种关联方式所需要维护的状态不会随着时间无限增大

# 创建student、score的source表,并给每条数据加上一个处理时间用于关联
DROP table if exists kafka_student_join_interval;
CREATE TABLE kafka_student_join_interval (
  id  STRING,
  name  STRING,
  age  BIGINT,
  gender STRING,
  clazz STRING,
  proctime AS PROCTIME() -- 增加一列处理时间
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'student_join', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'latest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
DROP table if exists kafka_score_join_interval;
CREATE TABLE kafka_score_join_interval (
  id  STRING,
  sub_id  STRING,
  score  BIGINT,
  proctime AS PROCTIME() -- 增加一列处理时间
) WITH (
  'connector' = 'kafka', -- 使用Kafka连接器
  'topic' = 'score_join', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定Kafka集群地址
  'properties.group.id' = 'testGroup01', -- 指定消费者组id
  'scan.startup.mode' = 'latest-offset', -- 指定启动策略
  'format' = 'csv',
  'csv.field-delimiter' = ',',
  'csv.ignore-parse-errors' = 'true' -- 当数据不符合表结构时 直接忽略
);
​
# Topic同Regular Join中使用的一致,不需要额外创建
# 创建控制台的生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
​
# 执行关联
SELECT  t1.id
        ,t1.proctime
        ,t2.proctime
        ,t1.name
        ,t1.clazz
        ,t2.sub_id
        ,t2.score
from kafka_student_join_interval t1
left join kafka_score_join_interval t2
on t1.id = t2.id and (
  t1.proctime BETWEEN t2.proctime - INTERVAL '10' SECOND AND t2.proctime
  or t2.proctime BETWEEN t1.proctime - INTERVAL '10' SECOND AND t1.proctime
);
Temporal Joins

使用场景:

假设有两张表:orders 订单表、currency_rates汇率表

用户会无时无刻下订单,汇率表也会随着时间不断波动(更新频率稍慢)

在汇率表中可以反应历史的汇率的变化,在这里可以将其视为拉链表,也可以叫做时态表

需要通过两表关联实现对货币按照汇率进行转换

如果两张表直接做Regular Join 最终需要维护的状态会越来越大

如果两张表做Interval Join,汇率表更新不及时会导致关联不上

为了解决上述问题两张表可以做时态Join

相当于从订单表获取一条数据的时间,通过该时间再去汇率表中找一条离该时间最近(只能小于)的一条记录,再进行转换

通过时态join,不需要维护大量的状态,只需要维护汇率表中最新的记录即可

-- 订单表
drop table if exists orders;
CREATE TABLE orders (
    order_id    STRING, -- 订单编号
    price       DECIMAL(32,2), --订单的价格,人民币价格
    currency    STRING, -- 汇率表主键
    order_time  TIMESTAMP(3), -- 订单发生的事件
    WATERMARK FOR order_time AS order_time -- 设置事件时间和水位线
) WITH (
 'connector' = 'kafka',
 'topic' = 'orders',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
);
​
-- 汇率表
drop table if exists currency_rates;
CREATE TABLE currency_rates (
    currency STRING, -- 汇率表主键
    conversion_rate DECIMAL(32, 2), -- 汇率
    update_time TIMESTAMP(3) , --汇率更新时间
    WATERMARK FOR update_time AS update_time,--时间字段和水位线
    PRIMARY KEY(currency) NOT ENFORCED--设置主键
) WITH (
 'connector' = 'upsert-kafka', 
  'topic' = 'currency_rates1',
  'properties.bootstrap.servers' = 'master:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);
​
-- 准备数据
-- 向汇率表中插数据 只能使用insert命令再sql-client中执行 不能通过命令行生产者生产数据
insert into currency_rates(currency,conversion_rate,update_time)
values
('1',0.16, TIMESTAMP'2022-09-03 10:15:10'),
('1',0.17, TIMESTAMP'2022-09-03 10:20:10'),
('1',0.18, TIMESTAMP'2022-09-03 10:25:10'),
('1',0.19, TIMESTAMP'2022-09-03 10:30:10'),
('1',0.19, TIMESTAMP'2022-09-03 10:41:10');
​
-- 向订单表的topic中生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic  orders
001,100,1,2022-09-03 10:14:10
001,100,1,2022-09-03 10:15:10
001,100,1,2022-09-03 10:19:10
001,100,1,2022-09-03 10:21:10
001,100,1,2022-09-03 10:22:10
001,100,1,2022-09-03 10:27:10
001,100,1,2022-09-03 10:28:10
001,100,1,2022-09-03 10:30:10
001,100,1,2022-09-03 10:40:10
001,100,1,2022-09-03 10:60:10
​
-- 时态表join
SELECT order_id,
     price,
     b.currency,
     conversion_rate,
     update_time,
     order_time
FROM orders as a
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF a.order_time as b
ON a.currency = b.currency;
Lookup Join

适用于流表关联维表

优点:可以动态捕获维表中数据的变化

缺点:每进行一条数据的关联就需要读取一次MySQL,关联的效率比较低

-- 分数表,分数表是一个无界流表
drop table if exists score_join_lookup;
CREATE TABLE score_join_lookup (
    id  STRING,
    sub_id  STRING,
    score  BIGINT,
    proctime as PROCTIME()-- 处理时间字段
) WITH (
 'connector' = 'kafka',
 'topic' = 'score_join',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
);
​
-- 学生表: mysql中表 维表 偶尔需要更新
drop table if exists mysql_student;
CREATE TABLE mysql_student (
    id STRING, 
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'student',
    'username' = 'root',
    'password' ='123456'
);
​
-- 如果直接关联,mysql中的维表只会在任务启动的时候被加载一次
-- 如果后续维表发生变化,flink是无法感知的
select 
a.id,a.score,b.id,b.name,b.age
from 
score_join_lookup as a
join 
mysql_student as b
on a.id=b.id;
​
-- 如果既能实现维表的关联 又可以捕获维表的更新
-- 使用lookup join进行关联
select 
a.id,a.score,b.id,b.name,b.age
from 
score_join_lookup as a
join 
mysql_student FOR SYSTEM_TIME AS OF a.proctime as b
on a.id=b.id;

标签:join,--,Flink,9092,kafka,学习,t1,数据,id
From: https://www.cnblogs.com/lkd0910/p/16945740.html

相关文章

  • 服务器学习
    1双路等于双核么?问题:常听说双路至强XX式服务器,最近又出现了双核至强,都是两个CPU,是不是双路等于双核?答案:不是无论服务器的单路、双路、四路乃至八路,其中的“路”都是......
  • Python数据分析(一)--Numpy学习
    Numpy学习1.数据的维度1.1一维数据一维数据由对等关系的有序或无序数据构成,采用线性方式组织。对应列表(有序)、数组和集合(无序)等类型。例子:列表和数组相同点:......
  • 2022-2023-1 20221421 《计算机基础与程序设计》第十四周学习总结
    作业信息班级链接:https://edu.cnblogs.com/campus/besti/2022-2023-1-CFAP作业要求:https://www.cnblogs.com/rocedu/p/9577842.html#WEEK14作业正文:2022-2023-120221312......
  • 命令窗口下excel数据导入到Mysql 和 mysql数据导出到excel
    1.mysql导出到excel1.1.SELECT*INTOOUTFILE‘/test.xls’FROMtable1;2.excel导入到mysql:2.1.将选中的数据块儿(不包含表头)拷贝到一个TXT文本文件中,假如存到“D:\data......
  • 蓝桥杯算法学习整理
    报名了蓝桥杯,但是对于算法的基础却不是很好,收集了一些学习的链接,以下链接都是转载自别人名下的博客文章,如果博主介意的话,请通知我立即删除。供日后复习时使用:前部分是摘......
  • 大前端html基础学习03-定位锚点透明
    一、position定位属性和属性值position定位属性,检索对象的定位方式;语法:position:static/absolute/relative/fixed/sticky/unset/inherit(未设置是inherit和initial的结合......
  • Ubuntu22.04安装CUDA深度学习环境&&cuda principle
    environment:neofetch&&uname-a|lolcatinstallnvidiaGPUdriver:sudoadd-apt-repositoryppa:graphics-drivers/ppa#加入官方ppa源sudoaptupdate#检查软件包......
  • opencv的学习记录(python)
    作为最容易上手之一的语言,python拥有着大量的第三方库,这些第三方库的存在使得很多人可以专注于业务逻辑、数学逻辑而忽略繁琐的代码操作,python的opencv第三方库就是其中之......
  • mysql 快速迁移百万数据 mysqldump
    从目标库导出数据C:\ProgramFiles\MySQL\MySQLServer8.0\bin>mysqldump-uroot-ptestdb_db>c:/testdb.dump导入目标库C:\ProgramFiles\MySQL\MySQLSe......
  • 2022-2023-1 20221404 《计算机基础与程序设计》第十四周学习总结
    2022-2023-120221404《计算机基础与程序设计》第十四周学习总结作业信息班级链接(2022-2023-1-计算机基础与程序设计)作业要求(2022-2023-1计算机基础与程序设......