首页 > 数据库 >Presto适配高斯数据库

Presto适配高斯数据库

时间:2024-03-06 18:11:51浏览次数:29  
标签:return 高斯 presto 适配 Presto gauss facebook import com

Presto是什么

Presto 是一个分布式 SQL 查询引擎,旨在查询大型数据集 分布在一个或多个异构数据源上。笔者所参与的项目主要使用Presto做数据探查和数据分析。

Presto架构

Presto查询引擎是一个Master-Slave的架构,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。

Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行。

Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。

Presto是功能如何实现

Presto工程中使用Connector负责Presto与数据源进行交互,不同的数据库对应于不同的Connector。Connector是使用 SPI 作为服务提供/发现机制的。Java中SPI机制主要思想是将装配的控制权移到程序之外。

在Presto中的应用就是基于 com.facebook.presto.spi.Plugin 这个接口实现一个对应的xxxPlugin。并且在plugin.dir配置的路径中添加实现com.facebook.presto.spi.Plugin接口代码对应的产物,即可在Presto中实现与数据源的交互。

高斯数据库GuassPlugin插件开发

软件版本信息

· presto源码版本0.253

· jdk1.8-151(和presto源码版本对应)

具体实现步骤

Presto原始项目中创建presto-gauss项目

presto-gauss pom中需要添加com.facebook.presto作为父项目,同时需要添加Presto官方实现的JdbcPlugin依赖包,以及高斯数据库JDBC包。

将打包项目打包方式设置成presto-plugin。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>presto-root</artifactId>
        <groupId>com.facebook.presto</groupId>
        <version>0.253</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>presto-gauss</artifactId>
    <description>Presto - gauss Connector</description>
    <packaging>presto-plugin</packaging>

    <dependencies>

        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-base-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.facebook.airlift</groupId>
            <artifactId>configuration</artifactId>
        </dependency>


        <dependency>
            <groupId>com.facebook.airlift</groupId>
            <artifactId>log-manager</artifactId>
            <scope>runtime</scope>
        </dependency>


        <dependency>
            <groupId>com.google.inject</groupId>
            <artifactId>guice</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.validation</groupId>
            <artifactId>validation-api</artifactId>
        </dependency>

        <!-- Presto SPI -->
        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-spi</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-common</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.facebook.drift</groupId>
            <artifactId>drift-api</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>slice</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>units</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <scope>provided</scope>
        </dependency>

       
        <!-- 华为高斯依赖 -->
        <dependency>
            <groupId>com.huawei</groupId>
            <artifactId>gsjdbc200</artifactId>
            <version>8.1.3</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <configuration>
                    <!-- TODO: Remove this once fixed -->
                    <ignoredDependencies>
                    </ignoredDependencies>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>ci</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-surefire-plugin</artifactId>
                        <configuration>
                            <excludes combine.self="override"/>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>

        <profile>
            <id>default</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-surefire-plugin</artifactId>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

在presto-root项目的pom中增加module

<modules>
    …… 
    <module>presto-gauss</module> 
    ……
</modules> 

在presto-server项目中增加presto-gauss依赖

<dependency>
	<groupId>com.facebook.presto</groupId>
	<artifactId>presto-gauss</artifactId>
	<version>${project.version}</version>
	<type>zip</type>
	<scope>provided</scope>
</dependency>

基于高斯数据库驱动实现ConnectionFactory实例的创建

因为高斯数据库是JDBC 数据源官方的JdbcPlugin中已经实现com.facebook.presto.spi.Plugin中的接口,因此我们仅需高斯Jdbc驱动实例构建ConnectionFactory即可。

高斯数据库驱动对于JDBC的支持程度、数据源本身的特性,所以需要对原始JDBC进行一些适配。

