首页 > 数据库 >Flink(六)Table API和Flink SQL

Flink(六)Table API和Flink SQL

时间:2024-09-29 14:49:43浏览次数:8  
标签:flink tableEnv Flink API SQL Table id

Table API和Flink SQL整体介绍

概念

  • Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询,比如select、filter和join
  • Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作
  • 无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果

需要引入的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
  • flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner
  • flink-table-api-java-bridge:bridge桥接器,主要负责table API和DataStream/DataSet API的连接支持,按照语言分java和scala
  • 这里的两个依赖,是IDEA环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了

API调用

基本程序结构

  • 与流式处理的程序结构类似,创建环境,定义source、transform、sink
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");

创建表环境

  • 创建表环境最简单的方式,就是基于流处理执行环境,调用create方法直接创建
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

表环境(TableEnvironment)是flink中集成Table API&SQL的核心概念,它负责:

  • 注册catalog
  • 在内部catalog中注册表
  • 执行SQL查询
  • 注册用户自定义函数
  • 将DataStream或DataSet转换为表
  • 保存对ExecutionEnvironment或StreamExecutionEnvironment的引用

举例

  • 配置老版本的流式查询(Flink-Streaming-Query)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
 .useOldPlanner() // 使用老版本 planner
 .inStreamingMode() // 流处理模式
 .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  • 基于老版本的批处理环境(Flink-Batch-Query)
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment;
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
  • 基于blink版本的流处理环境(Blink-Streaming-Query)
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
  • 基于blink版本的批处理环境(Blink-Batch-Query)
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

在Catalog中注册表

表的概念

  • 表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名),如果没有指定目录或数据库,就使用当前的默认值
  • 表可以是常规的(Table表),或者虚拟的(View视图)
  • 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
  • 视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果

连接到文件系统 csv格式

  • 连接外部系统在Catalog中注册表,直接调用tableEnv.connect()就可以,里面参数要传入一个ConnectorDescriptor,也就是connector描述器
  • 对于文件系统的connector而言,flink内部已经提供了,就叫做FileSystem()
tableEnv
 .connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接
 .withFormat(new Csv()) // 定义从外部系统读取数据之后的格式化方法
 .withSchema( new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .field("temperature", DataTypes.DOUBLE())
 ) // 定义表结构
 .createTemporaryTable("inputTable"); // 创建临时表
  • Csv()描述器在Flink里没有直接提供,需要引入依赖flink-csv
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>

连接到Kafka

  • kafka的连接器flink-kafka-connector中,1.10版本的已经提供了Table API的支持,可以在connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor
tableEnv.connect(
 new Kafka()
 .version("0.11") // 定义 kafka 的版本
 .topic("sensor") // 定义主题
 .property("zookeeper.connect", "localhost:2181")
 .property("bootstrap.servers", "localhost:9092")
)
 .withFormat(new Csv())
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .field("temperature", DataTypes.DOUBLE())
)
 .createTemporaryTable("kafkaInputTable");
  • 也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的

表的查询

  • 两种查询方式:Table API和SQL

Table API的调用

  • Table API是集成在Scala和Java语言内的查询API
  • Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的
  • Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API,这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果,有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段filter(…)表示筛选条件
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = senorTable
.select("id, temperature")
.filter("id ='sensor_1'");
  • 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式

SQL查询

  • Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准
  • 在Flink中,用常规字符串来定义SQL查询语句,SQL查询的结果,是一个新的Table
Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'");

举例,统计每个sensor温度数据出现的个数,count统计

  • Table API实现
Table aggResultTable = sensorTable
.groupBy("id")
.select("id, id.count as count");
  • SQL实现
Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");

将DataStream转换成表

  • 可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table
  • Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了

代码实现

  • 直接用tableEnv.fromDataStream()
  • 认转换后的Table schema和DataStream中的字段定义一一对应,也可以单独指定出来
DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream
 .map( line -> {
 String[] fields = line.split(",");
 return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
 } );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

数据类型与Table schema的对应

Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");
  • 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问,其他类型则被视为原子类型

创建临时图表(Temporary View)

  • 是直接从DataStream转换,可以直接对应字段转换;也可以在转换的时候,指定相应的字段
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");
  • 以基于Table创建视图
tableEnv.createTemporaryView("sensorView", sensorTable);

输出表

  • 通过将数据写入TableSink来实现,TableSink是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列
  • Table.insertInto()方法将一个Table写入注册过的TableSink中

输出到文件

// 注册输出表
tableEnv.connect(
 new FileSystem().path("…\\resources\\out.txt")
) // 定义到文件系统的连接
 .withFormat(new Csv()) // 定义格式化方法,Csv 格式
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("temp", DataTypes.DOUBLE())
) // 定义表结构
 .createTemporaryTable("outputTable"); // 创建临时表
resultSqlTable.insertInto("outputTable");

更新模式(Update Mode)

  • 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(update mode)指定

追加模式(Append):表(动态表)和外部连接器只交换插入(Insert)消息
撤回模式(Retract):表和外部连接器交换的是:添加(Add)和撤回(Retract)消息

  • 插入(Insert)会被编码为添加消息
  • 删除(Delete)则编码为撤回消息
  • 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)
    的添加消息
  • 此模式下,不能定义key,这一点跟upsert模式完全不同

更新插入模式(Upsert):动态表和外部连接器交换Upsert和Delete消息

  • 这个模式需要一个唯一的key,通过这个key可以传递更新消息
  • 为了正确应用消息,外部连接器需要知道这个唯一key的属性
  • 插入(Insert)和更新(Update)都被编码为Upsert消息
  • 删除(Delete)编码为Delete消息

