首页 > 其他分享 >1. Calcite元数据创建

1. Calcite元数据创建

时间:2025-01-20 20:45:45浏览次数:1  
标签:jdbc 创建 org role 数据源 import Calcite 数据 calcite

1. 简介

Calcite 是一款来自 Apache 的开源动态数据管理框架,核心功能是提供 SQL 查询解析、优化及执行等基础能力,以灵活支持多种数据源,广泛应用于各类数据处理系统。以下从其功能特性、应用场景、优势三方面简单概述:

  • 功能特性
    • SQL 解析:支持多种 SQL 方言,如标准 SQL 以及不同数据库特定的扩展语法,能将输入的 SQL 语句解析为抽象语法树(AST),便于后续处理。
    • 语义分析:对解析后的 SQL 进行语义检查,比如验证表名、列名是否存在,数据类型是否匹配等,确保 SQL 的语义正确。
    • 查询优化:运用基于规则(RBO)和基于代价(CBO)的优化策略。RBO 通过预设规则,如谓词下推等,重写查询;CBO 则基于统计信息,估算不同执行计划的代价,选择最优方案。
    • 执行计划生成:根据优化后的结果,生成可执行的物理执行计划,定义操作的具体执行顺序和方式。
    • 数据源适配:可连接多种数据源,如关系型数据库(MySQL、Oracle 等)、文件系统(CSV、JSON 文件)、NoSQL 数据库等,而且还支持自定义数据源适配器, 并为不同数据源生成相应的数据访问策略。
    • 跨数据源查询: 能够连接不同类型的数据源,通过适配器将不同数据源的操作进行统一抽象。在进行跨数据源连表查询时,它会将查询分解为各个数据源可以处理的子查询,然后将各个数据源的结果进行合并和进一步处理

2. 元数据准备

准备两个数据库 mysqlpostgres

库信息如下: mysql中有张表: user, postgres有张表role

表信息如下:

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `username` varchar(255) DEFAULT NULL COMMENT '用户名称',
  `age` int(11) DEFAULT NULL COMMENT '性别',
  `sex` varchar(255) DEFAULT NULL COMMENT '性别',
  `role_key` int(11) DEFAULT NULL COMMENT '角色',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=32 DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';
CREATE TABLE "public"."role" (
  "name" varchar(255) COLLATE "pg_catalog"."default",
  "role_key" int4
);

3. maven依赖

maven依赖如下:

<dependency>
  <groupId>org.apache.calcite</groupId>
  <artifactId>calcite-core</artifactId>
  <version>1.37.0</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.29</version>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>42.2.23</version>
</dependency>

4. 元数据定义

calcite支持两种多种定义元数据方式 常用的是通过json方式,另一种是通过SchemaFactory的方式。

4.1 Json Model

组织结构:

|- model # 数据模型
|	|- schema # 数据模式
|	|	|- tables # 表/视图
|	|	|- functions # 函数
|	|	|- type # 模式类型  custom: 自定义, map: 映射, jdbc: jdbc, inline: 嵌入式 (默认)
|	|	|- factory # 指定SchemaFactory的工厂类
|	|	|- operand # 指定额外参数

示例内容:

创建两个数据源 mysql 和 postgres, 使用两种不同的声明方式

{
  "version": "1.0",
  "defaultSchema": "my_mysql",
  "schemas": [
    {
      "type": "jdbc",
      "name": "my_mysql",
      "jdbcUser": "root",
      "jdbcPassword": "123456",
      "jdbcUrl": "jdbc:mysql://localhost:3306/test",
      "jdbcCatalog": "test",
      "jdbcSchema": null
    },
    {
      "name": "my_postgres",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        "jdbcDriver": "org.postgresql.Driver",
        "jdbcUrl": "jdbc:postgresql://localhost:5432/test",
        "jdbcUser": "root",
        "jdbcPassword": "123456"
      }
    }
  ]
}

calcite model 实现类org.apache.calcite.jdbc.Driver --> org.apache.calcite.model.ModelHandler

calcite model doc:https://calcite.apache.org/docs/model.html

加载资源:

将json文件放到resources下, 然后创建connection的时候指定该文件即可

Properties info = new Properties();
// 不区分sql大小写
info.setProperty("caseSensitive", "false");
// 设置引用标识符为反引号
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 指定model信息
info.setProperty("model", resourcePath("model/model.json"));
// 创建Calcite连接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 创建SQL语句执行查询
Statement statement = calciteConnection.createStatement();

4.2 SchemaFactory

schema UML图如下:

先创建对应数据源的datasource对象

private static DataSource getMysqlDataSource() {
  MysqlDataSource dataSource = new MysqlDataSource();
  dataSource.setUrl("jdbc:mysql://localhost:3306/test");
  dataSource.setUser("root");
  dataSource.setPassword("123456");

  return dataSource;
}

private static DataSource getPostgresDataSource() {
  final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
  pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test");
  pgSimpleDataSource.setUser("root");
  pgSimpleDataSource.setPassword("123456");

  return pgSimpleDataSource;
}

