go-redis 源碼分析:連接池

筆者最近在項(xiàng)目中基于 go-redis 實(shí)現(xiàn) Redis 緩存優(yōu)化性能。go-redis 是一個(gè) Go 語(yǔ)言實(shí)現(xiàn)的 Redis 客戶端,既然是網(wǎng)絡(luò)服務(wù)的客戶端,為了高效利用有限資源,避免重復(fù)創(chuàng)建和銷毀網(wǎng)絡(luò)連接,就必需對(duì)其進(jìn)行管理。而資源管理又是編程領(lǐng)域中的一個(gè)重點(diǎn)難點(diǎn),抱著對(duì)是否能利用 Go 語(yǔ)言語(yǔ)法簡(jiǎn)潔的特點(diǎn)來優(yōu)雅實(shí)現(xiàn)連接池的好奇,筆者決定閱讀并分析 go-redis 連接池部分的源碼,一探究竟。以下是對(duì)源碼的分析,分為接口與結(jié)構(gòu)體連接池管理、建立與關(guān)閉連接、獲取與放回連接、監(jiān)控統(tǒng)計(jì)等5大部分,源碼鏈接。


接口與結(jié)構(gòu)體

連接結(jié)構(gòu)體:

type Conn struct {
    netConn net.Conn  // 基于 tcp 的網(wǎng)絡(luò)連接

    rd *proto.Reader // 根據(jù) Redis 通信協(xié)議實(shí)現(xiàn)的 Reader
    wr *proto.Writer // 根據(jù) Redis 通信協(xié)議實(shí)現(xiàn)的 Writer

    Inited    bool // 是否完成初始化
    pooled    bool // 是否放進(jìn)連接池
    createdAt time.Time // 創(chuàng)建時(shí)間
    usedAt    int64 // 使用時(shí)間,atomic
}

連接池接口:

type Pooler interface {
    NewConn(context.Context) (*Conn, error) // 創(chuàng)建連接
    CloseConn(*Conn) error // 關(guān)閉連接

    Get(context.Context) (*Conn, error) // 獲取連接
    Put(*Conn) // 放回連接
    Remove(*Conn, error) // 移除連接

    Len() int // 連接池長(zhǎng)度
    IdleLen() int // 空閑連接數(shù)量
    Stats() *Stats // 連接池統(tǒng)計(jì)

    Close() error // 關(guān)閉連接池
}

連接池結(jié)構(gòu)體:

type ConnPool struct {
    opt *Options // 連接池配置

    dialErrorsNum uint32 // 連接錯(cuò)誤次數(shù),atomic

    lastDialErrorMu sync.RWMutex // 上一次連接錯(cuò)誤鎖,讀寫鎖
    lastDialError   error // 上一次連接錯(cuò)誤

    queue chan struct{} // 工作連接隊(duì)列

    connsMu      sync.Mutex // 連接隊(duì)列鎖
    conns        []*Conn // 連接隊(duì)列
    idleConns    []*Conn // 空閑連接隊(duì)列
    poolSize     int // 連接池大小
    idleConnsLen int // 空閑連接隊(duì)列長(zhǎng)度

    stats Stats // 連接池統(tǒng)計(jì)

    _closed  uint32 // 連接池關(guān)閉標(biāo)志,atomic
    closedCh chan struct{} // 通知連接池關(guān)閉通道
}

連接池管理

初始化

var _ Pooler = (*ConnPool)(nil)

func NewConnPool(opt *Options) *ConnPool {
    p := &ConnPool{
        opt: opt,

        queue:     make(chan struct{}, opt.PoolSize),
        conns:     make([]*Conn, 0, opt.PoolSize),
        idleConns: make([]*Conn, 0, opt.PoolSize),
        closedCh:  make(chan struct{}),
    }

    p.checkMinIdleConns()

    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }

    return p
}
  1. 創(chuàng)建連接池,傳入連接池配置選項(xiàng)參數(shù) opt,工廠函數(shù)根據(jù) opt 創(chuàng)建連接池實(shí)例。連接池主要依靠以下四個(gè)數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)管理和通信:
  • queue: 存儲(chǔ)工作連接的緩沖通道
  • conns:存儲(chǔ)所有連接的切片
  • idleConns:存儲(chǔ)空閑連接的切片
  • closed:用于通知所有協(xié)程連接池已經(jīng)關(guān)閉的通道
  1. 檢查連接池的空閑連接數(shù)量是否滿足最小空閑連接數(shù)量要求,若不滿足,則創(chuàng)建足夠的空閑連接。
  2. 若連接池配置選項(xiàng)規(guī)定了空閑連接超時(shí)檢查空閑連接頻率,則開啟一個(gè)清理空閑連接的協(xié)程。

關(guān)閉

