golang sql 包連接池分析

golang 在使用 mysql 的時候會用到 database/sql 這個庫,每次都在黑盒使用它,有必要來梳理一下整個請求流程和細節(jié),以免以后碰到問題會有解決的思路。

閱讀之前的幾個問題

  • sql 的連接池的連接怎么維護的?
  • Query / Exec 如何獲取查詢的連接?
  • 連接池的連接如何釋放的?

幾個重要的結構

DB struct

先來看看 DB 結構,該結構是 sql 包的核心結構。DB 是表示零個或多個底層連接池的數(shù)據(jù)庫句柄,是并發(fā)安全的。

sql 包可以自動創(chuàng)建和釋放連接,它還維護一個空閑連接池。如果數(shù)據(jù)庫具有每個連接狀態(tài)的概念,則只能在事務中可靠地觀察到這種狀態(tài)。調(diào)用 DB.Begin 后,返回的 Tx 將綁定到單個連接。在事務上調(diào)用 Commit 或 Rollback 后,該事務的連接將返回到 DB 的空閑連接池。SetMaxIdleConns 用來控制連接池大小。

type DB struct {
    driver driver.Driver // 數(shù)據(jù)庫驅(qū)動
    dsn    string // 數(shù)據(jù)庫連接參數(shù),比如 username,hostname,password 等等
    numClosed uint64 // numClosed 是一個原子計數(shù)器,表示已關閉連接的總數(shù)。Stmt.openStmt 在清除 Stmt.css 中的已關閉連接之前對其進行檢查。

    mu           sync.Mutex // 保護下面的字段
    freeConn     []*driverConn // 空閑連接
    connRequests map[uint64]chan connRequest // 阻塞請求隊列。當達到最大連接數(shù)時,后續(xù)請求將插入該隊列來等待可用連接
    nextRequest  uint64 // connRequests 的下一個 key
    numOpen      int    // 已連接或者正等待連接的數(shù)量

    // 一個創(chuàng)建新連接的信號,
    // 運行connectionOpener()的goroutine讀取此chan,maybeOpenNewConnections發(fā)送此chan(每個需要的連接發(fā)送一次)
    // 它在db.Close()時關閉,并通知connectionOpener goroutine退出。
    openerCh    chan struct{} 
    closed      bool
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string // 用于 debug
    maxIdle     int                    // 最大空閑連接數(shù), 0等價于 defaultMaxIdleConns 常量(代碼中值為2),負數(shù)等價于0
    maxOpen     int                    // 數(shù)據(jù)庫的最大連接數(shù),0 等價于不限制最大連接數(shù)
    maxLifetime time.Duration          // 連接的最大生命周期
    cleanerCh   chan struct{} // 用于釋放連接池中過期的連接的信號
}
driverConn struct

driverConn 使用互斥鎖封裝一個 driver.Conn 結構,在所有對 Conn 的調(diào)用期間保持(包括對通過該 Conn 返回的接口的任何調(diào)用,例如對 Tx,Stmt,Result,Rows 的調(diào)用)

type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // 保護下面的字段
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close 已經(jīng)被調(diào)用則為 true
    openStmt    map[*driverStmt]bool

    // 下面的字段被 db.mu 保護
    inUse      bool
    onPut      []func() // 下次返回 conn 時運行的代碼
    dbmuClosed bool     // 與 closed 字段相同,但由 db.mu 保護,用于 removeClosedStmtLocked
}

// driver.Conn 是具體的接口 用來支持不同的數(shù)據(jù)庫
// Conn 是與數(shù)據(jù)庫的連接,不是 gotoutines 安全的。
// Conn 被認為是有狀態(tài)的。
type Conn interface {
    // Prepare 返回綁定到該連接的就緒語句 Stmt。
    Prepare(query string) (Stmt, error)

    // Close 使當前就緒的語句和事務無效并可能停止,將此連接標記為不再使用。
    //
    // 因為sql包維護一個空閑的連接池,并且只有在空閑連接過剩時才調(diào)用Close,所以驅(qū)動不需要做自己的連接緩存。
    Close() error

    // Begin 啟動并返回一個新的事務 Tx
    Begin() (Tx, error)
}

// 可以看到driverConn的這個方法,看名字就知道是釋放連接的 調(diào)用了DB 的 putConn 方法,這里先留個印象
func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

驅(qū)動注冊綁定

我們在使用指定數(shù)據(jù)庫時需要使用import _ "github.com/go-sql-driver/mysql"來執(zhí)行 init() 函數(shù)。這個 init() 函數(shù)主要用來將指定的數(shù)據(jù)庫驅(qū)動注冊到 sql 的 一個 map 類型的 drivers 變量中。

mysql/driver.go

