首页 > 其他分享 >Flink Table Api(五)

Flink Table Api(五)

时间:2022-10-14 16:58:48浏览次数:42  
标签:name Flink streamtableenv catalog Api new Table data left

基础代码

环境信息
    //table api     flink三层API(processfunction api/datastream api/sql table api)
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.table.api._

    val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment//set参数 流处理
    val streamsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamtableenv = StreamTableEnvironment.create(sourceTestEnv,streamsettings)

    val batchsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()//set参数 批处理
    val tableenv = TableEnvironment.create(batchsettings)
基本样例
    import org.apache.flink.api.scala.createTypeInformation
    val data = sourceTestEnv.fromElements[(Int,Long)]((1,1663828406000l),
       (2,1663828416000l),(3,1663828426000l),(4,1663828436000l))
    val datatable = streamtableenv.fromDataStream(data,$"name",$"data")
      .select($"name",$"data").filter($"name" > 0)
      .addColumns(concat("my name is",$"name" ))
      .addOrReplaceColumns(concat("my name is",$"name" ))
      .renameColumns($"name" as "b2")
      .dropColumns($"b2")
      .groupBy($"name").select($"name", $"data".sum().as("sum_data"))
常用方法
    streamtableenv.fromDataStream(data,$"name",$"data",$"rowtime")
      .window(Tumble over 5.minutes on $"rowtime" as "w")
      .groupBy($"a", $"w")
      .select($"a", $"w".start, $"w".end, $"w".rowtime, $"data".sum as "data_sum")

    streamtableenv.fromDataStream(data,$"name",$"data",$"rowtime")
      .window(Over partitionBy $"name" orderBy $"rowtime" preceding UNBOUNDED_RANGE following CURRENT_RANGE as "w")
      .select($"name", $"data".avg over $"w", $"data".max().over($"w"), $"data".min().over($"w"))

    streamtableenv.fromDataStream(data,$"name",$"data").groupBy($"name")
      .select($"name", $"data".sum.distinct as "distinct_data")

    streamtableenv.fromDataStream(data,$"name",$"data",$"ts")
      .window(Tumble over 5.minutes on $"ts" as "w").groupBy($"name", $"w")
      .select($"name", $"data".sum.distinct as "distinct_data")

    streamtableenv.fromDataStream(data,$"name",$"data",$"ts")
      .window(Over partitionBy $"name" orderBy $"ts" preceding UNBOUNDED_RANGE as $"w")
      .select($"name", $"data".avg.distinct over $"w", $"data".max over $"w", $"data".min over $"w")

    streamtableenv.fromDataStream(data,$"name",$"data",$"ts").distinct()
多条语句执行
    streamtableenv.createStatementSet()//执行多条insert
      .addInsertSql("insert into data select * from data")
      .addInsertSql("insert into data select * from data")
      .execute()
join
    val left = streamtableenv.fromDataStream(data,$"left_name",$"left_data",$"left_ts")
    val right = streamtableenv.fromDataStream(data,$"right_name",$"right_data",$"right_ts")
    val innerjoinResult = left.join(right).where($"left_name" === $"right_name").select($"left_name", $"right_data")
    val leftOuterResult = left.leftOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
    val rightOuterResult = left.rightOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
    val fullOuterResult = left.fullOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
    //区间 join
    val intervalResult = left.join(right)
      .where($"left_name" === $"right_name" && $"left_ts" >= $"right_ts" - 5.minutes && $"left_ts" < $"right_ts" + 10.minutes)
      .select($"left_name", $"right_data")

    left.union(right)//删除重复记录
    left.unionAll(right)//不删除重复记录
    left.intersect(right)//返回两个表都存在的记录  出现多次的记录只返回一次
    left.intersect(right)//返回两个表都存在的记录  出现多次的记录只返回多次
    left.minus(right)//返回左表不存在于右表的记录   删除重复记录
    left.minusAll(right)//返回左表不存在于右表的记录   重复记录(左 - 右)
    left.select($"left_name",$"left_data",$"left_ts")
        .where($"left_name".in(right)) //sql in 不重复了
    left.orderBy($"left_name".asc) //排序
    left.orderBy($"left_name".asc).fetch(5) //排序返回前5行
    left.orderBy($"left_name".asc).offset(3) //跳过前三行
    left.orderBy($"left_name".asc).offset(10).fetch(5) 
    //排序 跳过前10行 返回前5行
    left.executeInsert("OutOrders")//插入数据
