首页 > 数据库 >29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)

时间:2023-09-30 13:02:50浏览次数:54  
标签:LOAD RESET USE default symbol flink +- rowtime table

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3) 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs

26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1) 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2) 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3) 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4) 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5) 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6) 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1) 30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的


(文章目录)


本文简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。 本文依赖flink和kafka集群能正常使用。 本文分为三部分,即DESC介绍及使用、EXPLAIN介绍及使用和USE的介绍及使用。

一、DESCRIBE 语句

DESCRIBE 语句用于描述表或视图的 schema。

1、语法

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

2、java示例

可以使用 TableEnvironment 的 executeSql() 方法执行 DESCRIBE 语句。如果 DESCRIBE 操作执行成功,executeSql() 方法会返回给定表的 schema,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 DESCRIBE 语句。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String sql = "CREATE TABLE orders (\r\n" + 
				"    order_id    STRING,\r\n" + 
				"    price       DECIMAL(32,2),\r\n" + 
				"    currency    STRING,\r\n" + 
				"    order_time  TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" + 
				"    PRIMARY KEY(order_id) NOT ENFORCED\r\n" + 
				") WITH (\r\n" + 
				"  'connector' = 'kafka',\r\n" + 
				"  'topic' = 'orders_topic',\r\n" + 
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
				"  'properties.group.id' = 'testGroup',\r\n" + 
				"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
				"  'value.format' = 'debezium-json'\r\n" + 
				");";
		tenv.executeSql(sql);

		// 打印 schema
		tenv.executeSql("DESCRIBE orders").print();

		// 打印 schema
		tenv.executeSql("DESC orders").print();
	}

}
  • 运行结果
----tenv.executeSql("DESCRIBE orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
|       name |           type |  null |           key |                    extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
|   order_id |         STRING | FALSE | PRI(order_id) |                           |           |
|      price | DECIMAL(32, 2) |  TRUE |               |                           |           |
|   currency |         STRING |  TRUE |               |                           |           |
| order_time |   TIMESTAMP(3) |  TRUE |               | METADATA FROM 'timestamp' |           |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set

----tenv.executeSql("DESC orders").print()
+------------+----------------+-------+---------------+---------------------------+-----------+
|       name |           type |  null |           key |                    extras | watermark |
+------------+----------------+-------+---------------+---------------------------+-----------+
|   order_id |         STRING | FALSE | PRI(order_id) |                           |           |
|      price | DECIMAL(32, 2) |  TRUE |               |                           |           |
|   currency |         STRING |  TRUE |               |                           |           |
| order_time |   TIMESTAMP(3) |  TRUE |               | METADATA FROM 'timestamp' |           |
+------------+----------------+-------+---------------+---------------------------+-----------+
4 rows in set

3、flink sql cli 示例

Flink SQL> CREATE TABLE alan_ticker2 (
>    symbol STRING,
>    price DOUBLE,
>    tax  DOUBLE,
>    rowtime  TIMESTAMP(3),
>    WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_ticker2_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> DESCRIBE alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
|    name |                   type | null | key | extras |                       watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
|  symbol |                 STRING | TRUE |     |        |                                 |
|   price |                 DOUBLE | TRUE |     |        |                                 |
|     tax |                 DOUBLE | TRUE |     |        |                                 |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE |     |        | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set

Flink SQL> DESC alan_ticker;
+---------+------------------------+------+-----+--------+---------------------------------+
|    name |                   type | null | key | extras |                       watermark |
+---------+------------------------+------+-----+--------+---------------------------------+
|  symbol |                 STRING | TRUE |     |        |                                 |
|   price |                 DOUBLE | TRUE |     |        |                                 |
|     tax |                 DOUBLE | TRUE |     |        |                                 |
| rowtime | TIMESTAMP(3) *ROWTIME* | TRUE |     |        | `rowtime` - INTERVAL '1' SECOND |
+---------+------------------------+------+-----+--------+---------------------------------+
4 rows in set

二、EXPLAIN 语句

EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。

1、语法

EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>

statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;

2、java 示例

可以使用 TableEnvironment 的 executeSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency> 
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-gateway -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
 		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency> 
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
		    <groupId>org.apache.flink</groupId>
		    <artifactId>flink-table-api-java-uber</artifactId>
		    <version>${flink.version}</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>

<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

	</dependencies>

2)、java 代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String sql = "CREATE TABLE alan_ticker2 (\r\n" + 
				"   symbol STRING,\r\n" + 
				"   price DOUBLE,\r\n" + 
				"   tax  DOUBLE,\r\n" + 
				"   rowtime  TIMESTAMP(3),\r\n" + 
				"   WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\r\n" + 
				") WITH (\r\n" + 
				"  'connector' = 'kafka',\r\n" + 
				"  'topic' = 'alan_ticker2_topic',\r\n" + 
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
				"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
				"  'format' = 'csv'\r\n" + 
				");";
		tenv.executeSql(sql);

		// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
		String querySQL = "SELECT *\r\n" + 
				"FROM alan_ticker2\r\n" + 
				"    MATCH_RECOGNIZE(\r\n" + 
				"        PARTITION BY symbol\r\n" + 
				"        ORDER BY rowtime\r\n" + 
				"        MEASURES\r\n" + 
				"            C.price AS lastPrice,\r\n" + 
				"            C.rowtime AS rowtime\r\n" + 
				"        ONE ROW PER MATCH\r\n" + 
				"        AFTER MATCH SKIP PAST LAST ROW\r\n" + 
				"        PATTERN (A B*? C)\r\n" + 
				"        DEFINE\r\n" + 
				"            A AS A.price > 12,\r\n" + 
				"            B AS B.price < 25,\r\n" + 
				"            C AS C.price > 18\r\n" + 
				"    );";
		
		String explanation = tenv.explainSql(querySQL);
		System.out.println("-------------------------explanation--------------------------------");
		System.out.println(explanation);
		System.out.println("-------------------------explanation--------------------------------");
		
		// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
		TableResult tableResult =	tenv.executeSql("EXPLAIN PLAN FOR " + querySQL);
		System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
		tableResult.print();
		System.out.println("-------------------------EXPLAIN PLAN FOR--------------------------------");
		
		TableResult tableResult2 =	tenv.executeSql("EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN " + querySQL);
		System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
		tableResult2.print();
		System.out.println("-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------");
		
	}

}

