Go-Timer源碼解讀

image

前言

在初學Go定時任務之時,腦海中始終有一個問題在徘徊,究竟是每個任務都有一個goroutine去監(jiān)控,還是多個任務處于同一個隊列,讓同一個goroutine去輪詢檢查。這里大家可以帶著這個問題去進行接下來的閱讀。

Example

先來看一個簡單的例子,這里我選擇了NewTicker去進行測試,它和NewTimer唯一的區(qū)別是:前者定時循環(huán)執(zhí)行,后者只會執(zhí)行一次。

func main() {
    t := time.NewTicker(5 * time.Second)
    for {
        select {
        case <-t.C:
            log.Println("xxxxxxxxxx")
        }
    }
}

這里會每隔5秒就會執(zhí)行一次打印,但這個究竟是怎么實現(xiàn)的,咱們一步步的去探索。

源碼部分

NewTicker

func NewTicker(d Duration) *Ticker {
    if d <= 0 {
        panic(errors.New("non-positive interval for NewTicker"))
    }
    c := make(chan Time, 1)
    t := &Ticker{
        C: c,
        r: runtimeTimer{
            when:   when(d),
            period: int64(d),
            f:      sendTime,
            arg:    c,
        },
    }
    startTimer(&t.r)
    return t
}

簡單的看一下第一個函數(shù),不算太復雜,進行簡單的異常判斷,創(chuàng)建一個緩沖為1的channel,并構(gòu)建核心的結(jié)構(gòu)體runtimeTimer。下面我們關(guān)注一下這個結(jié)構(gòu)體的幾個屬性。(由于runtimeTimertimer底層結(jié)構(gòu)一致,我這里截取timer結(jié)構(gòu)體的源碼進行解釋一下相關(guān)屬性)

type timer struct {
    tb *timersBucket // 在哪個桶中存在,這里是根據(jù)goroutine所屬的p確定的
    i  int           // 在堆結(jié)構(gòu)中的索引位置
    when   int64 // 啥時候去執(zhí)行函數(shù)f
    period int64 // 間隔多久去執(zhí)行函數(shù)f,該值為0時表示只會執(zhí)行一次函數(shù)f
    f      func(interface{}, uintptr)
    arg    interface{} // 函數(shù)f的第一個參數(shù)
    seq    uintptr // 函數(shù)f的第二個參數(shù)
}

startTimer

func startTimer(*runtimeTimer)

接著看startTimer這個函數(shù),初學者看這段源碼時可能會覺得奇怪,因為它根本沒有body。其實類似的情況并不少見,像這種沒有方法體的大多都會在runtime包給其提供實現(xiàn)。如下所示:

// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
    if raceenabled {
        racerelease(unsafe.Pointer(t))
    }
    addtimer(t)
}

注意一下這個方法上面有一句注釋//go:linkname startTimer time.startTimer,這句注釋可不是一個無用的注釋,簡單的來說go:linkname這個指令告訴編譯器為當前源文件中私有函數(shù)或者變量在編譯時鏈接到指定的方法或變量。所以在這里大家可以把這個理解為runtime.startTimertime.startTimer的具體實現(xiàn)。

addtimer

func addtimer(t *timer) {
    tb := t.assignBucket()
    lock(&tb.lock)
    ok := tb.addtimerLocked(t)
    unlock(&tb.lock)
    if !ok {
        badTimer()
    }
}

這個函數(shù)主要干了兩件事:

  • 獲取這個timer屬于哪個bucket,這里是根據(jù)goroutine所屬的p的id來進行計算。
  • timer其添加到對應的bucket中。

這里可以保證在大部分情況下同一個p上創(chuàng)建的timer可以放到同一個bucket中,除非你的機器CPU核數(shù)超過了64個。每個核上維護著一個隊列,在某種程度上也是提升了定時任務的性能。

addtimerLocked

func (tb *timersBucket) addtimerLocked(t *timer) bool {
    // 保證when的值是正數(shù)
    if t.when < 0 {
        t.when = 1<<63 - 1
    }
    t.i = len(tb.t)
    tb.t = append(tb.t, t)
    // 根據(jù)when的值去調(diào)整堆中的順序
    if !siftupTimer(tb.t, t.i) {
        return false
    }
    if t.i == 0 {
    
        if !tb.created {
            tb.created = true
            // 進行對當前bucket監(jiān)控的goroutine的創(chuàng)建
            go timerproc(tb)
        }
    }
    return true
}