创建GaussClient.java实现ConnectionFactory,同时完成表重命名、数据类型映射、查询表和模式的等方法进行适配。

	package com.facebook.presto.plugin.gauss;
	
	import com.facebook.presto.common.type.Decimals;
	import com.facebook.presto.common.type.Type;
	import com.facebook.presto.common.type.VarcharType;
	import com.facebook.presto.plugin.jdbc.*;
	import com.facebook.presto.spi.ConnectorSession;
	import com.facebook.presto.spi.PrestoException;
	import com.facebook.presto.spi.SchemaTableName;
	import com.huawei.gauss200.jdbc.Driver;
	
	import javax.inject.Inject;
	import java.sql.*;
	import java.util.Optional;
	import java.util.Properties;
	
	import static com.facebook.presto.common.type.DecimalType.createDecimalType;
	import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
	import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
	import static com.facebook.presto.common.type.VarcharType.createVarcharType;
	import static com.facebook.presto.plugin.jdbc.DriverConnectionFactory.basicConnectionProperties;
	import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
	import static com.facebook.presto.plugin.jdbc.StandardReadMappings.*;
	import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
	import static java.lang.String.format;
	import static java.util.Locale.ENGLISH;
	import static java.util.Objects.requireNonNull;
	
	public class GaussClient extends BaseJdbcClient
	{
	    private static final int FETCH_SIZE = 1000;
	
	    private final boolean synonymsEnabled;
	    private final int numberDefaultScale;
	
	    @Inject
	    public GaussClient(JdbcConnectorId connectorId, BaseJdbcConfig config, GaussConfig gaussConfig) throws SQLException {
	        super(connectorId, config, "", connectionFactory(config, gaussConfig));
	        requireNonNull(gaussConfig, "gauss config is null");
	        this.synonymsEnabled = gaussConfig.isSynonymsEnabled();
	        this.numberDefaultScale = gaussConfig.getNumberDefaultScale();
	    }
	
	
	    private static ConnectionFactory connectionFactory(BaseJdbcConfig config, GaussConfig gaussConfig) {
	        Properties connectionProperties = basicConnectionProperties(config);
	        connectionProperties.setProperty("useUnicode", "true");
	        connectionProperties.setProperty("characterEncoding", "utf8");
	        connectionProperties.setProperty("ssl", "false");
	        return new DriverConnectionFactory(new Driver(), config.getConnectionUrl(), 
	                Optional.ofNullable(config.getUserCredentialName()),
	                Optional.ofNullable(config.getPasswordCredentialName()), connectionProperties);
	    }
	
	
	    private String[] getTableTypes()
	    {
	        if (synonymsEnabled) {
	            return new String[] {"TABLE", "VIEW", "SYNONYM"};
	        }
	        return new String[] {"TABLE", "VIEW"};
	    }
	
	    @Override
	    protected ResultSet getTables(Connection connection, Optional<String> schemaName, Optional<String> tableName)
	            throws SQLException
	    {
	        DatabaseMetaData metadata = connection.getMetaData();
	        String escape = metadata.getSearchStringEscape();
	        return metadata.getTables(
	                connection.getCatalog(),
	                escapeNamePattern(schemaName, Optional.of(escape)).orElse(null),
	                escapeNamePattern(tableName, Optional.of(escape)).orElse(null),
	                getTableTypes());
	    }
	    @Override
	    public PreparedStatement getPreparedStatement(Connection connection, String sql)
	            throws SQLException
	    {
	        PreparedStatement statement = connection.prepareStatement(sql);
	        statement.setFetchSize(FETCH_SIZE);
	        return statement;
	    }
	
	    @Override
	    protected String generateTemporaryTableName()
	    {
	        return "presto_tmp_" + System.nanoTime();
	    }
	
	    @Override
	    protected void renameTable(JdbcIdentity identity, String catalogName, SchemaTableName oldTable, SchemaTableName newTable)
	    {
	        if (!oldTable.getSchemaName().equalsIgnoreCase(newTable.getSchemaName())) {
	            throw new PrestoException(NOT_SUPPORTED, "Table rename across schemas is not supported in gauss");
	        }
	
	        String newTableName = newTable.getTableName().toUpperCase(ENGLISH);
	        String oldTableName = oldTable.getTableName().toUpperCase(ENGLISH);
	        String sql = format(
	                "ALTER TABLE %s RENAME TO %s",
	                quoted(catalogName, oldTable.getSchemaName(), oldTableName),
	                quoted(newTableName));
	
	        try (Connection connection = connectionFactory.openConnection(identity)) {
	            execute(connection, sql);
	        }
	        catch (SQLException e) {
	            throw new PrestoException(JDBC_ERROR, e);
	        }
	    }
	
	    @Override
	    public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
	    {
	        int columnSize = typeHandle.getColumnSize();
	
	        switch (typeHandle.getJdbcType()) {
	            case Types.CLOB:
	            case Types.NCLOB:
	                return Optional.of(varcharReadMapping(createUnboundedVarcharType()));
	            case Types.SMALLINT:
	                return Optional.of(smallintReadMapping());
	            case Types.FLOAT:
	            case Types.DOUBLE:
	                return Optional.of(doubleReadMapping());
	            case Types.REAL:
	                return Optional.of(realReadMapping());
	            case Types.NUMERIC:
	                int precision = columnSize == 0 ? Decimals.MAX_PRECISION : columnSize;
	                int scale = typeHandle.getDecimalDigits();
	
	                if (scale == 0) {
	                    return Optional.of(bigintReadMapping());
	                }
	                if (scale < 0 || scale > precision) {
	                    return Optional.of(decimalReadMapping(createDecimalType(precision, numberDefaultScale)));
	                }
	                return Optional.of(decimalReadMapping(createDecimalType(precision, scale)));
	            case Types.VARCHAR:
	            case Types.NVARCHAR:
	            case Types.LONGVARCHAR:
	            case Types.LONGNVARCHAR:
	                if (columnSize > VarcharType.MAX_LENGTH) {
	                    return Optional.of(varcharReadMapping(createUnboundedVarcharType()));
	                }
	                return Optional.of(varcharReadMapping(createVarcharType(columnSize)));
	            case Types.BLOB:
	                return Optional.of(varbinaryReadMapping());
	        }
	        return super.toPrestoType(session, typeHandle);
	    }
	
	    @Override
	    protected String toSqlType(Type type) {
	        if (VARBINARY.equals(type)) {
	            return "blob";
	        }
	        return super.toSqlType(type);
	    }
	}

