package controllers
import (
"io"
"sync"
"time"
"errors"
"fmt"
)
var (
ErrInvalidConfig = errors.New("invalid pool config")//error().Error()//errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)
// 1、如果連接時數(shù)量已經(jīng)滿了,等待,等待一段時間后重新獲取連接,嘗試幾次后返回連接失敗錯誤,需要開啟線程維護超時連接
// 2、資源釋放,每次連接完后,釋放連接,回歸到連接池,連接池中空閑連接設置最大時間,空閑時間過長,斷開連接,保留最新連接數(shù)
type PoolInterface interface {
Acquire() (io.Closer, error) // 獲取資源
Release(io.Closer) error // 釋放資源
Close(io.Closer) error // 關閉資源
Shutdown() error // 關閉池
}
type factory func()(io.Closer, error)
type GenericPool struct {
s sync.Mutex // 互斥鎖
Pool chan io.Closer // 緩存channel
maxOpen int // 最大連接數(shù)
minOpen int // 最小連接數(shù)
NumOpen int // 當前連接數(shù)
closed bool // 池是否關閉
maxIdle int // 最大空閑數(shù)
maxLifetime time.Duration // 最大生命周期
factory factory // 創(chuàng)建連接
}
func NewGenericPool(maxIdle,minOpen int, maxOpen int, maxLifetime time.Duration, factory factory)(*GenericPool, error) {
if maxOpen <= 0 || minOpen > maxOpen {
return nil, ErrInvalidConfig
}
p := &GenericPool{
sync.Mutex{},
make(chan io.Closer,maxOpen),
maxOpen,
minOpen,
0,
false,
maxIdle,
maxLifetime,
factory,
}
for i:=0;i < minOpen;i++ {
closer,err := factory()
if err != nil {
continue
}
p.NumOpen++
p.Pool<-closer
}
p.closeIdleConn()
return p,nil
}
// 嘗試連接,3次連接失敗返回錯誤
func (p *GenericPool)TryConnect()(io.Closer,error) {
//NewGenericPool(0,0,0,0,factory)
var closer io.Closer
for i:=0;i<3;i++ {
var err error
closer,err = p.Acquire()
if err != nil {
if i >= 3 {
fmt.Println("嘗試連接失敗")
return nil,errors.New("連接失敗")
}
time.Sleep(1)
}
return closer,nil
}
return closer,nil
}
// 獲取連接
func (p *GenericPool)Acquire()(io.Closer,error) {
if p.closed {
return nil,ErrPoolClosed
}
for {
closer,err := p.getOrCreate()
if err != nil {
// 沒有獲取到連接該如何處理,返回失敗,然后等待幾秒后重新Acquire,如果多次Acquire已經(jīng)失敗直接返回報錯信息
fmt.Println("超過最大數(shù),創(chuàng)建失敗")
return nil,err
}
//todo maxlifetime處理
return closer,nil
}
}
func (p *GenericPool)getOrCreate()(io.Closer,error) {
// 監(jiān)聽channel相關的IO操作
select{
case closer:= <- p.Pool:// 如果通道中有數(shù)據(jù)直接返回,沒有數(shù)據(jù)下次訪問時,如果有資源釋放依舊會獲取到
p.NumOpen ++
return closer,nil
default:
//fmt.Println("獲取池中失敗")
}
p.s.Lock()
// 如果池中當前數(shù)量大于等于最大資源,應該直接返回獲取不到的,不明白為何還有返回,即使返回,為何是不先判斷當前是否空閑狀態(tài)
if p.NumOpen >= p.maxOpen {
//closer :=<-p.Pool
p.s.Unlock()
//fmt.Println("當前連接數(shù)大于最大連接數(shù)")
return nil,errors.New("當前連接數(shù)大于最大連接數(shù)")
}
// 創(chuàng)建新連接
closer,err := p.factory()
if err != nil{
p.s.Unlock()
return nil,err
}
p.NumOpen ++
p.s.Unlock()
fmt.Println("創(chuàng)建新連接")
return closer,nil
}
// 釋放單個資源到連接池
func (p *GenericPool)Release(closer io.Closer)error {
if p.closed {
return ErrPoolClosed
}
p.s.Lock()
p.Pool <- closer
p.s.Unlock()
return nil
}
// 關閉池中長期不使用的連接
func (p *GenericPool)closeIdleConn() {
// 開啟子線程監(jiān)控空閑連接
go func() {
for {
time.Sleep(time.Duration(p.maxLifetime)*time.Second)
if len(p.Pool) > 1 && len(p.Pool) > p.minOpen {
fmt.Println("3244:",len(p.Pool))
for i:=0;i<len(p.Pool)-p.minOpen;i++ {
closer := <- p.Pool
fmt.Println("清除超時空閑:",i,closer)
break
}
}
}
}()
}
// 檢測Pool里面空閑的close,取出其中部分數(shù)據(jù)關閉,關閉單個資源
func (p *GenericPool)Close(closer io.Closer)error {
p.s.Lock()
p.Pool<-closer
p.NumOpen --
p.s.Unlock()
return nil
}
// 關閉連接池 釋放所有資源
func (p *GenericPool)ShutDown()error{
//fmt.Println('2')
if p.closed {
return ErrPoolClosed
}
close(p.Pool)
for closer := range p.Pool{
fmt.Println("1")
p.NumOpen --
closer.Close()
// _,ok := <-closer
}
p.closed = true
//p.s.Unlock()
return nil
}
func main() {
GPool,err := controllers.NewGenericPool(10,2,10,5, func() (io.Closer, error){
//a := io.Closer()
return nil,nil
})
if err != nil {
fmt.Println("創(chuàng)建失敗")
}
fmt.Println("創(chuàng)建成功:",len(GPool.Pool))
//for i:=0;i<3 ;i++ {
closer,err :=GPool.TryConnect()
GPool.TryConnect()
GPool.TryConnect()
GPool.TryConnect()
GPool.TryConnect()
GPool.TryConnect()
closer,err = GPool.TryConnect()
closer,err = GPool.TryConnect()
closer,err = GPool.TryConnect()
closer,err = GPool.TryConnect()
//GPool.TryConnect()
//GPool.TryConnect()
//GPool.TryConnect()
//GPool.TryConnect()
//GPool.TryConnect()
//}
fmt.Println("當前空閑:",len(GPool.Pool),"累積創(chuàng)建:",GPool.NumOpen)
time.Sleep(3)
if err != nil {
fmt.Println("獲取失敗")
}
//GPool.ShutDown()
GPool.Close(closer)
GPool.Close(closer)
GPool.Close(closer)
GPool.Close(closer)
GPool.Close(closer)
GPool.Close(closer)
//GPool.Close(closer)
fmt.Println("當前空閑1:",len(GPool.Pool),"當前在用1:",GPool.NumOpen)
//GPool.TryConnect()
fmt.Println("當前空閑2:",len(GPool.Pool),"當前在用2:",GPool.NumOpen)
conn,err := GPool.TryConnect()
if err != nil {
if err != nil {
fmt.Println("獲取失敗")
}
}
fmt.Printf("%T",conn)
time.Sleep(time.Duration(1000)*time.Second)
return
}