首页 > 数据库 >flink sql

flink sql

时间:2024-05-13 23:22:16浏览次数:26  
标签:WaterSensor vc flink env sql new sensor id

【案例1】Flink01_Table_BaseUse

public class Flink01_Table_BaseUse {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 1),
                new WaterSensor("sensor_2", 2000L, 2),
                new WaterSensor("sensor_1", 3000L, 3),
                new WaterSensor("sensor_1", 4000L, 4),
                new WaterSensor("sensor_2", 5000L, 5),
                new WaterSensor("sensor_1", 6000L, 6)
        );

        //1.创建表的执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.通过表的执行环境,将流转成表(动态表)
        Table table = tEnv.fromDataStream(stream);
        //table.printSchema();   //查看动态表的 元数据

        //3.在动态表上执行连续查询
        // select id,ts,vc from t where id='sensor_1'
        Table result = table.where("id='sensor_1'").select("id,ts,vc");

        //4.把动态表转成流
        DataStream<WaterSensor> resultStream = tEnv.toAppendStream(result, WaterSensor.class);
        resultStream.print();
        
        env.execute();
        
    }
}

运行结果:

WaterSensor(id=sensor_1, ts=1000, vc=1)
WaterSensor(id=sensor_1, ts=3000, vc=3)
WaterSensor(id=sensor_1, ts=4000, vc=4)
WaterSensor(id=sensor_1, ts=6000, vc=6)

 

【案例1.2】Flink01_Table_BaseUse2

public class Flink01_Table_BaseUse {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 1),
                new WaterSensor("sensor_2", 2000L, 2),
                new WaterSensor("sensor_1", 3000L, 3),
                new WaterSensor("sensor_1", 4000L, 4),
                new WaterSensor("sensor_2", 5000L, 5),
                new WaterSensor("sensor_1", 6000L, 6)
        );

        //1.创建表的执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.通过表的执行环境,将流转成表(动态表)
        Table table = tEnv.fromDataStream(stream);
        //table.printSchema();   //查看动态表的 元数据

        //3.在动态表上执行连续查询
        //  select id,ts,vc from t where id='sensor_1'

        Table result = table.where("id='sensor_1'").select("id,vc");

        //4.把动态表转成流
        DataStream<Row> resultStream = tEnv.toAppendStream(result, Row.class);
        resultStream.print();

        env.execute();
    }
}

运行结果:

+I[sensor_1, 1]
+I[sensor_1, 3]
+I[sensor_1, 4]
+I[sensor_1, 6]

 

标签:WaterSensor,vc,flink,env,sql,new,sensor,id
From: https://www.cnblogs.com/apple677/p/16146011.html

相关文章

  • linux里安装sql2022详细步骤
    https://learn.microsoft.com/zh-tw/sql/linux/quickstart-install-connect-ubuntu?view=sql-server-linux-ver16&preserve-view=true&tabs=ubuntu2004https://learn.microsoft.com/zh-tw/sql/linux/quickstart-install-connect-ubuntu?view=sql-server-linux-ver16&a......
  • 运维必备Linux学习day2(mysql,jdk,redis,docker安装)
    一.MySQL安装①Linux环境:1.虚拟机Centos7.6版本安装,2.准备类似版本 mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar包1.新建文件夹/opt/mysql,并cd进去,首先:mkdir/opt/mysql2.运行 wgethttp://dev.mysql.com/get/mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar,下载mysql安装包......
  • MySQL ROW_NUMBER 函数
    MySQLROW_NUMBER()语法MySQL ROW_NUMBER()从8.0版开始引入了功能。这ROW_NUMBER()是一个窗口函数或分析函数,它为从1开始应用的每一行分配一个序号。请注意,如果你使用MySQL版本低于8.0,你可以效仿的一些功能ROW_NUMBER()函数使用各种技术。以下显示了ROW_NUMBER()函数的语法:......
  • MySQL数据高阶处理技巧:掌握先排序后分组的智慧
    在MySQL数据库的数据探索旅程中,排序和分组是不可或缺的工具。然而,当你面对大量数据、重复值等情况时,常规的处理方法可能显得不够灵活。本文将为你揭示一个精妙的技巧:如何在MySQL中先排序,后分组,从而获取每个类型的最新数据,助你轻松驾驭复杂的数据处理任务。 问题背景:先排序,后分......
  • Docker 部署 Mysql8.1
    #不挂载,直接创建容器[root@VM-24-9-centos~]#dockerrun-d-p3306:3306--namemysql-eMYSQL_ROOT_PASSWORD='123456'mysql获取镜像#拉取镜像[root@VM-24-9-centos~]#dockerpullmysql:8.1创建挂载目录和配置文件#创建挂载目录[root@VM-0-17-centos~]#mkd......
  • mysql视图
    1.介绍  视图(View)是一种虚拟存在的表。视图中的数据并不在数据库中实际存在,行和列数据来自定义视图的查询中使用的表,并且是在使用视图时动态生成的。  通俗的讲,视图只保存了查询的SQL逻辑,不保存查询结果。所以我们在创建视图的时候,主要的工作就落在创建这条SQL查询语句上。......
  • sqlserver 亿级数据删除方案
    sqlserver删除百万级别及以上数据的时候需要考虑是否需要保留日志文件,如果需要保留日志文件,用于恢复。那么就要使用DELETE语句进行删除,DELETE删除语句尽量使用主键或者索引的字段,同时进行批量删除语句如下:1DECLARE@BatchSizeINT2SET@BatchSize=10000--设置每批删......
  • postgresql(14-15)升级(源库需要停机)
    环境:OS:Centos7旧版本:pg14新版本:pg15已经安装的插件mysql_fdw_14.x86_64postgis33_14.x86_64 1.查看当前的版本[root@dsc1~]#psql-hlocalhost-Upostgres-p5432psql(14.11)Type"help"forhelp.postgres=#selectversion();......
  • inno Setup 打包Java exe可执行文件和MySQL数据库,无需额外配置实现一键傻瓜式安装
    前言出现有需要打包Java应用和Mysql数据库成一个安装包给出去的需求,这里我把整个打包的流程整理一下。环境JDK17;MySQL5.7;流程Jpackage打包EXEJpackage是JDK14后加入的一个用于独立打包的工具,能够将应用打包成exe,有了Jpackage就不需要用exe4j这种打包工具,省去打包的繁......
  • openGauss 慢SQL诊断
    慢SQL诊断背景信息在SQL语句执行性能不符合预期时,可以查看SQL语句执行信息,便于事后分析SQL语句执行时的行为,从而诊断SQL语句执行出现的相关问题。前提条件数据库实例运行正常。查询SQL语句信息,需要正确设置GUC参数track_stmt_stat_level。只能用系统管理员和监控管理员权限......