基于presto使用的Guice依赖注入框架,创建GuassPlugin.java、GaussClientModule.java、GaussConfig.java 类注册实现的插件

GuassPlugin.java:

package com.facebook.presto.plugin.gauss;
import com.facebook.presto.plugin.jdbc.JdbcPlugin;

/**
 *  Initialize GuassPlugin class for prestoDB
 */
public class GuassPlugin
        extends JdbcPlugin
{
    /**
     *  gauss Plugin Constructor
     */
    public GuassPlugin()
    {
        super("gauss", new GaussClientModule());
    }
}

GaussClientModule.java:

package com.facebook.presto.plugin.gauss;

import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.JdbcClient;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;

public class GaussClientModule
        implements Module
{
    @Override
    public void configure(Binder binder)
    {
        binder.bind(JdbcClient.class).to(GaussClient.class)
                .in(Scopes.SINGLETON);
        configBinder(binder).bindConfig(BaseJdbcConfig.class);
        configBinder(binder).bindConfig(GaussConfig.class);
    }
}

GaussClientModule.java:

	package com.facebook.presto.plugin.gauss;
	
	import com.facebook.airlift.configuration.Config;
	
	import javax.validation.constraints.Max;
	import javax.validation.constraints.Min;
	import javax.validation.constraints.NotNull;
	
	import java.math.RoundingMode;
	
	public class GaussConfig
	{
	    private boolean synonymsEnabled;
	    private int varcharMaxSize = 4000;
	    private int timestampDefaultPrecision = 6;
	    private int numberDefaultScale = 10;
	    private RoundingMode numberRoundingMode = RoundingMode.HALF_UP;
	
	    @NotNull
	    public boolean isSynonymsEnabled()
	    {
	        return synonymsEnabled;
	    }
	
	    @Config("gauss.synonyms.enabled")
	    public GaussConfig setSynonymsEnabled(boolean enabled)
	    {
	        this.synonymsEnabled = enabled;
	        return this;
	    }
	
	    @Min(0)
	    @Max(38)
	    public int getNumberDefaultScale()
	    {
	        return numberDefaultScale;
	    }
	
	    @Config("gauss.number.default-scale")
	    public GaussConfig setNumberDefaultScale(int numberDefaultScale)
	    {
	        this.numberDefaultScale = numberDefaultScale;
	        return this;
	    }
	
	    @NotNull
	    public RoundingMode getNumberRoundingMode()
	    {
	        return numberRoundingMode;
	    }
	
	    @Config("gauss.number.rounding-mode")
	    public GaussConfig setNumberRoundingMode(RoundingMode numberRoundingMode)
	    {
	        this.numberRoundingMode = numberRoundingMode;
	        return this;
	    }
	
	    @Min(4000)
	    public int getVarcharMaxSize()
	    {
	        return varcharMaxSize;
	    }
	
	    @Config("gauss.varchar.max-size")
	    public GaussConfig setVarcharMaxSize(int varcharMaxSize)
	    {
	        this.varcharMaxSize = varcharMaxSize;
	        return this;
	    }
	
	    @Min(0)
	    @Max(9)
	    public int getTimestampDefaultPrecision()
	    {
	        return timestampDefaultPrecision;
	    }
	
	    @Config("gauss.timestamp.precision")
	    public GaussConfig setTimestampDefaultPrecision(int timestampDefaultPrecision)
	    {
	        this.timestampDefaultPrecision = timestampDefaultPrecision;
	        return this;
	    }
	}

