Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs
26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1) 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2) 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3) 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4) 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5) 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6) 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1) 30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的
(文章目录)
本文简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。 本文依赖flink和kafka集群能正常使用。 本文分为三部分,即DESC介绍及使用、EXPLAIN介绍及使用和USE的介绍及使用。
一、DESCRIBE 语句
DESCRIBE 语句用于描述表或视图的 schema。
1、语法
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
2、java示例
可以使用 TableEnvironment 的 executeSql() 方法执行 DESCRIBE 语句。如果 DESCRIBE 操作执行成功,executeSql() 方法会返回给定表的 schema,否则会抛出异常。
以下示例展示了如何在 TableEnvironment 中执行一条 DESCRIBE 语句。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String sql = "CREATE TABLE orders (\r\n" +
" order_id STRING,\r\n" +
" price DECIMAL(32,2),\r\n" +
" currency STRING,\r\n" +
" order_time TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
" PRIMARY KEY(order_id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'orders_topic',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'properties.group.id' = 'testGroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'value.format' = 'debezium-json'\r\n" +
");";
tenv.executeSql(sql);
// 打印 schema
tenv.executeSql("DESCRIBE orders").print();
// 打印 schema
tenv.executeSql("DESC orders").print();
}
}
- 运行结果
----tenv.executeSql("DESCRIBE orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
| name | type | null | key | extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
| order_id | STRING | FALSE | PRI(order_id) | | |
| price | DECIMAL(32, 2) | TRUE | | | |
| currency | STRING | TRUE | | | |
| order_time | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set
----tenv.executeSql("DESC orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
| name | type | null | key | extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
| order_id | STRING | FALSE | PRI(order_id) | | |
| price | DECIMAL(32, 2) | TRUE | | | |
| currency | STRING | TRUE | | | |
| order_time | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set
3、flink sql cli 示例
Flink SQL> CREATE TABLE alan_ticker2 (
> symbol STRING,
> price DOUBLE,
> tax DOUBLE,
> rowtime TIMESTAMP(3),
> WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_ticker2_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> DESCRIBE alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
| symbol | STRING | TRUE | | | |
| price | DOUBLE | TRUE | | | |
| tax | DOUBLE | TRUE | | | |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE | | | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set
Flink SQL> DESC alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
| symbol | STRING | TRUE | | | |
| price | DOUBLE | TRUE | | | |
| tax | DOUBLE | TRUE | | | |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE | | | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set
二、EXPLAIN 语句
EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。
1、语法
EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;
2、java 示例
可以使用 TableEnvironment 的 executeSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。
1、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-gateway -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
2)、java 代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String sql = "CREATE TABLE alan_ticker2 (\r\n" +
" symbol STRING,\r\n" +
" price DOUBLE,\r\n" +
" tax DOUBLE,\r\n" +
" rowtime TIMESTAMP(3),\r\n" +
" WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'alan_ticker2_topic',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executeSql(sql);
// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
String querySQL = "SELECT *\r\n" +
"FROM alan_ticker2\r\n" +
" MATCH_RECOGNIZE(\r\n" +
" PARTITION BY symbol\r\n" +
" ORDER BY rowtime\r\n" +
" MEASURES\r\n" +
" C.price AS lastPrice,\r\n" +
" C.rowtime AS rowtime\r\n" +
" ONE ROW PER MATCH\r\n" +
" AFTER MATCH SKIP PAST LAST ROW\r\n" +
" PATTERN (A B*? C)\r\n" +
" DEFINE\r\n" +
" A AS A.price > 12,\r\n" +
" B AS B.price < 25,\r\n" +
" C AS C.price > 18\r\n" +
" );";
String explanation = tenv.explainSql(querySQL);
System.out.println("-------------------------explanation--------------------------------");
System.out.println(explanation);
System.out.println("-------------------------explanation--------------------------------");
// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
TableResult tableResult = tenv.executeSql("EXPLAIN PLAN FOR " + querySQL);
System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
tableResult.print();
System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
TableResult tableResult2 = tenv.executeSql("EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN " + querySQL);
System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
tableResult2.print();
System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
}
}
3)、运行结果
-------------------------explanation--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])
== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
-------------------------explanation--------------------------------
-------------------------EXPLAIN PLAN FOR--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])
== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
-------------------------EXPLAIN PLAN FOR--------------------------------
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])
== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.
== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 11,
"type" : "Source: alan_ticker2[7]",
"pact" : "Data Source",
"contents" : "[7]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
"parallelism" : 16
}, {
"id" : 13,
"type" : "StreamRecordTimestampInserter[9]",
"pact" : "Operator",
"contents" : "[9]:StreamRecordTimestampInserter(rowtime field: 3)",
"parallelism" : 16,
"predecessors" : [ {
"id" : 11,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 14,
"type" : "Match[9]",
"pact" : "Operator",
"contents" : "[9]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
"parallelism" : 16,
"predecessors" : [ {
"id" : 13,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------
3、flink sql cli示例
1)、建表
Flink SQL> CREATE TABLE alan_ticker2 (
> symbol STRING,
> price DOUBLE,
> tax DOUBLE,
> rowtime TIMESTAMP(3),
> WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_ticker2_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
2)、EXPLAIN PLAN FOR执行计划查询及结果
Flink SQL> EXPLAIN PLAN FOR
> SELECT *
> FROM alan_ticker2
> MATCH_RECOGNIZE(
> PARTITION BY symbol
> ORDER BY rowtime
> MEASURES
> C.price AS lastPrice,
> C.rowtime AS rowtime
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B*? C)
> DEFINE
> A AS A.price > 12,
> B AS B.price < 25,
> C AS C.price > 18
> );

| result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])
== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
|

1 row in set
2)、EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN执行计划查询及结果
Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN
> SELECT *
> FROM alan_ticker2
> MATCH_RECOGNIZE(
> PARTITION BY symbol
> ORDER BY rowtime
> MEASURES
> C.price AS lastPrice,
> C.rowtime AS rowtime
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B*? C)
> DEFINE
> A AS A.price > 12,
> B AS B.price < 25,
> C AS C.price > 18
> );

