首页 > 其他分享 >Flink之Catalog

Flink之Catalog

时间:2023-11-25 15:31:43浏览次数:42  
标签:Flink catalog Catalog SQL new mydb

Catalog

概述

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的 UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。

Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。

Catalog分类

在Flink中,Catalog可以分为4类:GenericInMemoryCatalogJdbcCatalogHiveCatalog用户自定义Catalog

1.GenericInMemoryCatalog

GenericInMemoryCatalog是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

2.JdbcCatalog

JdbcCatalog使得用户可以将Flink通过JDBC协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前 JDBC Catalog仅有的两种实现。

3.HiveCatalog

HiveCatalog有两个用途:作为原Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。

Hive Metastore以小写形式存储所有元数据对象名称。而GenericInMemoryCatalog区分大小写。

4.用户自定义Catalog

Catalog是可扩展的,用户可以通过实现Catalog接口来开发自定义Catalog。 想要在SQL CLI中使用自定义 Catalog,用户除了需要实现自定义的Catalog 之外,还需要为这个Catalog实现对应的CatalogFactory接口。

CatalogFactory定义了一组属性,用于SQL CLI启动时配置Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到CatalogFactory并初始化相应的Catalog 实例。

GenericInMemoryCatalog

基于内存实现的Catalog,所有元数据只在session的生命周期(一个Flink任务运行生命周期内)内可用。默认自动创建名为default_catalog的内存Catalog,这个Catalog默认只有一个名为default_database的数据库。

JdbcCatalog

JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。

这里以JdbcCatalog-MySQL使用为例。

注意:JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

下载JAR包及使用

下载:flink-connector-jdbc

下载:mysql-connector-j

上传JAR包到flink/lib

cp ./flink-connector-jdbc-3.1.0-1.17.jar /usr/local/program/flink/lib

cp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib

重启操作

重启flink集群和sql-client

bin/start-cluster.sh

bin/sql-client.sh

创建Catalog

JdbcCatalog支持以下选项:

name:必需,Catalog名称

default-database:连接到的默认数据库

username: Postgres/MySQL帐户的用户名

password:帐号密码

base-url:数据库的jdbc url(不含数据库名)
	Postgres Catalog:是"jdbc:postgresql://<ip>:<端口>"
	MySQL Catalog:是"jdbc: mysql://<ip>:<端口>"
CREATE CATALOG jdbc_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'demo',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://node01:3306'
);

查看与使用Catalog

查看Catalog

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    jdbc_catalog |
+-----------------+
2 rows in set

使用指定Catalog

Flink SQL> use catalog jdbc_catalog;
[INFO] Execute statement succeed.

查看当前的CATALOG

Flink SQL> SHOW CURRENT CATALOG;
+----------------------+
| current catalog name |
+----------------------+
|         jdbc_catalog |
+----------------------+
1 row in set

操作数据库表

Flink SQL> show current database;
+-----------------------+
| current database name |
+-----------------------+
|                  demo |
+-----------------------+
1 row in set


Flink SQL> show tables;
+------------+
| table name |
+------------+
|    tb_user |
+------------+
1 row in set

Flink SQL> select * from tb_user;
[INFO] Result retrieval cancelled.

Flink SQL> insert into tb_user values(0,'java',20);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9d78ec378ad635d291bd730ba86245d8

自动初始化catalog

进入SQL客户端自动初始化catalo,创建vim sql-client-init.sql初始化脚本

SET sql-client.execution.result-mode = 'tableau';

CREATE CATALOG jdbc_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'demo',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://node01:3306'
);

use catalog jdbc_catalog;

进入客户端时指定初始化文件

bin/sql-client.sh  -i ./sql-client-init.sql

再查看catalog

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    jdbc_catalog |
+-----------------+
2 rows in set

HiveCatalog

HiveCatalog有两个用途:

单纯作为 Flink元数据的持久化存储

作为读写现有Hive元数据的接口

注意:Hive MetaStore以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。

下载JAR包及使用

下载:flink-sql-connector-hive

下载:mysql-connector-j

上传jar包到flink的lib

cp ./flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar /usr/local/program/flink/lib/

cp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib

重启操作

重启flink集群和sql-client

bin/start-cluster.sh

bin/sql-client.sh

hive metastore服务

启动外置的hive metastore服务

Hive metastore必须作为独立服务运行,因此,在Hive的hive-site.xml中添加配置

  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://node01:9083</value>
  </property>
# 前台运行
hive --service metastore

# 后台运行
hive --service metastore &

创建Catalog

创建Catalog参数说明

配置项 必需 默认值 类型 说明
type Yes (none) String Catalog类型,创建HiveCatalog时必须设置为'hive'
name Yes (none) String Catalog的唯一名称
hive-conf-dir No (none) String 包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml
default-database No default String Hive Catalog使用的默认数据库
hive-version No (none) String HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败
hadoop-conf-dir No (none) String Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/local/program/hive/conf'
);

查看与使用Catalog

查看Catalog

Flink SQL> SHOW CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|          myhive |
+-----------------+
2 rows in set