测试数据源注册

入参:

{
  "catalogName": "gauss-test1",
  "connectorName": "gauss",
  "properties": {
    "connection-url":"jdbc: gaussdb://172.30.***.***:***/yth_shuguan",
    "connection-user":"***",
    "connection-password":"***"
  }
}

返回数据源信息:

2024-03-06T17:27:08.532+0800  INFO   Bootstrap        PROPERTY                                  DEFAULT     RUNTIME                                         DESCRIPTION
2024-03-06T17:27:08.532+0800  INFO   Bootstrap        gauss.number.default-scale                10          10
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        gauss.number.rounding-mode                HALF_UP     HALF_UP
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        gauss.synonyms.enabled                    false       false
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        gauss.timestamp.precision                 6           6
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        gauss.varchar.max-size                    4000        4000
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        case-insensitive-name-matching            false       false
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        case-insensitive-name-matching.cache-ttl  1.00m       1.00m
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        connection-password                       [REDACTED]  [REDACTED]
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        connection-url                            ----        jdbc:gaussdb://172.30.***.***:***/yth_shuguan
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        connection-user                           ----        yth
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        password-credential-name                  ----        ----
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        user-credential-name                      ----        ----
2024-03-06T17:27:08.533+0800  INFO   Bootstrap        allow-drop-table                          false       false                                           Allow connector to drop tables
2024-03-06T17:27:08.550+0800  INFO   com.facebook.airlift.bootstrap.LifeCycleManager        Life cycle startup complete. System ready.
2024-03-06T17:27:08.549+0800  INFO   com.facebook.airlift.bootstrap.LifeCycleManager        Life cycle starting...

其他说明

适配非JDBC数据源,如HuaweiHD651-V310(基于Hive开发),需要自己实现如下接口

接口名称 说明
ConnectorFactory Connector工厂
ConnectorMetadata 获取数据源元数据
ConnectorHandleResolver 获取各种Handler
ConnectorSplitManager 处理任务分片
ConnectorRecordSetProviderConnectorPageSourceProvider 读取数据
ConnectorPageSinkProvider 写入数据

参考

Presto官方文档:

https://prestodb.io/docs/0.253/

美团技术团队分享:

https://tech.meituan.com/2014/06/16/presto.html

标签:return,高斯,presto,适配,Presto,gauss,facebook,import,com
From: https://www.cnblogs.com/jishuwu/p/18057172

