筆者最近在項(xiàng)目中基于 go-redis 實(shí)現(xiàn) Redis 緩存優(yōu)化性能。go-redis 是一個(gè) Go 語(yǔ)言實(shí)現(xiàn)的 Redis 客戶端,既然是網(wǎng)絡(luò)服務(wù)的客戶端,為了高效利用有限資源,避免重復(fù)創(chuàng)建和銷毀網(wǎng)絡(luò)連接,就必需對(duì)其進(jìn)行管理。而資源管理又是編程領(lǐng)域中的一個(gè)重點(diǎn)難點(diǎn),抱著對(duì)是否能利用 Go 語(yǔ)言語(yǔ)法簡(jiǎn)潔的特點(diǎn)來優(yōu)雅實(shí)現(xiàn)連接池的好奇,筆者決定閱讀并分析 go-redis 連接池部分的源碼,一探究竟。以下是對(duì)源碼的分析,分為接口與結(jié)構(gòu)體、連接池管理、建立與關(guān)閉連接、獲取與放回連接、監(jiān)控統(tǒng)計(jì)等5大部分,源碼鏈接。
接口與結(jié)構(gòu)體
連接結(jié)構(gòu)體:
type Conn struct {
netConn net.Conn // 基于 tcp 的網(wǎng)絡(luò)連接
rd *proto.Reader // 根據(jù) Redis 通信協(xié)議實(shí)現(xiàn)的 Reader
wr *proto.Writer // 根據(jù) Redis 通信協(xié)議實(shí)現(xiàn)的 Writer
Inited bool // 是否完成初始化
pooled bool // 是否放進(jìn)連接池
createdAt time.Time // 創(chuàng)建時(shí)間
usedAt int64 // 使用時(shí)間,atomic
}
連接池接口:
type Pooler interface {
NewConn(context.Context) (*Conn, error) // 創(chuàng)建連接
CloseConn(*Conn) error // 關(guān)閉連接
Get(context.Context) (*Conn, error) // 獲取連接
Put(*Conn) // 放回連接
Remove(*Conn, error) // 移除連接
Len() int // 連接池長(zhǎng)度
IdleLen() int // 空閑連接數(shù)量
Stats() *Stats // 連接池統(tǒng)計(jì)
Close() error // 關(guān)閉連接池
}
連接池結(jié)構(gòu)體:
type ConnPool struct {
opt *Options // 連接池配置
dialErrorsNum uint32 // 連接錯(cuò)誤次數(shù),atomic
lastDialErrorMu sync.RWMutex // 上一次連接錯(cuò)誤鎖,讀寫鎖
lastDialError error // 上一次連接錯(cuò)誤
queue chan struct{} // 工作連接隊(duì)列
connsMu sync.Mutex // 連接隊(duì)列鎖
conns []*Conn // 連接隊(duì)列
idleConns []*Conn // 空閑連接隊(duì)列
poolSize int // 連接池大小
idleConnsLen int // 空閑連接隊(duì)列長(zhǎng)度
stats Stats // 連接池統(tǒng)計(jì)
_closed uint32 // 連接池關(guān)閉標(biāo)志,atomic
closedCh chan struct{} // 通知連接池關(guān)閉通道
}
連接池管理
初始化
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}),
}
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
- 創(chuàng)建連接池,傳入連接池配置選項(xiàng)參數(shù) opt,工廠函數(shù)根據(jù) opt 創(chuàng)建連接池實(shí)例。連接池主要依靠以下四個(gè)數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)管理和通信:
- queue: 存儲(chǔ)工作連接的緩沖通道
- conns:存儲(chǔ)所有連接的切片
- idleConns:存儲(chǔ)空閑連接的切片
- closed:用于通知所有協(xié)程連接池已經(jīng)關(guān)閉的通道
- 檢查連接池的空閑連接數(shù)量是否滿足最小空閑連接數(shù)量要求,若不滿足,則創(chuàng)建足夠的空閑連接。
- 若連接池配置選項(xiàng)規(guī)定了空閑連接超時(shí)和檢查空閑連接頻率,則開啟一個(gè)清理空閑連接的協(xié)程。
關(guān)閉
func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return ErrClosed
}
close(p.closedCh)
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
p.conns = nil
p.poolSize = 0
p.idleConns = nil
p.idleConnsLen = 0
p.connsMu.Unlock()
return firstErr
}
- 原子性檢查連接池是否已經(jīng)關(guān)閉,若沒關(guān)閉,則將關(guān)閉標(biāo)志置為1
- 關(guān)閉 closedCh 通道,連接池中的所有協(xié)程都可以通過判斷該通道是否關(guān)閉來確定連接池是否已經(jīng)關(guān)閉。
- 連接隊(duì)列鎖上鎖,關(guān)閉隊(duì)列中的所有連接,并置空所有維護(hù)連接池狀態(tài)的數(shù)據(jù)結(jié)構(gòu),解鎖。
過濾
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
p.connsMu.Unlock()
return firstErr
}
實(shí)質(zhì)上是遍歷連接池中的所有連接,并調(diào)用傳入的 fn 過濾函數(shù)作用在每個(gè)連接上,過濾出符合業(yè)務(wù)要求的連接。
清理
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// It is possible that ticker and closedCh arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if p.closed() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf("ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
- 開啟一個(gè)用于檢查并清理過期連接的 goroutine 每隔 frequency 時(shí)間遍歷檢查連接池中是否存在過期連接,并清理。
- 創(chuàng)建一個(gè)時(shí)間間隔為 frequency 的計(jì)時(shí)器,在連接池關(guān)閉時(shí)關(guān)閉該計(jì)時(shí)器
- 循環(huán)判斷計(jì)時(shí)器是否到時(shí)和連接池是否關(guān)閉
- 移除空閑連接隊(duì)列中的過期連接
建立與關(guān)閉連接
建立連接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
// If pool is full remove the cn on next Put.
if p.poolSize >= p.opt.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock()
return cn, nil
}
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
netConn, err := p.opt.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err
}
cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
}
func (p *ConnPool) tryDial() {
for {
if p.closed() {
return
}
conn, err := p.opt.Dialer(context.Background())
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close()
return
}
}
創(chuàng)建連接流程圖:


移除與關(guān)閉連接
func (p *ConnPool) Remove(cn *Conn, reason error) {
p.removeConnWithLock(cn)
p.freeTurn()
_ = p.closeConn(cn)
}
func (p *ConnPool) CloseConn(cn *Conn) error {
p.removeConnWithLock(cn)
return p.closeConn(cn)
}
func (p *ConnPool) removeConnWithLock(cn *Conn) {
p.connsMu.Lock()
p.removeConn(cn)
p.connsMu.Unlock()
}
func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
if cn.pooled {
p.poolSize--
p.checkMinIdleConns()
}
return
}
}
}
func (p *ConnPool) closeConn(cn *Conn) error {
if p.opt.OnClose != nil {
_ = p.opt.OnClose(cn)
}
return cn.Close()
}
連接池?zé)o論移除還是關(guān)閉連接,底層調(diào)用的都是 removeConnWithLock 函數(shù)。removeConnWithLock 函數(shù)的工作流程如下:
- 連接隊(duì)列上鎖
- 遍歷連接隊(duì)列找到要關(guān)閉的連接,并將其移除出連接隊(duì)列
- 更新連接池統(tǒng)計(jì)數(shù)據(jù)
- 檢查連接池最小空閑連接數(shù)量
- 連接隊(duì)列解鎖
- 關(guān)閉連接,先執(zhí)行關(guān)閉連接時(shí)的回調(diào)函數(shù)(創(chuàng)建連接池時(shí)的配置選項(xiàng)傳入),再關(guān)閉連接
獲取與放回連接
獲取
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
for {
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true)
if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case p.queue <- struct{}{}:
return nil
default:
}
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return ctx.Err()
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return nil
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
func (p *ConnPool) freeTurn() {
<-p.queue
}
func (p *ConnPool) popIdle() *Conn {
if len(p.idleConns) == 0 {
return nil
}
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
p.idleConnsLen--
p.checkMinIdleConns()
return cn
}
獲取連接流程圖:

放回
func (p *ConnPool) Put(cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf("Conn has unread data")
p.Remove(cn, BadConnError{})
return
}
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn()
}
- 檢查連接中是否還有數(shù)據(jù)沒被讀取,若有,移除連接并返回 BadConnError
- 判斷連接是否已經(jīng)放入連接池中,若無(wú),直接移除連接
- 連接隊(duì)列上鎖,將該連接加入空閑連接隊(duì)列中,連接隊(duì)列解鎖,工作連接通道移除一個(gè)元素
監(jiān)控統(tǒng)計(jì)
監(jiān)控統(tǒng)計(jì)對(duì)調(diào)整連接池配置選項(xiàng),優(yōu)化連接池性能有很大的幫助。
Dial 錯(cuò)誤統(tǒng)計(jì)
func (p *ConnPool) setLastDialError(err error) {
p.lastDialErrorMu.Lock()
p.lastDialError = err
p.lastDialErrorMu.Unlock()
}
func (p *ConnPool) getLastDialError() error {
p.lastDialErrorMu.RLock()
err := p.lastDialError
p.lastDialErrorMu.RUnlock()
return err
}
由于一般情況下,連接錯(cuò)誤記錄是讀多寫少的,所以采用讀寫鎖來保證該記錄的并發(fā)安全(讀寫鎖在該場(chǎng)景下性能更佳)。
狀態(tài)統(tǒng)計(jì)
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
n := len(p.conns)
p.connsMu.Unlock()
return n
}
// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
p.connsMu.Unlock()
return n
}
func (p *ConnPool) Stats() *Stats {
idleLen := p.IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
- func (p *ConnPool) Len() int {...}:返回連接池連接數(shù)量總數(shù)
- func (p *ConnPool) IdleLen() int {...}:返回連接池空閑連接數(shù)量
- Stats:
Hits 連接池命中空閑連接次數(shù)
Misses 連接池沒有空閑連接可用次數(shù)
Timeouts 請(qǐng)求連接等待超時(shí)次數(shù)
TotalConns 連接池總連接數(shù)量
IdleConns 連接池空閑連接數(shù)量
StaleConns 移除過期連接數(shù)量