golang源碼分析-sql package

背景

1. 結(jié)構(gòu)體

1.1 depSet和finalCloser

depSet : 記錄db與conn之間的依賴關(guān)系,維持連接池以及關(guān)閉時使用
finalCloser: Todo

 // depSet is a finalCloser's outstanding dependencies
 type depSet map[interface{}]bool // set of true bools
// The finalCloser interface is used by (*DB).addDep and related
// dependency reference counting.

type finalCloser interface {
// finalClose is called when the reference count of an object
// goes to zero. (*DB).mu is not held while calling it.
finalClose() error

1.2 DB結(jié)構(gòu)體

// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can only be reliably
// observed within a transaction. Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
    driver driver.Driver
    dsn    string
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64
    mu           sync.Mutex // protects following fields
    //當前連接池中空余的連接
   //當freeConn>maxIdle時,會將多余的那部分連接close掉
    freeConn     []*driverConn
    connRequests []chan connRequest
    numOpen      int // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh    chan struct{}
    closed      bool
    // 記錄依賴關(guān)系
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string // stacktrace of last conn's put; debug only
    //maxIdle和maxOpen的關(guān)系?
   // maxIdle為連接池中最大的連接數(shù)目
   // maxOpen為打開db最大的連接數(shù) : maxOpen僅當>0且< maxIdle的情況下才會生效
    maxIdle     int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen     int                    // <= 0 means unlimited
    //單個連接最大生命周期
    maxLifetime time.Duration          // maximum amount of time a connection may be reused
    cleanerCh   chan struct{}
}

// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8

const (
    // alwaysNewConn forces a new connection to the database.
    alwaysNewConn connReuseStrategy = iota
    // cachedOrNewConn returns a cached connection, if available, else waits
    // for one to become available (if MaxOpenConns has been reached) or
    // creates a new database connection.
    cachedOrNewConn
)

1.3 driverConn結(jié)構(gòu)體
driverConn為單個連接的結(jié)構(gòu)體,同時它有一個其依賴的db的指針(因為每個連接都是建立在db之上),在DB中可以處理單個連接的操作(比如Close()操作)

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // guards following
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close has been called
    openStmt    map[driver.Stmt]bool

    // guarded by db.mu
    inUse      bool
    onPut      []func() // code (with db.mu held) run when conn is next returned
    dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

2. 連接池原理分析

2.1 單個請求關(guān)閉操作

關(guān)閉操作主要分為兩個步驟:
a. 加互斥鎖關(guān)閉該連接中的closed值
b. 加互斥所關(guān)閉其所依賴db的依賴關(guān)系(調(diào)用removeDepLocked函數(shù))

  func (dc *driverConn) Close() error {
    dc.Lock()
    if dc.closed {
        dc.Unlock()
        return errors.New("sql: duplicate driverConn close")
    }
    dc.closed = true
    dc.Unlock() // not defer; removeDep finalClose calls may need to lock

    // And now updates that require holding dc.mu.Lock.
    dc.db.mu.Lock()
    dc.dbmuClosed = true
    fn := dc.db.removeDepLocked(dc, dc)
    dc.db.mu.Unlock()
    return fn()
}

2.2 db操作去除依賴關(guān)系函數(shù)

注意,從2.1可知該函數(shù)的入?yún) finalCloser為單個連接的指針,在db中會建立一個map, map[finalCloser]depSet(db中的deep字段),如果該連接關(guān)閉掉,則會在關(guān)閉的時候在map中delete該依賴關(guān)系,同時還要確保該連接內(nèi)Stmt全部關(guān)閉(調(diào)用finalClose函數(shù))。

 func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
    //println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep))

    xdep, ok := db.dep[x]
    if !ok {
        panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
    }

    l0 := len(xdep)
    delete(xdep, dep)

    switch len(xdep) {
    case l0:
        // Nothing removed. Shouldn't happen.
        panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
    case 0:
        // No more dependencies.
        delete(db.dep, x)
        return x.finalClose
    default:
        // Dependencies remain.
        return func() error { return nil }
    }
}