func (p *ConnPool) Close() error {
    if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
        return ErrClosed
    }
    close(p.closedCh)

    var firstErr error
    p.connsMu.Lock()
    for _, cn := range p.conns {
        if err := p.closeConn(cn); err != nil && firstErr == nil {
            firstErr = err
        }
    }
    p.conns = nil
    p.poolSize = 0
    p.idleConns = nil
    p.idleConnsLen = 0
    p.connsMu.Unlock()

    return firstErr
}
  1. 原子性檢查連接池是否已經(jīng)關(guān)閉,若沒關(guān)閉,則將關(guān)閉標(biāo)志置為1
  2. 關(guān)閉 closedCh 通道,連接池中的所有協(xié)程都可以通過判斷該通道是否關(guān)閉來確定連接池是否已經(jīng)關(guān)閉。
  3. 連接隊(duì)列鎖上鎖,關(guān)閉隊(duì)列中的所有連接,并置空所有維護(hù)連接池狀態(tài)的數(shù)據(jù)結(jié)構(gòu),解鎖。

過濾

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
    var firstErr error
    p.connsMu.Lock()
    for _, cn := range p.conns {
        if fn(cn) {
            if err := p.closeConn(cn); err != nil && firstErr == nil {
                firstErr = err
            }
        }
    }
    p.connsMu.Unlock()
    return firstErr
}

實(shí)質(zhì)上是遍歷連接池中的所有連接,并調(diào)用傳入的 fn 過濾函數(shù)作用在每個(gè)連接上,過濾出符合業(yè)務(wù)要求的連接。

清理

func (p *ConnPool) reaper(frequency time.Duration) {
    ticker := time.NewTicker(frequency)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // It is possible that ticker and closedCh arrive together,
            // and select pseudo-randomly pick ticker case, we double
            // check here to prevent being executed after closed.
            if p.closed() {
                return
            }
            _, err := p.ReapStaleConns()
            if err != nil {
                internal.Logger.Printf("ReapStaleConns failed: %s", err)
                continue
            }
        case <-p.closedCh:
            return
        }
    }
}

func (p *ConnPool) ReapStaleConns() (int, error) {
    var n int
    for {
        p.getTurn()

        p.connsMu.Lock()
        cn := p.reapStaleConn()
        p.connsMu.Unlock()
        p.freeTurn()

        if cn != nil {
            _ = p.closeConn(cn)
            n++
        } else {
            break
        }
    }
    atomic.AddUint32(&p.stats.StaleConns, uint32(n))
    return n, nil
}

func (p *ConnPool) reapStaleConn() *Conn {
    if len(p.idleConns) == 0 {
        return nil
    }

    cn := p.idleConns[0]
    if !p.isStaleConn(cn) {
        return nil
    }

    p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
    p.idleConnsLen--
    p.removeConn(cn)

    return cn
}
  1. 開啟一個(gè)用于檢查并清理過期連接的 goroutine 每隔 frequency 時(shí)間遍歷檢查連接池中是否存在過期連接,并清理。
  2. 創(chuàng)建一個(gè)時(shí)間間隔為 frequency 的計(jì)時(shí)器,在連接池關(guān)閉時(shí)關(guān)閉該計(jì)時(shí)器
  3. 循環(huán)判斷計(jì)時(shí)器是否到時(shí)和連接池是否關(guān)閉
  4. 移除空閑連接隊(duì)列中的過期連接

建立與關(guān)閉連接

建立連接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }

    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // If pool is full remove the cn on next Put.
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}

func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }

    if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
        return nil, p.getLastDialError()
    }

    netConn, err := p.opt.Dialer(ctx)
    if err != nil {
        p.setLastDialError(err)
        if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
            go p.tryDial()
        }
        return nil, err
    }

    cn := NewConn(netConn)
    cn.pooled = pooled
    return cn, nil
}

func (p *ConnPool) tryDial() {
    for {
        if p.closed() {
            return
        }

        conn, err := p.opt.Dialer(context.Background())
        if err != nil {
            p.setLastDialError(err)
            time.Sleep(time.Second)
            continue
        }

        atomic.StoreUint32(&p.dialErrorsNum, 0)
        _ = conn.Close()
        return
    }
}

創(chuàng)建連接流程圖:


newConn流程圖.png

DialConn流程圖.png

移除與關(guān)閉連接

func (p *ConnPool) Remove(cn *Conn, reason error) {
    p.removeConnWithLock(cn)
    p.freeTurn()
    _ = p.closeConn(cn)
}

func (p *ConnPool) CloseConn(cn *Conn) error {
    p.removeConnWithLock(cn)
    return p.closeConn(cn)
}

func (p *ConnPool) removeConnWithLock(cn *Conn) {
    p.connsMu.Lock()
    p.removeConn(cn)
    p.connsMu.Unlock()
}

func (p *ConnPool) removeConn(cn *Conn) {
    for i, c := range p.conns {
        if c == cn {
            p.conns = append(p.conns[:i], p.conns[i+1:]...)
            if cn.pooled {
                p.poolSize--
                p.checkMinIdleConns()
            }
            return
        }
    }
}

func (p *ConnPool) closeConn(cn *Conn) error {
    if p.opt.OnClose != nil {
        _ = p.opt.OnClose(cn)
    }
    return cn.Close()
}

