首页 > 数据库 >【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

时间:2024-01-06 11:01:55浏览次数:38  
标签:org flink 视图 API import apache Table

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文给出了通过Table API 和SQL 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过Table API 创建了一张Hive的表,然后在该表上创建视图进行示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1) 17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章: 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等) 【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本) 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作 【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作 【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本) 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版 【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:通过Table API 和 SQL 创建视图

1、示例:通过SQL创建视图

本示例是通过sql创建一个简单的表,然后再通过sql创建一个视图,最后查询视图并输出结果。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;
/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		// SQL 创建输入表
		String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
				"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
				"  `partition` BIGINT METADATA VIRTUAL,\r\n" +
				"  `offset` BIGINT METADATA VIRTUAL,\r\n" +
				"  `user_id` BIGINT,\r\n" +
				"  `item_id` BIGINT,\r\n" +
				"  `behavior` STRING\r\n" +
				") WITH (\r\n" +
				"  'connector' = 'kafka',\r\n" +
				"  'topic' = 'user_behavior',\r\n" +
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
				"  'properties.group.id' = 'testGroup',\r\n" +
				"  'scan.startup.mode' = 'earliest-offset',\r\n" +
				"  'format' = 'csv'\r\n" +
				");";
		tenv.executeSql(sourceSql);

		//
		String sql = "select user_id , behavior from Alan_KafkaTable group by user_id ,behavior ";
		Table resultQuery = tenv.sqlQuery(sql);
		tenv.createTemporaryView("Alan_KafkaView", resultQuery);

		String queryViewSQL = " select * from Alan_KafkaView ";
		Table queryViewResult = tenv.sqlQuery(queryViewSQL);

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(queryViewResult, Row.class);

		// 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
		// kafka中输入测试数据
		// 1,1001,login
		// 1,2001,p_read

		// 程序运行控制台输入如下
		// 3> (true,+I[1, login])
		// 14> (true,+I[1, p_read])
	}
}
	

2、示例:通过Table API创建视图

本示例是通过Table API创建一个hive的表,将数据写入hive,然后再创建视图,最后查询视图输出。 本示例依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;
/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		// SQL 创建输入表
		String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
				"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
				"  `partition` BIGINT METADATA VIRTUAL,\r\n" +
				"  `offset` BIGINT METADATA VIRTUAL,\r\n" +
				"  `user_id` BIGINT,\r\n" +
				"  `item_id` BIGINT,\r\n" +
				"  `behavior` STRING\r\n" +
				") WITH (\r\n" +
				"  'connector' = 'kafka',\r\n" +
				"  'topic' = 'user_behavior',\r\n" +
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
				"  'properties.group.id' = 'testGroup',\r\n" +
				"  'scan.startup.mode' = 'earliest-offset',\r\n" +
				"  'format' = 'csv'\r\n" +
				");";
		tenv.executeSql(sourceSql);

		// 创建视图
		String catalogName = "alan_hive";
		String defaultDatabase = "default";
		String databaseName = "viewtest_db";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
		tenv.registerCatalog(catalogName, hiveCatalog);
		tenv.useCatalog(catalogName);
		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
		}, true);
		tenv.useDatabase(databaseName);

		String viewName = "Alan_KafkaView";
		String originalQuery = "select user_id , behavior from Alan_KafkaTable group by user_id ,behavior  ";
		String expandedQuery = "SELECT  user_id , behavior FROM " + databaseName + "." + "Alan_KafkaTable  group by user_id ,behavior   ";
		String comment = "this is a comment";
		ObjectPath path = new ObjectPath(databaseName, viewName);

		createView(originalQuery, expandedQuery, comment, hiveCatalog, path);

		// 查询视图
		String queryViewSQL = " select * from Alan_KafkaView ";
		Table queryViewResult = tenv.sqlQuery(queryViewSQL);

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(queryViewResult, Row.class);

		// 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
		// kafka中输入测试数据
		// 1,1001,login
		// 1,2001,p_read

		// 程序运行控制台输入如下
		// 3> (true,+I[1, login])
		// 14> (true,+I[1, p_read])

	}
	
	static void createView(String originalQuery, String expandedQuery, String comment, HiveCatalog hiveCatalog, ObjectPath path) throws Exception {
			ResolvedSchema resolvedSchema = new ResolvedSchema(
					Arrays.asList(
							Column.physical("user_id", DataTypes.INT()),
							Column.physical("behavior", DataTypes.STRING())),
					Collections.emptyList(),
					null);
	
			CatalogView origin = CatalogView.of(
					Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
					comment,
					originalQuery,
					expandedQuery,
					Collections.emptyMap());
			CatalogView view = new ResolvedCatalogView(origin, resolvedSchema);
			hiveCatalog.createTable(path, view, false);
	
		}

}

