Go 語言極速入門10 - 并發(fā)模式之資源池

提供一個資源池,類似于數(shù)據(jù)庫連接池的功能;資源池在 go 1.11.1 中有官方實(shí)現(xiàn):sync/pool.go

一、資源池

package pool

import (
    "sync"
    "io"
    "errors"
    "log"
)

// 聲明池類結(jié)構(gòu)體
type Pool struct {
    // 鎖
    lock sync.Mutex
    // 池中存儲的資源
    resources chan io.Closer
    // 資源創(chuàng)建工廠函數(shù)
    factory func() (io.Closer, error)
    // 池是否已經(jīng)被關(guān)閉
    closed bool
}

// 創(chuàng)建池類實(shí)例的工廠函數(shù)
// 工廠函數(shù)名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size too small");
    }

    return &Pool{
        resources: make(chan io.Closer, size),
        factory:   fn,
    }, nil
}

// 從池中獲取一個資源
func (p *Pool) Acquire() (io.Closer, error) {
    // select - default 經(jīng)典模式,將阻塞形式的 channel 改為了非阻塞,當(dāng) <-p.resources 不能立即返回時,執(zhí)行 default
    // 當(dāng)然,如果沒有 default,那么還是要阻塞在 <-p.resources 上的
    select {
    // 檢查是否有空閑的資源
    case r, ok := <-p.resources:
        log.Println("Acquire:", "Shared Resource")
        if !ok {
            return nil, errors.New("pool already closed")
        }
        return r, nil
    default:
        log.Println("Acquire:", "New Resource")
        // 調(diào)用資源創(chuàng)建函數(shù)創(chuàng)建資源
        return p.factory()
    }
}

// 將一個使用后的資源放回池里
func (p *Pool) Release(r io.Closer) {
    // 注意:Release 和 Close 使用的是同一把鎖,就是說二者同時只能執(zhí)行一個,防止資源池已經(jīng)關(guān)閉了,release 還向資源池放資源
    // 向一個已經(jīng)關(guān)閉的 channel 發(fā)送消息,會發(fā)生 panic: send on closed channel
    p.lock.Lock()
    defer p.lock.Unlock()

    // 如果池已經(jīng)被關(guān)閉,銷毀這個資源
    if p.closed {
        r.Close()
        return
    }

    select {
    // 試圖將這個資源放入隊列
    case p.resources <- r:
        log.Println("Release:", "In Queue")
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// 關(guān)閉資源池,并關(guān)閉所有現(xiàn)有的資源
func (p *Pool) Close() {
    p.lock.Lock()
    defer p.lock.Unlock()

    if p.closed {
        return
    }

    p.closed = true

    // 在清空通道里的資源之前,將通道關(guān)閉
    close(p.resources)

    // 關(guān)閉資源
    for r := range p.resources {
        r.Close()
    }
}

select - default 經(jīng)典模式,將阻塞形式的 channel 改為了非阻塞,當(dāng) <-p.resources 不能立即返回時,執(zhí)行 default;當(dāng)然,如果沒有 default,那么還是要阻塞在 <-p.resources 上的

二、具體的資源類

package db

import (
    "log"
    "io"
    "sync/atomic"
)

// 給每個連接分配一個獨(dú)一無二的id
var idCounter int32

// 資源 - 數(shù)據(jù)庫連接
type DBConnection struct {
    ID int32
}

// dbConnection 實(shí)現(xiàn)了 io.Closer 接口
// 關(guān)閉資源
func (conn *DBConnection) Close() error {
    log.Println("conn closed")
    return nil
}

// 創(chuàng)建一個資源 - dbConnection
func CreateConn() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create conn, id:", id)
    return &DBConnection{
        ID: id,
    }, nil
}

三、使用資源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的數(shù)量
    pooledResources = 2 // 池中的資源的數(shù)量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 獲取連接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用結(jié)束后,釋放鏈接
    defer p.Release(conn)

    // 該 log 模擬對連接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 創(chuàng)建一個 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 開啟 goroutine 執(zhí)行任務(wù)
    for query := 0; query < maxGoroutines; query++ {
        // 每個goroutine需要自己復(fù)制一份要、查詢值的副本,
        // 不然所有的查詢會共享同一個查詢變量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于測試從 resources channel 中獲取資源
    }

    // 3. 關(guān)閉連接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并發(fā)的創(chuàng)建 goroutine 的情況下,從 pool.go # Acquire 方法中可以看到,大家可能都還沒有 Release 資源,此時都會創(chuàng)建資源,資源在一瞬間會大量增加,在實(shí)際系統(tǒng)中,需要根據(jù)需求,做一些措施,例如提前創(chuàng)建好資源放入池中,goroutine 都從池中取資源,資源不夠就等待,使用完之后就放入池中,防止資源意外關(guān)閉,還可以啟用后臺線程監(jiān)控等。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容