相关文章

  • 【信创低代码】JeecgBoot适配达梦和人大金仓,TiDB配置手册
    项目介绍JeecgBoot是一款专为信创产业设计的企业级低代码开发平台,目前已经实现了对多种国产数据库的默认兼容,提供了丰富的组件和模板,可帮助用户快速搭建信创应用系统。其灵活的定制功能也使用户能够根据具体需求进行个性化定制,满足不同场景的要求。通过JeecgBoot,信创用户可以更高......
  • GBU3510-ASEMI火牛适配器专用整流桥GBU3510
    编辑:llGBU3510-ASEMI火牛适配器专用整流桥GBU3510型号:GBU3510品牌:ASEMI封装:GBU-4正向电流(Id):35A反向耐压(VRRM):1000V正向浪涌电流:300A正向电压(VF):1.10V引脚数量:4芯片个数:4芯片尺寸:MIL功率(Pd):中小功率设备工作温度:-55°C~150°C类型:插件整流桥、整流桥GBU3510整流桥描......
  • 40容器适配器
    容器适配器标准容器-容器适配器适配器底层没有自己的数据结构,它是另外一个容器的封装,它的方法全部由底层依赖的容器实现。容器适配器没有实现自己的迭代器。参考stack,deque依赖deque;priority_deque依赖vector前者的原因:vector的初始内存使用效率太低,需要多次resize;......
  • MBR10200FCT-ASEMI适配开关电源MBR10200FCT
    编辑:llMBR10200FCT-ASEMI适配开关电源MBR10200FCT型号:MBR10200FCT品牌:ASEMI封装:ITO-220AB最大平均正向电流(IF):10A最大循环峰值反向电压(VRRM):200V最大正向电压(VF):0.90V工作温度:-65°C~175°C反向恢复时间:ns重量:1.5615克芯片个数:2芯片尺寸:102mil正向浪涌电流(IFMS):150AMBR1......
  • KBP310-ASEMI小功率电源适配器KBP310
    编辑:llKBP310-ASEMI小功率电源适配器KBP310型号:KBP310品牌:ASEMI封装:KBP-4正向电流(Id):3A反向耐压(VRRM):1000V正向浪涌电流:60A正向电压(VF):1.10V引脚数量:4芯片个数:4芯片尺寸:MIL功率(Pd):大功率设备工作温度:-55°C~150°C类型:插件整流桥KBP310整流桥描述:ASEMI品牌KBP310是......
  • Android模拟蓝牙蓝牙键盘——适配Android和Windows
    学校寒假有个程序设计比赛,我也一直想要去写一个安卓模拟的蓝牙键盘,这样无论到哪里,比如班班通和没有键盘的电脑设备,有手机就可以操作它,也比USB方便一些。忙活了一个寒假,也走了不少歪路,终于整成了,下面分享一些经验。代码思路①第一步是蓝牙HID的初始化在安卓API28后开放了Bluetoo......
  • MBR20100FCT-ASEMI适配开关电源MBR20100FCT
    编辑:llMBR20100FCT-ASEMI适配开关电源MBR20100FCT型号:MBR20100FCT品牌:ASEMI封装:ITO-220AB最大平均正向电流(IF):20A最大循环峰值反向电压(VRRM):100V最大正向电压(VF):0.80V工作温度:-65°C~175°C反向恢复时间:ns重量:1.5615克芯片个数:2芯片尺寸:102mil正向浪涌电流(IFMS):200AMBR2......
  • pfa密闭取样瓶插针取样瓶适配密闭取样器使用石油化工有害易燃样品
    密闭取样器适用于石油化工装置中各种介质,尤其是有毒有害、易燃易爆等危害性的中、低压气体介质的无泄漏的采样。所采集的样品真实性强,准确性高,无残液/残气排放。有效地防止有毒有害介质对操作者的伤害。同时不会污染环境,避免了易燃易爆介质在采样时可能造成的危险事故。符合对环......
  • anaconda环境下:强化学习PPO算法仿真环境库sample-factory的python完美适配版本为pytho
    anaconda环境下:强化学习PPO算法仿真环境库sample-factory的python完美适配版本为python3.11库sample-factory地址:https://github.com/alex-petrenko/sample-factory文档地址:https://samplefactory.dev/经过对多个版本的python进行测试,anaconda环境下只有python3.11......
  • 使用safe-area-inset-*来适配iPhoneX的刘海屏及底部横条区域
    之前一直沿用同事写的媒体查询处理这个问题,所有固定在底部展示的按钮栏都要用媒体查询来定义距离底部的距离,着实不太方便,而且媒体查询比较有局限性,不太可能把市面上所有机型都适配一遍。刚好要处理折叠屏适配问题,重构了一个复杂页面的布局,就找到了使用safe-area-inset-*来适配iPh......