首页 > 数据库 >Flink核心API之Table API和SQL

Flink核心API之Table API和SQL

时间:2023-06-03 11:33:10浏览次数:59  
标签:flink val org Flink Table API SQL apache table

Table API & SQL

注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

Table API和SQL的由来:
Flink针对标准的流处理和批处理提供了两种关系型API,Table API和SQL。Table API允许用户以一种很直观的方式进行select 、filter和join操作。Flink SQL基于 Apache Calcite实现标准SQL。针对批处理和流处理可以提供相同的处理语义和结果。

Flink Table API、SQL和Flink的DataStream API、DataSet API是紧密联系在一起的。
Table API和SQL是一种关系型 API,用户可以像操作 Mysql 数据库表一样的操作数据,而不需要写代码,更不需要手工的对代码进行调优。另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供 SQL 支持,将很容易被用户接受。
如果你想要使用Table API 和SQL的话,需要添加下面的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

如果你想在 本地 IDE中运行程序,还需要添加下面的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

如果你用到了老的执行引擎,还需要添加下面这个依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.11.1</version>
</dependency>

Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。
针对Table API和SQL我们主要讲解以下内容

  1. Table API和SQL的使用
  2. DataStream、DataSet和Table之间的互相转换

Table API 和SQL的使用

想要使用Table API 和SQL,首先要创建一个TableEnvironment对象。
下面我们来创建一个TableEnvironment对象

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
  * 创建TableEnvironment对象
  */
object CreateTableEnvironmentScala {
  def main(args: Array[String]): Unit = {
    /**
      * 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换
      * 则针对stream和batch都可以使用TableEnvironment
      */
    //指定底层使用Blink引擎,以及数据处理模式-stream
    //从1.11版本开始,Blink引擎成为Table API和SQL的默认执行引擎,在生产环境下面,推荐使用Blink引擎
    val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    //创建TableEnvironment对象
    val sTableEnv = TableEnvironment.create(sSettings)
    //指定底层使用Blink引擎,以及数据处理模式-batch
    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    //创建TableEnvironment对象
    val bTableEnv = TableEnvironment.create(bSettings)
    /**
      * 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
      * 针对stream需要使用StreamTableEnvironment
      * 针对batch需要使用BatchTableEnvironment
      */
    //创建StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
    //创建BatchTableEnvironment
    //注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)
  }

}

编译报错,不支持java8中接口的静态方法

Error:(21, 44) Static methods in interface require -target:jvm-1.8
    val sTableEnv = TableEnvironment.create(sSettings)

解决方法如下,注意:添加的编译参数经过几次程序于运行之后会重置为空,具体原因未知,重新添加即可

image

网上说的其他方法尝试之后都不行,如

image

运行时又遇到了另一个问题

Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:112)
	at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:48)
	at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:262)
	at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93)
	at com.imooc.bigdata.flink.tablesql.CreateTableEnvironmentScala$.main(CreateTableEnvironmentScala.scala:21)
	at com.imooc.bigdata.flink.tablesql.CreateTableEnvironmentScala.main(CreateTableEnvironmentScala.scala)

原因是maven依赖有冲突,很奇怪。解决方法:删除以下依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

下面我们来演示一下Table API和 SQL的使用
目前创建Table的很多方法都过时了,都不推荐使用了,例如:registerTableSource、connect等方法
目 前 官 方 推 荐 使 用 executeSql 的 方 式 , executeSql 里 面 支 持DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE等语法

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
  * TableAPI 和 SQL的使用
  */
object TableAPIAndSQLOpScala {
  def main(args: Array[String]): Unit = {
    //获取TableEnvironment
    val sSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode().build()
    val sTableEnv = TableEnvironment.create(sSettings)
    //创建输入表
    /**
      * connector.type:指定connector的类型
      * connector.path:指定文件或者目录地址
      * format.type:文件数据格式化类型,现在只支持csv格式
      * 注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用添
      */
    val sql =
      """
        |create table myTable(
        |      id int,
        |      name string
        |      ) with (
        |      'connector.type' = 'filesystem',
        |      'connector.path' = 'C:/D-myfiles/testjar/flink/source.txt',
        |      'format.type' = 'csv'
        |      )
      """.stripMargin
    sTableEnv.executeSql(sql)
    //使用Table API实现数据查询和过滤等操作
    /*import org.apache.flink.table.api._
    val result = sTableEnv.from("myTable")
    .select($"id",$"name")
    .filter($"id" > 1)*/
    //使用SQL实现数据查询和过滤等操作
    val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1")
    //输出结果到控制台
    result.execute.print()
    //创建输出表
    val sql2 =
      """
        |create table newTable(
        |      id int,
        |      name string
        |      ) with (
        |      'connector.type' = 'filesystem',
        |      'connector.path' = 'C:/D-myfiles/testjar/flink/res',
        |      'format.type' = 'csv'
        |      )
      """.stripMargin
    sTableEnv.executeSql(sql2)
    //输出结果到表newTable中
    result.executeInsert("newTable")
  }


}

