首页 > 数据库 >29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)

时间:2023-10-20 10:06:23浏览次数:41  
标签:LOAD RESET USE default symbol flink +- rowtime table


Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL24、Flink 的table api与sql之Catalogs

26、Flink 的SQL之概览与入门示例27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

41、Flink之Hive 方言介绍及详细示例42、Flink 的table api与sql之Hive Catalog43、Flink之Hive 读写及详细验证示例44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的



文章目录

  • Flink 系列文章
  • 一、DESCRIBE 语句
  • 1、语法
  • 2、java示例
  • 3、flink sql cli 示例
  • 二、EXPLAIN 语句
  • 1、语法
  • 2、java 示例
  • 1、maven依赖
  • 2)、java 代码
  • 3)、运行结果
  • 3、flink sql cli示例
  • 1)、建表
  • 2)、EXPLAIN PLAN FOR执行计划查询及结果
  • 2)、EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN执行计划查询及结果
  • 4、ExplainDetails
  • 1)、ESTIMATED_COST
  • 2)、CHANGELOG_MODE
  • 3)、PLAN_ADVICE
  • 1、示例1
  • 2、示例2
  • 3、示例3
  • 4)、JSON_EXECUTION_PLAN
  • 三、Use语句
  • 1、语法
  • 1)、catalog
  • 2)、database
  • 3)、MODULES
  • 1、java示例
  • 2、Flink sql cli 示例



本文简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。
本文依赖flink和kafka集群能正常使用。
本文分为三部分,即DESC介绍及使用、EXPLAIN介绍及使用和USE的介绍及使用。

一、DESCRIBE 语句

DESCRIBE 语句用于描述表或视图的 schema。

1、语法

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

2、java示例

可以使用 TableEnvironment 的 executeSql() 方法执行 DESCRIBE 语句。如果 DESCRIBE 操作执行成功,executeSql() 方法会返回给定表的 schema,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 DESCRIBE 语句。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String sql = "CREATE TABLE orders (\r\n" + 
				"    order_id    STRING,\r\n" + 
				"    price       DECIMAL(32,2),\r\n" + 
				"    currency    STRING,\r\n" + 
				"    order_time  TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" + 
				"    PRIMARY KEY(order_id) NOT ENFORCED\r\n" + 
				") WITH (\r\n" + 
				"  'connector' = 'kafka',\r\n" + 
				"  'topic' = 'orders_topic',\r\n" + 
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
				"  'properties.group.id' = 'testGroup',\r\n" + 
				"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
				"  'value.format' = 'debezium-json'\r\n" + 
				");";
		tenv.executeSql(sql);

		// 打印 schema
		tenv.executeSql("DESCRIBE orders").print();

		// 打印 schema
		tenv.executeSql("DESC orders").print();
	}

}
  • 运行结果
