1.驱动:
"github.com/ClickHouse/clickhouse-go/v2"
2.连接代码:
package model import ( "database/sql" "fmt" _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/go-sql-driver/mysql" "github.com/golang-jwt/jwt/v4" "github.com/zeromicro/go-zero/core/stores/sqlx" "time" ) var _ YibaiDbSourceModel = (*customYibaiDbSourceModel)(nil) var dbConnMap = make(map[string]*sql.DB) var ipPortMap = map[string]string{ "172.16.128.143": "9001", "121.37.248.212": "9003", } const maxConnections = 10 type ( // YibaiDbSourceModel is an interface to be customized, add more methods here, // and implement the added methods in customYibaiDbSourceModel. YibaiDbSourceModel interface { yibaiDbSourceModel GetConn(psb YibaiDbSource, secretKey string) (*sql.DB, error) ExecuteSql(psb YibaiDbSource, secretKey string, sql string, args ...interface{}) ([]map[string]interface{}, []*sql.ColumnType, error) } customYibaiDbSourceModel struct { *defaultYibaiDbSourceModel } ) func ParseToken(token string, secret string) (jwt.MapClaims, error) { claim, err := jwt.Parse(token, func(token *jwt.Token) (interface{}, error) { return []byte(secret), nil }) if err != nil { return nil, err } if payload, ok := claim.Claims.(jwt.MapClaims); ok { return payload, nil } return nil, fmt.Errorf("fail parse") } func (d customYibaiDbSourceModel) GetConn(psb YibaiDbSource, secretKey string) (*sql.DB, error) { key := fmt.Sprintf("%s_%s_%s_%s_%s_%s", psb.DbType.String, psb.DbIp.String, psb.DbPort.String, psb.DbUser.String, psb.DbPwd.String, psb.Db.String) db, ok := dbConnMap[key] if ok { if err := db.Ping(); err == nil { return db, nil } } payload, _ := ParseToken(psb.DbPwd.String, secretKey) psw, _ := payload["psw"].(string) dbType := psb.DbType.String var datasource string if dbType == "clickhouse" { portStr, ok := ipPortMap[psb.DbIp.String] if !ok { portStr = "9001" } datasource = fmt.Sprintf("clickhouse://%s:%s@%s:%s/%s?dial_timeout=2000ms&max_execution_time=60s", psb.DbUser.String, psw, psb.DbIp.String, portStr, psb.Db.String) } else if dbType == "mysql" { datasource = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Asia%%2FShanghai", psb.DbUser.String, psw, psb.DbIp.String, psb.DbPort.String, psb.Db.String) } else { return nil, fmt.Errorf("不支持的数据库类型%s", dbType) } dbConn, err := sql.Open(dbType, datasource) if err != nil { return nil, err } // 设置连接池的最大连接数 dbConn.SetMaxOpenConns(maxConnections) // 设置连接池中最多可以有多少空闲连接 dbConn.SetMaxIdleConns(maxConnections) // 设置连接的最大空闲时间 dbConn.SetConnMaxLifetime(time.Minute * 5) // 测试连接是否可用 if err = dbConn.Ping(); err != nil { return nil, err } dbConnMap[key] = dbConn return dbConn, nil } func (c *customYibaiDbSourceModel) ExecuteSql(psb YibaiDbSource, secretKey string, sql string, args ...interface{}) ([]map[string]interface{}, []*sql.ColumnType, error) { conn, err := c.GetConn(psb, secretKey) if err != nil { return nil, nil, err } //sql = "select count(*) from (select distinct `dep_name`,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'))" //sql = "with t as (select distinct `dep_name`,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部')) SELECT count(1) AS count FROM t" //sql = "SELECT count(*) FROM (select distinct `dep_name` as dm,`group_name`,`账号` from (SELECT * FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'))" sql = "SELECT count(*) as 'count' FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'" rows, err := conn.Query(sql, args...) if err != nil { return nil, nil, err } defer rows.Close() columns, _ := rows.Columns() colTypes, _ := rows.ColumnTypes() cacheRow := make([]interface{}, len(columns)) for idx, _ := range cacheRow { var i interface{} cacheRow[idx] = &i } var res []map[string]interface{} for rows.Next() { if err := rows.Scan(cacheRow...); err != nil { return nil, nil, err } row := make(map[string]interface{}) for idx, data := range cacheRow { row[columns[idx]] = *data.(*interface{}) } res = append(res, row) } return res, colTypes, nil } // NewYibaiDbSourceModel returns a model for the database table. func NewYibaiDbSourceModel(conn sqlx.SqlConn) YibaiDbSourceModel { return &customYibaiDbSourceModel{ defaultYibaiDbSourceModel: newYibaiDbSourceModel(conn), } }
注:包含mysql和clickhouse两种数据库的连接方法,采用go统一接口 database/sql 连接
3.问题
1.clickhouse有两种连接方式:tcp(常用端口9000) 和http(常用端口8123),上面的例子中使用的是tcp连接,如果需要使用http连接,参考如下代码:
func ConnectDSNHTTP() error { env, err := GetStdTestEnvironment() if err != nil { return err } conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password)) if err != nil { return err } return conn.Ping() }
更详细的连接方式参考clickhouse官方文档:https://clickhouse.com/docs/en/integrations/go/clickhouse-go/database-sql-api
2.以下几个sql执行会报错
上面代码中,连接clickhouse时 ExecuteSql 执行以下sql会报错,而在dbeaver上均能正常执行:
SELECT count(*) as `count` FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部' SELECT count(*) as "count" FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部' SELECT count(*) as count FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'
报错信息:
read tcp 172.16.11.157:55509->121.37.248.212:9003: wsarecv: An existing connection was forcibly closed by the remote host.
改正下面这样可以正常执行:
SELECT count(*) FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部' SELECT count(*) as c FROM `plant_week`.`fba_break_detail_analysis` WHERE `dep_name`='成都产品线九部'
猜测原因:count为clickhouse标识符, github.com/ClickHouse/clickhouse-go/v2 无法正确处理。暂时无法验证。。。
标签:return,err,nil,psb,golang,sql,连接,clickhouse,string From: https://www.cnblogs.com/liuxuelin/p/17168164.html