source.txt内容为

1,Jack
2,Tom
3,Mary

sql查询结果会输出到控制台及新表中

DataStream、DataSet和Table之间的互相转换

Table API和SQL可以很容易的和DataStream和DataSet程序集成到一块。通过TableEnvironment ,可以 把 DataStream 或 者 DataSet 注 册 为 Table , 这 样 就 可 以 使 用 Table API 和 SQL 查 询 了 。 通 过TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream或者DataSet中的相关API了。

使用DataStream创建表

主要包含下面这两种情况

  • 使用DataStream创建view视图
  • 使用DataStream创建table对象
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * 将DataStream转换成表
  */
object DataStreamToTableScala {
  def main(args: Array[String]): Unit = {
    //获取StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
    //获取DataStream
    import org.apache.flink.api.scala._
    val stream = ssEnv.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mac")))
    //第一种:将DataStream转换为view视图
    import org.apache.flink.table.api._
    ssTableEnv.createTemporaryView("myTable", stream, 'id, 'name)
    ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()
    //第二种:将DataStream转换为table对象
    val table = ssTableEnv.fromDataStream(stream, $"id", $"name")
    table.select($"id", $"name")
      .filter($"id" > 1)
      .execute()
      .print()
    //注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
  }

}

使用DataSet创建表

注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment

/**
  * 将DataSet转换成表
  */
object DataSetToTableScala {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)
    //获取DataSet
    import org.apache.flink.api.scala._
    val set = bbEnv.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mack")))
    //第一种:将DataSet转换为view视图
    import org.apache.flink.table.api._
    bbTableEnv.createTemporaryView("myTable", set, 'id, 'name)
    bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()
    //第二种:将DataSet转换为table对象
    val table = bbTableEnv.fromDataSet(set, $"id", $"name")
    table.select($"id", $"name")
      .filter($"id" > 1)
      .execute()
      .print()
    //注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
  }

}

将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:

  • Row: 通过角标映射字段,支持任意数量的字段,支持 null 值,无类型安全(type-safe)检查。
  • POJO: Java中的实体类,这个实体类中的字段名称需要和Table中的字段名称保持一致,支持任意数量的字段,支持null值,有类型安全检查。
  • Case Class: 通过角标映射字段,不支持null值,有类型安全检查。
  • Tuple: 通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,有类型安全检查。
  • Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

将表转换成 DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。
有几种模式可以将Table转换为DataStream。

  • Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时(仅附加),之前添加的数据不会被更新。
  • Retract Mode:可以始终使用此模式,它使用一个Boolean标识来编码INSERT和DELETE更改。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

/**
  * 将table转换成 DataStream
  */
object TableToDataStreamScala {
  def main(args: Array[String]): Unit = {
    //获取StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
    //创建输入表
    val sql =
      """
        |create table myTable(
        |      id int,
        |      name string
        |      ) with (
        |      'connector.type' = 'filesystem',
        |      'connector.path' = 'C:/D-myfiles/testjar/flink/source.txt',
        |      'format.type' = 'csv'
        |      )
      """.stripMargin
    ssTableEnv.executeSql(sql)
    //获取table
    val table = ssTableEnv.from("myTable")
    //将table转换为DataStream
    //如果只有新增(追加)操作,可以使用toAppendStream
    import org.apache.flink.api.scala._
    val appStream = ssTableEnv.toAppendStream[Row](table)
    appStream.map(row => (row.getField(0).toString.toInt, row.getField(1).toString))
      .print()
    //如果有增加操作,还有删除操作,则使用toRetractStream
    val retStream = ssTableEnv.toRetractStream[Row](table)
    retStream.map(tup => {
      val flag = tup._1
      val row = tup._2
      val id = row.getField(0).toString.toInt
      val name = row.getField(1).toString
      (flag, id, name)
    }).print()
    //注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment
    ssEnv.execute("TableToDataStreamScala")
  }

}

将表转换成 DataSet

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.types.Row

/**
  * 将table转换成 DataSet
  */