输出到MySQL

  • Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
// 输出到 Mysql
String sinkDDL= "create table jdbcOutputTable (" +
 " id varchar(20) not null, " +
 " cnt bigint not null " +
 ") with (" +
 " 'connector.type' = 'jdbc', " +
 " 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
 " 'connector.table' = 'sensor_count', " +
 " 'connector.driver' = 'com.mysql.jdbc.Driver', " +
 " 'connector.username' = 'root', " +
 " 'connector.password' = '123456' )";
tableEnv.sqlUpdate(sinkDDL); // 执行 DDL 创建表
aggResultSqlTable.insertInto("jdbcOutputTable");

将表转换成DataStream

  • 将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型,最方便的转换类型就是Row
  • 表作为流式查询的结果,是动态更新的,所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式
  • Table API中表到DataStream有两种模式

追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景
撤回模式(Retract Mode):用于任何场景,有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作,得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, 
Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream = 
tableEnv.toRetractStream(aggResultTable, Row.class);
resultStream.print("result");
aggResultStream.print("aggResult");
  • 没有经过groupby之类聚合操作,可以直接用toAppendStream来转换;而如果经过了聚合,有更新操作,一般就必须用toRetractDstream

标签:flink,tableEnv,Flink,API,SQL,Table,id
From: https://www.cnblogs.com/shihongpin/p/18439424

相关文章

  • ElementUI中实现el-table表格列宽自适应,列根据内容自动撑满,内容不换行
    一、概述在表格宽度固定时,实现内容不换行,表格自动显示滚动条当前显示效果: 期望实现效果: 二、实现思路遍历表格数组,每次都构建一个隐藏的span元素,获取该元素的宽度,对比保存最大值代码如下:/***表格列宽自适应*@paramprop属性*@paramrecords数据*@paramm......
  • 回执单识别-银行回单识别API-文字识别OCR API
    银行回单是一种由银行提供的交易凭证,记录了账户资金的交易明细。它通常包括存款、取款、转账、汇款、支付等各种类型的资金往来信息。银行回单可以是纸质的,也可以是电子版的,内容详尽记录了交易的相关信息,具有法律效力,常用于财务核对、税务申报和审计等场合。对于金融、财......
  • 结婚证识别-离婚证识别接口-结婚证识别API应用场景
    在信息化与智能化高速发展的今天,证件的自动识别技术逐渐成为了各行各业数字化转型的关键工具,而结婚证识别接口、离婚证识别接口正在悄然改变着传统的民政工作方式。结婚证识别与离婚证识别接口是基于光学字符识别(OCR)技术的智能解决方案。通过这一接口,用户可以轻松实现对......
  • GaussDB数据库SQL系列-LOCK TABLE
    一、前言GaussDB是一款高性能、高可用的分布式数据库,广泛应用于各类行业和场景。在GaussDB中,锁是实现并发控制的关键机制之一,用于协调多个事务之间的数据访问,确保数据的一致性和完整性。本文将围绕GaussDB数据库的LOCKTABLE做一简单介绍。二、GaussDB数据库的锁GaussDB提供了......
  • .Net 6 WebApi 项目部署到 Linux 系统上的 Docker 容器
    .Net6WebApi项目部署到Linux系统上的Docker容器 1.创建一个基础的WebApi项目  注意:因为发布时候,Dockerfile文件必须和解决方案.cspro文件放在同级,所以建议勾上这个,当时遇到这个问题,导致打包镜像时找不到.cspro文件,搞了好久  点击创建,项目基础框架是这样......
  • Python量化分析2024年最新整理的免费获取股票数据接口集合以及API数据接口说明文档
    ​近一两年来,股票量化分析逐渐受到广泛关注。而作为这一领域的初学者,首先需要面对的挑战就是如何获取全面且准确的股票数据。因为无论是实时交易数据、历史交易记录、财务数据还是基本面信息,这些数据都是我们进行量化分析时不可或缺的宝贵资源。我们的核心任务是从这些数据......
  • 第28篇 如何.net中实现高效可靠数据同步api
    通过以下方式可以高效,并保证数据同步的可靠性1.API设计使用RESTful设计,确保API端点明确,并使用适当的HTTP方法(如POST用于创建,PUT用于更新)。设计清晰的请求和响应模型,以确保客户端能够理解预期格式。2.数据验证在服务器端进行严格的数据验证,确保接收到的数据符合预期格式和......
  • Arrays常用API
    Arrays常用API本文主要总结了JAVA的Arrays工具类的常见使用方法,该工具类在机试刷题、面试过程中经常被问到,阅读完建议自己实践实践。1.常见API:1Arrays.toString()//输出数组的内容(基本数据类型)2Arrays.sort(arr);//排序,默认是升序3Arrays.binarySearch(int[]......
  • 在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
    前言:什么是操作筛选器操作筛选器是ASP.NETCoreWebAPI中的一种过滤器,用于在执行控制器操作(Action)之前或之后执行一些代码,完成特定的功能,比如执行日志记录、身份验证、授权、异常处理等通用的处理逻辑。每次ASP.NETCoreWebAPI中控制器的操作方法被执行的时候,操作筛选器......
  • java之使用CompletableFuture入门1
    Java17- 简介JDK中异步执行任务。源码://AFuturethatmaybeexplicitlycompleted(settingitsvalueandstatus),//andmaybeusedasaCompletionStage,supportingdependentfunctions//andactionsthattriggeruponitscompletion.publicclassCo......