// 該方法注冊到驅(qū)動,也就是db.Open的調(diào)用,返回的mc是實現(xiàn)的driver.Conn接口的結構,dsn 為連接該數(shù)據(jù)庫的配置
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
    // New mysqlConn
    mc := &mysqlConn{
        maxAllowedPacket: maxPacketSize,
        maxWriteSize:     maxPacketSize - 1,
        closech:          make(chan struct{}),
    }
    ...
    return mc, nil
}

// 注冊驅(qū)動 MySQLDriver 結構實現(xiàn)了 driver.Driver 接口
func init() {
    sql.Register("mysql", &MySQLDriver{})
}

連接

創(chuàng)建連接的過程
創(chuàng)建連接

可以看到,調(diào)用sql.Open的時候會啟動一個 goroutine 一直阻塞讀取db.openerCh。當這個openerCh收到信號時,會啟動創(chuàng)建連接的流程,調(diào)用驅(qū)動提供的創(chuàng)建連接的方法創(chuàng)建連接。如果創(chuàng)建成功,優(yōu)先把改連接給db.connRequests中阻塞的請求使用,如果沒有阻塞的請求就把這個新連接放入 db.freeConn中待請求使用。

關鍵方法
// 調(diào)用驅(qū)動的 Open 方法創(chuàng)建新連接
func (db *DB) openNewConnection() {
    // 創(chuàng)建新連接
    ci, err := db.driver.Open(db.dsn)
    db.mu.Lock()
    defer db.mu.Unlock()
    if db.closed {
        if err == nil {
            ci.Close()
        }
        db.numOpen--
        return
    }
    if err != nil {
        db.numOpen--
        db.putConnDBLocked(nil, err)
        db.maybeOpenNewConnections()
        return
    }
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    // 直接給阻塞的請求使用 或者 放入連接池
    if db.putConnDBLocked(dc, err) {
        db.addDepLocked(dc, dc)
    } else {
        db.numOpen--
        ci.Close()
    }
}

// 給 阻塞在 connRequest 隊列的請求分配連接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    // 如果超過最大連接,直接返回false,connRequest 隊列的請求繼續(xù)阻塞
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 把連接分配給 connRequests 阻塞的請求
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 取第一個
        for reqKey, req = range db.connRequests {
            break
        }
        // 阻塞的請求得到連接 刪除 connRequests 的記錄
        delete(db.connRequests, reqKey)
        // 標記該連接正在使用
        if err == nil {
            dc.inUse = true
        }
        // 通過 chan 把該連接發(fā)送給請求
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    // 如果空閑連接數(shù)小于最大連接限制 把該連接放到 freeConn 中
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
        db.freeConn = append(db.freeConn, dc)
        // 根據(jù) db.maxLifetime 起一個 goroutine 清除freeConn中過期的連接 
        db.startCleanerLocked()
        return true
    }
    return false
}

查詢?nèi)绾潍@取連接

先來看提供的兩個基本的查詢的方法 Query / Exec

// 使用方法
db.Query("SELECT * FROM table")
db.Exec("INSERT INTO table VALUES (1)")
  • Query:執(zhí)行需要返回 rows 的操作,例如 SELECT)不釋放連接,但在調(diào)用后仍然保持連接,即放回 freeConn。
  • Exec:執(zhí)行沒有返回 rows 的操作,例如 INSERT, UPDATE,DELETE)在調(diào)用后自動釋放連接。


    Query 查詢
關鍵方法
// conn 返回新打開的連接,或者從連接池freeConn中取
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    ...
    // 檢查上下文是否被取消
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // cachedOrNewConn 模式獲取連接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // 如果連接數(shù)已經(jīng)超過限制,將該請求放入connRequest中阻塞,直到有空閑連接
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.mu.Unlock()

        // 上下文判斷請求超時
        select {
        case <-ctx.Done():
            // 刪除 connRequests 中阻塞的請求
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            select {
            default:
            case ret, ok := <-req:
                if ok {
                    // 如果收到了連接,由于超時了,回收該連接
                    db.putConn(ret.conn, ret.err)
                }
            }
            return nil, ctx.Err()
        // 獲取到了連接,返回處理
        case ret, ok := <-req:
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }

    // 連接池中沒有連接,且打開的連接數(shù)沒有超限,創(chuàng)建新連接
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    db.addDepLocked(dc, dc)
    dc.inUse = true
    db.mu.Unlock()
    return dc, nil
}

連接的回收或釋放

被動回收或釋放

我們沿著上面的 Query 請求分析下來,在 queryConn 的方法中會看到一個 releaseConn 的方法,它調(diào)用了putConn方法去處理這個 dc 連接。

func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

再來看看 putConn 方法的定義