然后将datasource对象包装成JdbcSchema对象最后注册到rootSchema

Properties info = new Properties();
// 不区分sql大小写
info.setProperty("caseSensitive", "false");
// 设置引用标识符为双引号
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 创建Calcite连接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 设置默认的schema, 如果不设置需要加上对应数据源的名称
calciteConnection.setSchema("my_mysql");
final DataSource mysqlDataSource = getMysqlDataSource();
final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
final DataSource postgresDataSource = getPostgresDataSource();
final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public");
rootSchema.add("my_mysql", schemaWithMysql);
rootSchema.add("my_postgres", schemaWithPostgres);
// 创建SQL语句执行查询
Statement statement = calciteConnection.createStatement();

rootSchema也可以使用创建

CalciteSchema calciteSchema = CalciteSchema.createRootSchema(true, true);
SchemaPlus rootSchema = calciteSchema.plus();

5. 测试查询

5.1 测试单个数据源的查询功能

@Test
@SneakyThrows
public void test_connection() {
  // 上述配置中都设置了默认的schema为my_mysql, 所以查询的时候可以不添加数据源key前缀
  final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
  final ResultSet resultSet = statement.executeQuery("SELECT * FROM my_mysql.`user`");
  printResultSet(resultSet);
}

输出结果如下:

Number of columns: 5
{sex=1, role_key=1, id=1, age=23, username=张三}
{sex=2, role_key=2, id=2, age=18, username=李四}
{sex=2, role_key=1, id=3, age=26, username=张铁牛}
{sex=2, role_key=3, id=4, age=30, username=王麻子}

5.2 测试不同数据源连表查询

calcite支持将不同数据源的sql下推, 然后在内存中做对应的关联过滤等操作

@Test
@SneakyThrows
public void test_cross_db_query() {
    final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
    printResultSet(resultSet);
}

输出结果如下:

Number of columns: 6
{sex=1, role_key=1, name=管理员, id=1, age=23, username=张三}
{sex=2, role_key=1, name=管理员, id=3, age=26, username=张铁牛}
{sex=2, role_key=2, name=老师, id=2, age=18, username=李四}
{sex=2, role_key=3, name=学生, id=4, age=30, username=王麻子}

6. 完整测试代码

6.1 Json Model

package com.ldx.calcite;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Sources;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testng.collections.Maps;

import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;

@Slf4j
public class CalciteModelTest {
    private static Statement statement;

    @BeforeAll
    @SneakyThrows
    public static void beforeAll() {
        Properties info = new Properties();
        // 不区分sql大小写
        info.setProperty("caseSensitive", "false");
        // 设置引用标识符为双引号
        info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
        // 指定model信息
        info.setProperty("model", resourcePath("model/model.json"));
        // 创建Calcite连接
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
        SchemaPlus rootSchema = calciteConnection.getRootSchema();
        // 创建SQL语句执行查询
        statement = calciteConnection.createStatement();
    }

    @Test
    @SneakyThrows
    public void test_connection() {
        final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
        printResultSet(resultSet);
    }

    @Test
    @SneakyThrows
    public void test_cross_db_query() {
        final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
        printResultSet(resultSet);
    }

    public static void printResultSet(ResultSet resultSet) throws SQLException {
        // 获取 ResultSet 元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        log.info("Number of columns: {}",columnCount);

        // 遍历 ResultSet 并打印结果
        while (resultSet.next()) {
            final Map<String, String> item = Maps.newHashMap();
            // 遍历每一列并打印
            for (int i = 1; i <= columnCount; i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                item.put(columnName, columnValue);
            }

            log.info(item.toString());
        }
    }

    private static String resourcePath(String path) {
        final URL url = CalciteCsvTest.class.getResource("/" + path);
        return Sources
                .of(url).file().getAbsolutePath();
    }
}

6.2 SchemaFactory

package com.ldx.calcite;

import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Sources;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.postgresql.osgi.PGDataSourceFactory;
import org.testng.collections.Maps;

import javax.sql.DataSource;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;

@Slf4j
public class CalciteCreateMataDataTest {

    private static Statement statement;

    @BeforeAll
    @SneakyThrows
    public static void beforeAll() {
        Properties info = new Properties();
        // 不区分sql大小写
        info.setProperty("caseSensitive", "false");
        // 设置引用标识符为双引号
        info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
        // 创建Calcite连接
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
        SchemaPlus rootSchema = calciteConnection.getRootSchema();
        // 设置默认的schema, 如果不设置需要加上对应数据源的名称
        calciteConnection.setSchema("my_mysql");
        final DataSource mysqlDataSource = getMysqlDataSource();
        final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
        final DataSource postgresDataSource = getPostgresDataSource();
        final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public");
        rootSchema.add("my_mysql", schemaWithMysql);
        rootSchema.add("my_postgres", schemaWithPostgres);
        // 创建SQL语句执行查询
        statement = calciteConnection.createStatement();
    }

    private static DataSource getMysqlDataSource() {
        MysqlDataSource dataSource = new MysqlDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:3306/test");
        dataSource.setUser("root");
        dataSource.setPassword("123456");

        return dataSource;
    }

