golang源碼閱讀之定時(shí)器以及避坑指南

歡迎訪問我的個(gè)人網(wǎng)站獲取更佳閱讀排版體驗(yàn): golang源碼閱讀之定時(shí)器以及避坑指南 | yoko blog (https://pengrl.com/p/62835/)

本文分為三部分:
第一部分為閱讀源碼后的總結(jié)。
第二部分為高性能場(chǎng)景使用定時(shí)器需要注意的地方。
第三部分為系統(tǒng)庫(kù)源碼以及我寫的注釋。

本文基于go version 1.11.4

先放總結(jié)

所有業(yè)務(wù)層的timer對(duì)象都被底層的全局容器變量所持有及管理。這里說的全局容器是一個(gè)桶(bucket)數(shù)組,數(shù)組大小固定為64,數(shù)組的每個(gè)元素為一個(gè)桶對(duì)象,每個(gè)桶內(nèi)包含一個(gè)最小堆和一個(gè)loop循環(huán)協(xié)程(以下簡(jiǎn)稱桶協(xié)程)。

timer對(duì)象歸哪個(gè)桶管理取決于申請(qǐng)?jiān)搕imer對(duì)象時(shí)G所在的P(通過P的id取余64作為桶數(shù)組下標(biāo))。
(關(guān)于golang線程調(diào)度模型中G P M的概念超出了本文的討論范圍。這里只簡(jiǎn)單理解G為當(dāng)前goroutine,P為當(dāng)前goroutine所屬的任務(wù)隊(duì)列。)

由于hash算法和P的id相關(guān),所以一個(gè)程序最多有min(64, GOMAXPROCS)個(gè)桶在使用。
另外,和桶一對(duì)一關(guān)聯(lián)的桶協(xié)程是懶開啟的,只在桶被初次使用時(shí)(即有timer對(duì)象hash到了這個(gè)桶)才開啟,開啟后桶協(xié)程內(nèi)部的循環(huán)永遠(yuǎn)不會(huì)退出。

不將桶數(shù)量直接設(shè)置為GOMAXPROCS是因?yàn)槟菢拥脑挃?shù)組需要?jiǎng)討B(tài)申請(qǐng)。
桶數(shù)量設(shè)置為64是權(quán)衡在不同環(huán)境下(GOMAXPROCS不同)內(nèi)存使用以及性能間的一種經(jīng)驗(yàn)值。

每個(gè)桶都有一個(gè)最小堆,根據(jù)桶內(nèi)所有timer的超時(shí)觸發(fā)絕對(duì)時(shí)間點(diǎn)做調(diào)整。
關(guān)于數(shù)據(jù)結(jié)構(gòu)最小堆的詳細(xì)介紹讀者可以自行查找資料,這里你只需要知道堆的底層使用數(shù)組實(shí)現(xiàn),插入和刪除的時(shí)間復(fù)雜度都是O(logn),并且插入和刪除后,最小堆始終保持最小的元素在堆頂位置,所以獲取最小元素是O(1)的。
事實(shí)上,golang定時(shí)器中的最小堆使用的是四叉樹實(shí)現(xiàn),相較于常見的二叉樹實(shí)現(xiàn),在節(jié)點(diǎn)數(shù)量比較多時(shí),四叉樹對(duì)底層數(shù)組的訪問路徑的局部性更好,CPU cache更友好些。

當(dāng)桶內(nèi)沒timer時(shí),桶協(xié)程被掛起。即rescheduling狀態(tài)。
當(dāng)桶內(nèi)還有timer時(shí),桶內(nèi)協(xié)程睡眠直到最小超時(shí)觸發(fā)時(shí)間點(diǎn)后再喚醒。即sleeping狀態(tài)。
當(dāng)往桶內(nèi)加入新timer而該timer的超時(shí)觸發(fā)時(shí)間點(diǎn)正好是當(dāng)前桶內(nèi)最小的,則喚醒桶協(xié)程。讓桶協(xié)程重新判斷,設(shè)置新的最小超時(shí)觸發(fā)時(shí)間點(diǎn)后進(jìn)入sleeping狀態(tài)。

