golang workerpool 源碼閱讀

今天讀了一下 fasthttp 的源碼,其中讀到了 workpool ,做了一些注釋。

package fasthttp

import (
    "net"
    "runtime"
    "strings"
    "sync"
    "time"
)

// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type workerPool struct {
    // Function for serving server connections.
    // It must leave c unclosed.
    WorkerFunc func(c net.Conn) error //注冊的conn 處理函數(shù)

    MaxWorkersCount int //最大的工作協(xié)程數(shù)

    LogAllErrors bool

    MaxIdleWorkerDuration time.Duration //協(xié)程最大的空閑時間,超過了就清理掉,其實就是退出協(xié)程函數(shù) ,退出 go

    Logger Logger

    lock         sync.Mutex
    workersCount int  //當前的工作協(xié)程數(shù)
    mustStop     bool //workpool 停止標記

    ready []*workerChan //準備工作的協(xié)程,記當時還在空閑的協(xié)程

    stopCh chan struct{} //workpool 停止信號

    workerChanPool sync.Pool //避免每次頻繁分配workerChan,使用pool
}

type workerChan struct { //工作協(xié)程
    lastUseTime time.Time
    ch          chan net.Conn // 帶緩沖區(qū) chan 處理完了一個conn 通過for range 再處理下一個,都在一個協(xié)程里面
}

func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch) //定時清理掉協(xié)程  (workerChan)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

func (wp *workerPool) Stop() {
    if wp.stopCh == nil {
        panic("BUG: workerPool wasn't started")
    }
    close(wp.stopCh) //停止
    wp.stopCh = nil

    // Stop all the workers waiting for incoming connections.
    // Do not wait for busy workers - they will stop after
    // serving the connection and noticing wp.mustStop = true.
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready { //清空
        ch.ch <- nil  //使用nil 值來關閉,而不是close,因為 ch 是池化了的,會循環(huán)使用,所以不能close
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
    if wp.MaxIdleWorkerDuration <= 0 {
        return 10 * time.Second
    }
    return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
    // 傳入scratch ,要淘汰的ch, 避免每次分配
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn't serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++ //過期的ch 個數(shù)
    }
    *scratch = append((*scratch)[:0], ready[:i]...) //淘汰的ch,放到scratch
    if i > 0 {
        m := copy(ready, ready[i:]) //把需要保留的ch,平移到前面,并且?guī)紫乱A舻臄?shù)量 m
        for i = m; i < n; i++ {
            ready[i] = nil //把ready 后面的ch淘汰 賦值nil
        }
        wp.ready = ready[:m] //保留的ch到ready
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    for i, ch := range tmp { //淘汰的ch 賦值nil
        ch.ch <- nil
        tmp[i] = nil
    }
}

func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh() //獲取一個協(xié)程
    if ch == nil {
        return false
    }
    ch.ch <- c //傳入 conn 到協(xié)程
    return true
}

var workerChanCap = func() int {
    // Use blocking workerChan if GOMAXPROCS=1.
    // This immediately switches Serve to WorkerFunc, which results
    // in higher performance (under go1.5 at least).
    if runtime.GOMAXPROCS(0) == 1 {
        return 0
    }

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the Serve caller (Acceptor) may lag accepting
    // new connections if WorkerFunc is CPU-bound.
    return 1
}()

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan //ch 是一個conn chan 阻塞的,通過for range 不停的處理不同的conn,可以看做是一個協(xié)程,不停的處理不同的鏈接
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++ //沒有可用的了需要 new
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n] //獲取ch 并且ready - 1
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get() //new 一個,這里的new 其實是在 pool 拿一個workerChan,從這里可以看出基本上只要是頻繁要分配的變量,都使用pool
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() { //新建一個協(xié)程處理
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch) //歸還 workerChan
        }()
    }
    return ch
}

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow() //更新最后使用這個協(xié)程的時間
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false //如果停止了,則上層 停止協(xié)程
    }
    wp.ready = append(wp.ready, ch) //歸還 ch 到ready,這里很巧妙,這樣 getch 的時候就又可以把新的conn放到這個協(xié)程處理
    wp.lock.Unlock()
    return true
}

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch { //不停的獲取 ch(阻塞的chan) ,處理不同的conn,在一個協(xié)程里面
        if c == nil { //接受到nil 值 就break
            break
        }

        if err = wp.WorkerFunc(c); err != nil && err != errHijacked { //注冊的conn鏈接處理函數(shù)
            errStr := err.Error()
            if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
                strings.Contains(errStr, "reset by peer") ||
                strings.Contains(errStr, "i/o timeout")) {
                wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
            }
        }
        if err != errHijacked {
            c.Close()
        }
        c = nil //釋放conn

        if !wp.release(ch) {
            break
        } //如果stop 了就 break
    }

    wp.lock.Lock()
    wp.workersCount-- //釋放此協(xié)程
    wp.lock.Unlock()
}

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容