3)、运行结果

-------------------------explanation--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

-------------------------explanation--------------------------------
-------------------------EXPLAIN PLAN FOR--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

-------------------------EXPLAIN PLAN FOR--------------------------------
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------
== Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 11,
    "type" : "Source: alan_ticker2[7]",
    "pact" : "Data Source",
    "contents" : "[7]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 16
  }, {
    "id" : 13,
    "type" : "StreamRecordTimestampInserter[9]",
    "pact" : "Operator",
    "contents" : "[9]:StreamRecordTimestampInserter(rowtime field: 3)",
    "parallelism" : 16,
    "predecessors" : [ {
      "id" : 11,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 14,
    "type" : "Match[9]",
    "pact" : "Operator",
    "contents" : "[9]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
    "parallelism" : 16,
    "predecessors" : [ {
      "id" : 13,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
-------------------------EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN--------------------------------

3、flink sql cli示例

1)、建表

Flink SQL> CREATE TABLE alan_ticker2 (
>    symbol STRING,
>    price DOUBLE,
>    tax  DOUBLE,
>    rowtime  TIMESTAMP(3),
>    WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_ticker2_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

2)、EXPLAIN PLAN FOR执行计划查询及结果

Flink SQL> EXPLAIN PLAN FOR 
> SELECT *
> FROM alan_ticker2
>     MATCH_RECOGNIZE(
>         PARTITION BY symbol
>         ORDER BY rowtime
>         MEASURES
>             C.price AS lastPrice,
>             C.rowtime AS rowtime
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP PAST LAST ROW
>         PATTERN (A B*? C)
>         DEFINE
>             A AS A.price > 12,
>             B AS B.price < 25,
>             C AS C.price > 18
>     );
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    result |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

2)、EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN执行计划查询及结果

Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN 
> SELECT *
> FROM alan_ticker2
>     MATCH_RECOGNIZE(
>         PARTITION BY symbol
>         ORDER BY rowtime
>         MEASURES
>             C.price AS lastPrice,
>             C.rowtime AS rowtime
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP PAST LAST ROW
>         PATTERN (A B*? C)
>         DEFINE
>             A AS A.price > 12,
>             B AS B.price < 25,
>             C AS C.price > 18
>     );
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       result |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], lastPrice=[$1], rowtime=[$2])
+- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, lastPrice, rowtime]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 12), <(PREV(B.$1, 0), 25), >(PREV(C.$1, 0), 18)]], inputFields=[[symbol, price, tax, rowtime]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker2]])

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

