歡迎訪問我的個(gè)人網(wǎng)站獲取更佳閱讀排版體驗(yàn): golang源碼閱讀之定時(shí)器以及避坑指南 | yoko blog (https://pengrl.com/p/62835/)
本文分為三部分:
第一部分為閱讀源碼后的總結(jié)。
第二部分為高性能場(chǎng)景使用定時(shí)器需要注意的地方。
第三部分為系統(tǒng)庫(kù)源碼以及我寫的注釋。
本文基于go version 1.11.4
先放總結(jié)
所有業(yè)務(wù)層的timer對(duì)象都被底層的全局容器變量所持有及管理。這里說的全局容器是一個(gè)桶(bucket)數(shù)組,數(shù)組大小固定為64,數(shù)組的每個(gè)元素為一個(gè)桶對(duì)象,每個(gè)桶內(nèi)包含一個(gè)最小堆和一個(gè)loop循環(huán)協(xié)程(以下簡(jiǎn)稱桶協(xié)程)。
timer對(duì)象歸哪個(gè)桶管理取決于申請(qǐng)?jiān)搕imer對(duì)象時(shí)G所在的P(通過P的id取余64作為桶數(shù)組下標(biāo))。
(關(guān)于golang線程調(diào)度模型中G P M的概念超出了本文的討論范圍。這里只簡(jiǎn)單理解G為當(dāng)前goroutine,P為當(dāng)前goroutine所屬的任務(wù)隊(duì)列。)
由于hash算法和P的id相關(guān),所以一個(gè)程序最多有min(64, GOMAXPROCS)個(gè)桶在使用。
另外,和桶一對(duì)一關(guān)聯(lián)的桶協(xié)程是懶開啟的,只在桶被初次使用時(shí)(即有timer對(duì)象hash到了這個(gè)桶)才開啟,開啟后桶協(xié)程內(nèi)部的循環(huán)永遠(yuǎn)不會(huì)退出。
不將桶數(shù)量直接設(shè)置為GOMAXPROCS是因?yàn)槟菢拥脑挃?shù)組需要?jiǎng)討B(tài)申請(qǐng)。
桶數(shù)量設(shè)置為64是權(quán)衡在不同環(huán)境下(GOMAXPROCS不同)內(nèi)存使用以及性能間的一種經(jīng)驗(yàn)值。
每個(gè)桶都有一個(gè)最小堆,根據(jù)桶內(nèi)所有timer的超時(shí)觸發(fā)絕對(duì)時(shí)間點(diǎn)做調(diào)整。
關(guān)于數(shù)據(jù)結(jié)構(gòu)最小堆的詳細(xì)介紹讀者可以自行查找資料,這里你只需要知道堆的底層使用數(shù)組實(shí)現(xiàn),插入和刪除的時(shí)間復(fù)雜度都是O(logn),并且插入和刪除后,最小堆始終保持最小的元素在堆頂位置,所以獲取最小元素是O(1)的。
事實(shí)上,golang定時(shí)器中的最小堆使用的是四叉樹實(shí)現(xiàn),相較于常見的二叉樹實(shí)現(xiàn),在節(jié)點(diǎn)數(shù)量比較多時(shí),四叉樹對(duì)底層數(shù)組的訪問路徑的局部性更好,CPU cache更友好些。
當(dāng)桶內(nèi)沒timer時(shí),桶協(xié)程被掛起。即rescheduling狀態(tài)。
當(dāng)桶內(nèi)還有timer時(shí),桶內(nèi)協(xié)程睡眠直到最小超時(shí)觸發(fā)時(shí)間點(diǎn)后再喚醒。即sleeping狀態(tài)。
當(dāng)往桶內(nèi)加入新timer而該timer的超時(shí)觸發(fā)時(shí)間點(diǎn)正好是當(dāng)前桶內(nèi)最小的,則喚醒桶協(xié)程。讓桶協(xié)程重新判斷,設(shè)置新的最小超時(shí)觸發(fā)時(shí)間點(diǎn)后進(jìn)入sleeping狀態(tài)。
由于桶數(shù)量是固定的,所以hash桶的操作是無(wú)鎖的。
但是桶內(nèi)有互斥鎖,因?yàn)?code>桶協(xié)程和業(yè)務(wù)層調(diào)用Timer的接口可能并行操作桶內(nèi)的最小堆和各種標(biāo)志等變量。
使用timer時(shí),以下幾點(diǎn)開銷要做到心里有數(shù),桶內(nèi)互斥鎖的開銷,最小堆容器管理的開銷,協(xié)程調(diào)度的開銷,創(chuàng)建timer對(duì)象時(shí)、超時(shí)觸發(fā)返回當(dāng)前時(shí)間時(shí)、桶協(xié)程內(nèi)部都會(huì)有獲取當(dāng)前時(shí)間調(diào)用的開銷。
高性能場(chǎng)景如何使用
閱讀源碼的目的,是學(xué)習(xí)別人寫的好的地方,以及保證正確的使用姿勢(shì)。
你能看出下面這段偽代碼存在的問題嗎?
func consume() {
t := new time.NewTimer(5 * time.Second)
select {
case <- ch:
// 做相關(guān)的業(yè)務(wù)
case <- t:
// 超時(shí)了,做超時(shí)處理
}
}
這是timer常見的一種用法,為某個(gè)消費(fèi)者設(shè)置消費(fèi)超時(shí)時(shí)間。
如果在超時(shí)時(shí)間內(nèi)消費(fèi)ch成功了,則timer對(duì)象在業(yè)務(wù)層沒有被觸發(fā)。
那么問題來了,底層從最小堆中刪除timer只有兩種情況,要么在業(yè)務(wù)層顯式調(diào)用Stop方法停止定時(shí)器,要么底層判斷timer已經(jīng)到達(dá)超時(shí)觸發(fā)時(shí)間。剛才這種情況,底層只能等到超時(shí)觸發(fā)時(shí)間(偽代碼中為5秒后)才能從容器中移除該timer。即資源被延時(shí)釋放了。
作為寫業(yè)務(wù)層代碼的人,很可能會(huì)誤認(rèn)為業(yè)務(wù)層已經(jīng)不再使用且不再持有該timer了,資源就被釋放了。
如果我們的生產(chǎn)消費(fèi)非常的頻繁,底層容器將堆積大量的timer,從而浪費(fèi)大量?jī)?nèi)存和CPU資源。
另外,假設(shè)你在其它場(chǎng)景使用了time.Ticker(不同于Timer只在超時(shí)后觸發(fā)一次,Ticker將周期性觸發(fā)超時(shí))而沒有調(diào)用Stop(即使業(yè)務(wù)層已不再持有Ticker對(duì)象了),情況將更糟糕,底層容器將一直持有Ticker對(duì)象,并周期性觸發(fā)超時(shí),然后修改下次超時(shí)時(shí)間點(diǎn)。資源將永遠(yuǎn)得不到釋放,內(nèi)存和CPU將永久性的泄漏。
正確的做法應(yīng)該是:
Ticker對(duì)象不再使用后,顯式調(diào)用Stop方法。
Timer對(duì)象不再使用后,在高性能場(chǎng)景下,也應(yīng)該顯式調(diào)用Stop方法,及時(shí)釋放資源。
那么這又分為兩種情況,Timer是否已經(jīng)在業(yè)務(wù)層觸發(fā)超時(shí)了。
通過閱讀系統(tǒng)庫(kù)源碼我們可以得知,對(duì)已超時(shí)的Timer調(diào)用Stop方法內(nèi)部有變量保護(hù),是安全的。但是這種保護(hù)需要拿一次桶內(nèi)的互斥鎖,高性能場(chǎng)景下也需要考慮這個(gè)消耗。
所以正確釋放Timer對(duì)象的做法是,簡(jiǎn)單點(diǎn)就在上面?zhèn)未a的select結(jié)束后統(tǒng)一調(diào)用Stop,精細(xì)點(diǎn)就在ch得到消費(fèi)時(shí)調(diào)用Stop。
我之后會(huì)再寫一篇文章,關(guān)于在某些特定場(chǎng)景下如何自己實(shí)現(xiàn)一個(gè)簡(jiǎn)易timer,犧牲部分我們不需要的精確度來大幅提高超時(shí)業(yè)務(wù)邏輯的性能。
部分源碼的說明
涉及到文件為:
- src/time/sleep.go
- src/time/tick.go
- src/runtime/time.go
- 其它一些runtime中的代碼
首先看time/sleep.go,里面有time.Timer的實(shí)現(xiàn),time.Timer比較簡(jiǎn)單,只是對(duì)runtime包中timer的一層wrap。這層自身實(shí)現(xiàn)的最核心功能是將底層的超時(shí)回調(diào)轉(zhuǎn)換為發(fā)送channel消息。
// 這里可以看到是對(duì)runtimeTimer的wrap
type Timer struct {
C <-chan Time
r runtimeTimer
}
func NewTimer(d Duration) *Timer {
// 注意,這里的channel是帶緩沖的,保證了業(yè)務(wù)層如果不接收這個(gè)channel,底層的
// 桶協(xié)程不會(huì)因?yàn)榘l(fā)送channel而被阻塞
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
// 向底層timer傳入sendTime回調(diào)函數(shù)
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
// 將底層的超時(shí)回調(diào)轉(zhuǎn)化為channel發(fā)送,并寫入了當(dāng)前時(shí)間
func sendTime(c interface{}, seq uintptr) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan Time) <- Now():
default:
}
}
// After就是匿名Timer
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
接下來我們看runtime/time.go
// timer結(jié)構(gòu)體
type timer struct {
tb *timersBucket // timer所屬的桶
i int // 最小堆中的下標(biāo),為-1時(shí)則不可用了
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64 // 超時(shí)時(shí)間點(diǎn)
period int64 // 如果是Ticker,會(huì)有這個(gè)值,周期性觸發(fā)
f func(interface{}, uintptr) // 回調(diào)
arg interface{} // time.Timer會(huì)傳入channel變量,一會(huì)回調(diào)時(shí)把channel帶回去
seq uintptr // 這個(gè)變量目前沒有用
}
// 桶數(shù)量固定為64
const timersLen = 64
// 全局桶數(shù)組,還對(duì)cache偽共享做了優(yōu)化
var timers [timersLen]struct {
timersBucket
// The padding should eliminate false sharing
// between timersBucket values.
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}
// addTimer時(shí),首先P id取余64獲取timer所屬的bucket
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}
func (tb *timersBucket) addtimerLocked(t *timer) bool {
// 負(fù)數(shù)參數(shù)保護(hù)性代碼
if t.when < 0 {
t.when = 1<<63 - 1
}
// 最小堆插入操作
t.i = len(tb.t)
tb.t = append(tb.t, t)
if !siftupTimer(tb.t, t.i) {
return false
}
// 下標(biāo)為0,說明該timer的觸發(fā)時(shí)間為當(dāng)前桶中最早的
if t.i == 0 {
// 桶協(xié)程在sleep,喚醒它
if tb.sleeping {
tb.sleeping = false
notewakeup(&tb.waitnote)
}
// 桶協(xié)程被掛起了,重新調(diào)度
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
}
// 如果timer所屬的桶還沒有創(chuàng)建,創(chuàng)建并開啟桶協(xié)程
if !tb.created {
tb.created = true
go timerproc(tb)
}
return true
}
// 桶協(xié)程,注意,這里有兩層for循環(huán),最外面的for是永遠(yuǎn)不會(huì)退出的
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
// 進(jìn)互斥鎖
lock(&tb.lock)
// 睡眠標(biāo)志修改
tb.sleeping = false
// 獲取當(dāng)前時(shí)間
now := nanotime()
delta := int64(-1)
for {
// 如果桶內(nèi)沒有timer,直接退出內(nèi)層for
if len(tb.t) == 0 {
delta = -1
break
}
// 獲取最早觸發(fā)timer,并檢查是否到達(dá)觸發(fā)時(shí)間
t := tb.t[0]
delta = t.when - now
// 還沒到時(shí)間,直接退出內(nèi)層for
if delta > 0 {
break
}
ok := true
// 如果是period有值,說明需要周期性觸發(fā),我們將該timer修改觸發(fā)時(shí)間后,重新
// 插入最小堆中
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(tb.t, 0) {
ok = false
}
} else {
// 從最小堆中刪除
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
if !siftdownTimer(tb.t, 0) {
ok = false
}
}
// 下標(biāo)設(shè)置為-1,deltimer時(shí)發(fā)現(xiàn)下標(biāo)為-1則不用刪除了
t.i = -1 // mark as removed
}
// 把t中變量拷貝出來,就可以出鎖了
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
// 堆調(diào)整時(shí)如果下標(biāo)設(shè)置越界了,則丟到這里來處理,badTimer會(huì)直接panic
if !ok {
badTimer()
}
// 如果開了race檢查的話
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&tb.lock)
}
// 如果桶內(nèi)沒有timer了,把協(xié)程掛起
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
// 如果還有協(xié)程,睡眠直到桶內(nèi)最早觸發(fā)時(shí)間點(diǎn)到達(dá)后喚醒
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}
// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
if t.tb == nil {
// t.tb can be nil if the user created a timer
// directly, without invoking startTimer e.g
// time.Ticker{C: c}
// In this case, return early without any deletion.
// See Issue 21874.
return false
}
tb := t.tb
lock(&tb.lock)
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i := t.i
last := len(tb.t) - 1
// 如果已經(jīng)觸發(fā)過或已經(jīng)被刪除了,則返回false告知調(diào)用方
if i < 0 || i > last || tb.t[i] != t {
unlock(&tb.lock)
return false
}
if i != last {
tb.t[i] = tb.t[last]
tb.t[i].i = i
}
tb.t[last] = nil
tb.t = tb.t[:last]
ok := true
if i != last {
if !siftupTimer(tb.t, i) {
ok = false
}
if !siftdownTimer(tb.t, i) {
ok = false
}
}
unlock(&tb.lock)
if !ok {
badTimer()
}
return true
}