首页 > 数据库 >golang两种数据库连接池实现

golang两种数据库连接池实现

时间:2024-11-22 19:58:40浏览次数:1  
标签:return cn nil 数据库 db golang 连接 连接池

Golang的连接池实现在标准库database/sql/sql.go下。当我们运行:

db, err := sql.Open("mysql", "xxxx")

的时候,就会打开一个连接池。我们可以看看返回的db的结构体:

type DB struct {
	waitDuration int64 // Total time waited for new connections.
	mu           sync.Mutex // protects following fields
	freeConn     []*driverConn
	connRequests map[uint64]chan connRequest
	nextRequest  uint64 // Next key to use in connRequests.
	numOpen      int    // number of opened and pending open connections
	// Used to signal the need for new connections
	// a goroutine running connectionOpener() reads on this chan and
	// maybeOpenNewConnections sends on the chan (one send per needed connection)
	// It is closed during db.Close(). The close tells the connectionOpener
	// goroutine to exit.
	openerCh          chan struct{}
	closed            bool
	maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
	maxOpen           int                    // <= 0 means unlimited
	maxLifetime       time.Duration          // maximum amount of time a connection may be reused
	cleanerCh         chan struct{}
	waitCount         int64 // Total number of connections waited for.
	maxIdleClosed     int64 // Total number of connections closed due to idle.
	maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}

上面省去了一些暂时不需要关注的field。我们可以看的,DB这个连接池内部存储连接的结构freeConn,并不是我们之前使用的chan,而是[]driverConn,一个连接切片。同时我们还可以看到,里面有maxIdle等相关变量来控制空闲连接数量。值得注意的是,DB的初始化函数Open函数并没有新建数据库连接。而新建连接在哪个函数呢?我们可以在Query方法一路往回找,我们可以看到这个函数:func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)。而我们从连接池获取连接的方法,就从这里开始:

获取连接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
	db.mu.Lock()
	if db.closed {
		db.mu.Unlock()
		return nil, errDBClosed
	}
	// 注意检测context是否已经被超时等原因被取消。
	select {
	default:
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
	lifetime := db.maxLifetime

	// 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
	numFree := len(db.freeConn)
	if strategy == cachedOrNewConn && numFree > 0 {
		conn := db.freeConn[0]
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree-1]
		conn.inUse = true
		db.mu.Unlock()
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	}

	// 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// 下面的动作相当于往connRequests这个map插入自己的号码牌。
		// 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
		req := make(chan connRequest, 1)
		reqKey := db.nextRequestKeyLocked()
		db.connRequests[reqKey] = req
		db.waitCount++
		db.mu.Unlock()

		waitStart := time.Now()

		// Timeout the connection request with the context.
		select {
		case <-ctx.Done():
			// context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			select {
			default:
			case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
				if ok && ret.conn != nil {
					db.putConn(ret.conn, ret.err, false)
				}
			}
			return nil, ctx.Err()
		case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			if !ok {
				return nil, errDBClosed
			}
			if ret.err == nil && ret.conn.expired(lifetime) {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			if ret.conn == nil {
				return nil, ret.err
			}
			ret.conn.Lock()
			err := ret.conn.lastErr
			ret.conn.Unlock()
			if err == driver.ErrBadConn {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			return ret.conn, ret.err
		}
	}
	// 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
		inUse:     true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到func (db *DB) putConnDBLocked(dc *driverConn, err error) bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:


	// 如果已经超过最大打开数量了,就不需要在回归pool了
	if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
	// 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
	if c := len(db.connRequests); c > 0 {
		var req chan connRequest
		var reqKey uint64
		for reqKey, req = range db.connRequests {
			break
		}
		delete(db.connRequests, reqKey) // 删除这个在排队的请求。
		if err == nil {
			dc.inUse = true
		}
        // 把连接给这个正在排队的连接。
		req <- connRequest{
			conn: dc,
			err:  err,
		}
		return true
	} else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
		if db.maxIdleConnsLocked() > len(db.freeConn) {
			db.freeConn = append(db.freeConn, dc)
			db.startCleanerLocked()
			return true
		}
		db.maxIdleClosed++
	}