| result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])
== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.
== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 859,
"type" : "Source: alan_ticker2[641]",
"pact" : "Data Source",
"contents" : "[641]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
"parallelism" : 1
}, {
"id" : 861,
"type" : "StreamRecordTimestampInserter[643]",
"pact" : "Operator",
"contents" : "[643]:StreamRecordTimestampInserter(rowtime field: 3)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 859,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 862,
"type" : "Match[643]",
"pact" : "Operator",
"contents" : "[643]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 861,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
} |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
4、ExplainDetails
使用指定的 ExplainDetail 类型来打印语句的计划。
1)、ESTIMATED_COST
指定 ESTIMATED_COST 将使得优化器(optimizer)将估算出的成本信息附加在每个物理节点上输出。
== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.
2)、CHANGELOG_MODE
指定 CHANGELOG_MODE 将使得优化器(optimizer)将 changelog mode 附加在每个物理节点上输出。
== Optimized Physical Plan With Advice ==
...
+- Exchange(... changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
...
3)、PLAN_ADVICE
指定 PLAN_ADVICE 将使得优化器(optimizer)分析优化后的物理执行计划并提供潜在的数据风险预警或性能调优建议。 此时输出标题将会从 “Optimized Physical Plan” 变为 “Optimized Physical Plan with Advice” 作为提示。 从 Flink 1.17 版本开始支持 PLAN_ADVICE。 针对物理计划的建议按照 类型 和 范围 来区分。
建议类型 | 说明 |
---|---|
WARNING | 给出潜在的数据正确性风险 |
ADVICE | 给出可能的 SQL 调优建议 |
建议范围 | 说明 |
---|---|
QUERY_LEVEL | 针对整个 SQL 的建议 |
NODE_LEVEL | 针对单个物理节点的建议 |
PLAN_ADVICE 提供针对如下问题的建议
- 分组聚合(Group Aggregation)时产生的数据倾斜
- 非确定性更新
若检测到分组聚合可以启用两阶段优化但未开启时,优化器(optimizer)将会把建议 id 附在 GroupAggregate 节点内作为索引,在最后附上建议内容。
1、示例1
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '200';
SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';
CREATE TABLE alan_test_PLAN_ADVICE (
a BIGINT,
b INT NOT NULL,
c VARCHAR,
d BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'alan_test_PLAN_ADVICE_topic',
'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
EXPLAIN PLAN_ADVICE
SELECT
AVG(a) AS avg_a,
COUNT(*) AS cnt,
COUNT(b) AS cnt_b,
MIN(b) AS min_b,
MAX(c) FILTER (WHERE a > 1) AS max_c
FROM alan_test_PLAN_ADVICE;
Flink SQL> CREATE TABLE alan_test_PLAN_ADVICE (
> a BIGINT,
> b INT NOT NULL,
> c VARCHAR,
> d BIGINT
> ) WITH (
>
> 'connector' = 'kafka',
> 'topic' = 'alan_test_PLAN_ADVICE_topic',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> EXPLAIN PLAN_ADVICE
> SELECT
> AVG(a) AS avg_a,
> COUNT(*) AS cnt,
> COUNT(b) AS cnt_b,
> MIN(b) AS min_b,
> MAX(c) FILTER (WHERE a > 1) AS max_c
> FROM alan_test_PLAN_ADVICE;

| result |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(avg_a=[$0], cnt=[$1], cnt_b=[$1], min_b=[$2], max_c=[$3])
+- LogicalAggregate(group=[{}], avg_a=[AVG($0)], cnt=[COUNT()], min_b=[MIN($1)], max_c=[MAX($2) FILTER $3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 1))])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]])
== Optimized Physical Plan With Advice ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
+- Exchange(distribution=[single])
+- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
+- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
== Optimized Execution Plan ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
+- Exchange(distribution=[single])
+- Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])
+- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])
|

