基础代码
环境信息
//table api flink三层API(processfunction api/datastream api/sql table api)
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment//set参数 流处理
val streamsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamtableenv = StreamTableEnvironment.create(sourceTestEnv,streamsettings)
val batchsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()//set参数 批处理
val tableenv = TableEnvironment.create(batchsettings)
基本样例
import org.apache.flink.api.scala.createTypeInformation
val data = sourceTestEnv.fromElements[(Int,Long)]((1,1663828406000l),
(2,1663828416000l),(3,1663828426000l),(4,1663828436000l))
val datatable = streamtableenv.fromDataStream(data,$"name",$"data")
.select($"name",$"data").filter($"name" > 0)
.addColumns(concat("my name is",$"name" ))
.addOrReplaceColumns(concat("my name is",$"name" ))
.renameColumns($"name" as "b2")
.dropColumns($"b2")
.groupBy($"name").select($"name", $"data".sum().as("sum_data"))
常用方法
streamtableenv.fromDataStream(data,$"name",$"data",$"rowtime")
.window(Tumble over 5.minutes on $"rowtime" as "w")
.groupBy($"a", $"w")
.select($"a", $"w".start, $"w".end, $"w".rowtime, $"data".sum as "data_sum")
streamtableenv.fromDataStream(data,$"name",$"data",$"rowtime")
.window(Over partitionBy $"name" orderBy $"rowtime" preceding UNBOUNDED_RANGE following CURRENT_RANGE as "w")
.select($"name", $"data".avg over $"w", $"data".max().over($"w"), $"data".min().over($"w"))
streamtableenv.fromDataStream(data,$"name",$"data").groupBy($"name")
.select($"name", $"data".sum.distinct as "distinct_data")
streamtableenv.fromDataStream(data,$"name",$"data",$"ts")
.window(Tumble over 5.minutes on $"ts" as "w").groupBy($"name", $"w")
.select($"name", $"data".sum.distinct as "distinct_data")
streamtableenv.fromDataStream(data,$"name",$"data",$"ts")
.window(Over partitionBy $"name" orderBy $"ts" preceding UNBOUNDED_RANGE as $"w")
.select($"name", $"data".avg.distinct over $"w", $"data".max over $"w", $"data".min over $"w")
streamtableenv.fromDataStream(data,$"name",$"data",$"ts").distinct()
多条语句执行
streamtableenv.createStatementSet()//执行多条insert
.addInsertSql("insert into data select * from data")
.addInsertSql("insert into data select * from data")
.execute()
join
val left = streamtableenv.fromDataStream(data,$"left_name",$"left_data",$"left_ts")
val right = streamtableenv.fromDataStream(data,$"right_name",$"right_data",$"right_ts")
val innerjoinResult = left.join(right).where($"left_name" === $"right_name").select($"left_name", $"right_data")
val leftOuterResult = left.leftOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
val rightOuterResult = left.rightOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
val fullOuterResult = left.fullOuterJoin(right, $"left_name" === $"right_name").select($"left_name", $"right_data")
//区间 join
val intervalResult = left.join(right)
.where($"left_name" === $"right_name" && $"left_ts" >= $"right_ts" - 5.minutes && $"left_ts" < $"right_ts" + 10.minutes)
.select($"left_name", $"right_data")
left.union(right)//删除重复记录
left.unionAll(right)//不删除重复记录
left.intersect(right)//返回两个表都存在的记录 出现多次的记录只返回一次
left.intersect(right)//返回两个表都存在的记录 出现多次的记录只返回多次
left.minus(right)//返回左表不存在于右表的记录 删除重复记录
left.minusAll(right)//返回左表不存在于右表的记录 重复记录(左 - 右)
left.select($"left_name",$"left_data",$"left_ts")
.where($"left_name".in(right)) //sql in 不重复了
left.orderBy($"left_name".asc) //排序
left.orderBy($"left_name".asc).fetch(5) //排序返回前5行
left.orderBy($"left_name".asc).offset(3) //跳过前三行
left.orderBy($"left_name".asc).offset(10).fetch(5)
//排序 跳过前10行 返回前5行
left.executeInsert("OutOrders")//插入数据
自定义函数
// 用主键和时间属性注册表函数
val rates = streamtableenv.from("currency_rates")
.createTemporalTableFunction($"update_time", $"currency")
streamtableenv.createTemporarySystemFunction("rates", rates)
left.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));
扩展
//fromDataStream 扩展
streamtableenv.fromDataStream(sqldata,Schema.newBuilder()//新增列 并且设置watermark
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()").build())
// createTemporaryView 扩展
streamtableenv.createTemporaryView("MyView",
streamtableenv.fromDataStream(sqldata).as("id", "name"))
类型转换
类型转换
//TableEnvironment可以注册目录catalog,并可以基于catalog注册表
//表(Table)是由一个标识符来指定,catalog名、数据库名、对象名 可以事常规表和视图(虚拟表)
streamtableenv.createTemporaryView("data",datatable)
val sqldata = streamtableenv.sqlQuery("select * from data ")//执行select
streamtableenv.executeSql("insert into data select * from data ")//执行insert
val resultStream = streamtableenv.toDataStream(sqldata) //将Table类型转换为 DataStream类型
val resultChangeStream = streamtableenv.toChangelogStream(sqldata)//doesn't support update
fromChangelogStream
import org.apache.flink.types.{Row, RowKind}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.data.StringData
val changelogdata = sourceTestEnv.fromElements(Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)))(Types.ROW(Types.STRING, Types.INT))
val changelogtable1 = streamtableenv.fromChangelogStream(changelogdata)
val changelogtable2 = streamtableenv.fromChangelogStream(changelogdata,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert())
//处理事件 字段必须放最后
streamtableenv.fromDataStream(data,$"name",$"datats",$"pt".proctime() as "proctime")
//事件时间 字段必须放最后
streamtableenv.fromDataStream(data,$"name",$"datats",$"pt".rowtime() as "rowtime")
toChangelogStream
streamtableenv.toChangelogStream(changelogtable1)
streamtableenv.toChangelogStream(changelogtable1,Schema.newBuilder()
.column("name",DataTypes.STRING().bridgedTo(classOf[StringData]))
.column("score",DataTypes.INT()).build())
Window
group window
//group window 分组窗口 时间或行计数间隔,将行聚合到有限的组 执行一次聚合函数
datatable.window(Tumble over 10.minutes on $"rowtime" as "rn") //滚动窗口时间
datatable.window(Tumble over 10.minutes on $"proctime" as "rn") //滚动窗口时间
datatable.window(Tumble over 10.rows on $"rowtime" as "rn") //滚动窗口行数
datatable.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"rn") //滑动窗口时间
datatable.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"rn") //滑动窗口时间
datatable.window(Slide over 10.rows every 5.rows on $"proctime" as $"rn") //滑动窗口行数
datatable.window(Session withGap 10.minutes on $"rowtime" as $"rn")//会话窗口时间
datatable.window(Session withGap 10.minutes on $"proctime" as $"rn")//会话窗口行数
over window
//over window 会针对每个输入行,计算相邻行范围内的聚合
datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"rn")//无界的事件时间
datatable.window(Over partitionBy "name" orderBy $"proctime" preceding UNBOUNDED_RANGE as $"rn")//无界的处理时间
datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding UNBOUNDED_ROW as $"rn")//无界的事件时间 行数
datatable.window(Over partitionBy "name" orderBy $"proctime" preceding UNBOUNDED_ROW as $"rn")//无界的处理时间 行数
datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding 1.minutes as $"rn")//有界的事件时间
datatable.window(Over partitionBy "name" orderBy $"proctime" preceding 1.minutes as $"rn")//有界的事件时间
datatable.window(Over partitionBy "name" orderBy $"rowtime" preceding 10.rows as $"rn")//有界的事件时间 行数
datatable.window(Over partitionBy "name" orderBy $"proctime" preceding 10.rows as $"rn")//有界的事件时间 行数
UDF
UDF
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.api.scala.createTypeInformation
class HashCode extends ScalarFunction{
def eval(s:String): Unit ={
s.hashCode-10
}
}
val hashcode = new HashCode()
datatable.select($"id",hashcode($"name")).toAppendStream.print()//table api 方式
streamtableenv.createTemporaryView("data",datatable)
streamtableenv.createTemporarySystemFunction("hashcode",hashcode)
import org.apache.flink.types.Row
streamtableenv.sqlQuery("select hashcode(id) from data ").toAppendStream[Row].print()//sql 方式
表函数(flatmap)
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.table.functions.TableFunction
class Float extends TableFunction[(String,Int)]{
def eval(value:String): Unit ={
value.split(",").map(x=>collect((x,1)))
}
}
val float = new Float()
datatable.joinLateral(float($"name").as("name","name_length"))
streamtableenv.createTemporaryView("data",datatable)
streamtableenv.createTemporarySystemFunction("float",float)
import org.apache.flink.types.Row
streamtableenv.sqlQuery("select name,length from data lateral table(split float(name_list)) as float_table(name,length)").toAppendStream[Row].print()//sql 方式
自定义聚合函数
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.api.scala.createTypeInformation
class AvgTempAcc{ var sum:Double=0;var count:Int=0 }
class AvgTemp extends AggregateFunction[Double,AvgTempAcc]{//输入参数 中间保存结果
override def getValue(acc: AvgTempAcc): Double = {acc.sum/acc.count}
override def createAccumulator(): AvgTempAcc = {new AvgTempAcc}
def accumulate(acc: AvgTempAcc,temp:Double): Unit ={//还需要实现一个聚合函数
acc.count+=1
acc.sum+=temp
}
}
val avgTemp = new AvgTemp()
datatable.groupBy($"name").aggregate(avgTemp("score") as "avgscore").select($("avgscore")).toRetractStream[Row].print()
streamtableenv.createTemporaryView("data",datatable)
streamtableenv.createTemporarySystemFunction("avgTemp",avgTemp)
import org.apache.flink.types.Row
streamtableenv.sqlQuery("select id,avgTemp(score) from data group by id").toRetractStream[Row].print()//sql 方式
UTAF
class Top2Data{var high:Double = 0 ;var secondhigh:Double = 0 ;}
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.flink.api.scala.createTypeInformation
class Top2 extends TableAggregateFunction[(Double,Int),Top2Data]{
override def createAccumulator(): Top2Data = new Top2Data
def accumulate(acc: Top2Data,temp:Double): Unit ={//还需要实现一个聚合函数
if(acc.high<temp){acc.secondhigh = acc.high;acc.high = temp;}
else if(acc.secondhigh<temp){acc.secondhigh = temp;}
}
def emitVaue(acc: Top2Data,out:Collector[(Double,Int)]): Unit ={
out.collect((acc.high,1))
out.collect((acc.secondhigh,2))
}
}
val top2 = new Top2()
datatable.groupBy($"name").flatAggregate(top2("score") as ("top2_score","rank"))
.select($("name"),$("top2_score"),$("rank")).toRetractStream[Row].print()
//没有SQL方式
task故障重启策略
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import java.util.concurrent.TimeUnit
sourceTestEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,Time.of(10, TimeUnit.SECONDS)// 尝试重启的次数 延时
))
sourceTestEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
))
datatable.explain()//执行计划
datatable.execute()
模块
tableenv.listModules();tableenv.listFullModules();tableenv.loadModule("hive", new HiveModule());
tableenv.useModules("hive");tableenv.unloadModule("hive");
Catalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
tableenv.registerCatalog("myhive", catalog);
tableenv.executeSql("CREATE DATABASE mydb WITH (...)");
tableenv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
tableenv.listTables(); // should return the tables in current catalog and database
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
catalog.dropDatabase("mydb", false);
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
catalog.getDatabase("mydb");
catalog.databaseExists("mydb");
catalog.listDatabases("mycatalog");
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
catalog.getTable("mytable");
catalog.tableExists("mytable");
catalog.listTables("mydb");
catalog.listViews("mydb");
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
catalog.alterPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),false);
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
catalog.getFunction("myfunc");
catalog.functionExists("myfunc");
catalog.listFunctions("mydb");
tableenv 操作
tableenv.registerCatalog(new CustomCatalog("myCatalog"));
tableenv.useCatalog("myCatalog")
tableenv.useDatabase("myDb")
tableEnv.listCatalogs
tableEnv.listDatabases();
tableEnv.listTables
标签:name,Flink,streamtableenv,catalog,Api,new,Table,data,left
From: https://www.cnblogs.com/wuxiaolong4/p/16792106.html