Golang連接池

package controllers

import (
    "io"
    "sync"
    "time"
    "errors"
    "fmt"
)

var (
    ErrInvalidConfig = errors.New("invalid pool config")//error().Error()//errors.New("invalid pool config")
    ErrPoolClosed    = errors.New("pool closed")
)

// 1、如果連接時數(shù)量已經(jīng)滿了,等待,等待一段時間后重新獲取連接,嘗試幾次后返回連接失敗錯誤,需要開啟線程維護超時連接
// 2、資源釋放,每次連接完后,釋放連接,回歸到連接池,連接池中空閑連接設置最大時間,空閑時間過長,斷開連接,保留最新連接數(shù)
type PoolInterface interface {
    Acquire() (io.Closer, error) // 獲取資源
    Release(io.Closer) error     // 釋放資源
    Close(io.Closer) error       // 關閉資源
    Shutdown() error             // 關閉池
}

type factory func()(io.Closer, error)

type GenericPool struct {
    s sync.Mutex // 互斥鎖
    Pool chan io.Closer  // 緩存channel
    maxOpen int   // 最大連接數(shù)
    minOpen int   // 最小連接數(shù)
    NumOpen int   // 當前連接數(shù)
    closed bool   // 池是否關閉
    maxIdle int  // 最大空閑數(shù)
    maxLifetime time.Duration // 最大生命周期
    factory factory  // 創(chuàng)建連接
}



func NewGenericPool(maxIdle,minOpen int, maxOpen int, maxLifetime time.Duration, factory factory)(*GenericPool, error) {
    if maxOpen <= 0 || minOpen > maxOpen {
        return nil, ErrInvalidConfig
    }
    p := &GenericPool{
        sync.Mutex{},
        make(chan io.Closer,maxOpen),
        maxOpen,
        minOpen,
        0,
        false,
        maxIdle,
        maxLifetime,
        factory,
    }
    for i:=0;i < minOpen;i++ {
        closer,err := factory()
        if err != nil {
            continue
        }
        p.NumOpen++
        p.Pool<-closer
        
    }
    p.closeIdleConn()
    return p,nil
}

// 嘗試連接,3次連接失敗返回錯誤
func (p *GenericPool)TryConnect()(io.Closer,error) {
    //NewGenericPool(0,0,0,0,factory)
    var closer io.Closer
    for i:=0;i<3;i++ {
        var err error
        closer,err = p.Acquire()
        if err != nil {
            if i >= 3 {
                fmt.Println("嘗試連接失敗")
                return nil,errors.New("連接失敗")
            }
            time.Sleep(1)
        }
        return closer,nil
    }
    return closer,nil
}

// 獲取連接
func (p *GenericPool)Acquire()(io.Closer,error) {
    if p.closed {
        return nil,ErrPoolClosed
    }

    for {

        closer,err := p.getOrCreate()
        if err != nil {
            // 沒有獲取到連接該如何處理,返回失敗,然后等待幾秒后重新Acquire,如果多次Acquire已經(jīng)失敗直接返回報錯信息
            fmt.Println("超過最大數(shù),創(chuàng)建失敗")
            return nil,err
        }

        //todo maxlifetime處理

        return closer,nil
    }
}

func (p *GenericPool)getOrCreate()(io.Closer,error)  {
    // 監(jiān)聽channel相關的IO操作
    select{
    case closer:= <- p.Pool:// 如果通道中有數(shù)據(jù)直接返回,沒有數(shù)據(jù)下次訪問時,如果有資源釋放依舊會獲取到
        p.NumOpen ++
        return closer,nil
    default:
        //fmt.Println("獲取池中失敗")
    }

    p.s.Lock()

    // 如果池中當前數(shù)量大于等于最大資源,應該直接返回獲取不到的,不明白為何還有返回,即使返回,為何是不先判斷當前是否空閑狀態(tài)
    if p.NumOpen >= p.maxOpen {

        //closer :=<-p.Pool
        p.s.Unlock()
        //fmt.Println("當前連接數(shù)大于最大連接數(shù)")
        return nil,errors.New("當前連接數(shù)大于最大連接數(shù)")

    }
    // 創(chuàng)建新連接
    closer,err := p.factory()
    if err != nil{
        p.s.Unlock()
        return nil,err
    }
    p.NumOpen ++
    p.s.Unlock()
    fmt.Println("創(chuàng)建新連接")
    return closer,nil
}