== Optimized Execution Plan ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])
+- Exchange(distribution=[hash[symbol]])
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 859,
    "type" : "Source: alan_ticker2[641]",
    "pact" : "Data Source",
    "contents" : "[641]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 1
  }, {
    "id" : 861,
    "type" : "StreamRecordTimestampInserter[643]",
    "pact" : "Operator",
    "contents" : "[643]:StreamRecordTimestampInserter(rowtime field: 3)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 859,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 862,
    "type" : "Match[643]",
    "pact" : "Operator",
    "contents" : "[643]:Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 861,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
} |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

4、ExplainDetails

使用指定的 ExplainDetail 类型来打印语句的计划。

1)、ESTIMATED_COST

指定 ESTIMATED_COST 将使得优化器(optimizer)将估算出的成本信息附加在每个物理节点上输出。

== Optimized Physical Plan With Advice ==
Match(partitionBy=[symbol], orderBy=[rowtime ASC], measures=[FINAL(C.price) AS lastPrice, FINAL(CAST(C.rowtime AS TIMESTAMP(3))) AS rowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[((_UTF-16LE'A', PATTERN_QUANTIFIER(_UTF-16LE'B', 0, -1, true)), _UTF-16LE'C')], define=[{A=>(PREV(A.$1, 0), 12), B=<(PREV(B.$1, 0), 25), C=>(PREV(C.$1, 0), 18)}], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.7E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
+- Exchange(distribution=[hash[symbol]], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
   +- TableSourceScan(table=[[default_catalog, default_database, alan_ticker2, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}

advice[1]: [WARNING] Unsupported to resolve non-deterministic issue in match-recognize.

2)、CHANGELOG_MODE

指定 CHANGELOG_MODE 将使得优化器(optimizer)将 changelog mode 附加在每个物理节点上输出。

== Optimized Physical Plan With Advice ==
...
+- Exchange(... changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.69E10 cpu, 4.0E9 io, 4.0E9 network, 0.0 memory}
...

3)、PLAN_ADVICE

指定 PLAN_ADVICE 将使得优化器(optimizer)分析优化后的物理执行计划并提供潜在的数据风险预警或性能调优建议。 此时输出标题将会从 “Optimized Physical Plan” 变为 “Optimized Physical Plan with Advice” 作为提示。 从 Flink 1.17 版本开始支持 PLAN_ADVICE。 针对物理计划的建议按照 类型 和 范围 来区分。

建议类型 说明
WARNING 给出潜在的数据正确性风险
ADVICE 给出可能的 SQL 调优建议
建议范围 说明
QUERY_LEVEL 针对整个 SQL 的建议
NODE_LEVEL 针对单个物理节点的建议

PLAN_ADVICE 提供针对如下问题的建议

  • 分组聚合(Group Aggregation)时产生的数据倾斜
  • 非确定性更新

若检测到分组聚合可以启用两阶段优化但未开启时,优化器(optimizer)将会把建议 id 附在 GroupAggregate 节点内作为索引,在最后附上建议内容。

1、示例1
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '200';
SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';

CREATE TABLE alan_test_PLAN_ADVICE (
  a BIGINT,
  b INT NOT NULL,
  c VARCHAR,
  d BIGINT
) WITH (

  'connector' = 'kafka',
  'topic' = 'alan_test_PLAN_ADVICE_topic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

EXPLAIN PLAN_ADVICE
SELECT
  AVG(a) AS avg_a,
  COUNT(*) AS cnt,
  COUNT(b) AS cnt_b,
  MIN(b) AS min_b,
  MAX(c) FILTER (WHERE a > 1) AS max_c
FROM alan_test_PLAN_ADVICE;

Flink SQL> CREATE TABLE alan_test_PLAN_ADVICE (
>   a BIGINT,
>   b INT NOT NULL,
>   c VARCHAR,
>   d BIGINT
> ) WITH (
> 
>   'connector' = 'kafka',
>   'topic' = 'alan_test_PLAN_ADVICE_topic',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> EXPLAIN PLAN_ADVICE
> SELECT
>   AVG(a) AS avg_a,
>   COUNT(*) AS cnt,
>   COUNT(b) AS cnt_b,
>   MIN(b) AS min_b,
>   MAX(c) FILTER (WHERE a > 1) AS max_c
> FROM alan_test_PLAN_ADVICE;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          result |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(avg_a=[$0], cnt=[$1], cnt_b=[$1], min_b=[$2], max_c=[$3])
+- LogicalAggregate(group=[{}], avg_a=[AVG($0)], cnt=[COUNT()], min_b=[MIN($1)], max_c=[MAX($2) FILTER $3])
   +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 1))])
      +- LogicalTableScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]])

