go自從出生就身帶“高并發(fā)”的標簽,其并發(fā)編程就是由groutine實現(xiàn)的,因其消耗資源低,性能高效,開發(fā)成本低的特性而被廣泛應用到各種場景,例如服務端開發(fā)中使用的HTTP服務,在golang net/http包中,每一個被監(jiān)聽到的tcp鏈接都是由一個groutine去完成處理其上下文的,由此使得其擁有極其優(yōu)秀的并發(fā)量吞吐量
for {
? ? ? ? // 監(jiān)聽tcp
? ? ? ? rw, e := l.Accept()
? ? ? ? if e != nil {
? ? ? ? ? ? .......
? ? ? ? }
? ? ? ? tempDelay = 0
? ? ? ? c := srv.newConn(rw)
? ? ? ? c.setState(c.rwc, StateNew) // before Serve can return
? ? ? ? // 啟動協(xié)程處理上下文
? ? ? ? go c.serve(ctx)
}
雖然創(chuàng)建一個groutine占用的內(nèi)存極小(大約2KB左右,線程通常2M左右),但是在實際生產(chǎn)環(huán)境無限制的開啟協(xié)程顯然是不科學的,比如上圖的邏輯,如果來幾千萬個請求就會開啟幾千萬個groutine,當沒有更多內(nèi)存可用時,go的調(diào)度器就會阻塞groutine最終導致內(nèi)存溢出乃至嚴重的崩潰,所以本文將通過實現(xiàn)一個簡單的協(xié)程池,以及剖析幾個開源的協(xié)程池源碼來探討一下對groutine的并發(fā)控制以及多路復用的設計和實現(xiàn)。
一個簡單的協(xié)程池
主播管理系統(tǒng)中信息不完整的主播找出來然后再到其相對應的直播平臺爬取完整信息并補全,當時考慮到每一個主播的數(shù)據(jù)都要訪問一次直播平臺所以就用應對每一個主播開啟一個groutine去抓取數(shù)據(jù),雖然這個業(yè)務量還遠遠遠遠達不到能造成groutine性能瓶頸的地步,但是心里總是不舒服,于是將其優(yōu)化成從協(xié)程池中控制groutine數(shù)量再開啟爬蟲進行數(shù)據(jù)抓取。思路其實非常簡單,用一個channel當做任務隊列,初始化groutine池時確定好并發(fā)量,然后以設置好的并發(fā)量開啟groutine同時讀取channel中的任務并執(zhí)行,
實現(xiàn)
type SimplePool struct {
? ? wg? sync.WaitGroup
? ? work chan func() //任務隊列
}
func NewSimplePoll(workers int) *SimplePool {
? ? p := &SimplePool{
? ? ? ? wg:? sync.WaitGroup{},
? ? ? ? work: make(chan func()),
? ? }
? ? p.wg.Add(workers)
? ? //根據(jù)指定的并發(fā)量去讀取管道并執(zhí)行
? ? for i := 0; i < workers; i++ {
? ? ? ? go func() {
? ? ? ? ? ? defer func() {
? ? ? ? ? ? ? ? // 捕獲異常 防止waitGroup阻塞
? ? ? ? ? ? ? ? if err := recover(); err != nil {
? ? ? ? ? ? ? ? ? ? fmt.Println(err)
? ? ? ? ? ? ? ? ? ? p.wg.Done()
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }()
? ? ? ? ? ? // 從workChannel中取出任務執(zhí)行
? ? ? ? ? ? for fn := range p.work {
? ? ? ? ? ? ? ? fn()
? ? ? ? ? ? }
? ? ? ? ? ? p.wg.Done()
? ? ? ? }()
? ? }
? ? return p
}
// 添加任務
func (p *SimplePool) Add(fn func()) {
? ? p.work <- fn
}
// 執(zhí)行
func (p *SimplePool) Run() {
? ? close(p.work)
? ? p.wg.Wait()
}