背景
1. 結(jié)構(gòu)體
1.1 depSet和finalCloser
depSet : 記錄db與conn之間的依賴關(guān)系,維持連接池以及關(guān)閉時使用
finalCloser: Todo
// depSet is a finalCloser's outstanding dependencies
type depSet map[interface{}]bool // set of true bools
// The finalCloser interface is used by (*DB).addDep and related
// dependency reference counting.
type finalCloser interface {
// finalClose is called when the reference count of an object
// goes to zero. (*DB).mu is not held while calling it.
finalClose() error
1.2 DB結(jié)構(gòu)體
// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can only be reliably
// observed within a transaction. Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
driver driver.Driver
dsn string
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64
mu sync.Mutex // protects following fields
//當前連接池中空余的連接
//當freeConn>maxIdle時,會將多余的那部分連接close掉
freeConn []*driverConn
connRequests []chan connRequest
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
// 記錄依賴關(guān)系
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
//maxIdle和maxOpen的關(guān)系?
// maxIdle為連接池中最大的連接數(shù)目
// maxOpen為打開db最大的連接數(shù) : maxOpen僅當>0且< maxIdle的情況下才會生效
maxIdle int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
//單個連接最大生命周期
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
}
// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8
const (
// alwaysNewConn forces a new connection to the database.
alwaysNewConn connReuseStrategy = iota
// cachedOrNewConn returns a cached connection, if available, else waits
// for one to become available (if MaxOpenConns has been reached) or
// creates a new database connection.
cachedOrNewConn
)
1.3 driverConn結(jié)構(gòu)體
driverConn為單個連接的結(jié)構(gòu)體,同時它有一個其依賴的db的指針(因為每個連接都是建立在db之上),在DB中可以處理單個連接的操作(比如Close()操作)
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
db *DB
createdAt time.Time
sync.Mutex // guards following
ci driver.Conn
closed bool
finalClosed bool // ci.Close has been called
openStmt map[driver.Stmt]bool
// guarded by db.mu
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}
2. 連接池原理分析
2.1 單個請求關(guān)閉操作
關(guān)閉操作主要分為兩個步驟:
a. 加互斥鎖關(guān)閉該連接中的closed值
b. 加互斥所關(guān)閉其所依賴db的依賴關(guān)系(調(diào)用removeDepLocked函數(shù))
func (dc *driverConn) Close() error {
dc.Lock()
if dc.closed {
dc.Unlock()
return errors.New("sql: duplicate driverConn close")
}
dc.closed = true
dc.Unlock() // not defer; removeDep finalClose calls may need to lock
// And now updates that require holding dc.mu.Lock.
dc.db.mu.Lock()
dc.dbmuClosed = true
fn := dc.db.removeDepLocked(dc, dc)
dc.db.mu.Unlock()
return fn()
}
2.2 db操作去除依賴關(guān)系函數(shù)
注意,從2.1可知該函數(shù)的入?yún) finalCloser為單個連接的指針,在db中會建立一個map, map[finalCloser]depSet(db中的deep字段),如果該連接關(guān)閉掉,則會在關(guān)閉的時候在map中delete該依賴關(guān)系,同時還要確保該連接內(nèi)Stmt全部關(guān)閉(調(diào)用finalClose函數(shù))。
func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
//println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep))
xdep, ok := db.dep[x]
if !ok {
panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
}
l0 := len(xdep)
delete(xdep, dep)
switch len(xdep) {
case l0:
// Nothing removed. Shouldn't happen.
panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
case 0:
// No more dependencies.
delete(db.dep, x)
return x.finalClose
default:
// Dependencies remain.
return func() error { return nil }
}
}
2.3 db Open函數(shù)
a. 在sql包里有兩個全局變量:一個讀寫鎖,一個是driver map表;
b. 在driver map初始化的過程中,為防止對應(yīng)關(guān)系被修改加了一把讀鎖;
c. connectionRequestQueueSize變量的理解:請求連接隊列大小要遠大于db.maxOpen,這樣做可以在僅打開<=maxOpen個連接的情況下不阻塞住請求數(shù)量,其實現(xiàn)原理是通過golfing的channel去處理的;
var (
driversMu sync.RWMutex
drivers = make(map[string]driver.Driver)
)
// This is the size of the connectionOpener request chan (DB.openerCh).
// This value should be larger than the maximum typical value
// used for db.maxOpen. If maxOpen is significantly larger than
// connectionRequestQueueSize then it is possible for ALL calls into the *DB
// to block until the connectionOpener can satisfy the backlog of requests.
var connectionRequestQueueSize = 1000000
// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
db := &DB{
driver: driveri,
dsn: dataSourceName,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
}
go db.connectionOpener()
return db, nil
}
2.4 建立連接conn函數(shù)
在介紹conn函數(shù)之前要了解sql保重一個重要的枚舉常量connReuseStrategy,其定義入下:
// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8
const (
// alwaysNewConn forces a new connection to the database.
alwaysNewConn connReuseStrategy = iota
// cachedOrNewConn returns a cached connection, if available, else waits
// for one to become available (if MaxOpenConns has been reached) or
// creates a new database connection.
cachedOrNewConn
)
其中:
a. alwaysNewConn常量定義了打開連接時的策略為每次建立一個新的連接;
b. cachedOrNewConn常量會從返回一個cached連接或者等待一個可用連接,甚至也可能建立一個新的連接;
// connRequest represents one request for a new connection
// When there are no idle connections available, DB.conn will create
// a new connRequest and put it on the db.connRequests list.
type connRequest struct {
conn *driverConn
err error
}
下面為db中建立連接的過程,分為三個步驟:
a. 如果是基于cachedOrNewConn緩存策略,且有可用的連接,則從freeConn中取第一個連接返回;
b. 如果連接數(shù)量已經(jīng)>設(shè)定的閾值,則從req channel中等待一個可用的連接返回;
c. 重新建立一個新的連接返回;
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
lifetime := db.maxLifetime
// 策略一: Prefer a free connection, if possible.
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
//策略二: Out of free connections or we were asked not to use one. If we're not
// allowed to open any more connections, make a request and wait.
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
ret, ok := <-req
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
//策略三
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.driver.Open(db.dsn)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
db.addDepLocked(dc, dc)
dc.inUse = true
db.mu.Unlock()
return dc, nil
}
3. sql查詢原理分析
在介紹db類查詢之前,首先介紹下一個主要的結(jié)構(gòu)體和一個函數(shù),
a. driverStmt為db連接處理sql時db與driver 聲明之間的一個橋梁;
b. rowsiFromStatement函數(shù)為queryConn函數(shù)調(diào)用一個基類函數(shù),其作用為調(diào)用driver的Query方法實現(xiàn)對driverStmt的一個查詢;
// driverStmt associates a driver.Stmt with the
// *driverConn from which it came, so the driverConn's lock can be
// held during calls.
type driverStmt struct {
//該字段Locker為interface,定義了Lock和Unlock方法,而在driverStmt結(jié)構(gòu)體中,則為指向driver連接的指針
sync.Locker // the *driverConn
//si 為driver中的Stmt 類型interface(主要實現(xiàn)了Exec()和Query()方法)
si driver.Stmt
}
func rowsiFromStatement(ds driverStmt, args ...interface{}) (driver.Rows, error) {
ds.Lock()
want := ds.si.NumInput()
ds.Unlock()
// -1 means the driver doesn't know how to count the number of
// placeholders, so we won't sanity check input here and instead let the
// driver deal with errors.
if want != -1 && len(args) != want {
return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(args))
}
// driverArgs為實現(xiàn)格式化參數(shù)的一個函數(shù)
dargs, err := driverArgs(&ds, args)
if err != nil {
return nil, err
}
ds.Lock()
//該函數(shù)主要調(diào)用driver中的Query實現(xiàn)查詢其結(jié)果
rowsi, err := ds.si.Query(dargs)
ds.Unlock()
if err != nil {
return nil, err
}
return rowsi, nil
}
下面介紹下db類查詢中一個主要的函數(shù)queryConn,其中
a. dc為driverConn類型;
b. releaseConn為一個函數(shù)入?yún)?,其含義為釋放該連接只freeConn池的作用;
c. query 和 args為輸入sql語言分解后的語句和參數(shù);
// queryConn executes a query on the given connection.
// The connection gets released by the releaseConn function.
func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
if queryer, ok := dc.ci.(driver.Queryer); ok {
dargs, err := driverArgs(nil, args)
if err != nil {
releaseConn(err)
return nil, err
}
dc.Lock()
rowsi, err := queryer.Query(query, dargs)
dc.Unlock()
if err != driver.ErrSkip {
if err != nil {
releaseConn(err)
return nil, err
}
// Note: ownership of dc passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
}
return rows, nil
}
}
dc.Lock()
si, err := dc.ci.Prepare(query)
dc.Unlock()
if err != nil {
releaseConn(err)
return nil, err
}
ds := driverStmt{dc, si}
rowsi, err := rowsiFromStatement(ds, args...)
if err != nil {
dc.Lock()
si.Close()
dc.Unlock()
releaseConn(err)
return nil, err
}
// Note: ownership of ci passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
closeStmt: si,
}
return rows, nil
}
3.1 db查詢語言分析
3.2 Stmt結(jié)構(gòu)體介紹
// connStmt is a prepared statement on a particular connection.
type connStmt struct {
dc *driverConn
si driver.Stmt
}
// Stmt is a prepared statement.
// A Stmt is safe for concurrent use by multiple goroutines.
type Stmt struct {
// Immutable:
db *DB // where we came from
query string // that created the Stmt
stickyErr error // if non-nil, this error is returned for all operations
closemu sync.RWMutex // held exclusively during close, for read otherwise.
// If in a transaction, else both nil:
tx *Tx
txsi *driverStmt
mu sync.Mutex // protects the rest of the fields
closed bool
// css is a list of underlying driver statement interfaces
// that are valid on particular connections. This is only
// used if tx == nil and one is found that has idle
// connections. If tx != nil, txsi is always used.
css []connStmt
// lastNumClosed is copied from db.numClosed when Stmt is created
// without tx and closed connections in css are removed.
lastNumClosed uint64
}