首页 > 其他分享 >golang 连接clickhouse

golang 连接clickhouse

时间:2023-03-01 15:02:38浏览次数:70  
标签:return err nil psb golang sql 连接 clickhouse string

1.驱动:

"github.com/ClickHouse/clickhouse-go/v2"

2.连接代码:

package model

import (
    "database/sql"
    "fmt"
    _ "github.com/ClickHouse/clickhouse-go/v2"
    _ "github.com/go-sql-driver/mysql"
    "github.com/golang-jwt/jwt/v4"
    "github.com/zeromicro/go-zero/core/stores/sqlx"
    "time"
)

var _ YibaiDbSourceModel = (*customYibaiDbSourceModel)(nil)

var dbConnMap = make(map[string]*sql.DB)

var ipPortMap = map[string]string{
    "172.16.128.143": "9001",
    "121.37.248.212": "9003",
}

const maxConnections = 10

type (
    // YibaiDbSourceModel is an interface to be customized, add more methods here,
    // and implement the added methods in customYibaiDbSourceModel.
    YibaiDbSourceModel interface {
        yibaiDbSourceModel
        GetConn(psb YibaiDbSource, secretKey string) (*sql.DB, error)
        ExecuteSql(psb YibaiDbSource, secretKey string,
            sql string, args ...interface{}) ([]map[string]interface{}, []*sql.ColumnType, error)
    }

    customYibaiDbSourceModel struct {
        *defaultYibaiDbSourceModel
    }
)

func ParseToken(token string, secret string) (jwt.MapClaims, error) {
    claim, err := jwt.Parse(token, func(token *jwt.Token) (interface{}, error) {
        return []byte(secret), nil
    })
    if err != nil {
        return nil, err
    }
    if payload, ok := claim.Claims.(jwt.MapClaims); ok {
        return payload, nil
    }
    return nil, fmt.Errorf("fail parse")
}

func (d customYibaiDbSourceModel) GetConn(psb YibaiDbSource, secretKey string) (*sql.DB, error) {
    key := fmt.Sprintf("%s_%s_%s_%s_%s_%s", psb.DbType.String, psb.DbIp.String, psb.DbPort.String,
        psb.DbUser.String, psb.DbPwd.String, psb.Db.String)

    db, ok := dbConnMap[key]
    if ok {
        if err := db.Ping(); err == nil {
            return db, nil
        }
    }

    payload, _ := ParseToken(psb.DbPwd.String, secretKey)
    psw, _ := payload["psw"].(string)

    dbType := psb.DbType.String
    var datasource string
    if dbType == "clickhouse" {
        portStr, ok := ipPortMap[psb.DbIp.String]
        if !ok {
            portStr = "9001"
        }
        datasource = fmt.Sprintf("clickhouse://%s:%s@%s:%s/%s?dial_timeout=2000ms&max_execution_time=60s",
            psb.DbUser.String, psw, psb.DbIp.String, portStr, psb.Db.String)
    } else if dbType == "mysql" {
        datasource = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Asia%%2FShanghai",
            psb.DbUser.String, psw, psb.DbIp.String, psb.DbPort.String, psb.Db.String)
    } else {
        return nil, fmt.Errorf("不支持的数据库类型%s", dbType)
    }
    dbConn, err := sql.Open(dbType, datasource)
    if err != nil {
        return nil, err
    }
    // 设置连接池的最大连接数
    dbConn.SetMaxOpenConns(maxConnections)
    // 设置连接池中最多可以有多少空闲连接
    dbConn.SetMaxIdleConns(maxConnections)
    // 设置连接的最大空闲时间
    dbConn.SetConnMaxLifetime(time.Minute * 5)
    // 测试连接是否可用
    if err = dbConn.Ping(); err != nil {
        return nil, err
    }

    dbConnMap[key] = dbConn
    return dbConn, nil
}

func (c *customYibaiDbSourceModel) ExecuteSql(psb YibaiDbSource, secretKey string,
    sql string, args ...interface{}) ([]map[string]interface{}, []*sql.ColumnType, error) {
    conn, err := c.GetConn(psb, secretKey)
    if err != nil {
        return nil, nil, err
    }
    //sql = "select count(*) from (select distinct `dep_name`,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'))"
    //sql = "with t as (select distinct `dep_name`,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部')) SELECT count(1) AS count FROM t"
    //sql = "SELECT count(*) FROM (select distinct `dep_name` as dm,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'))"
    sql = "SELECT count(*) as 'count' FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'"
    rows, err := conn.Query(sql, args...)
    if err != nil {
        return nil, nil, err
    }
    defer rows.Close()
    columns, _ := rows.Columns()
    colTypes, _ := rows.ColumnTypes()
    cacheRow := make([]interface{}, len(columns))
    for idx, _ := range cacheRow {
        var i interface{}
        cacheRow[idx] = &i
    }

    var res []map[string]interface{}
    for rows.Next() {
        if err := rows.Scan(cacheRow...); err != nil {
            return nil, nil, err
        }
        row := make(map[string]interface{})
        for idx, data := range cacheRow {
            row[columns[idx]] = *data.(*interface{})
        }
        res = append(res, row)
    }
    return res, colTypes, nil
}

