Table API和Flink SQL整体介绍
概念
- Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询,比如select、filter和join
- Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作
- 无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果
需要引入的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
- flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner
- flink-table-api-java-bridge:bridge桥接器,主要负责table API和DataStream/DataSet API的连接支持,按照语言分java和scala
- 这里的两个依赖,是IDEA环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了
API调用
基本程序结构
- 与流式处理的程序结构类似,创建环境,定义source、transform、sink
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");
创建表环境
- 创建表环境最简单的方式,就是基于流处理执行环境,调用create方法直接创建
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
表环境(TableEnvironment)是flink中集成Table API&SQL的核心概念,它负责:
- 注册catalog
- 在内部catalog中注册表
- 执行SQL查询
- 注册用户自定义函数
- 将DataStream或DataSet转换为表
- 保存对ExecutionEnvironment或StreamExecutionEnvironment的引用
举例
- 配置老版本的流式查询(Flink-Streaming-Query)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useOldPlanner() // 使用老版本 planner
.inStreamingMode() // 流处理模式
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
- 基于老版本的批处理环境(Flink-Batch-Query)
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment;
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
- 基于blink版本的流处理环境(Blink-Streaming-Query)
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
- 基于blink版本的批处理环境(Blink-Batch-Query)
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在Catalog中注册表
表的概念
- 表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名),如果没有指定目录或数据库,就使用当前的默认值
- 表可以是常规的(Table表),或者虚拟的(View视图)
- 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
- 视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果
连接到文件系统 csv格式
- 连接外部系统在Catalog中注册表,直接调用tableEnv.connect()就可以,里面参数要传入一个ConnectorDescriptor,也就是connector描述器
- 对于文件系统的connector而言,flink内部已经提供了,就叫做FileSystem()
tableEnv
.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接
.withFormat(new Csv()) // 定义从外部系统读取数据之后的格式化方法
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("inputTable"); // 创建临时表
- Csv()描述器在Flink里没有直接提供,需要引入依赖flink-csv
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
连接到Kafka
- kafka的连接器flink-kafka-connector中,1.10版本的已经提供了Table API的支持,可以在connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor
tableEnv.connect(
new Kafka()
.version("0.11") // 定义 kafka 的版本
.topic("sensor") // 定义主题
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");
- 也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的
表的查询
- 两种查询方式:Table API和SQL
Table API的调用
- Table API是集成在Scala和Java语言内的查询API
- Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的
- Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API,这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果,有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段filter(…)表示筛选条件
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = senorTable
.select("id, temperature")
.filter("id ='sensor_1'");
- 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式
SQL查询
- Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准
- 在Flink中,用常规字符串来定义SQL查询语句,SQL查询的结果,是一个新的Table
Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'");
举例,统计每个sensor温度数据出现的个数,count统计
- Table API实现
Table aggResultTable = sensorTable
.groupBy("id")
.select("id, id.count as count");
- SQL实现
Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");
将DataStream转换成表
- 可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table
- Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了
代码实现
- 直接用tableEnv.fromDataStream()
- 认转换后的Table schema和DataStream中的字段定义一一对应,也可以单独指定出来
DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream
.map( line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");
数据类型与Table schema的对应
Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");
- 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问,其他类型则被视为原子类型
创建临时图表(Temporary View)
- 是直接从DataStream转换,可以直接对应字段转换;也可以在转换的时候,指定相应的字段
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");
- 以基于Table创建视图
tableEnv.createTemporaryView("sensorView", sensorTable);
输出表
- 通过将数据写入TableSink来实现,TableSink是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列
- Table.insertInto()方法将一个Table写入注册过的TableSink中
输出到文件
// 注册输出表
tableEnv.connect(
new FileSystem().path("…\\resources\\out.txt")
) // 定义到文件系统的连接
.withFormat(new Csv()) // 定义格式化方法,Csv 格式
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("outputTable"); // 创建临时表
resultSqlTable.insertInto("outputTable");
更新模式(Update Mode)
- 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(update mode)指定
追加模式(Append):表(动态表)和外部连接器只交换插入(Insert)消息
撤回模式(Retract):表和外部连接器交换的是:添加(Add)和撤回(Retract)消息
- 插入(Insert)会被编码为添加消息
- 删除(Delete)则编码为撤回消息
- 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)
的添加消息 - 此模式下,不能定义key,这一点跟upsert模式完全不同
更新插入模式(Upsert):动态表和外部连接器交换Upsert和Delete消息
- 这个模式需要一个唯一的key,通过这个key可以传递更新消息
- 为了正确应用消息,外部连接器需要知道这个唯一key的属性
- 插入(Insert)和更新(Update)都被编码为Upsert消息
- 删除(Delete)编码为Delete消息
输出到MySQL
- Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
</dependency>
// 输出到 Mysql
String sinkDDL= "create table jdbcOutputTable (" +
" id varchar(20) not null, " +
" cnt bigint not null " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
" 'connector.table' = 'sensor_count', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = '123456' )";
tableEnv.sqlUpdate(sinkDDL); // 执行 DDL 创建表
aggResultSqlTable.insertInto("jdbcOutputTable");
将表转换成DataStream
- 将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型,最方便的转换类型就是Row
- 表作为流式查询的结果,是动态更新的,所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式
- Table API中表到DataStream有两种模式
追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景
撤回模式(Retract Mode):用于任何场景,有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作,得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,
Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream =
tableEnv.toRetractStream(aggResultTable, Row.class);
resultStream.print("result");
aggResultStream.print("aggResult");
- 没有经过groupby之类聚合操作,可以直接用toAppendStream来转换;而如果经过了聚合,有更新操作,一般就必须用toRetractDstream