...

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

二、 Golang实现的Redis客户端

这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。

而它的连接池结构如下

type ConnPool struct {
	...
	queue chan struct{}

	connsMu      sync.Mutex
	conns        []*Conn
	idleConns    []*Conn
	poolSize     int
	idleConnsLen int

	stats Stats

	_closed  uint32 // atomic
	closedCh chan struct{}
}

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

func NewConnPool(opt *Options) *ConnPool {
	....
	p.checkMinIdleConns()

	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
		go p.reaper(opt.IdleCheckFrequency)
	}
	....
}

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。
  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}
	
    //这边和前面sql获取连接函数的流程先不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
	err := p.waitTurn(ctx)
	if err != nil {
		return nil, err
	}
	//前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
	for {
		p.connsMu.Lock()
        //从空闲连接里面先获取一个空闲连接。
		cn := p.popIdle()
		p.connsMu.Unlock()

		if cn == nil {
            // 没有空闲连接时候直接跳出循环。
			break
		}
		// 判断是否已经过时,是的话close掉了然后继续取出。
		if p.isStaleConn(cn) {
			_ = p.CloseConn(cn)
			continue
		}

		atomic.AddUint32(&p.stats.Hits, 1)
		return cn, nil
	}

	atomic.AddUint32(&p.stats.Misses, 1)
	
    // 如果没有空闲连接的话,这边就直接新建连接了。
	newcn, err := p.newConn(ctx, true)
	if err != nil {
        // 归还令牌。
		p.freeTurn()
		return nil, err
	}

	return newcn, nil
}

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从cn := p.popIdle()这句话可以看出,获取连接这个动作,是从idleConns里面获取的,而里面的函数也证明了这一点。但是,真的是这样的嘛?我们后面再看看。

同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。
  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

sfunc (p *ConnPool) freeTurn() {
	<-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
	case p.queue <- struct{}{}:
		return nil
...
}

就是在靠queue这个chan来维持令牌数量。

那么conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
	cn, err := p.dialConn(ctx, pooled)
	if err != nil {
		return nil, err
	}

	p.connsMu.Lock()
	p.conns = append(p.conns, cn)
	if pooled {
		// 如果连接池满了,会在后面移除。
		if p.poolSize >= p.opt.PoolSize {
			cn.pooled = false
		} else {
			p.poolSize++
		}
	}
	p.connsMu.Unlock()
	return cn, nil
}

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在idleConns里面,而是先放conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

func (p *ConnPool) Put(cn *Conn) {
	if cn.rd.Buffered() > 0 {
		internal.Logger.Printf("Conn has unread data")
		p.Remove(cn, BadConnError{})
		return
	}
	//这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
	if !cn.pooled {
        // 这个方法就是前面的标志位,判断里面可以知道,前面标志不要池化的,这里会将它删除。
		p.Remove(cn, nil)
		return
	}

	p.connsMu.Lock()
	p.idleConns = append(p.idleConns, cn)
	p.idleConnsLen++
	p.connsMu.Unlock()
    //我们可以看到很明显的这个归还号码牌的动作。
	p.freeTurn()
}

答案就是,所有的连接其实是存放在conns这个切片里面。如果这个连接是空闲等待的状态的话,那就在idleConns里面加一个自己的指针!

其实归还的过程,就是检查一下我打算还的这个连接,是不是超售的产物,如果是就没必要池化了,直接删除就可以了。不是的话,就是把连接自身(一个指针)在idleConns也append一下。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. waitTurn,拿到令牌。而令牌数量是根据pool里面的queue决定的。
  2. 拿到令牌了,去库房idleConns里面拿空闲的连接。没有的话就自己newConn一个,并且把他记录到conns里面。
  3. 用完了,就调用put归还:也就是从conns添加这个连接的指针到idleConns。归还的时候就检查在newConn时候是不是已经做了超卖标记了。是的话就不转移到idleConns

