提供一個資源池,類似于數(shù)據(jù)庫連接池的功能;資源池在 go 1.11.1 中有官方實(shí)現(xiàn):sync/pool.go
一、資源池
package pool
import (
"sync"
"io"
"errors"
"log"
)
// 聲明池類結(jié)構(gòu)體
type Pool struct {
// 鎖
lock sync.Mutex
// 池中存儲的資源
resources chan io.Closer
// 資源創(chuàng)建工廠函數(shù)
factory func() (io.Closer, error)
// 池是否已經(jīng)被關(guān)閉
closed bool
}
// 創(chuàng)建池類實(shí)例的工廠函數(shù)
// 工廠函數(shù)名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size too small");
}
return &Pool{
resources: make(chan io.Closer, size),
factory: fn,
}, nil
}
// 從池中獲取一個資源
func (p *Pool) Acquire() (io.Closer, error) {
// select - default 經(jīng)典模式,將阻塞形式的 channel 改為了非阻塞,當(dāng) <-p.resources 不能立即返回時,執(zhí)行 default
// 當(dāng)然,如果沒有 default,那么還是要阻塞在 <-p.resources 上的
select {
// 檢查是否有空閑的資源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, errors.New("pool already closed")
}
return r, nil
default:
log.Println("Acquire:", "New Resource")
// 調(diào)用資源創(chuàng)建函數(shù)創(chuàng)建資源
return p.factory()
}
}
// 將一個使用后的資源放回池里
func (p *Pool) Release(r io.Closer) {
// 注意:Release 和 Close 使用的是同一把鎖,就是說二者同時只能執(zhí)行一個,防止資源池已經(jīng)關(guān)閉了,release 還向資源池放資源
// 向一個已經(jīng)關(guān)閉的 channel 發(fā)送消息,會發(fā)生 panic: send on closed channel
p.lock.Lock()
defer p.lock.Unlock()
// 如果池已經(jīng)被關(guān)閉,銷毀這個資源
if p.closed {
r.Close()
return
}
select {
// 試圖將這個資源放入隊列
case p.resources <- r:
log.Println("Release:", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// 關(guān)閉資源池,并關(guān)閉所有現(xiàn)有的資源
func (p *Pool) Close() {
p.lock.Lock()
defer p.lock.Unlock()
if p.closed {
return
}
p.closed = true
// 在清空通道里的資源之前,將通道關(guān)閉
close(p.resources)
// 關(guān)閉資源
for r := range p.resources {
r.Close()
}
}
select - default 經(jīng)典模式,將阻塞形式的 channel 改為了
非阻塞,當(dāng) <-p.resources 不能立即返回時,執(zhí)行 default;當(dāng)然,如果沒有 default,那么還是要阻塞在 <-p.resources 上的
二、具體的資源類
package db
import (
"log"
"io"
"sync/atomic"
)
// 給每個連接分配一個獨(dú)一無二的id
var idCounter int32
// 資源 - 數(shù)據(jù)庫連接
type DBConnection struct {
ID int32
}
// dbConnection 實(shí)現(xiàn)了 io.Closer 接口
// 關(guān)閉資源
func (conn *DBConnection) Close() error {
log.Println("conn closed")
return nil
}
// 創(chuàng)建一個資源 - dbConnection
func CreateConn() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create conn, id:", id)
return &DBConnection{
ID: id,
}, nil
}
三、使用資源池
package main
import (
"sync"
"github.com/zhaojigang/pool/pool"
"github.com/zhaojigang/pool/db"
"log"
"time"
"math/rand"
)
const (
maxGoroutines = 5 // 要使用的goroutine的數(shù)量
pooledResources = 2 // 池中的資源的數(shù)量
)
func performQuery(query int, p *pool.Pool) {
// 1. 獲取連接
conn, err := p.Acquire()
if err != nil {
log.Println("acquire conn error, ", err)
return
}
// 使用結(jié)束后,釋放鏈接
defer p.Release(conn)
// 該 log 模擬對連接的使用
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}
func main() {
var waitGroup sync.WaitGroup
waitGroup.Add(maxGoroutines)
// 1. 創(chuàng)建一個 Pool
p, err := pool.New(db.CreateConn, pooledResources)
if err != nil {
log.Println("create Pool error")
}
// 2. 開啟 goroutine 執(zhí)行任務(wù)
for query := 0; query < maxGoroutines; query++ {
// 每個goroutine需要自己復(fù)制一份要、查詢值的副本,
// 不然所有的查詢會共享同一個查詢變量,即所有的 goroutine 最后的 query 值都是3
go func(q int) {
performQuery(q, p)
waitGroup.Done()
}(query)
//time.Sleep(1000*time.Millisecond) // 用于測試從 resources channel 中獲取資源
}
// 3. 關(guān)閉連接池
waitGroup.Wait()
p.Close()
log.Println("pool closed - main")
}
在高并發(fā)的創(chuàng)建 goroutine 的情況下,從 pool.go # Acquire 方法中可以看到,大家可能都還沒有 Release 資源,此時都會創(chuàng)建資源,資源在一瞬間會大量增加,在實(shí)際系統(tǒng)中,需要根據(jù)需求,做一些措施,例如提前創(chuàng)建好資源放入池中,goroutine 都從池中取資源,資源不夠就等待,使用完之后就放入池中,防止資源意外關(guān)閉,還可以啟用后臺線程監(jiān)控等。