----tenv.executeSql("DESCRIBE orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
|       name |           type |  null |           key |                    extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
|   order_id |         STRING | FALSE | PRI(order_id) |                           |           |
|      price | DECIMAL(32, 2) |  TRUE |               |                           |           |
|   currency |         STRING |  TRUE |               |                           |           |
| order_time |   TIMESTAMP(3) |  TRUE |               | METADATA FROM 'timestamp' |           |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set

----tenv.executeSql("DESC orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
|       name |           type |  null |           key |                    extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
|   order_id |         STRING | FALSE | PRI(order_id) |                           |           |
|      price | DECIMAL(32, 2) |  TRUE |               |                           |           |
|   currency |         STRING |  TRUE |               |                           |           |
| order_time |   TIMESTAMP(3) |  TRUE |               | METADATA FROM 'timestamp' |           |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set

3、flink sql cli 示例

Flink SQL> CREATE TABLE alan_ticker2 (
>    symbol STRING,
>    price DOUBLE,
>    tax  DOUBLE,
>    rowtime  TIMESTAMP(3),
>    WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_ticker2_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> DESCRIBE alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
|    name |                   type | null | key | extras |                       watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
|  symbol |                 STRING | TRUE |     |        |                                 |
|   price |                 DOUBLE | TRUE |     |        |                                 |
|     tax |                 DOUBLE | TRUE |     |        |                                 |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE |     |        | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set

Flink SQL> DESC alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
|    name |                   type | null | key | extras |                       watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
|  symbol |                 STRING | TRUE |     |        |                                 |
|   price |                 DOUBLE | TRUE |     |        |                                 |
|     tax |                 DOUBLE | TRUE |     |        |                                 |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE |     |        | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set

二、EXPLAIN 语句

EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。

1、语法

EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>

statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;

2、java 示例

可以使用 TableEnvironment 的 executeSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency> 
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-gateway -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
 		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency> 
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
		    <groupId>org.apache.flink</groupId>
		    <artifactId>flink-table-api-java-uber</artifactId>
		    <version>${flink.version}</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>

<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

	</dependencies>

2)、java 代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String sql = "CREATE TABLE alan_ticker2 (\r\n" + 
				"   symbol STRING,\r\n" + 
				"   price DOUBLE,\r\n" + 
				"   tax  DOUBLE,\r\n" + 
				"   rowtime  TIMESTAMP(3),\r\n" + 
				"   WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\r\n" + 
				") WITH (\r\n" + 
				"  'connector' = 'kafka',\r\n" + 
				"  'topic' = 'alan_ticker2_topic',\r\n" + 
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
				"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
				"  'format' = 'csv'\r\n" + 
				");";
		tenv.executeSql(sql);

		// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
		String querySQL = "SELECT *\r\n" + 
				"FROM alan_ticker2\r\n" + 
				"    MATCH_RECOGNIZE(\r\n" + 
				"        PARTITION BY symbol\r\n" + 
				"        ORDER BY rowtime\r\n" + 
				"        MEASURES\r\n" + 
				"            C.price AS lastPrice,\r\n" + 
				"            C.rowtime AS rowtime\r\n" + 
				"        ONE ROW PER MATCH\r\n" + 
				"        AFTER MATCH SKIP PAST LAST ROW\r\n" + 
				"        PATTERN (A B*? C)\r\n" + 
				"        DEFINE\r\n" + 
				"            A AS A.price > 12,\r\n" + 
				"            B AS B.price < 25,\r\n" + 
				"            C AS C.price > 18\r\n" + 
				"    );";
		
		String explanation = tenv.explainSql(querySQL);
		System.out.println("-------------------------explanation--------------------------------");
		System.out.println(explanation);
		System.out.println("-------------------------explanation--------------------------------");
		
		// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
		TableResult tableResult =	tenv.executeSql("EXPLAIN PLAN FOR " + querySQL);
		System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
		tableResult.print();
		System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
		
		TableResult tableResult2 =	tenv.executeSql("EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN " + querySQL);
		System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
		tableResult2.print();
		System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
		
	}

}

3)、运行结果

-------------------------explanation--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

-------------------------explanation--------------------------------
-------------------------EXPLAIN PLAN FOR--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