由于桶數(shù)量是固定的,所以hash桶的操作是無(wú)鎖的。
但是桶內(nèi)有互斥鎖,因?yàn)?code>桶協(xié)程和業(yè)務(wù)層調(diào)用Timer的接口可能并行操作桶內(nèi)的最小堆和各種標(biāo)志等變量。

使用timer時(shí),以下幾點(diǎn)開銷要做到心里有數(shù),桶內(nèi)互斥鎖的開銷,最小堆容器管理的開銷,協(xié)程調(diào)度的開銷,創(chuàng)建timer對(duì)象時(shí)、超時(shí)觸發(fā)返回當(dāng)前時(shí)間時(shí)、桶協(xié)程內(nèi)部都會(huì)有獲取當(dāng)前時(shí)間調(diào)用的開銷。

高性能場(chǎng)景如何使用

閱讀源碼的目的,是學(xué)習(xí)別人寫的好的地方,以及保證正確的使用姿勢(shì)。

你能看出下面這段偽代碼存在的問題嗎?

func consume() {
  t := new time.NewTimer(5 * time.Second)
  select {
    case <- ch:
      // 做相關(guān)的業(yè)務(wù)
    case <- t:
      // 超時(shí)了,做超時(shí)處理
  }
}

這是timer常見的一種用法,為某個(gè)消費(fèi)者設(shè)置消費(fèi)超時(shí)時(shí)間。
如果在超時(shí)時(shí)間內(nèi)消費(fèi)ch成功了,則timer對(duì)象在業(yè)務(wù)層沒有被觸發(fā)。
那么問題來了,底層從最小堆中刪除timer只有兩種情況,要么在業(yè)務(wù)層顯式調(diào)用Stop方法停止定時(shí)器,要么底層判斷timer已經(jīng)到達(dá)超時(shí)觸發(fā)時(shí)間。剛才這種情況,底層只能等到超時(shí)觸發(fā)時(shí)間(偽代碼中為5秒后)才能從容器中移除該timer。即資源被延時(shí)釋放了。
作為寫業(yè)務(wù)層代碼的人,很可能會(huì)誤認(rèn)為業(yè)務(wù)層已經(jīng)不再使用且不再持有該timer了,資源就被釋放了。
如果我們的生產(chǎn)消費(fèi)非常的頻繁,底層容器將堆積大量的timer,從而浪費(fèi)大量?jī)?nèi)存和CPU資源。

另外,假設(shè)你在其它場(chǎng)景使用了time.Ticker(不同于Timer只在超時(shí)后觸發(fā)一次,Ticker將周期性觸發(fā)超時(shí))而沒有調(diào)用Stop(即使業(yè)務(wù)層已不再持有Ticker對(duì)象了),情況將更糟糕,底層容器將一直持有Ticker對(duì)象,并周期性觸發(fā)超時(shí),然后修改下次超時(shí)時(shí)間點(diǎn)。資源將永遠(yuǎn)得不到釋放,內(nèi)存和CPU將永久性的泄漏。

正確的做法應(yīng)該是:

Ticker對(duì)象不再使用后,顯式調(diào)用Stop方法。

Timer對(duì)象不再使用后,在高性能場(chǎng)景下,也應(yīng)該顯式調(diào)用Stop方法,及時(shí)釋放資源。
那么這又分為兩種情況,Timer是否已經(jīng)在業(yè)務(wù)層觸發(fā)超時(shí)了。
通過閱讀系統(tǒng)庫(kù)源碼我們可以得知,對(duì)已超時(shí)的Timer調(diào)用Stop方法內(nèi)部有變量保護(hù),是安全的。但是這種保護(hù)需要拿一次桶內(nèi)的互斥鎖,高性能場(chǎng)景下也需要考慮這個(gè)消耗。
所以正確釋放Timer對(duì)象的做法是,簡(jiǎn)單點(diǎn)就在上面?zhèn)未a的select結(jié)束后統(tǒng)一調(diào)用Stop,精細(xì)點(diǎn)就在ch得到消費(fèi)時(shí)調(diào)用Stop。