這里存儲timer的數(shù)據(jù)結(jié)構(gòu)是四叉樹。相同的數(shù)據(jù)而言,四叉樹比二叉樹的深度要低,查詢時效率要高一點。在實現(xiàn)定時器時為啥要選擇四叉樹而不是二叉樹,大家可以參考一下這篇文章定時器:4叉堆與2叉堆的效率比較。

這個方法大概分三步:

  • tb.t = append(tb.t, t),將其插入到數(shù)組的最后一個。
  • siftupTimer(tb.t, t.i),將最后一個timer和它的parent進行比較,由于這里的數(shù)據(jù)結(jié)構(gòu)是四叉樹,所以它的parent計算公式為p := (i - 1) / 4 // parent,如果比parent小,則進行交換。這里會遞歸執(zhí)行,直到取到符合timer的位置為止。
  • 判斷監(jiān)控該bucket的goroutine是否已經(jīng)創(chuàng)建,如果沒有,則進行創(chuàng)建。

timerproc

func timerproc(tb *timersBucket) {
    tb.gp = getg()
    for {
        lock(&tb.lock)
        tb.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(tb.t) == 0 {
                delta = -1
                break
            }
            // 由于這里是最小堆,取出堆頂元素也就是最靠近執(zhí)行時間的那個timer
            t := tb.t[0]
            delta = t.when - now
            if delta > 0 {
                break
            }
            ok := true
            if t.period > 0 {
                // 這里表示這個timer是一個定時輪詢的任務,所以加上執(zhí)行周期重新
                // 調(diào)整在堆中的位置
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                // 調(diào)整該timer在堆中的位置
                if !siftdownTimer(tb.t, 0) {
                    ok = false
                }
            } else {
                // 只執(zhí)行一次的任務,執(zhí)行后直接從堆中移除
                last := len(tb.t) - 1
                if last > 0 {
                    tb.t[0] = tb.t[last]
                    tb.t[0].i = 0
                }
                tb.t[last] = nil
                tb.t = tb.t[:last]
                if last > 0 {
                    if !siftdownTimer(tb.t, 0) {
                        ok = false
                    }
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&tb.lock)
            if !ok {
                badTimer()
            }
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            // 執(zhí)行timer結(jié)構(gòu)中的f函數(shù)
            f(arg, seq)
            lock(&tb.lock)
        }
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            tb.rescheduling = true
            goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
            continue
        }
        // At least one timer pending. Sleep until then.
        tb.sleeping = true
        tb.sleepUntil = now + delta
        noteclear(&tb.waitnote)
        unlock(&tb.lock)
        notetsleepg(&tb.waitnote, delta)
    }
}

執(zhí)行流程:

  1. 由于是最小堆,從堆頂取出的timer就是最近一個將要執(zhí)行的任務,與當前時間進行對比,判斷是否已經(jīng)到了執(zhí)行任務的時間。
  2. 如果是定時輪詢?nèi)蝿眨〕鰜碜龊糜涗浐笮枰{(diào)整該timer的屬性when的值,并在堆中進行重新排序。方便下一次的執(zhí)行。
  3. 如果是執(zhí)行一次的任務,取出來做好記錄后需要從堆中進行移除。
  4. 執(zhí)行特定的函數(shù),例如sendTimegoFunc等等函數(shù)。

總結(jié)

通過上面的了解咱們可以完美解決咱們在文章開始的時候提出的那個問題,究竟開了多少個goroutine去維護咱們的定時任務隊列?答案是:比如你的機器有n個CPU,那么就會有n個bucket,同樣就會有n個goroutine去監(jiān)控這些bucket,由于存儲結(jié)構(gòu)采用的是最小堆,這里咱們也不用輪詢檢查,只用檢查堆中的第一個元素即可。當然最后得出的結(jié)論并不屬于咱們上面兩個猜測的其中任何一個。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容