// 釋放單個資源到連接池
func (p *GenericPool)Release(closer io.Closer)error  {
    if p.closed {
        return ErrPoolClosed
    }
    p.s.Lock()
    p.Pool <- closer
    p.s.Unlock()
    return nil
}

// 關閉池中長期不使用的連接
func (p *GenericPool)closeIdleConn()  {
    // 開啟子線程監(jiān)控空閑連接
    go func() {
        for {
            time.Sleep(time.Duration(p.maxLifetime)*time.Second)
            if len(p.Pool) > 1 && len(p.Pool) > p.minOpen {
                fmt.Println("3244:",len(p.Pool))
                    for i:=0;i<len(p.Pool)-p.minOpen;i++  {
                        closer := <- p.Pool
                        fmt.Println("清除超時空閑:",i,closer)
                        break
                    }

            }
        }
    }()
}

// 檢測Pool里面空閑的close,取出其中部分數(shù)據(jù)關閉,關閉單個資源
func (p *GenericPool)Close(closer io.Closer)error  {
    p.s.Lock()
    p.Pool<-closer
    p.NumOpen --
    p.s.Unlock()
    return nil
}

// 關閉連接池 釋放所有資源
func (p *GenericPool)ShutDown()error{
    //fmt.Println('2')
    if p.closed {
        return ErrPoolClosed
    }
    close(p.Pool)

    for closer := range p.Pool{
        fmt.Println("1")
            p.NumOpen --
            closer.Close()
            // _,ok := <-closer
    }

    p.closed = true
    //p.s.Unlock()
    return nil
}

func main() {

    GPool,err := controllers.NewGenericPool(10,2,10,5, func() (io.Closer, error){
        //a := io.Closer()
        return nil,nil
    })
    if err != nil {
        fmt.Println("創(chuàng)建失敗")
    }
    fmt.Println("創(chuàng)建成功:",len(GPool.Pool))

    //for i:=0;i<3 ;i++  {
    closer,err :=GPool.TryConnect()
        GPool.TryConnect()
        GPool.TryConnect()
    GPool.TryConnect()
    GPool.TryConnect()
    GPool.TryConnect()
    closer,err = GPool.TryConnect()
    closer,err = GPool.TryConnect()
    closer,err = GPool.TryConnect()
    closer,err = GPool.TryConnect()
    //GPool.TryConnect()
    //GPool.TryConnect()
    //GPool.TryConnect()
    //GPool.TryConnect()
    //GPool.TryConnect()
    //}
    fmt.Println("當前空閑:",len(GPool.Pool),"累積創(chuàng)建:",GPool.NumOpen)
    time.Sleep(3)
    if err != nil {
        fmt.Println("獲取失敗")
    }
    //GPool.ShutDown()
    GPool.Close(closer)
    GPool.Close(closer)
    GPool.Close(closer)
    GPool.Close(closer)
    GPool.Close(closer)
    GPool.Close(closer)
    //GPool.Close(closer)
    fmt.Println("當前空閑1:",len(GPool.Pool),"當前在用1:",GPool.NumOpen)
    //GPool.TryConnect()
    fmt.Println("當前空閑2:",len(GPool.Pool),"當前在用2:",GPool.NumOpen)
    
    conn,err := GPool.TryConnect()
    if err != nil {
        if err != nil {
            fmt.Println("獲取失敗")
        }
    }
    fmt.Printf("%T",conn)
    time.Sleep(time.Duration(1000)*time.Second)
    return
}
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容