--查看当前的CATALOG
SHOW CURRENT CATALOG;

使用指定Catalog

Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.

Flink与Hive中操作

Flink中查看

Flink SQL> SHOW DATABASES;
+---------------+
| database name |
+---------------+
|       default |
+---------------+
1 row in set

操作Hive

# 创建数据库demo
hive (default)> create database demo;

# 切换数据库
hive (default)> use demo;

# 创建表tb_user
hive (demo)> create table tb_user(id int,name string, age int);

# 插入数据
hive (demo)> insert into tb_user values(1,"test",22);

Flink中再次查看

Flink SQL> SHOW DATABASES;
+---------------+
| database name |
+---------------+
|       default |
|          demo |
+---------------+
2 rows in set

Flink SQL> use demo;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+------------+
| table name |
+------------+
|    tb_user |
+------------+


Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from tb_user;2023-07-09 21:58:25,620 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 1

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           test |          22 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

在Flink中插入

Flink SQL> insert into tb_user values(2,'flink',22);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9fe32af97cfb9e507ce84263cae65d23

Flink SQL> select * from tb_user;2023-07-09 22:05:47,521 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 2

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           test |          22 |
| +I |           2 |                          flink |          22 |
+----+-------------+--------------------------------+-------------+
Received a total of 2 rows

Hive中查询

hive (demo)> select * from tb_user;

自动初始化catalog

进入SQL客户端自动初始化catalog,创建vim sql-client-init.sql初始化脚本

SET sql-client.execution.result-mode = 'tableau';

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/local/program/hive/conf'
);

use catalog myhive ;

进入客户端时指定初始化文件

bin/sql-client.sh  -i ./sql-client-init.sql

可以发现数据信息任然存在

Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+---------------+
| database name |
+---------------+
|       default |
|          demo |
+---------------+
2 rows in set

用户自定义Catalog

实现Catalog

用户可以通过实现Catalog接口来开发自定义 Catalog

public class CustomCatalog implements Catalog {

    public CustomCatalog(String catalogName, String defaultDatabase) {
        
    }


    @Override
    public void open() {
        // 实现 Catalog 打开的逻辑
    }

    @Override
    public void close() {
        // 实现 Catalog 关闭的逻辑
    }

    @Override
    public List<String> listDatabases() {
        // 实现获取数据库列表的逻辑
        return null;
    }

    @Override
    public CatalogDatabase getDatabase(String databaseName) {
        // 实现获取指定数据库的逻辑
        return null;
    }

    @Override
    public boolean databaseExists(String databaseName) {
        // 实现检查数据库是否存在的逻辑
        return false;
    }

    @Override
    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) {
        // 实现创建数据库的逻辑
    }

    @Override
    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) {
        // 实现删除数据库的逻辑
    }

    @Override
    public List<String> listTables(String databaseName) {
        // 实现获取数据库中表的列表的逻辑
        return null;
    }

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath) {
        // 实现获取指定表的逻辑
        return null;
    }

    @Override
    public boolean tableExists(ObjectPath tablePath) {
        // 实现检查表是否存在的逻辑
        return false;
    }

    @Override
    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {
        // 实现创建表的逻辑
    }

    @Override
    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) {
        // 实现删除表的逻辑
    }

    @Override
    public List<String> listFunctions(String dbName) {
        // 实现获取数据库中函数的逻辑
        return null;
    }

    // 其他方法的实现
}

使用Catalog

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 注册自定义 Catalog
        tableEnv.registerCatalog("my_catalog", new CustomCatalog("my_catalog", "default"));

        // 使用自定义 Catalog
        tableEnv.useCatalog("my_catalog");

        // 执行 SQL 查询或 Table API 操作
        tableEnv.sqlQuery("SELECT * FROM my_table").execute().print();
    }

Catalog API

数据库操作

    public static void main(String[] args) throws Exception {
        // 创建一个基于内存的Catalog实例
        GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("myCatalog");
        catalog.open();


        // 创建数据库
        Map<String, String> properties = new HashMap<>();
        properties.put("key", "value");
        CatalogDatabase database = new CatalogDatabaseImpl(properties, "create comment");
        catalog.createDatabase("mydb", database, false);

        // 列出Catalog中的所有数据库
        System.out.println("列出Catalog中的所有数据库 = " + catalog.listDatabases());

        // 获取数据库
        CatalogDatabase createDb = catalog.getDatabase("mydb");
        System.out.println("获取数据库,comment =  " + createDb.getComment() + " ,properties = " + createDb.getProperties());

        // 修改数据库
        Map<String, String> properties2 = new HashMap<>();
        properties2.put("key", "value1");
        catalog.alterDatabase("mydb", new CatalogDatabaseImpl(properties2, "alter comment"), false);

        // 获取数据库
        CatalogDatabase alterDb = catalog.getDatabase("mydb");
        System.out.println("获取数据库,comment =  " + alterDb.getComment() + " ,properties = " + alterDb.getProperties());

        // 检查数据库是否存在
        System.out.println("检查数据库是否存在 = " + catalog.databaseExists("mydb"));

        // 删除数据库
        catalog.dropDatabase("mydb", false);

        // 关闭 Catalog
        catalog.close();
    }