我之后會(huì)再寫一篇文章,關(guān)于在某些特定場(chǎng)景下如何自己實(shí)現(xiàn)一個(gè)簡(jiǎn)易timer,犧牲部分我們不需要的精確度來大幅提高超時(shí)業(yè)務(wù)邏輯的性能。

部分源碼的說明

涉及到文件為:

  • src/time/sleep.go
  • src/time/tick.go
  • src/runtime/time.go
  • 其它一些runtime中的代碼

首先看time/sleep.go,里面有time.Timer的實(shí)現(xiàn),time.Timer比較簡(jiǎn)單,只是對(duì)runtime包中timer的一層wrap。這層自身實(shí)現(xiàn)的最核心功能是將底層的超時(shí)回調(diào)轉(zhuǎn)換為發(fā)送channel消息。

// 這里可以看到是對(duì)runtimeTimer的wrap
type Timer struct {
    C <-chan Time
    r runtimeTimer
}

func NewTimer(d Duration) *Timer {
    // 注意,這里的channel是帶緩沖的,保證了業(yè)務(wù)層如果不接收這個(gè)channel,底層的
    // 桶協(xié)程不會(huì)因?yàn)榘l(fā)送channel而被阻塞
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),
            // 向底層timer傳入sendTime回調(diào)函數(shù)
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