== Optimized Physical Plan With Advice ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
   +- Exchange(distribution=[single])
      +- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
         +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])

advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').

== Optimized Execution Plan ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
   +- Exchange(distribution=[single])
      +- Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])
         +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, alan_test_PLAN_ADVICE]], fields=[a, b, c, d])
 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

系统给出的建议如下:

advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').

2、示例2
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c STRING,
  d INT,
  `day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
  PRIMARY KEY (a, c) NOT ENFORCED
) WITH (
  'connector' = 'values',
  'changelog-mode' = 'I,UA,UB,D'  
);

CREATE TABLE MySink (
 a INT,
 b BIGINT,
 c STRING,
 PRIMARY KEY (a) NOT ENFORCED
) WITH (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

EXPLAIN PLAN_ADVICE
INSERT INTO MySink
SELECT a, b, `day`
FROM MyTable
WHERE b > 100;

== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
+- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])

advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.

related rel plan:
Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])

若检测到存在 NDU 问题风险时,优化器(optimizer)将会把建议内容附在最后。

advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.

3、示例3
Flink SQL> EXPLAIN PLAN_ADVICE select * from alan_ticker;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              result |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
   +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])

== Optimized Physical Plan With Advice ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

No available advice...

== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])
 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

若未检测到问题,优化器(optimizer)将会在计划最后附上 “No available advice” 作为提示。

No available advice...

4)、JSON_EXECUTION_PLAN

生成 json 格式的程序执行计划。

Flink SQL> EXPLAIN JSON_EXECUTION_PLAN select * from alan_ticker;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     result |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| == Abstract Syntax Tree ==
LogicalProject(symbol=[$0], price=[$1], tax=[$2], rowtime=[$3])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
   +- LogicalTableScan(table=[[default_catalog, default_database, alan_ticker]])

== Optimized Physical Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Optimized Execution Plan ==
TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 877,
    "type" : "Source: alan_ticker[654]",
    "pact" : "Data Source",
    "contents" : "[654]:TableSourceScan(table=[[default_catalog, default_database, alan_ticker, watermark=[-(rowtime, 1000:INTERVAL SECOND)]]], fields=[symbol, price, tax, rowtime])",
    "parallelism" : 1
  } ]
} |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

三、Use语句

USE 语句用来设置当前的 catalog 或者 database。

1、语法

1)、catalog

设置当前的 catalog。所有后续命令未显式指定 catalog 的将使用此 catalog。如果指定的的 catalog 不存在,则抛出异常。默认的当前 catalog 是 default_catalog。

USE CATALOG catalog_name;

2)、database

设置当前的 database。所有后续命令未显式指定 database 的将使用此 database。如果指定的的 database 不存在,则抛出异常。默认的当前 database 是 default_database。

USE [catalog_name.]database_name;

3)、MODULES

按声明的顺序设置已启用的模块。所有后续命令都将解析已启用模块中的元数据(函数/用户定义的类型/规则等),并遵循解析顺序。加载模块时默认使用模块。如果 USE MODULES 语句未使用,加载的模块将被禁用。默认加载和启用的模块是core。

USE MODULES module_name1[, module_name2, ...];

1、java示例

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 数据库名称
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

		String newDatabaseName = "alan_hivecatalog_hivedb";
		tenv.useDatabase(newDatabaseName);

		// 创建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tenv.executeSql(hive_create_table_sql);

		// 插入数据
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查询数据
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		
		// 使用hive MODULES 
		tenv.executeSql("USE MODULES hive");
		tenv.executeSql("SHOW FULL MODULES").print();

		env.execute();
	}

}

2、Flink sql cli 示例

Flink SQL> CREATE CATALOG cat1 WITH (...);
[INFO] Catalog has been created.

Flink SQL> SHOW CATALOGS;
default_catalog
cat1

Flink SQL> USE CATALOG cat1;

Flink SQL> SHOW DATABASES;

Flink SQL> CREATE DATABASE db1 WITH (...);
[INFO] Database has been created.

Flink SQL> SHOW DATABASES;
db1

Flink SQL> USE db1;

Flink SQL> USE MODULES hive;
[INFO] Use modules succeeded!
Flink SQL> SHOW FULL MODULES;
+-------------+-------+
| module name |  used |
+-------------+-------+
|        hive |  true |
|        core | false |
+-------------+-------+
2 rows in set

以上,简单的介绍了desc、explain和use的语法使用,并以示例的形式进行介绍。

标签:LOAD,RESET,USE,default,symbol,flink,+-,rowtime,table
From: https://blog.51cto.com/alanchan2win/7662249

相关文章

  • vue_error_Runtime directive used on component with non-element root node. The di
    翻译:'运行时指令,用于非元素根节点的组件。这些指令将无法发挥预期的作用';这个错误发生在我将v-show放在自定义组件上时,我想是因为自定义组件在渲染时会被自定义组件的内部元素替换,因此设置是无效的解决:在自定义组件外加一个div,把v-show写在div上......
  • 《Upload-Labs》01. Pass 1~13
    @目录索引前言Pass-01题解Pass-02题解总结Pass-03题解总结Pass-04题解Pass-05题解总结Pass-06题解总结Pass-07题解总结Pass-08题解总结Pass-09题解Pass-10题解Pass-11题解Pass-12题解总结Pass-13题解靶场部署在VMware-Win7。靶场地址:https://github.com/c0ny1/upload-lab......
  • 抽象CurrentUser适配Http和Job场景
    前言获取当前请求用户的基础信息是很常见的,诸如当前用户Id,角色,有无访问权限等。通常我们可以直接使用HttpContext.User来拿到当前经过认证后的请求人信息。但是这样对于分层应用不太友好,需要安装AspNetCore.Http.Abstractions的包,这样对于这层(非Web层)来讲也有所侵入了。Curren......
  • this.getOptions is not a function at Object.loader
    问题描述VuePress使用有样式的组件时报错this.getOptionsisnotafunctionatObject.loader原因分析是这个导致的<stylelang="scss">你的依赖反应不了它就会报错解决方案需要安装sass-loaderstyle-loadersass-loader版本不能太高。。。安装@7的成功运行......
  • 问题:对控制器类型“VideoController”的操作“UserVideos”的当前请求在下列操作方法
      这应该是你写了两个UserVideos()方法,翻看一下你的VideoController控制器,如果是多写了一个UserVideos()方法把多写的那个注销删除就可以了。(在其他情况下如果遇到操作方法不明确或者定义不明确可以先看看是不是什么东西写重复了) ......
  • Begin of Upload
    wp上传的文件后缀有要求  这个看出来是简单的客户端检测,直接用bp抓包改后缀即可先将原来的.php改成.png上传再到bp改后缀名   放包直接蚁剑连接 根目录找到flag   正文题目很简单,主要想分享一下过程中遇到的坑 要加http就是连接蚁剑的时候网址......
  • clickhouse 踩过的坑
    【1】clickhouseleftjoin时where条件判空无效DROPTABLEifexiststablename1;createtabletablename1engine=MergeTreeorderbycolumn1asselectt.*fromtablename2tleftjointablename3t1ont.column1=t1.column1wheret1.columnisnull;默认管理时右......
  • Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock
    153392398 RUNNING 2023-08-2309:10:09 6 397413 0 2 4 1136 2 2 0 REPEATABLEREAD 1 1 0 0 0 0 328854561014064 RUNNING 2023-08-2309:19:03 0 397493 0 0 0 1136 0 0 0 REPEATABLEREAD 1 1 0 0 0 0 328854560997800 RUNNING 2023-08-2309:06:41 0 39733......
  • java.net.ConnectException: Connection refused: no further information
    java.net.ConnectException:Connectionrefused:nofurtherinformation atsun.nio.ch.SocketChannelImpl.checkConnect(NativeMethod)~[na:1.8.0_91] atsun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)~[na:1.8.0_91] atio.netty.channe......
  • 音频数据的自定义DataLoader及其AutoEncoder降噪算法
    DataLoader要求每一个Batch里面的数据的shape都一样,但是语音数据显然不可能都是等长的,因为每一条语音长度都不一样,因此在定制DataLoader的时候还要对每一个batch的数据进行剪裁(crop)或者填充(padding)处理。这里采用padding来对齐数据,方法采用PytorchDiscussion的网友Felix......