Go內(nèi)置database/sql連接池 - 源碼學(xué)習(xí)

引言

Go內(nèi)置了數(shù)據(jù)庫(kù)相關(guān)的庫(kù) - database/sql,實(shí)現(xiàn)數(shù)據(jù)庫(kù)操作相關(guān)的接口,其中還包含一個(gè)很重要的功能 - 連接池,用來(lái)實(shí)現(xiàn)連接的復(fù)用,限制連接的數(shù)量,從而最大程度的復(fù)用連接,提高性能,避免連接數(shù)量失控,導(dǎo)致資源消耗不可控。

本文借Go內(nèi)置的database/sql庫(kù),來(lái)一起學(xué)習(xí)如何一步步設(shè)計(jì)包含連接池的數(shù)據(jù)庫(kù)組件,包括模型抽象、連接復(fù)用,以及如何管理連接數(shù)。

設(shè)計(jì)

模型抽象

首先,我們要對(duì)解決領(lǐng)域進(jìn)行抽象。

我們目標(biāo)是設(shè)計(jì)一個(gè)數(shù)據(jù)庫(kù)連接組件,所以第一個(gè)對(duì)象模型很明確 - 數(shù)據(jù)庫(kù)對(duì)象, 我們將數(shù)據(jù)庫(kù)對(duì)象抽象為一個(gè)DB的結(jié)構(gòu)體,一個(gè)對(duì)象對(duì)應(yīng)的是一個(gè)數(shù)據(jù)庫(kù)實(shí)例,所以DB必須是單例。

其次,數(shù)據(jù)庫(kù)需要連接,所以可以對(duì)連接進(jìn)行抽象,命名為Conn,這里我們不關(guān)心Conn的屬性,而是關(guān)心行為,所以Conn類型定義成一個(gè)interface,包含所需兩個(gè)方法:預(yù)處理方法Prepare和關(guān)閉連接方法Close
(注:Prepare方法不再繼續(xù)展開,實(shí)際上就是接收一個(gè)sql,返回一個(gè)實(shí)現(xiàn)Stmt接口的預(yù)處對(duì)象,接著設(shè)置一下參數(shù),最后執(zhí)行數(shù)據(jù)庫(kù)操作)。

由于不同的數(shù)據(jù)庫(kù)連接的實(shí)現(xiàn)方式會(huì)有不同,這時(shí)就要考慮隔離變化,對(duì)連接的方式進(jìn)行抽象,定義一個(gè)連接接口 - Connector,用來(lái)創(chuàng)建連接(依賴倒置原則),當(dāng)初始化DB的時(shí)候再將具體實(shí)現(xiàn)注入到DB對(duì)象中(也就是依賴注入)。

最終我們可以得到以下幾個(gè)接口和結(jié)構(gòu)體:

// 數(shù)據(jù)庫(kù)對(duì)象
type DB struct {
    connector Connector
}

type Conn interface {
    Prepare(query string) (Stmt, error)
    Close() error
}

// 數(shù)據(jù)庫(kù)連接接口
type Connector interface {
    Connect(context.Context) (Conn, error)
}

最后,我們給DB對(duì)象增加一個(gè)獲取連接的方法Conn,在不考慮連接池的情況下,調(diào)用connector.Connect(ctx)直接獲取連接:

// 獲取一個(gè)連接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    return db.connector.Connect(ctx)
}

連接復(fù)用

當(dāng)不考慮到連接池時(shí),上面的實(shí)現(xiàn)基本滿足需求,但是為了提高性能,連接池還是必須的,接下來(lái)我們開始設(shè)計(jì)連接池的功能。

第一步,需要考慮存儲(chǔ)空閑連接,這里采用的是切片來(lái)存儲(chǔ):freeConn []*Conn。

接著,需要考慮屬性要定義在哪?!空閑的連接需要能被DB實(shí)例中的不同方法訪問(wèn)到,所以我們把freeConn定義為DB的一個(gè)屬性,同時(shí)考慮到對(duì)freeConn的訪問(wèn)會(huì)存在并發(fā)安全的問(wèn)題,需要增加一個(gè)鎖mu sync.Mutex來(lái)保護(hù):

// 數(shù)據(jù)庫(kù)對(duì)象
type DB struct {
    connector Connector
    mu        sync.Mutex // protects following fields
    freeConn  []*Conn
}

數(shù)據(jù)庫(kù)連接獲取方法Conn就需要修改為:

func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    db.mu.Lock() // 加鎖保護(hù)freeConn
    numFree := len(db.freeConn)
    if numFree > 0 {
        conn := db.freeConn[0]
        // 移除第一個(gè)連接
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    db.mu.Unlock()
    return db.connector.Connect(ctx) // 沒(méi)有空閑連接,重新創(chuàng)建
}

連接要復(fù)用,就不能關(guān)閉,用完需要放回連接池中,所以DB需要一個(gè)將連接放回連接池的方法 - putConn:

func (db *DB) putConn(dc Conn) {
    db.mu.Lock()
    db.freeConn = append(db.freeConn, dc)
    db.mu.Unlock()
}

但是,putConn方法要在怎么被調(diào)用?!不能因?yàn)樵黾恿诉B接池功,讓客戶端的改變使用方式,所以我們應(yīng)該考慮將putConn調(diào)用放到一個(gè)合理的地方 - Conn的Close()方法中。

在沒(méi)有連接池功能的時(shí)候,一個(gè)Conn用完了就一定會(huì)調(diào)用Close()釋放資源,有連接池功能后,就不再是直接關(guān)閉連接,而是釋放到連接池中,提供給后續(xù)請(qǐng)求使用。

對(duì)此,我們對(duì)Conn進(jìn)行改造,增加一層代理,命名為PConn,將原來(lái)的Conn作為PConn的一個(gè)屬性,同時(shí)實(shí)現(xiàn)了Conn接口的兩個(gè)方法:Prepare和Close,這樣我們就可以對(duì)Close方法進(jìn)行攔截,修改它的行為:

type PConn interface {
  db        *DB
  ci        Conn
}
func (pc *PConn) Close() error {
    dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
    return pc.ci.Prepare(query)
}

接著我們調(diào)整一下Conn的創(chuàng)建方法(也可以重新實(shí)現(xiàn)Connector接口,返回的是PConn實(shí)例,然后將新的實(shí)現(xiàn)注入到DB實(shí)例中)

func (db *DB) Conn(ctx context.Context) (Conn, error) {
  ...
  c , err := db.connector.Connect(ctx) // 沒(méi)有空閑連接,重新創(chuàng)建
  if err!=nil {
    return nil, err
  }
  return &PConn{
    ci: c,
    db: db,
  }
}