以上,本文给出了通过Table API 和SQL 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过Table API 创建了一张Hive的表,然后在该表上创建视图进行示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1) 17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章: 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等) 【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本) 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作 【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作 【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本) 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版 【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

标签:org,flink,视图,API,import,apache,Table
From: https://blog.51cto.com/alanchan2win/9125956

相关文章

  • • python 脚本 输入字符串 输出字符串+当前时间 生成api http请求
    案例问题背景python脚本输入字符串输出字符串+当前时间生成apihttp请求脚本1这是单线程的单次处理单个http请求同时多个请求按照顺序处理而不是并行处理多请求!=多线程但是相关使用多线程来并行处理多请求使用flask或django等web服务器框架可以与wsgi服务器配合使用比如guni......
  • Stable Diffusion Win10部署,仅使用CPU,20240106
    先决条件:a.下载githttps://git-scm.com/download/winb.如果使用GPU,需要使用cuda,本文档对GPU环境不做讨论https://developer.nvidia.com/cuda-toolkit-archive注:CUDA版本需要和torch版本匹配https://pytorch.org/get-started/locally/    1.下载conda https://docs.conda.io/e......
  • 2024年小红书最新x-s-common签名算法分析以及点赞api接口测试nodejs(2024-01-05)
      2024年小红书又更新了x-s-common算法,现在的版本是:3.6.8。这个签名算法现在是越来越重要了,许多接口都要用到。比如:评论,点赞等接口,没有这个算法采集不到数据。  一、chrome逆向x-s-common算法  1、x-s-common  打开chrome,按f12,打开开发者模式,随便找一接口,全局......
  • el-table 设置合并行或列时,显示错乱问题
    1.需求效果图:2.接口数据格式:点击查看代码constlist=[{contractNo:"CAI-20220801001",contractItem:"用户质量指数",count:15234,customerItems:[{contractNo:null,contractItem:"反欺诈分......
  • elastic常用api
    elasticsearch运维常用API查看集群状态查询集群状态命令:curl-XGET"http://ip:port/_cluster/health?pretty"#?prettyjson打印结果查询集群JVM状态curl-XGET"http://ip:port/_nodes/stats/jvm?pretty"#查看具体某一个curl-XGET"http://ip:port/_nodes/nodeNa......
  • 安卓期末小项目TrackTable收支表+源码
    一、需求分析这是一款账目记录、分析App,本系统主要功能有:用户登录注册、首页账单分析、上传账单、搜索账单信息、个人信息、重置密码、数据效验。系统功能图系统总用例图二、系统开发平台环境IDE:AndroidStudio 2021.1.x插件:simpleUMLCE工具:Visustinv8DemoJava版本:Java11OS:win11......
  • ChatGPT的中转站 oupuapi,不扶墙也能上楼
    我们在建类似chatgpt聊天站点的时候,只需对服务器进行扶墙代理,便可实现访问。那么我们只需要往深里想一下,不要让整个服务器去访问VPN,而是基于API的代理,或者说API的中转站也就孕育而生了——我不是做产品的,找了一下找到了oupo.top/register?aff=xDgB。TextGenerator中使......
  • 支持API文档生成,API管理工具:Apipost
    随着数字化转型的加速,API(应用程序接口)已经成为企业间沟通和数据交换的关键。而在API开发和管理过程中,API文档、调试、Mock和测试的协作显得尤为重要。Apipost正是这样一款一体化协作平台,旨在解决这些问题,提高API开发效率和质量。Apipost提供API文档管理功能,让后端开发人员可以在开......
  • Springboot 2.7 open api:swagger | knife4j | spring doc
    *[集成SpringDoc接口文档和knife4j|SpringBoot2.7.2实战基础-掘金](https://juejin.cn/post/7201195677128687674)*[Springboot2.7集成Swagger增强版接口框架Knife4j4.3+springdocOpenApi3.0\_knife4jspringboot2.7-CSDN博客](https://blog.csdn.net/Mrqi......
  • 支持API文档生成,API管理工具:Apipost
    随着数字化转型的加速,API(应用程序接口)已经成为企业间沟通和数据交换的关键。而在API开发和管理过程中,API文档、调试、Mock和测试的协作显得尤为重要。Apipost正是这样一款一体化协作平台,旨在解决这些问题,提高API开发效率和质量。 Apipost提供API文档管理功能,让后端开发人员可......