// 將底層的超時(shí)回調(diào)轉(zhuǎn)化為channel發(fā)送,并寫入了當(dāng)前時(shí)間
func sendTime(c interface{}, seq uintptr) {
    // Non-blocking send of time on c.
    // Used in NewTimer, it cannot block anyway (buffer).
    // Used in NewTicker, dropping sends on the floor is
    // the desired behavior when the reader gets behind,
    // because the sends are periodic.
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

// After就是匿名Timer
func After(d Duration) <-chan Time {
    return NewTimer(d).C
}

接下來我們看runtime/time.go

// timer結(jié)構(gòu)體
type timer struct {
    tb *timersBucket // timer所屬的桶
    i  int           // 最小堆中的下標(biāo),為-1時(shí)則不可用了

    // Timer wakes up at when, and then at when+period, ... (period > 0 only)
    // each time calling f(arg, now) in the timer goroutine, so f must be
    // a well-behaved function and not block.
    when   int64 // 超時(shí)時(shí)間點(diǎn)
    period int64 // 如果是Ticker,會(huì)有這個(gè)值,周期性觸發(fā)
    f      func(interface{}, uintptr) // 回調(diào)
    arg    interface{} // time.Timer會(huì)傳入channel變量,一會(huì)回調(diào)時(shí)把channel帶回去
    seq    uintptr // 這個(gè)變量目前沒有用
}

// 桶數(shù)量固定為64
const timersLen = 64

// 全局桶數(shù)組,還對(duì)cache偽共享做了優(yōu)化
var timers [timersLen]struct {
    timersBucket

    // The padding should eliminate false sharing
    // between timersBucket values.
    pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}

// addTimer時(shí),首先P id取余64獲取timer所屬的bucket
func (t *timer) assignBucket() *timersBucket {
    id := uint8(getg().m.p.ptr().id) % timersLen
    t.tb = &timers[id].timersBucket
    return t.tb
}

func (tb *timersBucket) addtimerLocked(t *timer) bool {
    // 負(fù)數(shù)參數(shù)保護(hù)性代碼
    if t.when < 0 {
        t.when = 1<<63 - 1
    }
    // 最小堆插入操作
    t.i = len(tb.t)
    tb.t = append(tb.t, t)
    if !siftupTimer(tb.t, t.i) {
        return false
    }
    // 下標(biāo)為0,說明該timer的觸發(fā)時(shí)間為當(dāng)前桶中最早的
    if t.i == 0 {
        // 桶協(xié)程在sleep,喚醒它
        if tb.sleeping {
            tb.sleeping = false
            notewakeup(&tb.waitnote)
        }
        // 桶協(xié)程被掛起了,重新調(diào)度
        if tb.rescheduling {
            tb.rescheduling = false
            goready(tb.gp, 0)
        }
    }
    // 如果timer所屬的桶還沒有創(chuàng)建,創(chuàng)建并開啟桶協(xié)程
    if !tb.created {
        tb.created = true
        go timerproc(tb)
    }
    return true
}

// 桶協(xié)程,注意,這里有兩層for循環(huán),最外面的for是永遠(yuǎn)不會(huì)退出的
func timerproc(tb *timersBucket) {
    tb.gp = getg()
    for {
        // 進(jìn)互斥鎖
        lock(&tb.lock)
        // 睡眠標(biāo)志修改
        tb.sleeping = false
        // 獲取當(dāng)前時(shí)間
        now := nanotime()
        delta := int64(-1)
        for {
            // 如果桶內(nèi)沒有timer,直接退出內(nèi)層for
            if len(tb.t) == 0 {
                delta = -1
                break
            }
            // 獲取最早觸發(fā)timer,并檢查是否到達(dá)觸發(fā)時(shí)間
            t := tb.t[0]
            delta = t.when - now
            // 還沒到時(shí)間,直接退出內(nèi)層for
            if delta > 0 {
                break
            }
            ok := true
            // 如果是period有值,說明需要周期性觸發(fā),我們將該timer修改觸發(fā)時(shí)間后,重新
            // 插入最小堆中
            if t.period > 0 {
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                if !siftdownTimer(tb.t, 0) {
                    ok = false
                }
            } else {
                // 從最小堆中刪除
                last := len(tb.t) - 1
                if last > 0 {
                    tb.t[0] = tb.t[last]
                    tb.t[0].i = 0
                }
                tb.t[last] = nil
                tb.t = tb.t[:last]
                if last > 0 {
                    if !siftdownTimer(tb.t, 0) {
                        ok = false
                    }
                }
                // 下標(biāo)設(shè)置為-1,deltimer時(shí)發(fā)現(xiàn)下標(biāo)為-1則不用刪除了
                t.i = -1 // mark as removed
            }
            // 把t中變量拷貝出來,就可以出鎖了
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&tb.lock)
            // 堆調(diào)整時(shí)如果下標(biāo)設(shè)置越界了,則丟到這里來處理,badTimer會(huì)直接panic
            if !ok {
                badTimer()
            }
            // 如果開了race檢查的話
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&tb.lock)
        }
        // 如果桶內(nèi)沒有timer了,把協(xié)程掛起
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            tb.rescheduling = true
            goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
            continue
        }
        // At least one timer pending. Sleep until then.
        // 如果還有協(xié)程,睡眠直到桶內(nèi)最早觸發(fā)時(shí)間點(diǎn)到達(dá)后喚醒
        tb.sleeping = true
        tb.sleepUntil = now + delta
        noteclear(&tb.waitnote)
        unlock(&tb.lock)
        notetsleepg(&tb.waitnote, delta)
    }
}

// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
    if t.tb == nil {
        // t.tb can be nil if the user created a timer
        // directly, without invoking startTimer e.g
        //    time.Ticker{C: c}
        // In this case, return early without any deletion.
        // See Issue 21874.
        return false
    }

    tb := t.tb

    lock(&tb.lock)
    // t may not be registered anymore and may have
    // a bogus i (typically 0, if generated by Go).
    // Verify it before proceeding.
    i := t.i
    last := len(tb.t) - 1
    // 如果已經(jīng)觸發(fā)過或已經(jīng)被刪除了,則返回false告知調(diào)用方
    if i < 0 || i > last || tb.t[i] != t {
        unlock(&tb.lock)
        return false
    }
    if i != last {
        tb.t[i] = tb.t[last]
        tb.t[i].i = i
    }
    tb.t[last] = nil
    tb.t = tb.t[:last]
    ok := true
    if i != last {
        if !siftupTimer(tb.t, i) {
            ok = false
        }
        if !siftdownTimer(tb.t, i) {
            ok = false
        }
    }
    unlock(&tb.lock)
    if !ok {
        badTimer()
    }
    return true
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容