自定义函数
    //  用主键和时间属性注册表函数
    val rates = streamtableenv.from("currency_rates")
        .createTemporalTableFunction($"update_time", $"currency")
    streamtableenv.createTemporarySystemFunction("rates", rates)
    left.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));
扩展
    //fromDataStream 扩展
    streamtableenv.fromDataStream(sqldata,Schema.newBuilder()//新增列  并且设置watermark
          .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
          .watermark("rowtime", "SOURCE_WATERMARK()").build())
    // createTemporaryView 扩展
    streamtableenv.createTemporaryView("MyView",
       streamtableenv.fromDataStream(sqldata).as("id", "name"))

类型转换

类型转换
//TableEnvironment可以注册目录catalog,并可以基于catalog注册表
//表(Table)是由一个标识符来指定,catalog名、数据库名、对象名  可以事常规表和视图(虚拟表)
    streamtableenv.createTemporaryView("data",datatable)
    val sqldata = streamtableenv.sqlQuery("select * from data ")//执行select
    streamtableenv.executeSql("insert into data select * from data ")//执行insert
    val resultStream = streamtableenv.toDataStream(sqldata)  //将Table类型转换为  DataStream类型
    val resultChangeStream = streamtableenv.toChangelogStream(sqldata)//doesn't support update
fromChangelogStream
    import org.apache.flink.types.{Row, RowKind}
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.table.connector.ChangelogMode
    import org.apache.flink.table.data.StringData
    val changelogdata = sourceTestEnv.fromElements(Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
      Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)))(Types.ROW(Types.STRING, Types.INT))
    val changelogtable1 = streamtableenv.fromChangelogStream(changelogdata)
    val changelogtable2 = streamtableenv.fromChangelogStream(changelogdata,
        Schema.newBuilder().primaryKey("f0").build(),
        ChangelogMode.upsert())
    //处理事件  字段必须放最后
    streamtableenv.fromDataStream(data,$"name",$"datats",$"pt".proctime() as "proctime")
    //事件时间   字段必须放最后
    streamtableenv.fromDataStream(data,$"name",$"datats",$"pt".rowtime() as "rowtime")
toChangelogStream
    streamtableenv.toChangelogStream(changelogtable1)
    streamtableenv.toChangelogStream(changelogtable1,Schema.newBuilder()
        .column("name",DataTypes.STRING().bridgedTo(classOf[StringData]))
        .column("score",DataTypes.INT()).build())

Window

group window
    //group window 分组窗口 时间或行计数间隔,将行聚合到有限的组 执行一次聚合函数
    datatable.window(Tumble over 10.minutes on $"rowtime" as "rn")  //滚动窗口时间
    datatable.window(Tumble over 10.minutes on $"proctime" as "rn") //滚动窗口时间
    datatable.window(Tumble over 10.rows on $"rowtime" as "rn")     //滚动窗口行数
    datatable.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"rn")   //滑动窗口时间
    datatable.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"rn")   //滑动窗口时间
    datatable.window(Slide over 10.rows every 5.rows on $"proctime" as $"rn")   //滑动窗口行数
    datatable.window(Session withGap 10.minutes on $"rowtime" as $"rn")//会话窗口时间
    datatable.window(Session withGap 10.minutes on $"proctime" as $"rn")//会话窗口行数
over window
    //over window 会针对每个输入行,计算相邻行范围内的聚合
    datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"rn")//无界的事件时间
    datatable.window(Over partitionBy "name" orderBy $"proctime" preceding UNBOUNDED_RANGE as $"rn")//无界的处理时间
    datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding UNBOUNDED_ROW as $"rn")//无界的事件时间 行数
    datatable.window(Over partitionBy "name" orderBy $"proctime" preceding UNBOUNDED_ROW as $"rn")//无界的处理时间 行数
    datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding 1.minutes as $"rn")//有界的事件时间
    datatable.window(Over partitionBy "name" orderBy $"proctime" preceding 1.minutes as $"rn")//有界的事件时间
    datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding 10.rows as $"rn")//有界的事件时间 行数
    datatable.window(Over partitionBy "name" orderBy $"proctime" preceding 10.rows as $"rn")//有界的事件时间 行数