列出Catalog中的所有数据库 = [default, mydb]
获取数据库,comment =  create comment ,properties = {key=value}
获取数据库,comment =  alter comment ,properties = {key=value1}
检查数据库是否存在 = true

表操作

// 创建表
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// 删除表
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// 修改表
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// 重命名表
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// 获取表
catalog.getTable("mytable");

// 检查表是否存在
catalog.tableExists("mytable");

// 列出数据库中的所有表
catalog.listTables("mydb");

视图操作

// 创建视图
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// 删除视图
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// 修改视图
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// 重命名视图
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// 获取视图
catalog.getTable("myview");

// 检查视图是否存在
catalog.tableExists("mytable");

// 列出数据库中的所有视图
catalog.listViews("mydb");

分区操作

// 创建分区
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// 删除分区
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// 修改分区
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// 获取分区
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 检查分区是否存在
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 列出表的所有分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// 根据给定的分区规范列出表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 根据表达式过滤器列出表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

函数操作

// 创建函数
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// 删除函数
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// 修改函数
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// 获取函数
catalog.getFunction("myfunc");

// 检查函数是否存在
catalog.functionExists("myfunc");

// 列出数据库中的所有函数
catalog.listFunctions("mydb");

标签:Flink,catalog,Catalog,SQL,new,mydb
From: https://blog.51cto.com/chencoding/8561265

相关文章

  • Flink实战(11)-Exactly-Once语义之两阶段提交
    0大纲[ApacheFlink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持:数据源(source)和输出端(sink)包括ApacheKafka0.11及更高版本。它提......
  • Flink源码解析(六)——数据分区解析
    一、数据分区概念对分布式计算引擎来说,数据分区的主要作用是将现环节的数据进行切分,交给下游位于不同物理节点上的Task计算。二、Flink数据分区接口体系1、顶层接口ChannelSelector(1).setup()方法设置下游算子的通道数量。从该接口中可以看到,算子里的每一个分区器都知道下游......
  • 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
    本文整理自阿里云智能开源表存储负责人,FounderofPaimon,FlinkPMC成员李劲松在云栖大会开源大数据专场的分享。本篇内容主要分为四部分:数据分析架构演进介绍ApachePaimonFlink+Paimon流式湖仓流式湖仓Demo演示数据分析架构演进目前,数据分析架构正在从Hive到Lakehouse的演变......
  • flink入门程序
    flink入门程序生成项目mvnarchetype:generate-DarchetypeGroupId=org.apache.flink-DarchetypeArtifactId=flink-quickstart-java-DarchetypeVersion=1.16.4wordcount入门教程socket进程收集数据#terminal运行nc-lk9999javaflink处理importorg.apa......
  • [Flink] Flink(CDC/SQL)Job在启动时,报“ConnectException: Error reading MySQL varia
    1问题描述1.1基本信息所属环境:CN-PT问题时间:2023-11-21所属程序:FlinkJob(XXXPT_dimDeviceLogEventRi)作业类型:FlinkSQLJob数据流:业务MySQL==>FlinkJob(FlinkCdcConnector(mysql)+FlinkSQL)==>BigdataKafka==>BigdataOLAP==>业务系统作业......
  • 聊聊Flink必知必会(五)
    聊聊Flink的必知必会(三)聊聊Flink必知必会(四)从源码中,根据关键的代码,梳理一下Flink中的时间与窗口实现逻辑。WindowedStream对数据流执行keyBy()操作后,再调用window()方法,就会返回WindowedStream,表示分区后又加窗的数据流。如果数据流没有经过分区,直接调用window()方法则......
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSyst
    文章目录Flink系列文章一、Table&SQLConnectors1、概述2、支持的外部连接3、使用示例:kafka4、Transformtableconnector/formatresources5、SchemaMapping6、Metadata7、PrimaryKey8、TimeAttributes9、ProctimeAttributes10、RowtimeAttributes11、完整示例1)、建表2)、......
  • Flink(三):无状态转换map() 和flatMap()
    一、简介算子map()和flatMap()是用于实现无状态转换的基本操作。二、map()map()算子接收一个MapFunction接口参数,对元素进行一对一转换,即每个元素对应恰好一个结果。由于MapFunction是函数式接口,因此可以使用Lambda表达式。代码如下:StreamExecutionEnvironmentenv......
  • 19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
    (文章目录)本文展示了自定义函数在Flinksqlclient的应用以及自定义函数中使用pojo的示例。本文依赖flink、kafka集群能正常使用。本文分为2个部分,即自定义函数在Flinksqlclient中的应用以及自定义函数中使用pojo数据类型。本文的示例如无特殊说明则是在Flink1.17版本中运......
  • Flink CDC 同步 demo
    运行docker-compose.yml搭建数据库源,官方mysql样例数据源无法启动,改用其他mysql镜像version:'2.1'services:postgres:image:debezium/example-postgres:1.1ports:-"5432:5432"environment:-POSTGRES_PASSWORD=1234-POSTGR......