連接池?zé)o論移除還是關(guān)閉連接,底層調(diào)用的都是 removeConnWithLock 函數(shù)。removeConnWithLock 函數(shù)的工作流程如下:

  1. 連接隊(duì)列上鎖
  2. 遍歷連接隊(duì)列找到要關(guān)閉的連接,并將其移除出連接隊(duì)列
  3. 更新連接池統(tǒng)計(jì)數(shù)據(jù)
  4. 檢查連接池最小空閑連接數(shù)量
  5. 連接隊(duì)列解鎖
  6. 關(guān)閉連接,先執(zhí)行關(guān)閉連接時(shí)的回調(diào)函數(shù)(創(chuàng)建連接池時(shí)的配置選項(xiàng)傳入),再關(guān)閉連接

獲取與放回連接

獲取

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }

    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }

    for {
        p.connsMu.Lock()
        cn := p.popIdle()
        p.connsMu.Unlock()

        if cn == nil {
            break
        }

        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }

        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }

    atomic.AddUint32(&p.stats.Misses, 1)

    newcn, err := p.newConn(ctx, true)
    if err != nil {
        p.freeTurn()
        return nil, err
    }

    return newcn, nil
}

func (p *ConnPool) getTurn() {
    p.queue <- struct{}{}
}

func (p *ConnPool) waitTurn(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    select {
    case p.queue <- struct{}{}:
        return nil
    default:
    }

    timer := timers.Get().(*time.Timer)
    timer.Reset(p.opt.PoolTimeout)

    select {
    case <-ctx.Done():
        if !timer.Stop() {
            <-timer.C
        }
        timers.Put(timer)
        return ctx.Err()
    case p.queue <- struct{}{}:
        if !timer.Stop() {
            <-timer.C
        }
        timers.Put(timer)
        return nil
    case <-timer.C:
        timers.Put(timer)
        atomic.AddUint32(&p.stats.Timeouts, 1)
        return ErrPoolTimeout
    }
}

func (p *ConnPool) freeTurn() {
    <-p.queue
}

func (p *ConnPool) popIdle() *Conn {
    if len(p.idleConns) == 0 {
        return nil
    }

    idx := len(p.idleConns) - 1
    cn := p.idleConns[idx]
    p.idleConns = p.idleConns[:idx]
    p.idleConnsLen--
    p.checkMinIdleConns()
    return cn
}

獲取連接流程圖:


Get流程圖.png

放回

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }

    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }

    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    p.freeTurn()
}
  1. 檢查連接中是否還有數(shù)據(jù)沒被讀取,若有,移除連接并返回 BadConnError
  2. 判斷連接是否已經(jīng)放入連接池中,若無(wú),直接移除連接
  3. 連接隊(duì)列上鎖,將該連接加入空閑連接隊(duì)列中,連接隊(duì)列解鎖,工作連接通道移除一個(gè)元素

監(jiān)控統(tǒng)計(jì)

監(jiān)控統(tǒng)計(jì)對(duì)調(diào)整連接池配置選項(xiàng),優(yōu)化連接池性能有很大的幫助。

Dial 錯(cuò)誤統(tǒng)計(jì)

func (p *ConnPool) setLastDialError(err error) {
    p.lastDialErrorMu.Lock()
    p.lastDialError = err
    p.lastDialErrorMu.Unlock()
}

func (p *ConnPool) getLastDialError() error {
    p.lastDialErrorMu.RLock()
    err := p.lastDialError
    p.lastDialErrorMu.RUnlock()
    return err
}

由于一般情況下,連接錯(cuò)誤記錄是讀多寫少的,所以采用讀寫鎖來保證該記錄的并發(fā)安全(讀寫鎖在該場(chǎng)景下性能更佳)。

狀態(tài)統(tǒng)計(jì)

// Len returns total number of connections.
func (p *ConnPool) Len() int {
    p.connsMu.Lock()
    n := len(p.conns)
    p.connsMu.Unlock()
    return n
}

// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
    p.connsMu.Lock()
    n := p.idleConnsLen
    p.connsMu.Unlock()
    return n
}

func (p *ConnPool) Stats() *Stats {
    idleLen := p.IdleLen()
    return &Stats{
        Hits:     atomic.LoadUint32(&p.stats.Hits),
        Misses:   atomic.LoadUint32(&p.stats.Misses),
        Timeouts: atomic.LoadUint32(&p.stats.Timeouts),

        TotalConns: uint32(p.Len()),
        IdleConns:  uint32(idleLen),
        StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
    }
}
  • func (p *ConnPool) Len() int {...}:返回連接池連接數(shù)量總數(shù)
  • func (p *ConnPool) IdleLen() int {...}:返回連接池空閑連接數(shù)量
  • Stats:
    Hits 連接池命中空閑連接次數(shù)
    Misses 連接池沒有空閑連接可用次數(shù)
    Timeouts 請(qǐng)求連接等待超時(shí)次數(shù)
    TotalConns 連接池總連接數(shù)量
    IdleConns 連接池空閑連接數(shù)量
    StaleConns 移除過期連接數(shù)量
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容