首页 > 数据库 >FlinkSQL之Windowing TVF

FlinkSQL之Windowing TVF

时间:2022-11-01 19:45:46浏览次数:51  
标签:WaterSensor 窗口 FlinkSQL window import table TVF sensor Windowing

Windowing TVF

在Flink1.13版本之后出现的替代之前的Group window的产物,官网描述其 is more powerful and effective

 //TVF 中的tumble滚动窗口
 //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在
 //特别注意!!!!
 //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段

sql实现TVF的tumble窗口实现

 package net.cyan.FlinkSql.TVF;
 ​
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 ​
 import java.time.Duration;
 ​
 import static org.apache.flink.table.api.Expressions.$;
 ​
 public class Demo1_Window_TableAPI_Tumble {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 ​
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的tumble滚动窗口
         //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在
         //特别注意!!!!
         //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 "sum(vc) sum_vc" +
                 " from table (tumble(table sensor,descriptor(et),interval '5' second ))" +
                 " group by window_start,window_end,id ")
                .execute()
                .print();
 ​
    }
 }

sql实现TVF的滑动窗口

 //TVF 中的hop滚动窗口
 //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在
 //first interval :滑动步长, second interval :窗口长度
 //特别注意!!!!
 // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错
 // 例如:滑动步长为2,窗口长度就不能为5,可以为6
 // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF;
 ​
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 ​
 import java.time.Duration;
 ​
 import static org.apache.flink.table.api.Expressions.$;
 ​
 public class Demo2_Window_TVF_Hop {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 ​
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的hop滚动窗口
         //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在
         //first interval :滑动步长, second interval :窗口长度
         //特别注意!!!!
         // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错
         // 例如:滑动步长为2,窗口长度就不能为5,可以为6
         // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 "sum(vc) sum_vc" +
                 " from table (hop(table sensor,descriptor(et),interval '2' second,interval '6' second ))" +
                 " group by window_start,window_end,id ")
                .execute()
                .print();
 ​
 ​
 ​
    }
 }

sql实现TVF的累计窗口

累计窗口的应用:

需求:每天每隔一个小时统计一次当天的pv(浏览量)

流的方式如何解决:

1、用滚动窗口, 窗口长度设为1h

2、每天的第一个窗口清除状态,后面的不清,进行状态的累加

或者

用滚动窗口,长度设置为2day

自定义触发器,每隔1小时对窗内的元素计算一次,不关闭窗口

 

sql的方式如何解决?

直接使用累计窗口cumulate

 //TVF 中的cumulate累计窗口
 //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在
 //tableName:表名
 //timecol:时间属性字段
 //step:累计步长,跟滑动步长类似
 //size:窗口长度
 //特别注意!!!!
 //1.累计窗口的步长与窗口长度同样是需要整数倍关系
 // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF;
 ​
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 ​
 import java.time.Duration;
 ​
 import static org.apache.flink.table.api.Expressions.$;
 ​
 public class Demo3_Window_TVF_cumulate {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 ​
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的cumulate累计窗口
         //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在
         //tableName:表名
         //timecol:时间属性字段
         //step:累计步长,跟滑动步长类似
         //size:窗口长度
         //特别注意!!!!
         //1.累计窗口的步长与窗口长度同样是需要整数倍关系
         // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 " sum(vc) sum_vc" +
                 " from table (cumulate(table sensor,descriptor(et),interval '2' second,interval '6' second)) " +
                 "group by window_start,window_end,id")
                .execute()
                .print();
    }
 }
 

标签:WaterSensor,窗口,FlinkSQL,window,import,table,TVF,sensor,Windowing
From: https://www.cnblogs.com/CYan521/p/16848901.html

相关文章

  • FlinkSql之TableAPI详解
    一、FlinkSql的概念核心概念Flink的TableAPI和SQL是流批统一的API。这意味着TableAPI&SQL在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。......
  • FlinkSql的窗口使用以及运用案例
    1flinkSQL窗口概述1.1窗口定义:可理解为时间轴,可将无界流切分成有界流1.2窗口分类:TimeWindow:通过时间切割窗口,但是不知道窗口有多少数据滑动窗口滚动窗口......
  • flinksql读写redis
    0、前言最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。......
  • FlinkSQL的DataStream和Table互转的Demo
    1.构建UserLog对象@Data@Builder//创建对象@NoArgsConstructor//无参构造函数@AllArgsConstructor//有参构造函数publicclassUserLog{privateStr......
  • FlinkSQL基础概念
    1.spark和flink的区别Flink中,批处理是流处理的一个特例spark刚好相反,是微小的批次,准实时不能说实时处理。 2.Fink的版本Flink1.12之前的版本,并没有实现流批统一Flin......
  • FlinkSql常用函数
    1、比较函数=<>>>=<<=注意:selectnull=null;返回为nullISNULL、ISNOTNULL--非空判断value1ISDISTINCTFROMvalue2、value......
  • 【百年会员】大数据从入门到入职|Hadoop|Spark|Flink|FlinkSQL|FlinkCDC|Clickhouse|
    ​关心的问题写在最前面:1.两位数学习正版大数据课程是不是骗子?本课程大部分由《实战大数据(Hadoop+Spark+Flink)》作者本人录制,前期为了做口碑,做销量,两位数可以学习全部课......
  • flinksql实时数仓开发
    pom文件<groupId>com.ssi</groupId><artifactId>datalake</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><name>DataLake</nam......