2.3 db Open函數(shù)

a. 在sql包里有兩個全局變量:一個讀寫鎖,一個是driver map表;
b. 在driver map初始化的過程中,為防止對應(yīng)關(guān)系被修改加了一把讀鎖;
c. connectionRequestQueueSize變量的理解:請求連接隊列大小要遠大于db.maxOpen,這樣做可以在僅打開<=maxOpen個連接的情況下不阻塞住請求數(shù)量,其實現(xiàn)原理是通過golfing的channel去處理的;

var (
    driversMu sync.RWMutex
    drivers   = make(map[string]driver.Driver)
)

 // This is the size of the connectionOpener request chan (DB.openerCh).
// This value should be larger than the maximum typical value
// used for db.maxOpen. If maxOpen is significantly larger than
// connectionRequestQueueSize then it is possible for ALL calls into the *DB
// to block until the connectionOpener can satisfy the backlog of requests.
var connectionRequestQueueSize = 1000000

// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers[driverName]
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    db := &DB{
        driver:   driveri,
        dsn:      dataSourceName,
        openerCh: make(chan struct{}, connectionRequestQueueSize),
        lastPut:  make(map[*driverConn]string),
    }
    go db.connectionOpener()
    return db, nil
}

2.4 建立連接conn函數(shù)

在介紹conn函數(shù)之前要了解sql保重一個重要的枚舉常量connReuseStrategy,其定義入下:

// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8

const (
    // alwaysNewConn forces a new connection to the database.
    alwaysNewConn connReuseStrategy = iota
    // cachedOrNewConn returns a cached connection, if available, else waits
    // for one to become available (if MaxOpenConns has been reached) or
    // creates a new database connection.
    cachedOrNewConn
)

其中:
a. alwaysNewConn常量定義了打開連接時的策略為每次建立一個新的連接;
b. cachedOrNewConn常量會從返回一個cached連接或者等待一個可用連接,甚至也可能建立一個新的連接;

// connRequest represents one request for a new connection
// When there are no idle connections available, DB.conn will create
// a new connRequest and put it on the db.connRequests list.
type connRequest struct {
    conn *driverConn
    err  error
}

下面為db中建立連接的過程,分為三個步驟:
a. 如果是基于cachedOrNewConn緩存策略,且有可用的連接,則從freeConn中取第一個連接返回;
b. 如果連接數(shù)量已經(jīng)>設(shè)定的閾值,則從req channel中等待一個可用的連接返回;
c. 重新建立一個新的連接返回;

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    lifetime := db.maxLifetime

    // 策略一: Prefer a free connection, if possible.
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    //策略二: Out of free connections or we were asked not to use one.  If we're not
    // allowed to open any more connections, make a request and wait.
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        db.connRequests = append(db.connRequests, req)
        db.mu.Unlock()
        ret, ok := <-req
        if !ok {
            return nil, errDBClosed
        }
        if ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
        }
        return ret.conn, ret.err
    }

    //策略三
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    db.addDepLocked(dc, dc)
    dc.inUse = true
    db.mu.Unlock()
    return dc, nil
}

3. sql查詢原理分析

在介紹db類查詢之前,首先介紹下一個主要的結(jié)構(gòu)體和一個函數(shù),
a. driverStmt為db連接處理sql時db與driver 聲明之間的一個橋梁;
b. rowsiFromStatement函數(shù)為queryConn函數(shù)調(diào)用一個基類函數(shù),其作用為調(diào)用driver的Query方法實現(xiàn)對driverStmt的一個查詢;