我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:

虽然Get方法获取连接是newConn这个私用方法,受到令牌管制导致不会出现超卖。但是这个方法接受传参:pooled bool。所以我猜是担心其他人调用这个方法时候,不管三七二十一就传了true,导致poolSize越来越大。

总的来说,redis这个连接池的连接数控制,还是在queue这个我称为令牌的chan进行操作。

总结

上面可以看到,连接池的最基本的保证,就是获取连接时候的线程安全。但是在实现诸多额外特性时候却又从不同角度来实现。还是非常有意思的。但是不管存储结构是用chan还是还是slice,都可以很好的实现这一点。如果像sql或者redis那样用slice来存储连接,就得维护一个结构来表示排队等候的效果。

标签:return,cn,nil,数据库,db,golang,连接,连接池
From: https://www.cnblogs.com/luokn/p/18563644

相关文章

  • 如何对Mysql数据库进行优化
    目录引言优化的时机优化策略1.索引优化2.查询优化3.架构优化4.事务和锁机制优化5.配置优化6.硬件优化7.性能监控与调优工具案例分析案例1:简化查询逻辑案例2:使用索引结语引言在现代应用中,数据库的性能至关重要。MySQL作为流行的开源关系型数据库管理......
  • 基于SprinBoot+vue的物业管理系统(源码+数据库+文档)
    物业管理系统基于SprinBoot+vue的物业管理系统一、前言二、系统设计三、系统功能设计 系统登录实现后台模块实现管理员模块实现物业管理模块实现业主模块实现维修员模块实现四、数据库设计 五、核心代码 六、论文参考七、最新计算机毕设选题推荐八、源码获......
  • 105. 从前序与中序遍历序列构造二叉树 Golang实现
    题目描述:给定两个整数数组preorder和inorder,其中preorder是二叉树的先序遍历,inorder是同一棵树的中序遍历,请构造二叉树并返回其根节点。思路分析:其实每次遍历就是划分左右子树数组,然后同样的递归先得到左右子树的根节点,再依次划分即可。注意下标索引的写法即可,下标的......
  • hhdb数据库介绍(7)
    部署环境计算节点集群部署对服务器、操作系统、依赖软件等有一定要求,不符合要求的环境部署出来的集群可能无法使用或不满足使用要求。建议部署前详细了解计算节点集群部署对环境的要求说明。此文档将详细描述普通模式下,如何部署一套计算节点集群。部署架构示意图集群HA(主备)模......
  • 【MySQL主从复制】揭秘高效数据库架构,轻松应对大数据挑战!
    MySQL主从复制概述MySQL主从复制是一种数据复制技术,用于将一个MySQL数据库服务器(主库)的数据实时复制到一个或多个MySQL数据库服务器(从库)。这种机制常用于实现读写分离、负载均衡和数据备份等目的。主从复制的基本步骤配置主库:在主库上启用二进制日志,并创建一个具有复......
  • golang如何提取接口类型的实际类型
    目录类型断言的基本用法处理多种可能的类型获取接口的实际类型总结在Go语言中,如果你想从接口类型的值中提取实际类型,可以使用类型断言(typeassertion)。类型断言允许你检查接口变量的实际类型并将其转换为该类型。类型断言的基本用法假设你有一个接口类型的变量i,你可以使用......
  • hhdb数据库介绍(6)
    集群环境推荐说明集群环境要求包含服务器硬件配置、操作系统、软件部署、软件配置四个方面。在部署安装前或安装完成后请检查以下各项是否符合关系集群数据库使用要求,若不满足以下要求可能会给集群的运行带来不可预知的异常以及无法发挥集群的最佳性能。一级标题二级标题......
  • 基于Springboot+Vue的在线考试系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能该系统......
  • 基于Nodejs+Vue的游戏点单陪玩系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能该系统......
  • 基于Springboot+Vue的汽车销售系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......