1 row in set
系统给出的建议如下:
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
2、示例2
CREATE TABLE MyTable (
a INT,
b BIGINT,
c STRING,
d INT,
`day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
PRIMARY KEY (a, c) NOT ENFORCED
) WITH (
'connector' = 'values',
'changelog-mode' = 'I,UA,UB,D'
);
CREATE TABLE MySink (
a INT,
b BIGINT,
c STRING,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'connector' = 'values',
'sink-insert-only' = 'false'
);
EXPLAIN PLAN_ADVICE
INSERT INTO MySink
SELECT a, b, `day`
FROM MyTable
WHERE b > 100;
== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
+- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])
advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
related rel plan:
Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])
若检测到存在 NDU 问题风险时,优化器(optimizer)将会把建议内容附在最后。
advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
3、示例3
Flink SQL> EXPLAIN PLAN_ADVICE select * from alan_ticker;

| result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])
== Optimized Physical Plan With Advice ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
No available advice...
== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
|

1 row in set
若未检测到问题,优化器(optimizer)将会在计划最后附上 “No available advice” 作为提示。
No available advice...
4)、JSON_EXECUTION_PLAN
生成 json 格式的程序执行计划。
Flink SQL> EXPLAIN JSON_EXECUTION_PLAN select * from alan_ticker;

| result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])
== Optimized Physical Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 877,
"type" : "Source: alan_ticker[654]",
"pact" : "Data Source",
"contents" : "[654]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
"parallelism" : 1
} ]
} |

1 row in set
三、Use语句
USE 语句用来设置当前的 catalog 或者 database。
1、语法
1)、catalog
设置当前的 catalog。所有后续命令未显式指定 catalog 的将使用此 catalog。如果指定的的 catalog 不存在,则抛出异常。默认的当前 catalog 是 default_catalog。
USE CATALOG catalog_name;
2)、database
设置当前的 database。所有后续命令未显式指定 database 的将使用此 database。如果指定的的 database 不存在,则抛出异常。默认的当前 database 是 default_database。
USE [catalog_name.]database_name;
3)、MODULES
按声明的顺序设置已启用的模块。所有后续命令都将解析已启用模块中的元数据(函数/用户定义的类型/规则等),并遵循解析顺序。加载模块时默认使用模块。如果 USE MODULES 语句未使用,加载的模块将被禁用。默认加载和启用的模块是core。
USE MODULES module_name1[, module_name2, ...];
1、java示例
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 数据库名称
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
String newDatabaseName = "alan_hivecatalog_hivedb";
tenv.useDatabase(newDatabaseName);
// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);
// 插入数据
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
// 使用hive MODULES
tenv.executeSql("USE MODULES hive");
tenv.executeSql("SHOW FULL MODULES").print();
env.execute();
}
}
2、Flink sql cli 示例
Flink SQL> CREATE CATALOG cat1 WITH (...);
[INFO] Catalog has been created.
Flink SQL> SHOW CATALOGS;
default_catalog
cat1
Flink SQL> USE CATALOG cat1;
Flink SQL> SHOW DATABASES;
Flink SQL> CREATE DATABASE db1 WITH (...);
[INFO] Database has been created.
Flink SQL> SHOW DATABASES;
db1
Flink SQL> USE db1;
Flink SQL> USE MODULES hive;
[INFO] Use modules succeeded!
Flink SQL> SHOW FULL MODULES;
+-------------+-------+
| module name | used |
+-------------+-------+
| hive | true |
| core | false |
+-------------+-------+
2 rows in set
以上,简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。
标签:LOAD,RESET,USE,default,symbol,flink,+-,rowtime,table From: https://blog.51cto.com/alanchan2win/7662249