UDF

UDF
    import org.apache.flink.table.functions.ScalarFunction
    import org.apache.flink.api.scala.createTypeInformation
    class HashCode extends ScalarFunction{
      def eval(s:String): Unit ={
        s.hashCode-10
      }
    }
    val hashcode = new HashCode()
    datatable.select($"id",hashcode($"name")).toAppendStream.print()//table api 方式
    streamtableenv.createTemporaryView("data",datatable)
    streamtableenv.createTemporarySystemFunction("hashcode",hashcode)
    import org.apache.flink.types.Row
    streamtableenv.sqlQuery("select hashcode(id) from data ").toAppendStream[Row].print()//sql 方式
表函数(flatmap)
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.table.functions.TableFunction
    class Float extends TableFunction[(String,Int)]{
      def eval(value:String): Unit ={
        value.split(",").map(x=>collect((x,1)))
      }
    }
    val float = new Float()
    datatable.joinLateral(float($"name").as("name","name_length"))
    streamtableenv.createTemporaryView("data",datatable)
    streamtableenv.createTemporarySystemFunction("float",float)
    import org.apache.flink.types.Row
    streamtableenv.sqlQuery("select name,length from data lateral table(split float(name_list)) as float_table(name,length)").toAppendStream[Row].print()//sql 方式
自定义聚合函数
    import org.apache.flink.table.functions.AggregateFunction
    import org.apache.flink.api.scala.createTypeInformation
    class  AvgTempAcc{ var sum:Double=0;var count:Int=0 }
    class AvgTemp extends AggregateFunction[Double,AvgTempAcc]{//输入参数 中间保存结果
      override def getValue(acc: AvgTempAcc): Double = {acc.sum/acc.count}
      override def createAccumulator(): AvgTempAcc = {new AvgTempAcc}
      def accumulate(acc: AvgTempAcc,temp:Double): Unit ={//还需要实现一个聚合函数
        acc.count+=1
        acc.sum+=temp
      }
    }
    val avgTemp = new AvgTemp()
    datatable.groupBy($"name").aggregate(avgTemp("score") as "avgscore").select($("avgscore")).toRetractStream[Row].print()
    streamtableenv.createTemporaryView("data",datatable)
    streamtableenv.createTemporarySystemFunction("avgTemp",avgTemp)
    import org.apache.flink.types.Row
    streamtableenv.sqlQuery("select id,avgTemp(score) from data group by id").toRetractStream[Row].print()//sql 方式
UTAF
    class Top2Data{var high:Double = 0 ;var secondhigh:Double = 0 ;}
    import org.apache.flink.table.functions.TableAggregateFunction
    import org.apache.flink.types.Row
    import org.apache.flink.util.Collector
    import org.apache.flink.api.scala.createTypeInformation
    class Top2 extends TableAggregateFunction[(Double,Int),Top2Data]{
      override def createAccumulator(): Top2Data = new Top2Data
      def accumulate(acc: Top2Data,temp:Double): Unit ={//还需要实现一个聚合函数
        if(acc.high<temp){acc.secondhigh = acc.high;acc.high = temp;}
        else if(acc.secondhigh<temp){acc.secondhigh = temp;}
      }
      def emitVaue(acc: Top2Data,out:Collector[(Double,Int)]): Unit ={
        out.collect((acc.high,1))
        out.collect((acc.secondhigh,2))
      }
    }
    val top2 = new Top2()
    datatable.groupBy($"name").flatAggregate(top2("score") as ("top2_score","rank"))
      .select($("name"),$("top2_score"),$("rank")).toRetractStream[Row].print()
    //没有SQL方式
task故障重启策略
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.common.time.Time
    import java.util.concurrent.TimeUnit
    sourceTestEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3,Time.of(10, TimeUnit.SECONDS)// 尝试重启的次数   延时
    ))
    sourceTestEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
      3, // 每个时间间隔的最大故障次数
      Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
      Time.of(10, TimeUnit.SECONDS) // 延时
    ))
    datatable.explain()//执行计划
    datatable.execute()