// 把 dc 連接放回連接池 freeConn 或者釋放
func (db *DB) putConn(dc *driverConn, err error) {
    db.mu.Lock()
    // 回收一個沒被使用的連接 會panic
    if !dc.inUse {
        if debugGetPut {
            fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {
        db.lastPut[dc] = stack()
    }
    // 將該連接置為 未使用
    dc.inUse = false

    // 執(zhí)行完該連接的函數(shù)
    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil
    // 不重用無效的連接
    if err == driver.ErrBadConn {
        // 該函數(shù)會判斷 阻塞在 connRequest 的請求數(shù)量,然后在不超限的情況下,通過 openerCh 喚醒 connectionOpener goroutine 創(chuàng)建新連接處理請求。
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        // 釋放連接
        dc.Close()
        return
    }
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    // 如果是有效的連接,將該連接給 阻塞在 connRequest 的請求使用,或者放回連接池
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

    // 改連接沒被回收,釋放
    if !added {
        dc.Close()
    }
}
主動回收或釋放

除了上述的連接回收釋放方式,還有沒有其他地方回收釋放呢。當我們設置 db.SetConnMaxLifetime 也就是設置連接的最大存活時間時,都會調(diào)起一個 goroutine 負責處理連接池中過期的連接。同 openerCh 信號一樣,釋放也用到了一個 cleanerCh 用于通知該 goroutine 處理任務。

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    // 當縮小 maxLifetime 的時候,直接清理不符的連接
    if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxLifetime = d
    // 該方法會起一個 goroutine  負責釋放過期連接
    db.startCleanerLocked()
    db.mu.Unlock()
}

// 滿足條件開啟一個 goroutine 維護過期的連接
func (db *DB) startCleanerLocked() {
    if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
        db.cleanerCh = make(chan struct{}, 1)
        go db.connectionCleaner(db.maxLifetime)
    }
}

// 核心的邏輯
func (db *DB) connectionCleaner(d time.Duration) {
    const minInterval = time.Second

    if d < minInterval {
        d = minInterval
    }
    t := time.NewTimer(d)

    // 阻塞等待 cleanerCh 信號
    for {
        select {
        case <-t.C:
        case <-db.cleanerCh: // maxLifetime 修改 或者 db 關閉會發(fā)送該信號
        }

        db.mu.Lock()
        d = db.maxLifetime
        if db.closed || db.numOpen == 0 || d <= 0 {
            db.cleanerCh = nil
            db.mu.Unlock()
            return
        }

        expiredSince := nowFunc().Add(-d)
        var closing []*driverConn
        // 從連接池 freeConn 獲取過期連接
        for i := 0; i < len(db.freeConn); i++ {
            c := db.freeConn[i]
            if c.createdAt.Before(expiredSince) {
                closing = append(closing, c)
                last := len(db.freeConn) - 1
                db.freeConn[i] = db.freeConn[last]
                db.freeConn[last] = nil
                db.freeConn = db.freeConn[:last]
                i--
            }
        }
        db.mu.Unlock()
        // 釋放連接
        for _, c := range closing {
            c.Close()
        }

        if d < minInterval {
            d = minInterval
        }
        t.Reset(d)
    }
}

小結

現(xiàn)在回過頭來看看開始的三個問題,基本就有解了。

  • sql 的連接池的連接怎么維護的?
    有效的連接存儲在連接池 freeConn 中。啟用一個connectionOpener goroutine 通過接受 openerCh 信號負責調(diào)用驅(qū)動的 Open 方法創(chuàng)建連接。當用db.SetConnMaxLifetime 設置 MaxLifetime 或者調(diào)用 putConnDBLocked方法滿足條件時候會啟用一個 connectionCleanergoroutine 通過接受cleanerCh信號負責清理連接。
  • Query / Exec 如何獲取查詢的連接?
    1. 先查看 freeConn 是否有可用的連接,如果有就從連接池取。如果沒有進入下一步。
    2. 判斷當前連接數(shù)是否超限。如果超限,將該請求放入 connRequests 阻塞等待可用連接。如果沒超限進入下一步。
    3. 創(chuàng)建新的連接
  • 連接池的連接如何回收/釋放的?
    1. 被動回收/釋放。通過查詢等操作返回錯誤的時候會執(zhí)行 releaseConn 函數(shù)回收連接。當滿足條件時 會起一個connectionCleaner goroutine 清理連接池的無效連接。
    2. 主動回收/釋放。設置 db.SetConnMaxLifetime 的時候會觸發(fā)一次connectionCleaner goroutine 清理連接池。

可以看到sql庫的連接池的實現(xiàn)機制其實還是蠻復雜的,生產(chǎn)者connectionOpener goroutine 阻塞監(jiān)聽 openerCh 創(chuàng)建連接放入連接池。當請求來時,先查詢連接池有沒有空閑連接,如果沒有空閑連接則創(chuàng)建,Query 類的請求用完繼續(xù)放回連接池重用。當需要清理連接池的連接時調(diào)用connectionCleaner goroutine。分析過一遍,以后遇到問題會更快的處理。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容