// driverStmt associates a driver.Stmt with the
// *driverConn from which it came, so the driverConn's lock can be
// held during calls.
type driverStmt struct {
    //該字段Locker為interface,定義了Lock和Unlock方法,而在driverStmt結(jié)構(gòu)體中,則為指向driver連接的指針
    sync.Locker // the *driverConn
    //si 為driver中的Stmt 類型interface(主要實現(xiàn)了Exec()和Query()方法)
    si          driver.Stmt
}

func rowsiFromStatement(ds driverStmt, args ...interface{}) (driver.Rows, error) {
    ds.Lock()
    want := ds.si.NumInput()
    ds.Unlock()

    // -1 means the driver doesn't know how to count the number of
    // placeholders, so we won't sanity check input here and instead let the
    // driver deal with errors.
    if want != -1 && len(args) != want {
        return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(args))
    }
    // driverArgs為實現(xiàn)格式化參數(shù)的一個函數(shù)
    dargs, err := driverArgs(&ds, args)
    if err != nil {
        return nil, err
    }

    ds.Lock()
    //該函數(shù)主要調(diào)用driver中的Query實現(xiàn)查詢其結(jié)果
    rowsi, err := ds.si.Query(dargs)
    ds.Unlock()
    if err != nil {
        return nil, err
    }
    return rowsi, nil
}

下面介紹下db類查詢中一個主要的函數(shù)queryConn,其中
a. dc為driverConn類型;
b. releaseConn為一個函數(shù)入?yún)?,其含義為釋放該連接只freeConn池的作用;
c. query 和 args為輸入sql語言分解后的語句和參數(shù);

// queryConn executes a query on the given connection.
// The connection gets released by the releaseConn function.
func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
    if queryer, ok := dc.ci.(driver.Queryer); ok {
        dargs, err := driverArgs(nil, args)
        if err != nil {
            releaseConn(err)
            return nil, err
        }
        dc.Lock()
        rowsi, err := queryer.Query(query, dargs)
        dc.Unlock()
        if err != driver.ErrSkip {
            if err != nil {
                releaseConn(err)
                return nil, err
            }
            // Note: ownership of dc passes to the *Rows, to be freed
            // with releaseConn.
            rows := &Rows{
                dc:          dc,
                releaseConn: releaseConn,
                rowsi:       rowsi,
            }
            return rows, nil
        }
    }

    dc.Lock()
    si, err := dc.ci.Prepare(query)
    dc.Unlock()
    if err != nil {
        releaseConn(err)
        return nil, err
    }

    ds := driverStmt{dc, si}
    rowsi, err := rowsiFromStatement(ds, args...)
    if err != nil {
        dc.Lock()
        si.Close()
        dc.Unlock()
        releaseConn(err)
        return nil, err
    }

    // Note: ownership of ci passes to the *Rows, to be freed
    // with releaseConn.
    rows := &Rows{
        dc:          dc,
        releaseConn: releaseConn,
        rowsi:       rowsi,
        closeStmt:   si,
    }
    return rows, nil
}

3.1 db查詢語言分析

3.2 Stmt結(jié)構(gòu)體介紹

// connStmt is a prepared statement on a particular connection.
type connStmt struct {
    dc *driverConn
    si driver.Stmt
}

// Stmt is a prepared statement.
// A Stmt is safe for concurrent use by multiple goroutines.
type Stmt struct {
    // Immutable:
    db        *DB    // where we came from
    query     string // that created the Stmt
    stickyErr error  // if non-nil, this error is returned for all operations

    closemu sync.RWMutex // held exclusively during close, for read otherwise.

    // If in a transaction, else both nil:
    tx   *Tx
    txsi *driverStmt

    mu     sync.Mutex // protects the rest of the fields
    closed bool

    // css is a list of underlying driver statement interfaces
    // that are valid on particular connections.  This is only
    // used if tx == nil and one is found that has idle
    // connections.  If tx != nil, txsi is always used.
    css []connStmt

    // lastNumClosed is copied from db.numClosed when Stmt is created
    // without tx and closed connections in css are removed.
    lastNumClosed uint64
} 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容