object TableToDataSetScala {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)
    //创建输入表
    val sql =
      """
        |create table myTable(
        |      id int,
        |      name string
        |      ) with (
        |      'connector.type' = 'filesystem',
        |      'connector.path' = 'C:/D-myfiles/testjar/flink/source.txt',
        |      'format.type' = 'csv'
        |      )
      """.stripMargin
    bbTableEnv.executeSql(sql)
    //获取table
    val table = bbTableEnv.from("myTable")
    //将table转换为DataSet
    import org.apache.flink.api.scala._
    val set = bbTableEnv.toDataSet[Row](table)
    set.map(row => (row.getField(0).toString.toInt, row.getField(1).toString))
      .print()
  }

}

参考

Static methods in interface require -target:jvm-1.8
'jvm-1.8' is not a valid choice for '-target'
Caused by: java.lang.IncompatibleClassChangeError: Implementing class
flink sql ddl本地执行报错java.lang.IncompatibleClassChangeError: Implementing class

标签:flink,val,org,Flink,Table,API,SQL,apache,table
From: https://www.cnblogs.com/strongmore/p/17378220.html

相关文章

  • docker安装mysql
    1.从DockerHub下载MySQL镜像:dockerpullmysql2.运行MySQL容器,并将主机的3306端口映射到容器的3306端口:dockerrun-p3306:3306--namemysql-eMYSQL_ROOT_PASSWORD=your_password-dmysql其中,--namemysql指定容器的名称为mysql,-p3306:3306将容器的3306端口映射......
  • Flink核心API之DataStream
    Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。低级API(StatefulStreamProcessing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在一些复杂事件处理逻辑上。核心API(DataStream/DataSetAP......
  • MYSQL:无锁变更工具Pt-online-schema-change
    一、MySQL常用的无锁变更工具OnlineSchemaChange:OnlineSchemaChange(OSC)工具是MySQL官方提供的一种无锁变更工具,它可以在不停止MySQL服务器的情况下对表结构进行修改。OSC利用了InnoDB存储引擎的特性,使用复制和重放日志的方式来实现无锁变更。pt-online-schema-change:pt-......
  • MySQL同一字段取反处理
    在改BUG中遇到了这个问题一张表的字段比如是否可用标志取值取反了本来是0的写成了1 1写成了0可使用下面的语句 UPDATE(表名)SET字段名= CASE字段名WHEN (值) THEN(值)WHEN (值) THEN(值)WHEN (值) THEN(值)ENDWHERE(条件)举个例子:UPDA......
  • 首次进入Mysql修改密码报“The MySQL server is running with the --skip-grant-table
    第一次安装完mysql,修改默认密码的时候,报“TheMySQLserverisrunningwiththe--skip-grant-tablesoptionsoitcannotexecutethisstatement”。先刷新mysql然后再重新修改密码即可。mysql>ALTERUSER'root'@'localhost'IDENTIFIEDBY'123456';ERROR1290(H......
  • 关于MySQL数据库的外键作用及如何创建?
    一、外键的作用:外键的主要作用是保证数据的一致性和完整性,并且减少数据冗余。主要体现在以下两个方面:1、阻止执行从表插入新行,其外键值不是主表的主键值便阻止插入。从表修改外键值,新值不是主表的主键值便阻止修改。主表删除行,其主键值在从表里存在便阻止删除(要想删除,必须先删除......
  • react配置API请求代理
    需求当请求http://10.1.1.1:3131/v1/*接口时,需要代理到8181端口。如果只需要代理匹配到/v1路径的请求,可以在package.json中使用http-proxy-middleware进行自定义代理配置。以下是一个示例:首先,确保已经安装了http-proxy-middleware包。如果没有安装,可以使用以下命令进行安......
  • linux卸载MySQL
    linux卸载MySQL一查找以前是否装有mysqlrpm-qa|grep-imysql显示之前安装了:MySQL-server-5.6.22-1.el6.i686MySQL-client-5.6.22-1.el6.i686二停止mysql服务、删除之前安装的mysql删除命令:rpm-e–nodeps包名rpm-evMySQL-server-5.6.22-1.el6.i686rpm-evMySQL-cli......
  • Flink详解
    什么是FlinkApacheFlink是一个开源的分布式,高性能,高可用,准确的流处理框架。分布式:表示flink程序可以运行在很多台机器上,高性能:表示Flink处理性能比较高高可用:表示flink支持程序的自动重启机制。准确的:表示flink可以保证处理数据的准确性。Flink支持流处理和批处理,虽然我们......
  • 使用du查/mysql/bak目录使用1013g,但使用df查却显示使用2.8t
    问题描述:使用du查/mysql/bak目录使用1013g,但使用df查却显示使用2.8t,如下所示:系统:rhel7.3异常原因:删除文件后,du命令便不会在文件系统目录中统计被删除的文件,若此时存在运行中的进程持有已经被删除的文件句柄,那该类文件就不会真正在磁盘中被删除,分区超级块中的信息也不会更改,df命......