模块
 tableenv.listModules();tableenv.listFullModules();tableenv.loadModule("hive", new HiveModule());
   tableenv.useModules("hive");tableenv.unloadModule("hive");
Catalog
    val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
    tableenv.registerCatalog("myhive", catalog);
    tableenv.executeSql("CREATE DATABASE mydb WITH (...)");
    tableenv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
    tableenv.listTables(); // should return the tables in current catalog and database

    val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
    catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
    catalog.dropDatabase("mydb", false);
    catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
    catalog.getDatabase("mydb");
    catalog.databaseExists("mydb");
    catalog.listDatabases("mycatalog");
    catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    catalog.getTable("mytable");
    catalog.tableExists("mytable");
    catalog.listTables("mydb");
    catalog.listViews("mydb");

    catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    catalog.alterPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),false);
    catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

    catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    catalog.getFunction("myfunc");
    catalog.functionExists("myfunc");
    catalog.listFunctions("mydb");
tableenv 操作
    tableenv.registerCatalog(new CustomCatalog("myCatalog"));
    tableenv.useCatalog("myCatalog")
    tableenv.useDatabase("myDb")
    tableEnv.listCatalogs
    tableEnv.listDatabases();
    tableEnv.listTables

标签:name,Flink,streamtableenv,catalog,Api,new,Table,data,left
From: https://www.cnblogs.com/wuxiaolong4/p/16792106.html

相关文章

  • Elasticsearch——JavaApi实现索引管理
    版本不同版本的elasticsearch-rest-high-level-client和elasticsearch之间存在兼容风险,请确保和elasticsearch版本一致,否则会出现无法预计的错误。es配置maven依赖<dep......
  • RESTful API接口规范
    前后端接口规范-RESTful版本规范的三个目标:简洁、统一、开放。关于如何设计良好风格的RESTfulAPI,Github有一份满分答案,熟读三遍,其义自现。本规范将在其基础之上使......
  • selenium.common.exceptions.WebDriverException: Message: 'chromedriver' executabl
    在使用selenium模块操作浏览器时,出现下面的错误提示:selenium.common.exceptions.WebDriverException:Message:‘chromedriver’executableneedstobeinPATH.Please......
  • Yapi接口管理平台的配置和安装
     一、Yapi安装环境系统:Windows10安装环境:node环境+mongodb数据库(yapi系统需要部署在node环境中,且使用mongodb进行数据库存储。)安装包版本:Node:v10.5.0MongoDB:v5.0.1......
  • JDK1.8新增日期时间api LocalDate LocalTime LocalDateTime
    JDK1.0中使用java.util.Dte类---》第一次日期时间APIJDK1.1引入Calendar类---》第二批日期时间API缺陷:可变性:像日期和时间这样的类应该是不可变的便宜性:Date中的年份是......
  • APICloud AVM框架 封装虚拟数字键盘组件
    AVM(Application-View-Model)前端组件化开发模式基于标准WebComponents组件化思想,提供包含虚拟DOM和Runtime的编程框架avm.js以及多端统一编译工具,完全兼容WebComponents标......
  • UVA10820 Send a Table
    题目翻译你决定在考试的一道题中打表。该题要求你计算的是一个二元函数\(f(x,y)\),定义域为\(x,y\in[1,+\infty]\\x,y\in\mathbb{Z}^{+}\)。该函数有一个性质,就是......
  • APICloud AVM 框架 封装树形分类选择组件 组件递归调用
    由于项目中,需要用到追加表单项目的功能,而表单项目在PC端是树形列表的形式展现,而且要实现多选功能,依上述需求开发了树形分类选择组件。组件开发中用到的知识要点是:组件递归......
  • DataTable使用方法
    DataTable使用方法: usingSystem.Data;namespaceAdoNetCourseDataTable{classProgram{staticvoidMain(string[]args){......
  • 2022-10-14 API `saveImageToPhotosAlbum` is not yet implemented [uniapp]
    前言:uniapp+vue项目业务之生成海报并保存海报到手机,运行终端:h5。调用Api(uni.saveImageToPhotosAlbum)报错如下:[system]API`saveImageToPhotosAlbum`isnotyetimplemen......