// NewYibaiDbSourceModel returns a model for the database table.
func NewYibaiDbSourceModel(conn sqlx.SqlConn) YibaiDbSourceModel {
    return &customYibaiDbSourceModel{
        defaultYibaiDbSourceModel: newYibaiDbSourceModel(conn),
    }
}

注:包含mysql和clickhouse两种数据库的连接方法,采用go统一接口 database/sql 连接

3.问题

1.clickhouse有两种连接方式:tcp(常用端口9000) 和http(常用端口8123),上面的例子中使用的是tcp连接,如果需要使用http连接,参考如下代码:

func ConnectDSNHTTP() error {
    env, err := GetStdTestEnvironment()
    if err != nil {
        return err
    }
    conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
    if err != nil {
        return err
    }
    return conn.Ping()
}

更详细的连接方式参考clickhouse官方文档:https://clickhouse.com/docs/en/integrations/go/clickhouse-go/database-sql-api

2.以下几个sql执行会报错

上面代码中,连接clickhouse时 ExecuteSql 执行以下sql会报错,而在dbeaver上均能正常执行:

SELECT count(*) as `count` FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'

SELECT count(*) as "count" FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'

SELECT count(*) as count FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'

报错信息:

read tcp 172.16.11.157:55509->121.37.248.212:9003: wsarecv: An existing connection was forcibly closed by the remote host.

改正下面这样可以正常执行:

SELECT count(*) FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'

SELECT count(*) as c FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'

猜测原因:count为clickhouse标识符, github.com/ClickHouse/clickhouse-go/v2 无法正确处理。暂时无法验证。。。

 

标签:return,err,nil,psb,golang,sql,连接,clickhouse,string
From: https://www.cnblogs.com/liuxuelin/p/17168164.html

相关文章

  • 非交互式连接PostgreSQL
    方法一:.pgpass查看postgres家目录greppostgres/etc/passwdpostgres:x:114:119:PostgreSQLadministrator,,,:/var/lib/postgresql:/bin/bash创建.pgpasstouch/var/......
  • HTTPS连接过程
    简介HTTPS是在HTTP的基础上和ssl/tls证书结合起来的一种协议,保证了传输过程中的安全性,减少了被恶意劫持的可能.很好的解决了解决了http的三个缺点(被监听、被篡改、被伪......
  • (转)数据结构和算法(Golang实现)(8.2)基础知识-分治法和递归
    原文:https://juejin.cn/post/6844904132378263565分治法和递归在计算机科学中,分治法是一种很重要的算法。字面上的解释是分而治之,就是把一个复杂的问题分成两个或更多......
  • Golang make和new的区别及实现原理详解
    在Go语言中,有两个比较雷同的内置函数,分别是new和make方法,二者都可以用来分配内存,那他们有什么区别呢?下面我们就从底层来分析一下二者的不同。感兴趣的小伙伴们可以参考......
  • Java连接数据库技术-JDBC
    课程简介和目标 Java数据库连接,(JavaDatabaseConnectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据......
  • Java连接数据库技术-JDBC
    课程简介和目标 Java数据库连接,(JavaDatabaseConnectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据......
  • (转)GoLang之标准库strings包
    原文:https://blog.csdn.net/weixin_52690231/article/details/123593614?ops_request_misc=&request_id=&biz_id=102&utm_term=golang%20strings%20%E5%8C%85%20%E8%AF%A6%......
  • 连接查询
    一、(知道)数据查询操作1、连接查询-常用方式内连接:连接两个表时,取的是两个表中都存在的数据左连接:连接两个表时,取的是左表中特有的数据,对于右表中不存在的......
  • ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
    目录建表折叠数据算法资料分享参考文章该引擎继承于MergeTree,并在数据块合并算法中添加了折叠行的逻辑。CollapsingMergeTree会异步的删除(折叠)这些除了特定列Sign有1和-1......
  • Golang微服务(三)
    Golang微服务(三)目录Golang微服务(三)分布式锁互斥锁悲观锁乐观锁基于Redis的分布式锁分布式锁gRPCservice层demorouterhandler如下:/*>>>modeltypeInventorystruc......