這樣當(dāng)PConn使用完后調(diào)用Close方法,就不再是關(guān)閉連接,而是將連接釋放到DB對(duì)象的freeConn中,由于客戶端定義的類型是Conn接口,并且PConn也實(shí)現(xiàn)了Conn接口,所以無(wú)需調(diào)整(符合開閉原則

到此,連接復(fù)用初步完成了,接著就得考慮另外一個(gè)核心功能 :連接數(shù)量管理。

連接數(shù)量管理

目前的Conn發(fā)現(xiàn)沒(méi)有連接的時(shí)候是調(diào)用connector.Connect方法直接創(chuàng)建新連接,沒(méi)法控制連接的數(shù)量,這樣很明顯是不合理,無(wú)限創(chuàng)建連接可能會(huì)導(dǎo)致資源耗盡,資源消耗曲線過(guò)陡峭,所以我們需要:

  1. 限制連接數(shù)量。將連接的數(shù)量約束在指定的范圍內(nèi)
  2. 連接請(qǐng)求隊(duì)列。當(dāng)連接數(shù)量達(dá)到最大值時(shí),連接請(qǐng)求需要需要放到等待隊(duì)列中,等待有連接釋放。

首先,需要有地方保存當(dāng)前連接數(shù)量和最大連接數(shù),并且要能被對(duì)象中的不同方法訪問(wèn)到,所以我們可以給DB增加兩個(gè)屬性:

type DB struct {
  numOpen      int    // number of opened and pending open connections
  maxOpen      int    // <= 0 means unlimited
}

接著,需要設(shè)計(jì)當(dāng)有請(qǐng)求連接時(shí),發(fā)現(xiàn)沒(méi)有空閑連接,并且連接數(shù)量等于maxOpen時(shí)要怎么辦?

Database/sql里是采用一個(gè)map來(lái)存儲(chǔ)(注:這個(gè)有點(diǎn)奇怪,為什么不用隊(duì)列?),在DB結(jié)構(gòu)體增加一個(gè)屬性:connRequests,類型為:map[uint64]chan connRequest,其中key/value:

  • key :請(qǐng)求唯一標(biāo)識(shí)。調(diào)用nextRequestKeyLocked方法生成,實(shí)際上就是一個(gè)自增的序列,只是為了保持唯一
  • value:等待連接的請(qǐng)求。類型為chan,每個(gè)請(qǐng)求封裝為一個(gè)chan,可以保證并發(fā)安全,同時(shí)也可以利用其阻塞特性(當(dāng)chan沒(méi)有值時(shí)阻塞等待),chan接收的數(shù)據(jù)類型為connRequest格式,當(dāng)其他協(xié)程有釋放連接時(shí),會(huì)將連接放到一個(gè)connRequest對(duì)象中發(fā)送給該chan,connRequest只包含兩個(gè)屬性:conn和err,用來(lái)接收返回連接或是異常

代碼如下:

type DB struct {
    ...
  numOpen      int    // number of opened and pending open connections
  maxOpen      int                    // <= 0 means unlimited
  nextRequest  uint64 // Next key to use in connRequests.
  connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
    next := db.nextRequest
    db.nextRequest++
    return next
}
// 連接請(qǐng)求
type connRequest struct {
    conn *PConn
    err  error
}

調(diào)整獲取連接的方法Conn(ctx context.Context) 邏輯:

  1. 判斷freeConn是否有空閑連接,有就返回
  2. 判斷連接數(shù)量numOpen是否大于maxOpen,如果還小于maxOpen,說(shuō)明還可以創(chuàng)建新連接,創(chuàng)建連接后numOpen++,返回連接
  3. 當(dāng)numOpen已經(jīng)是大于等于maxOpen,就不能再創(chuàng)建新連接,這是就把請(qǐng)求放到集合connRequests中,等待連接釋放。
func (db *DB) Conn(ctx context.Context) (Conn, error) {
    db.mu.Lock() // 加鎖保護(hù)freeConn
    numFree := len(db.freeConn)
    if numFree > 0 { // 有空閑連接
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    // 連接數(shù)已經(jīng)超出
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        req := make(chan connRequest, 1) // 創(chuàng)建一個(gè)chan,接收連接
        reqKey := db.nextRequestKeyLocked() // 生成唯一序號(hào)
        db.connRequests[reqKey] = req // 放到全局屬性,讓其他方法能訪問(wèn)到
        db.mu.Unlock()
        select {
        case <-ctx.Done(): //超時(shí)等
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey) // 移除
            db.mu.Unlock()
        }
        case ret, ok := <-req: // 收到連接釋放
            if !ok {
                return nil, errDBClosed
            }
            return ret.conn, ret.err
        }
    }
  // 連接數(shù)沒(méi)超出,可以創(chuàng)建新連接
  db.numOpen++ // optimistically
  db.mu.Unlock()
    c, err := db.connector.Connect(ctx) // 重新創(chuàng)建
    if err != nil {
        return nil, err
    }
    return &PConn{
        ci: c,
        db: db,
    }, nil
}

接著,我們還需要調(diào)整連接釋放方法 - putConn:

  1. 增加一個(gè)bool返回值,告訴調(diào)用方連接是否釋放成功(如果失敗,客戶端可以決定關(guān)閉連接)
  2. 如果連接數(shù)numOpen大于 maxOpen時(shí),當(dāng)前連接直接丟棄,返回false
  3. 當(dāng)len(db.connRequests)大于0時(shí),需要考慮將連接優(yōu)先給db.connRequests中的請(qǐng)求
  4. 最后才將連接放入空閑列表中。
func (db *DB) putConn(dc Conn) bool{
  db.mu.Lock()
  defer db.mu.Unlock()
  if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 當(dāng)有等待連接的請(qǐng)求時(shí)
    if c := len(db.connRequests); c > 0 {
      var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    }
    // 放入空閑連接池中
    db.freeConn = append(db.freeConn, dc)
  return true
}

PConn的close方法也要稍微調(diào)整一下,如果釋放連接失敗,需要把連接關(guān)閉

func (pc *PConn) Close() error {
    ok := dc.db.putConn(pc)
    if !ok {
      dc.ci.Close()
    }
}

總結(jié)

到目前為止,一個(gè)較完整的包含連接池的數(shù)據(jù)庫(kù)組件完成了,上述代碼是參照database/sql設(shè)計(jì),去除了一些非核心的代碼,通過(guò)對(duì)它的研究,可以學(xué)到:

  1. 如何對(duì)問(wèn)題域進(jìn)行抽象。數(shù)據(jù)庫(kù)抽象成DB、連接為Conn,連接器Connector等
  2. 如何命名一些變量。 connRequest(請(qǐng)求連接)、 freeConn(請(qǐng)求連接)等
  3. 如何使用context。實(shí)現(xiàn)超時(shí)等
  4. 并發(fā)編程。chan、mutex、全局屬性訪問(wèn),資源約束等

我的博客https://itart.cn/blogs/2021/explore/database-sql-pool.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容