    private static DataSource getPostgresDataSource() {
        final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
        pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test");
        pgSimpleDataSource.setUser("root");
        pgSimpleDataSource.setPassword("123456");

        return pgSimpleDataSource;
    }

    @Test
    @SneakyThrows
    public void test_connection() {
        final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
        printResultSet(resultSet);
    }

    @Test
    @SneakyThrows
    public void test_cross_db_query() {
        final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
        printResultSet(resultSet);
    }

    public static void printResultSet(ResultSet resultSet) throws SQLException {
        // 获取 ResultSet 元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        log.info("Number of columns: {}",columnCount);

        // 遍历 ResultSet 并打印结果
        while (resultSet.next()) {
            final Map<String, String> item = Maps.newHashMap();
            // 遍历每一列并打印
            for (int i = 1; i <= columnCount; i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                item.put(columnName, columnValue);
            }

            log.info(item.toString());
        }
    }

    private static String resourcePath(String path) {
        final URL url = CalciteCsvTest.class.getResource("/" + path);
        return Sources
                .of(url).file().getAbsolutePath();
    }
}

标签:jdbc,创建,org,role,数据源,import,Calcite,数据,calcite
From: https://www.cnblogs.com/ludangxin/p/18682268

相关文章

  • 数据结构-堆及堆排序
    1.堆的定义堆(Heap)是一种数据结构,通常是一个完全二叉树。在堆中,每个节点都有一个与其相关的值,并且满足堆的性质。堆分为两种类型:大堆和小堆。大堆:在大堆中,对于每个非叶子节点,其值都大于或等于它的子节点的值。也就是说,根节点的值是整个堆中的最大值。小堆:与大堆相反,在小堆中,对......
  • 【第一天】零基础入门刷题Python-算法篇-数据结构与算法的介绍(持续更新)
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言一、Python数据结构与算法的详细介绍1.基本概念2.Python中的数据结构1.列表(List)2.元组(Tuple)3.字典(Dictionary)4.集合(Set)5.字符串(String)3.Python中的常用算法1.排序算法2.搜索算法3.递......
  • JSP某医学院实习管理系统7s3pv--程序+源码+数据库+调试部署+开发环境
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表技术要求:开发语言:JSP前端使用:HTML5,CSS,JSP动态网页技术后端使用SpringBoot,Spring技术主数据库使用MySQL开题报告内容一、研究背景与意义随着医学教育的快......
  • JSP明星周边在线购物商城zzfnj(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表技术要求:开发语言:JSP前端使用:HTML5,CSS,JSP动态网页技术后端使用SpringBoot,Spring技术主数据库使用MySQL开题报告内容一、课题背景及意义随着互联网技术......
  • JSP民族服饰文化数字化展示系统cyi8g程序+源码+数据库+调试部署+开发环境
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、课题名称民族服饰文化数字化展示系统二、研究目的与意义本课题旨在探索民族服饰文化的数字化展示方法,通过构建数字化展示系统,将传统民族服饰......
  • JSP民宿短租系统an01j(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表技术要求:开发语言:JSP前端使用:HTML5,CSS,JSP动态网页技术后端使用SpringBoot,Spring技术主数据库使用MySQL开题报告内容一、研究背景与意义随着互联网技术的......
  • 查询SQL数据库
    问题希望从数据库获取一些数据。解决方案使用PDO::query()向数据库发送SQL查询,然后利用一个foreach循环获取每行结果。向数据库发送查询//设置数据库连接所需的用户名$user='admin';//设置数据库连接所需的密码$password='123456';//创建一个PDO实例来连接到MySQL......
  • 阳振坤:AI 大模型的基础是数据,AI越发达,数据库价值越大
    2024年1月12日,第四届OceanBase数据库大赛决赛在北京圆满落幕。在大赛的颁奖典礼上,OceanBase首席科学家阳振坤老师为同学们献上了一场主题为“爱上数据库”的公开课,他不仅分享了个人的成长历程,还阐述了对数据库行业现状与未来的见解和思考。阳老师回忆了自己年轻时,与如今的同......
  • Spring Boot + Apache POI 实现 Excel 导出:BOM物料清单生成器(支持中文文件名、样式美
    目录引言ApachePOI操作Excel的实用技巧1.合并单元格操作2.设置单元格样式1.创建样式对象2.设置边框3.设置底色4.设置对齐方式5.设置字体样式6.设置自动换行7.应用样式到单元格3.定位和操作指定单元格4.实现标签-值的形式5.列宽设置1.设置单个列宽2.......
  • (2024最新毕设合集)基于SpringBoot的游乐园管理系统-69394|可做计算机毕业设计JAVA、PHP
    目录1绪论1.1选题背景与意义1.2国内外研究现状1.3论文结构与章节安排2系统分析2.1可行性分析2.1.1经济可行性2.1.2技术可行性2.1.3操作可行性2.2系统流程分析2.2.1系统开发流程2.2.2用户登录流程2.2.3系统操作流程2.2.4添加信息流程2.2.5......