-------------------------EXPLAIN PLAN FOR--------------------------------
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 11,
    "type" : "Source: alan_ticker2[7]",
    "pact" : "Data Source",
    "contents" : "[7]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 16
  }, {
    "id" : 13,
    "type" : "StreamRecordTimestampInserter[9]",
    "pact" : "Operator",
    "contents" : "[9]:StreamRecordTimestampInserter(rowtime field: 3)",
    "parallelism" : 16,
    "predecessors" : [ {
      "id" : 11,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 14,
    "type" : "Match[9]",
    "pact" : "Operator",
    "contents" : "[9]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
    "parallelism" : 16,
    "predecessors" : [ {
      "id" : 13,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------

3、flink sql cli示例

1)、建表

Flink SQL> CREATE TABLE alan_ticker2 (
>    symbol STRING,
>    price DOUBLE,
>    tax  DOUBLE,
>    rowtime  TIMESTAMP(3),
>    WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_ticker2_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

2)、EXPLAIN PLAN FOR执行计划查询及结果

Flink SQL> EXPLAIN PLAN FOR 
> SELECT *
> FROM alan_ticker2
>     MATCH_RECOGNIZE(
>         PARTITION BY symbol
>         ORDER BY rowtime
>         MEASURES
>             C.price AS lastPrice,
>             C.rowtime AS rowtime
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP PAST LAST ROW
>         PATTERN (A B*? C)
>         DEFINE
>             A AS A.price > 12,
>             B AS B.price < 25,
>             C AS C.price > 18
>     );

|result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
 |

1 row in set

2)、EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN执行计划查询及结果

Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN 
> SELECT *
> FROM alan_ticker2
>     MATCH_RECOGNIZE(
>         PARTITION BY symbol
>         ORDER BY rowtime
>         MEASURES
>             C.price AS lastPrice,
>             C.rowtime AS rowtime
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP PAST LAST ROW
>         PATTERN (A B*? C)
>         DEFINE
>             A AS A.price > 12,
>             B AS B.price < 25,
>             C AS C.price > 18
>     );

|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 859,
    "type" : "Source: alan_ticker2[641]",
    "pact" : "Data Source",
    "contents" : "[641]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 1
  }, {
    "id" : 861,
    "type" : "StreamRecordTimestampInserter[643]",
    "pact" : "Operator",
    "contents" : "[643]:StreamRecordTimestampInserter(rowtime field: 3)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 859,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 862,
    "type" : "Match[643]",
    "pact" : "Operator",
    "contents" : "[643]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 861,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
} |

1 row in set

4、ExplainDetails

使用指定的 ExplainDetail 类型来打印语句的计划。

1)、ESTIMATED_COST

指定 ESTIMATED_COST 将使得优化器(optimizer)将估算出的成本信息附加在每个物理节点上输出。

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

2)、CHANGELOG_MODE

指定 CHANGELOG_MODE 将使得优化器(optimizer)将 changelog mode 附加在每个物理节点上输出。

== Optimized Physical Plan With Advice ==
...
+- Exchange(... changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
...

3)、PLAN_ADVICE

指定 PLAN_ADVICE 将使得优化器(optimizer)分析优化后的物理执行计划并提供潜在的数据风险预警或性能调优建议。 此时输出标题将会从 “Optimized Physical Plan” 变为 “Optimized Physical Plan with Advice” 作为提示。
从 Flink 1.17 版本开始支持 PLAN_ADVICE。
针对物理计划的建议按照 类型 和 范围 来区分。

建议类型

说明

WARNING

给出潜在的数据正确性风险

ADVICE

给出可能的 SQL 调优建议

建议范围

说明

QUERY_LEVEL

针对整个 SQL 的建议

NODE_LEVEL

针对单个物理节点的建议

PLAN_ADVICE 提供针对如下问题的建议

  • 分组聚合(Group Aggregation)时产生的数据倾斜
  • 非确定性更新

若检测到分组聚合可以启用两阶段优化但未开启时,优化器(optimizer)将会把建议 id 附在 GroupAggregate 节点内作为索引,在最后附上建议内容。

1、示例1
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '200';
SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';

CREATE TABLE alan_test_PLAN_ADVICE (
  a BIGINT,
  b INT NOT NULL,
  c VARCHAR,
  d BIGINT
) WITH (

  'connector' = 'kafka',
  'topic' = 'alan_test_PLAN_ADVICE_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

EXPLAIN PLAN_ADVICE
SELECT
  AVG(a) AS avg_a,
  COUNT(*) AS cnt,
  COUNT(b) AS cnt_b,
  MIN(b) AS min_b,
  MAX(c) FILTER (WHERE a > 1) AS max_c
FROM alan_test_PLAN_ADVICE;

Flink SQL> CREATE TABLE alan_test_PLAN_ADVICE (
>   a BIGINT,
>   b INT NOT NULL,
>   c VARCHAR,
>   d BIGINT
> ) WITH (
> 
>   'connector' = 'kafka',
>   'topic' = 'alan_test_PLAN_ADVICE_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> EXPLAIN PLAN_ADVICE
> SELECT
>   AVG(a) AS avg_a,
>   COUNT(*) AS cnt,
>   COUNT(b) AS cnt_b,
>   MIN(b) AS min_b,
>   MAX(c) FILTER (WHERE a > 1) AS max_c
> FROM alan_test_PLAN_ADVICE;

|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          result |

| == Abstract Syntax Tree ==
LogicalProject(avg_a=[$0], cnt=[$1], cnt_b=[$1], min_b=[$2], max_c=[$3])
+- LogicalAggregate(group=[{}], avg_a=[AVG($0)], cnt=[COUNT()], min_b=[MIN($1)], max_c=[MAX($2) FILTER $3])
   +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 1))])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]])

