因为 TCP 的三只握手等等原因,建立一个连接是一件成本比较高的行为。所以在一个需要多次与特定实体交互的程序中,就需要维持一个连接池,里面有可以复用的连接可供重复使用。
而维持一个连接池,最基本的要求就是要做到:thread safe(线程安全),尤其是在 Golang 这种特性是 goroutine 的语言中。
实现简单的连接池 1 2 3 4 5 6 type Pool struct { m sync.Mutex res chan io.Closer factory func () (io.Closer,error ) closed bool }
这个简单的连接池,我们利用chan来存储池里的连接。而新建结构体的方法也比较简单:
1 2 3 4 5 6 7 8 9 func New (fn func () (io.Closer, error ), size uint ) (*Pool, error ) { if size <= 0 { return nil , errors.New("size的值太小了。" ) } return &Pool{ factory: fn, res: make (chan io.Closer, size), }, nil }
只需要提供对应的工厂函数和连接池的大小就可以了。
获取连接 那么我们要怎么从中获取资源呢?因为我们内部存储连接的结构是chan,所以只需要简单的select就可以保证线程安全:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (p *Pool) Acquire() (io.Closer,error ) { select { case r,ok := <-p.res: log.Println("Acquire:共享资源" ) if !ok { return nil ,ErrPoolClosed } return r,nil default : log.Println("Acquire:新生成资源" ) return p.factory() } }
我们先从连接池的res这个chan里面获取,如果没有的话我们就利用我们早已经准备好的工厂函数进行构造连接。同时我们在从res获取连接的时候利用ok先确定了这个连接池是否已经关闭。如果已经关闭的话我们就返回早已经准备好的连接已关闭错误。
关闭连接池 那么既然提到关闭连接池,我们是怎么样关闭连接池的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true close (p.res) for r:=range p.res { r.Close() } }
这边我们需要先进行p.m.Lock()上锁操作,这么做是因为我们需要对结构体里面的closed进行读写。需要先把这个标志位设定后,关闭res这个chan,使得Acquire方法无法再获取新的连接。我们再对res这个chan里面的连接进行Close操作。
释放连接 释放连接首先得有个前提,就是连接池还没有关闭。如果连接池已经关闭再往res里面送连接的话就好触发panic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (p *Pool) Release(r io.Closer){ p.m.Lock() defer p.m.Unlock() if p.closed { r.Close() return } select { case p.res <- r: log.Println("资源释放到池子里了" ) default : log.Println("资源池满了,释放这个资源吧" ) r.Close() } }
以上就是一个简单且线程安全的连接池实现方式了。我们可以看到的是,现在连接池虽然已经实现了,但是还有几个小缺点:
1、我们对连接最大的数量没有限制,如果线程池空的话都我们默认就直接新建一个连接返回了。一旦并发量高的话将会不断新建连接,很容易(尤其是MySQL)造成too many connections
的报错发生。 2、既然我们需要保证最大可获取连接数量,那么我们就不希望数量定的太死。希望空闲的时候可以维护一定的空闲连接数量idleNum,但是又希望我们能限制最大可获取连接数量maxNum。 3、第一种情况是并发过多的情况,那么如果并发量过少呢?现在我们在新建一个连接并且归还后,我们很长一段时间不再使用这个连接。那么这个连接很有可能在几个小时甚至更长时间之前就已经建立的了。长时间闲置的连接我们并没有办法保证它的可用性。便有可能我们下次获取的连接是已经失效的连接。
那么我们可以从已经成熟使用的MySQL连接池库和Redis连接池库中看看,它们是怎么解决这些问题的。
Golang标准库的Sql连接池 Golang的连接池实现在标准库database/sql/sql.go下。当我们运行:
1 db, err := sql.Open("mysql" , "xxxx" )
的时候,就会打开一个连接池。我们可以看看返回的db的结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type DB struct { waitDuration int64 mu sync.Mutex freeConn []*driverConn connRequests map [uint64 ]chan connRequest nextRequest uint64 numOpen int openerCh chan struct {} closed bool maxIdle int maxOpen int maxLifetime time.Duration cleanerCh chan struct {} waitCount int64 maxIdleClosed int64 maxLifetimeClosed int64 }
上面省去了一些暂时不需要关注的field。我们可以看的,DB这个连接池内部存储连接的结构freeConn,并不是我们之前使用的chan,而是 []driverConn
,一个连接切片。同时我们还可以看到,里面有maxIdle等相关变量来控制空闲连接数量。值得注意的是,DB的初始化函数Open函数并没有新建数据库连接。而新建连接在哪个函数呢?我们可以在Query方法一路往回找,我们可以看到这个函数:func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
。而我们从连接池获取连接的方法,就从这里开始:
获取连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error ) { db.mu.Lock() if db.closed { db.mu.Unlock() return nil , errDBClosed } select { default : case <-ctx.Done(): db.mu.Unlock() return nil , ctx.Err() } lifetime := db.maxLifetime numFree := len (db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0 ] copy (db.freeConn, db.freeConn[1 :]) db.freeConn = db.freeConn[:numFree-1 ] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil , driver.ErrBadConn } conn.Lock() err := conn.lastErr conn.Unlock() if err == driver.ErrBadConn { conn.Close() return nil , driver.ErrBadConn } return conn, nil } if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make (chan connRequest, 1 ) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() select { case <-ctx.Done(): db.mu.Lock() delete (db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64 (time.Since(waitStart))) select { default : case ret, ok := <-req: if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false ) } } return nil , ctx.Err() case ret, ok := <-req: atomic.AddInt64(&db.waitDuration, int64 (time.Since(waitStart))) if !ok { return nil , errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil , driver.ErrBadConn } if ret.conn == nil { return nil , ret.err } ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil , driver.ErrBadConn } return ret.conn, ret.err } } db.numOpen++ db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- db.maybeOpenNewConnections() db.mu.Unlock() return nil , err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true , } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?
释放连接 我们可以直接找到func (db *DB) putConnDBLocked(dc *driverConn, err error) bool
这个方法。就像注释说的,这个方法主要的目的是:
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
我们主要来看看里面重点那几行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ... if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } if c := len (db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete (db.connRequests, reqKey) if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { if db.maxIdleConnsLocked() > len (db.freeConn) { db.freeConn = append (db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } ...
我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。
现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。 值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。
redis Golang实现的Redis客户端这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在 https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。
而它的连接池结构如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type ConnPool struct { ... queue chan struct {} connsMu sync.Mutex conns []*Conn idleConns []*Conn poolSize int idleConnsLen int stats Stats _closed uint32 closedCh chan struct {} }
我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看queue
,conns
,idleConns
这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]Conn
结构:conns
、idleConns
,那么问题来了:
到底连接存在哪里?
新建连接池连接 我们先从新建连接池连接开始看:
1 2 3 4 5 6 7 8 9 func NewConnPool (opt *Options) *ConnPool { .... p.checkMinIdleConns() if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } .... }
初始化连接池的函数有个和前面两个不同的地方。
1、checkMinIdleConns
方法,在连接池初始化的时候就会往连接池填满空闲的连接。 2、go p.reaper(opt.IdleCheckFrequency)
则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。
获取连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (p *ConnPool) Get(ctx context.Context) (*Conn, error ) { if p.closed() { return nil , ErrClosed } err := p.waitTurn(ctx) if err != nil { return nil , err } for { p.connsMu.Lock() cn := p.popIdle() p.connsMu.Unlock() if cn == nil { break } if p.isStaleConn(cn) { _ = p.CloseConn(cn) continue } atomic.AddUint32(&p.stats.Hits, 1 ) return cn, nil } atomic.AddUint32(&p.stats.Misses, 1 ) newcn, err := p.newConn(ctx, true ) if err != nil { p.freeTurn() return nil , err } return newcn, nil }
我们可以试着回答开头那个问题:连接到底存在哪里?答案是从cn := p.popIdle()
这句话可以看出,获取连接这个动作,是从idleConns里面获取的,而里面的函数也证明了这一点。但是,真的是这样的嘛?我们后面再看看。
同时我的理解是:
1、sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。 2、redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。
而:
1 2 3 4 5 6 7 8 9 func (p *ConnPool) freeTurn() { <-p.queue } func (p *ConnPool) waitTurn(ctx context.Context) error {... case p.queue <- struct {}{}: return nil ... }
就是在靠queue这个chan来维持令牌数量。
那么conns的作用是什么呢?我们可以来看看新建连接这个函数:
新建连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (p *ConnPool) newConn(ctx context.Context, pooled bool ) (*Conn, error ) { cn, err := p.dialConn(ctx, pooled) if err != nil { return nil , err } p.connsMu.Lock() p.conns = append (p.conns, cn) if pooled { if p.poolSize >= p.opt.PoolSize { cn.pooled = false } else { p.poolSize++ } } p.connsMu.Unlock() return cn, nil }
基本逻辑出来了。就是如果新建连接的话,我并不会直接放在idleConns里面,而是先放conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。
归还连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (p *ConnPool) Put(cn *Conn) { if cn.rd.Buffered() > 0 { internal.Logger.Printf("Conn has unread data" ) p.Remove(cn, BadConnError{}) return } if !cn.pooled { p.Remove(cn, nil ) return } p.connsMu.Lock() p.idleConns = append (p.idleConns, cn) p.idleConnsLen++ p.connsMu.Unlock() p.freeTurn() }
答案就是,所有的连接其实是存放在conns这个切片里面。如果这个连接是空闲等待的状态的话,那就在idleConns里面加一个自己的指针!
其实归还的过程,就是检查一下我打算还的这个连接,是不是超售的产物,如果是就没必要池化了,直接删除就可以了。不是的话,就是把连接自身(一个指针)在idleConns也append一下。
等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:
1、先waitTurn,拿到令牌。而令牌数量是根据pool里面的queue决定的。 2、拿到令牌了,去库房idleConns里面拿空闲的连接。没有的话就自己newConn一个,并且把他记录到conns里面。 3、用完了,就调用put归还:也就是从conns添加这个连接的指针到idleConns。归还的时候就检查在newConn时候是不是已经做了超卖标记了。是的话就不转移到idleConns。
我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:
虽然Get方法获取连接是newConn这个私用方法,受到令牌管制导致不会出现超卖。但是这个方法接受传参:pooled bool。所以我猜是担心其他人调用这个方法时候,不管三七二十一就传了true,导致poolSize越来越大。
总的来说,redis这个连接池的连接数控制,还是在queue这个我称为令牌的chan进行操作。
总结 上面可以看到,连接池的最基本的保证,就是获取连接时候的线程安全。但是在实现诸多额外特性时候却又从不同角度来实现。还是非常有意思的。但是不管存储结构是用chan还是还是slice,都可以很好的实现这一点。如果像sql或者redis那样用slice来存储连接,就得维护一个结构来表示排队等候的效果。
本文引自这里