引言
Go內(nèi)置了數(shù)據(jù)庫(kù)相關(guān)的庫(kù) - database/sql,實(shí)現(xiàn)數(shù)據(jù)庫(kù)操作相關(guān)的接口,其中還包含一個(gè)很重要的功能 - 連接池,用來(lái)實(shí)現(xiàn)連接的復(fù)用,限制連接的數(shù)量,從而最大程度的復(fù)用連接,提高性能,避免連接數(shù)量失控,導(dǎo)致資源消耗不可控。
本文借Go內(nèi)置的database/sql庫(kù),來(lái)一起學(xué)習(xí)如何一步步設(shè)計(jì)包含連接池的數(shù)據(jù)庫(kù)組件,包括模型抽象、連接復(fù)用,以及如何管理連接數(shù)。
設(shè)計(jì)
模型抽象
首先,我們要對(duì)解決領(lǐng)域進(jìn)行抽象。
我們目標(biāo)是設(shè)計(jì)一個(gè)數(shù)據(jù)庫(kù)連接組件,所以第一個(gè)對(duì)象模型很明確 - 數(shù)據(jù)庫(kù)對(duì)象, 我們將數(shù)據(jù)庫(kù)對(duì)象抽象為一個(gè)DB的結(jié)構(gòu)體,一個(gè)對(duì)象對(duì)應(yīng)的是一個(gè)數(shù)據(jù)庫(kù)實(shí)例,所以DB必須是單例。
其次,數(shù)據(jù)庫(kù)需要連接,所以可以對(duì)連接進(jìn)行抽象,命名為Conn,這里我們不關(guān)心Conn的屬性,而是關(guān)心行為,所以Conn類型定義成一個(gè)interface,包含所需兩個(gè)方法:預(yù)處理方法Prepare和關(guān)閉連接方法Close
(注:Prepare方法不再繼續(xù)展開,實(shí)際上就是接收一個(gè)sql,返回一個(gè)實(shí)現(xiàn)Stmt接口的預(yù)處對(duì)象,接著設(shè)置一下參數(shù),最后執(zhí)行數(shù)據(jù)庫(kù)操作)。
由于不同的數(shù)據(jù)庫(kù)連接的實(shí)現(xiàn)方式會(huì)有不同,這時(shí)就要考慮隔離變化,對(duì)連接的方式進(jìn)行抽象,定義一個(gè)連接接口 - Connector,用來(lái)創(chuàng)建連接(依賴倒置原則),當(dāng)初始化DB的時(shí)候再將具體實(shí)現(xiàn)注入到DB對(duì)象中(也就是依賴注入)。
最終我們可以得到以下幾個(gè)接口和結(jié)構(gòu)體:
// 數(shù)據(jù)庫(kù)對(duì)象
type DB struct {
connector Connector
}
type Conn interface {
Prepare(query string) (Stmt, error)
Close() error
}
// 數(shù)據(jù)庫(kù)連接接口
type Connector interface {
Connect(context.Context) (Conn, error)
}
最后,我們給DB對(duì)象增加一個(gè)獲取連接的方法Conn,在不考慮連接池的情況下,調(diào)用connector.Connect(ctx)直接獲取連接:
// 獲取一個(gè)連接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
return db.connector.Connect(ctx)
}
連接復(fù)用
當(dāng)不考慮到連接池時(shí),上面的實(shí)現(xiàn)基本滿足需求,但是為了提高性能,連接池還是必須的,接下來(lái)我們開始設(shè)計(jì)連接池的功能。
第一步,需要考慮存儲(chǔ)空閑連接,這里采用的是切片來(lái)存儲(chǔ):freeConn []*Conn。
接著,需要考慮屬性要定義在哪?!空閑的連接需要能被DB實(shí)例中的不同方法訪問(wèn)到,所以我們把freeConn定義為DB的一個(gè)屬性,同時(shí)考慮到對(duì)freeConn的訪問(wèn)會(huì)存在并發(fā)安全的問(wèn)題,需要增加一個(gè)鎖mu sync.Mutex來(lái)保護(hù):
// 數(shù)據(jù)庫(kù)對(duì)象
type DB struct {
connector Connector
mu sync.Mutex // protects following fields
freeConn []*Conn
}
數(shù)據(jù)庫(kù)連接獲取方法Conn就需要修改為:
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
db.mu.Lock() // 加鎖保護(hù)freeConn
numFree := len(db.freeConn)
if numFree > 0 {
conn := db.freeConn[0]
// 移除第一個(gè)連接
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
db.mu.Unlock()
return conn, nil
}
db.mu.Unlock()
return db.connector.Connect(ctx) // 沒(méi)有空閑連接,重新創(chuàng)建
}
連接要復(fù)用,就不能關(guān)閉,用完需要放回連接池中,所以DB需要一個(gè)將連接放回連接池的方法 - putConn:
func (db *DB) putConn(dc Conn) {
db.mu.Lock()
db.freeConn = append(db.freeConn, dc)
db.mu.Unlock()
}
但是,putConn方法要在怎么被調(diào)用?!不能因?yàn)樵黾恿诉B接池功,讓客戶端的改變使用方式,所以我們應(yīng)該考慮將putConn調(diào)用放到一個(gè)合理的地方 - Conn的Close()方法中。
在沒(méi)有連接池功能的時(shí)候,一個(gè)Conn用完了就一定會(huì)調(diào)用Close()釋放資源,有連接池功能后,就不再是直接關(guān)閉連接,而是釋放到連接池中,提供給后續(xù)請(qǐng)求使用。
對(duì)此,我們對(duì)Conn進(jìn)行改造,增加一層代理,命名為PConn,將原來(lái)的Conn作為PConn的一個(gè)屬性,同時(shí)實(shí)現(xiàn)了Conn接口的兩個(gè)方法:Prepare和Close,這樣我們就可以對(duì)Close方法進(jìn)行攔截,修改它的行為:
type PConn interface {
db *DB
ci Conn
}
func (pc *PConn) Close() error {
dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
return pc.ci.Prepare(query)
}
接著我們調(diào)整一下Conn的創(chuàng)建方法(也可以重新實(shí)現(xiàn)Connector接口,返回的是PConn實(shí)例,然后將新的實(shí)現(xiàn)注入到DB實(shí)例中)
func (db *DB) Conn(ctx context.Context) (Conn, error) {
...
c , err := db.connector.Connect(ctx) // 沒(méi)有空閑連接,重新創(chuàng)建
if err!=nil {
return nil, err
}
return &PConn{
ci: c,
db: db,
}
}
這樣當(dāng)PConn使用完后調(diào)用Close方法,就不再是關(guān)閉連接,而是將連接釋放到DB對(duì)象的freeConn中,由于客戶端定義的類型是Conn接口,并且PConn也實(shí)現(xiàn)了Conn接口,所以無(wú)需調(diào)整(符合開閉原則)
到此,連接復(fù)用初步完成了,接著就得考慮另外一個(gè)核心功能 :連接數(shù)量管理。
連接數(shù)量管理
目前的Conn發(fā)現(xiàn)沒(méi)有連接的時(shí)候是調(diào)用connector.Connect方法直接創(chuàng)建新連接,沒(méi)法控制連接的數(shù)量,這樣很明顯是不合理,無(wú)限創(chuàng)建連接可能會(huì)導(dǎo)致資源耗盡,資源消耗曲線過(guò)陡峭,所以我們需要:
- 限制連接數(shù)量。將連接的數(shù)量約束在指定的范圍內(nèi)
- 連接請(qǐng)求隊(duì)列。當(dāng)連接數(shù)量達(dá)到最大值時(shí),連接請(qǐng)求需要需要放到等待隊(duì)列中,等待有連接釋放。
首先,需要有地方保存當(dāng)前連接數(shù)量和最大連接數(shù),并且要能被對(duì)象中的不同方法訪問(wèn)到,所以我們可以給DB增加兩個(gè)屬性:
type DB struct {
numOpen int // number of opened and pending open connections
maxOpen int // <= 0 means unlimited
}
接著,需要設(shè)計(jì)當(dāng)有請(qǐng)求連接時(shí),發(fā)現(xiàn)沒(méi)有空閑連接,并且連接數(shù)量等于maxOpen時(shí)要怎么辦?
Database/sql里是采用一個(gè)map來(lái)存儲(chǔ)(注:這個(gè)有點(diǎn)奇怪,為什么不用隊(duì)列?),在DB結(jié)構(gòu)體增加一個(gè)屬性:connRequests,類型為:map[uint64]chan connRequest,其中key/value:
- key :請(qǐng)求唯一標(biāo)識(shí)。調(diào)用nextRequestKeyLocked方法生成,實(shí)際上就是一個(gè)自增的序列,只是為了保持唯一
- value:等待連接的請(qǐng)求。類型為chan,每個(gè)請(qǐng)求封裝為一個(gè)chan,可以保證并發(fā)安全,同時(shí)也可以利用其阻塞特性(當(dāng)chan沒(méi)有值時(shí)阻塞等待),chan接收的數(shù)據(jù)類型為connRequest格式,當(dāng)其他協(xié)程有釋放連接時(shí),會(huì)將連接放到一個(gè)connRequest對(duì)象中發(fā)送給該chan,connRequest只包含兩個(gè)屬性:conn和err,用來(lái)接收返回連接或是異常。
代碼如下:
type DB struct {
...
numOpen int // number of opened and pending open connections
maxOpen int // <= 0 means unlimited
nextRequest uint64 // Next key to use in connRequests.
connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}
// 連接請(qǐng)求
type connRequest struct {
conn *PConn
err error
}
調(diào)整獲取連接的方法Conn(ctx context.Context) 邏輯:
- 判斷freeConn是否有空閑連接,有就返回
- 判斷連接數(shù)量numOpen是否大于maxOpen,如果還小于maxOpen,說(shuō)明還可以創(chuàng)建新連接,創(chuàng)建連接后numOpen++,返回連接
- 當(dāng)numOpen已經(jīng)是大于等于maxOpen,就不能再創(chuàng)建新連接,這是就把請(qǐng)求放到集合connRequests中,等待連接釋放。
func (db *DB) Conn(ctx context.Context) (Conn, error) {
db.mu.Lock() // 加鎖保護(hù)freeConn
numFree := len(db.freeConn)
if numFree > 0 { // 有空閑連接
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
db.mu.Unlock()
return conn, nil
}
// 連接數(shù)已經(jīng)超出
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1) // 創(chuàng)建一個(gè)chan,接收連接
reqKey := db.nextRequestKeyLocked() // 生成唯一序號(hào)
db.connRequests[reqKey] = req // 放到全局屬性,讓其他方法能訪問(wèn)到
db.mu.Unlock()
select {
case <-ctx.Done(): //超時(shí)等
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey) // 移除
db.mu.Unlock()
}
case ret, ok := <-req: // 收到連接釋放
if !ok {
return nil, errDBClosed
}
return ret.conn, ret.err
}
}
// 連接數(shù)沒(méi)超出,可以創(chuàng)建新連接
db.numOpen++ // optimistically
db.mu.Unlock()
c, err := db.connector.Connect(ctx) // 重新創(chuàng)建
if err != nil {
return nil, err
}
return &PConn{
ci: c,
db: db,
}, nil
}
接著,我們還需要調(diào)整連接釋放方法 - putConn:
- 增加一個(gè)bool返回值,告訴調(diào)用方連接是否釋放成功(如果失敗,客戶端可以決定關(guān)閉連接)
- 如果連接數(shù)numOpen大于 maxOpen時(shí),當(dāng)前連接直接丟棄,返回false
- 當(dāng)len(db.connRequests)大于0時(shí),需要考慮將連接優(yōu)先給db.connRequests中的請(qǐng)求
- 最后才將連接放入空閑列表中。
func (db *DB) putConn(dc Conn) bool{
db.mu.Lock()
defer db.mu.Unlock()
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 當(dāng)有等待連接的請(qǐng)求時(shí)
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
req <- connRequest{
conn: dc,
err: err,
}
return true
}
// 放入空閑連接池中
db.freeConn = append(db.freeConn, dc)
return true
}
PConn的close方法也要稍微調(diào)整一下,如果釋放連接失敗,需要把連接關(guān)閉
func (pc *PConn) Close() error {
ok := dc.db.putConn(pc)
if !ok {
dc.ci.Close()
}
}
總結(jié)
到目前為止,一個(gè)較完整的包含連接池的數(shù)據(jù)庫(kù)組件完成了,上述代碼是參照database/sql設(shè)計(jì),去除了一些非核心的代碼,通過(guò)對(duì)它的研究,可以學(xué)到:
- 如何對(duì)問(wèn)題域進(jìn)行抽象。數(shù)據(jù)庫(kù)抽象成DB、連接為Conn,連接器Connector等
- 如何命名一些變量。 connRequest(請(qǐng)求連接)、 freeConn(請(qǐng)求連接)等
- 如何使用context。實(shí)現(xiàn)超時(shí)等
- 并發(fā)編程。chan、mutex、全局屬性訪問(wèn),資源約束等
我的博客:https://itart.cn/blogs/2021/explore/database-sql-pool.html