== Optimized Physical Plan With Advice ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
   +- Exchange(distribution=[single])
      +- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
         +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])

advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').

== Optimized Execution Plan ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
   +- Exchange(distribution=[single])
      +- Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])
         +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])
 |

1 row in set

系统给出的建议如下:

advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring (‘table.optimizer.agg-phase-strategy’ to ‘AUTO’).

2、示例2
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c STRING,
  d INT,
  `day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
  PRIMARY KEY (a, c) NOT ENFORCED
) WITH (
  'connector' = 'values',
  'changelog-mode' = 'I,UA,UB,D'  
);

CREATE TABLE MySink (
 a INT,
 b BIGINT,
 c STRING,
 PRIMARY KEY (a) NOT ENFORCED
) WITH (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

EXPLAIN PLAN_ADVICE
INSERT INTO MySink
SELECT a, b, `day`
FROM MyTable
WHERE b > 100;

== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
+- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])

advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.

related rel plan:
Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])

若检测到存在 NDU 问题风险时,优化器(optimizer)将会把建议内容附在最后。

advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message(‘UB’/‘UA’/‘D’ in changelogMode, not ‘I’ only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.

3、示例3
Flink SQL> EXPLAIN PLAN_ADVICE select * from alan_ticker;

|result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
   +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])

== Optimized Physical Plan With Advice ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

No available advice...

== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
 |

1 row in set

若未检测到问题,优化器(optimizer)将会在计划最后附上 “No available advice” 作为提示。

No available advice…

4)、JSON_EXECUTION_PLAN

生成 json 格式的程序执行计划。

Flink SQL> EXPLAIN JSON_EXECUTION_PLAN select * from alan_ticker;

|result |

| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
   +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])

== Optimized Physical Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 877,
    "type" : "Source: alan_ticker[654]",
    "pact" : "Data Source",
    "contents" : "[654]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 1
  } ]
} |

1 row in set

三、Use语句

USE 语句用来设置当前的 catalog 或者 database。

1、语法

1)、catalog

设置当前的 catalog。所有后续命令未显式指定 catalog 的将使用此 catalog。如果指定的的 catalog 不存在,则抛出异常。默认的当前 catalog 是 default_catalog。

USE CATALOG catalog_name;

2)、database

设置当前的 database。所有后续命令未显式指定 database 的将使用此 database。如果指定的的 database 不存在,则抛出异常。默认的当前 database 是 default_database。

USE [catalog_name.]database_name;

3)、MODULES

按声明的顺序设置已启用的模块。所有后续命令都将解析已启用模块中的元数据(函数/用户定义的类型/规则等),并遵循解析顺序。加载模块时默认使用模块。如果 USE MODULES 语句未使用,加载的模块将被禁用。默认加载和启用的模块是core。

USE MODULES module_name1[, module_name2, ...];

1、java示例

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 数据库名称
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

		String newDatabaseName = "alan_hivecatalog_hivedb";
		tenv.useDatabase(newDatabaseName);

		// 创建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tenv.executeSql(hive_create_table_sql);

		// 插入数据
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查询数据
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		
		// 使用hive MODULES 
		tenv.executeSql("USE MODULES hive");
		tenv.executeSql("SHOW FULL MODULES").print();

		env.execute();
	}

}

2、Flink sql cli 示例

Flink SQL> CREATE CATALOG cat1 WITH (...);
[INFO] Catalog has been created.

Flink SQL> SHOW CATALOGS;
default_catalog
cat1

Flink SQL> USE CATALOG cat1;

Flink SQL> SHOW DATABASES;

Flink SQL> CREATE DATABASE db1 WITH (...);
[INFO] Database has been created.

Flink SQL> SHOW DATABASES;
db1

Flink SQL> USE db1;

Flink SQL> USE MODULES hive;
[INFO] Use modules succeeded!
Flink SQL> SHOW FULL MODULES;
+-------------+-------+
| module name |  used |
+-------------+-------+
|        hive |  true |
|        core | false |
+-------------+-------+
2 rows in set

以上,简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。


标签:LOAD,RESET,USE,default,symbol,flink,+-,rowtime,table
From: https://blog.51cto.com/alanchan2win/7946866

相关文章

  • 错误:You can't specify target table 'xxx' for update in FROM clause的解决
    deleteFROMusrloginwheremember_id=(SELECTmember_idFROMusrloginWHERElogin_id='#011SkhVVje27smbxek0XwjKeA==');会出现报错信息:Youcan'tspecifytargettable'tempA'forupdateinFROMclause大致意思是,在同一语句中,不能先select出同一表中的......
  • Linux操作系统从BIOS到bootloader是如何运行的
    操作系统一般都会在安装在硬盘上,在BIOS的界面上。你会看到一个启动盘的选项。启动盘有什么特点呢?它一般在第一个扇区,占512字节,而且以0xAA55结束。这是一个约定,当满足这个条件的时候,就说明这是一个启动盘,在512字节以内会启动相关的代码。这些代码是谁放在这里的呢?在Linux......
  • 用友畅捷通 畅捷CRM get_usedspace.php SQL注入漏洞
    漏洞简介畅捷CRMget_userspace.php文件中site_id参数存在SQL注入漏洞漏洞复现fofa语法:icon_hash="-1068428644"登录页面如下:POC:GET/WebSer~1/get_usedspace.php?site_id=-1159%20UNION%20ALL%20SELECT%20CONCAT(0x7178767671,0x5664726e476a637a565a50614d4c435745446a......
  • 安卓手机连接use调试解决方案
    一、确认手机的USB调试接口是打开的;----------打开开发者模式,暴击手机版本号多次,直到提示已打开开发者模式。二、使用USB线连接电脑和手机,可以首先执行adbremount(重新挂载系统分区,使系统分区重新可写)命令;三、remount成功后,可以使用adbshell命令查看设备是否连接成功;四、......
  • import { useRouter } from 'next/router'; 在非hooks 文件或组件中使用
    将 import{useRouter}from'next/router';改为 importRouterfrom"next/router";使用: Router.push('/');原来使用 import{useRouter}from'next/router';会导致报错如下  ......
  • [spring-mvc.xml] cannot be opened because it does not exist
    IOExceptionparsingXMLdocumentfromclasspathresource[spring-mvc.xml];nestedexceptionisjava.io.FileNotFoundException:classpathresource[spring-mvc.xml]cannotbeopenedbecauseitdoesnotexist检查pom.xml文件:<packaging>war</packagin......
  • Detected non-NVML platform: could not load NVML: libnvidia-ml.so.1: cannot open
    前言在kubernetes中配置https://github.com/NVIDIA/k8s-device-plugin时,报错:Detectednon-NVMLplatform:couldnotloadNVML:libnvidia-ml.so.1:cannotopensharedobject解决kubernetes使用运行时docker,需要编辑通常存在的配置文件/etc/docker/daemon.json,以......
  • Failed to stop auditd.service: Operation refused, unit auditd.service may be req
    [root@7~]#systemctlstopauditd.serviceFailedtostopauditd.service:Operationrefused,unitauditd.servicemayberequestedbydependencyonly(itisconfiguredtorefusemanualstart/stop).Seesystemlogsand'systemctlstatusauditd.service&#......
  • System.TypeLoadException:“程序集“XXXX.K3.SCM.App.Core, Version=1.0.0.0, Cultur
    一、问题描述:网站页面调用方法时报错:报错内容如下:System.TypeLoadException:“程序集“XXXX.K3.SCM.App.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null”中的类型“XXXX.K3.SCM.App.Core.StockService”的方法“WriteBackAfterByInWhenAudit”没有实现。”......
  • Expression #3 of SELECT list is not in GROUP BY clause and contains nonaggregate
    这个报错的完整信息Expression#3ofSELECTlistisnotinGROUPBYclauseandcontainsnonaggregatedcolumn'jira.ji.ID'whichisnotfunctionallydependentoncolumnsinGROUPBYclause;thisisincompatiblewithsql_mode=only_full_group_by这个说的意......