golang 調(diào)度器

今天來講一下調(diào)度器,我本來寫了兩個版本,后面發(fā)現(xiàn)都好像不太好,其實核心差不太多,就是層次不夠清晰,然后在度娘上又啃了幾篇相關(guān)的文章,又進(jìn)行了綜合一下,文章末尾有引用的文章鏈接。不得不說,大佬們畫圖還是非常厲害的。其實突然在這個期間發(fā)現(xiàn)一些問題:就是markdown模式下的話容易讓人看不清重點,最近在找替代簡書的地方,有推薦的可以留言推薦

(一)調(diào)度器的核心點:

  • 1.復(fù)用線程
    避免頻繁的創(chuàng)建銷毀線程,我們知道線程的啟停銷毀是很耗費性能的一件事情,我們就要reuse thread(線程復(fù)用) ,那么具體該怎么來處理。golang :”create threads when needed ;keep them around for reuse“,當(dāng)我們需要創(chuàng)建的時候就創(chuàng)建,然后保留它來復(fù)用,看過M結(jié)構(gòu)的人就知道M有個allM的字段就是保存所有的M的鏈表。
  • 2.利用并行
    設(shè)置GOMAXPROCS 來進(jìn)行設(shè)置go program的核心數(shù),程序的并行
  • 3.stealing working
    利用小偷算法,當(dāng)本地的隊列沒有g(shù)了,去別的地方偷一半的g進(jìn)行運行,保證任務(wù)的公平性
  • 4.handoff
    利用移交算法,當(dāng)本線程因為系統(tǒng)調(diào)用進(jìn)行阻塞的時候,線程釋放綁定的P,把P給其他的M執(zhí)行

值得一說的是:Go1.1之前只有G-M模型,沒有P,Dmitry Vyukov在Scalable Go Scheduler Design Doc提出該模型在并發(fā)伸縮性方面的問題,并通過加入P(Processors)來改進(jìn)該問題。

(二)重要結(jié)構(gòu)體

G:goroutine

每次go調(diào)用的時候,都會創(chuàng)建一個G對象,它包括棧、指令指針以及對于調(diào)用goroutines很重要的其它信息,比如阻塞它的任何channel,其主要數(shù)據(jù)結(jié)構(gòu)

// Go1.11版本默認(rèn)stack大小為2KB

_StackMin = 2048
 
// 創(chuàng)建一個g對象,然后放到g隊列
// 等待被執(zhí)行
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
    _g_ := getg()

    _g_.m.locks++
    siz := narg
    siz = (siz + 7) &^ 7

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)    
    if newg == nil {        
       // 初始化g stack大小
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg)
    }    
    // 以下省略}
M:mechine

代表一個線程,每次創(chuàng)建一個M的時候,都會有一個底層線程創(chuàng)建;所有的G任務(wù),最終還是在M上執(zhí)行,其主要數(shù)據(jù)結(jié)構(gòu)

type m struct {    
    /*
        1.  所有調(diào)用棧的Goroutine,這是一個比較特殊的Goroutine。
        2.  普通的Goroutine棧是在Heap分配的可增長的stack,而g0的stack是M對應(yīng)的線程棧。
        3.  所有調(diào)度相關(guān)代碼,會先切換到該Goroutine的棧再執(zhí)行。
    */
    g0       *g
    curg     *g         // M當(dāng)前綁定的結(jié)構(gòu)體G

    // SP、PC寄存器用于現(xiàn)場保護(hù)和現(xiàn)場恢復(fù)
    vdsoSP uintptr
    vdsoPC uintptr

    // 省略…}
P:Processor

代表一個處理器,每一個運行的M都必須綁定一個P,就像線程必須在么一個CPU核上執(zhí)行一樣,由P來調(diào)度G在M上的運行,P的個數(shù)就是GOMAXPROCS(最大256),啟動時固定的,一般不修改;M的個數(shù)和P的個數(shù)不一定一樣多(會有休眠的M或者不需要太多的M)(最大10000);每一個P保存著本地G任務(wù)隊列,也有一個全局G任務(wù)隊列。P的數(shù)據(jù)結(jié)構(gòu)

// 自定義設(shè)置GOMAXPROCS數(shù)量
func GOMAXPROCS(n int) int {    
    /*
        1.  GOMAXPROCS設(shè)置可執(zhí)行的CPU的最大數(shù)量,同時返回之前的設(shè)置。
        2.  如果n < 1,則不更改當(dāng)前的值。
    */
    ret := int(gomaxprocs)

    stopTheWorld("GOMAXPROCS")    
    // startTheWorld啟動時,使用newprocs。
    newprocs = int32(n)
    startTheWorld()    
    return ret
}

// 默認(rèn)P被綁定到所有CPU核上
// P == cpu.cores

func getproccount() int32 {    
    const maxCPUs = 64 * 1024
    var buf [maxCPUs / 8]byte


    // 獲取CPU Core
    r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])

    n := int32(0)    
    for _, v := range buf[:r] {        
       for v != 0 {
            n += int32(v & 1)
            v >>= 1
        }
    }    
    if n == 0 {
       n = 1
    }    
    return n
}
// 一個進(jìn)程默認(rèn)被綁定在所有CPU核上,返回所有CPU core。
// 獲取進(jìn)程的CPU親和性掩碼系統(tǒng)調(diào)用
// rax 204                          ; 系統(tǒng)調(diào)用碼
// system_call sys_sched_getaffinity; 系統(tǒng)調(diào)用名稱
// rid  pid                         ; 進(jìn)程號
// rsi unsigned int len             
// rdx unsigned long *user_mask_ptr
sys_linux_amd64.s:
TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
    MOVQ    pid+0(FP), DI
    MOVQ    len+8(FP), SI
    MOVQ    buf+16(FP), DX
    MOVL    $SYS_sched_getaffinity, AX
    SYSCALL
    MOVL    AX, ret+24(FP)
    RET

(三)調(diào)度過程

image
  • 我們通過 go func()來創(chuàng)建一個goroutine;g 的結(jié)構(gòu)是可復(fù)用的,對于可復(fù)用的g也是有l(wèi)ocal隊列和global隊列的,用:p.freeg 這個參數(shù),全局隊列就是sched.pfree,獲取參數(shù)都是差不多的,優(yōu)先從p.gfree中獲取,這一步是無鎖的,否者就從sched.pfree中獲取一部分過來這是有鎖的一個操作

  • 有兩個存儲G的隊列,一個是局部調(diào)度器P的本地隊列、一個是全局G隊列。新創(chuàng)建的G會優(yōu)先嘗試放到p的runnext中,作為下一個執(zhí)行G,如果不行就得放到我們的本地隊列中,如果P的本地隊列已經(jīng)滿了就會保存在全局的隊列中;

  • G只能運行在M中,一個M必須持有一個P,M與P是1:1的關(guān)系。M會從P的本地隊列彈出一個可執(zhí)行狀態(tài)的G來
    執(zhí)行,如果P的本地隊列為空,就會想其他的MP組合偷取一個可執(zhí)行的G來執(zhí)行;

  • 一個M調(diào)度G執(zhí)行的過程是一個循環(huán)機制;

  • 當(dāng)M執(zhí)行某一個G時候如果發(fā)生了syscall或則其余阻塞操作,M會阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會把
    這個線程M從P中摘除(detach),然后再創(chuàng)建一個新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來
    服務(wù)于這個P;

  • 當(dāng)M系統(tǒng)調(diào)用結(jié)束時候,這個G會嘗試獲取一個空閑的P執(zhí)行,并放入到這個P的本地隊列。如果獲取不到P,
    那么這個線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個G會被放入全局隊列中

3.1G的幾種暫停方式
    1. gosched: 將當(dāng)前的G暫停,保存堆棧狀態(tài),以_GRunnable狀態(tài)放入Global隊列中,讓當(dāng)前M繼續(xù)執(zhí)行其它任務(wù)。無需對G進(jìn)行喚醒操作,因為總會有M從Global隊列取得并執(zhí)行該G。搶占調(diào)度即使用該方式
  • 2.gopark: 與goched的最大區(qū)別在于gopark沒有將G放回執(zhí)行隊列,而是位于某個等待隊列中(如channel的waitq,此時G狀態(tài)為_Gwaitting),因此G必須被手動喚醒(通過goready),否則會丟失任務(wù)。應(yīng)用層阻塞通常使用這種方式。
  • 3.notesleep: 既不讓出M,也不讓G和P重新調(diào)度,直接讓線程休眠直到被喚醒(notewakeup),該方式更快,通常用于gcMark,stopm這類自旋場景
  • 4.notesleepg: 阻塞G和M,放飛P,P可以和其它M綁定繼續(xù)執(zhí)行,比如可能阻塞的系統(tǒng)調(diào)用會主動調(diào)用entersyscallblock,則會觸發(fā) notesleepg
  • 5.goexit: 立即終止G任務(wù),不管其處于調(diào)用堆棧的哪個層次,在終止前,確保所有defer正確執(zhí)行。

(四)調(diào)度源碼


// go1.9.1  src/runtime/proc.go
// 省略了GC檢查等其它細(xì)節(jié),只保留了主要流程
// g:       G結(jié)構(gòu)體定義
// sched:   Global隊列
// 獲取一個待執(zhí)行的G
func findrunnable() (gp *g, inheritTime bool) {
    // 獲取當(dāng)前的G對象
    _g_ := getg()

top:
    // 獲取當(dāng)前P對象
    _p_ := _g_.m.p.ptr()

    // 1. 嘗試從P的Local隊列中取得G 優(yōu)先_p_.runnext 然后再從Local隊列中取
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // 2. 嘗試從Global隊列中取得G
    if sched.runqsize != 0 {
        lock(&sched.lock)
        // globrunqget從Global隊列中獲取G 并轉(zhuǎn)移一批G到_p_的Local隊列
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // 3. 檢查netpoll任務(wù):檢測是否存在M阻塞
    if netpollinited() && sched.lastpoll != 0 {
        if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll返回的是G鏈表,將其它G放回Global隊列
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // 4. 嘗試從其它P竊取任務(wù)
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    for i := 0; i < 4; i++ {
        // 隨機P的遍歷順序
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // runqsteal執(zhí)行實際的steal工作,從目標(biāo)P的Local隊列轉(zhuǎn)移一般的G過來
            // stealRunNextG指是否steal目標(biāo)P的p.runnext G
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
    ...
}

4.1用戶態(tài)的阻塞

當(dāng)Goroutine因為Channel操作而阻塞(通過gopark)時,對應(yīng)的G會被放置到某個wait隊列(如channel的waitq),該G的狀態(tài)由_Gruning變?yōu)開Gwaitting,而M會跳過該G嘗試獲取并執(zhí)行下一個G。

當(dāng)阻塞的G被G2喚醒(通過goready)時(比如channel可讀/寫),G會嘗試加入G2所在P的runnext,然后再是P Local隊列和Global隊列。簡單解釋一下:當(dāng)G是chan<-的接收消息,被阻塞了,如果G2是Chan的寫消息,當(dāng)G阻塞,G2寫入了一個數(shù)據(jù),那么G就被G2喚醒了,G就被放到了G2的P的runnext,如果放成功了,就是跳過了排隊,然后執(zhí)行,如果失敗了就丟入local隊列

4.2系統(tǒng)調(diào)用阻塞:syscall

當(dāng)G被阻塞在某個系統(tǒng)調(diào)用上時,此時G會阻塞在_Gsyscall狀態(tài),M也處于block on syscall狀態(tài),此時仍然可被搶占調(diào)度: 執(zhí)行該G的M會與P解綁,而P則嘗試與其它idle的M綁定,繼續(xù)執(zhí)行其它G。如果沒有其它idle的M,但隊列中仍然有G需要執(zhí)行,則創(chuàng)建一個新的M。

當(dāng)系統(tǒng)調(diào)用完成后,G會重新嘗試獲取一個idle的P,并恢復(fù)執(zhí)行,如果沒有idle的P,G將加入到Global隊列。

系統(tǒng)調(diào)用能被調(diào)度的關(guān)鍵有兩點:

runtime/syscall包中,將系統(tǒng)調(diào)用分為SysCall和RawSysCall,前者和后者的區(qū)別是前者會在系統(tǒng)調(diào)用前后分別調(diào)用entersyscall和exitsyscall(位于src/runtime/proc.go),做一些現(xiàn)場保存和恢復(fù)操作,這樣才能使P安全地與M解綁,并在其它M上繼續(xù)執(zhí)行其它G。某些系統(tǒng)調(diào)用本身可以確定會長時間阻塞(比如鎖),會調(diào)用entersyscallblock在發(fā)起系統(tǒng)調(diào)用前直接讓P和M解綁(handoffp)。

4.3GMP的幾個狀態(tài)
  • P的幾個狀態(tài):

(五)sysmon

sysmon是一個由runtime啟動的M,也叫監(jiān)控線程,它無需P也可以運行,它每20us~10ms喚醒一次,主要執(zhí)行:

釋放閑置超過5分鐘的span物理內(nèi)存;
如果超過2分鐘沒有垃圾回收,強制執(zhí)行;
將長時間未處理的netpoll結(jié)果添加到任務(wù)隊列;
向長時間運行的G任務(wù)發(fā)出搶占調(diào)度;
收回因syscall長時間阻塞的P;

入口在src/runtime/proc.go:sysmon函數(shù),它通過retake實現(xiàn)對syscall和長時間運行的G進(jìn)行調(diào)度:

func retake(now int64) uint32 {
    n := 0
    for i := int32(0); i < gomaxprocs; i++ {
        _p_ := allp[i]
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // 如果當(dāng)前P Local隊列沒有其它G,當(dāng)前有其它P處于Idle狀態(tài),并且syscall執(zhí)行事件不超過10ms,則不用解綁當(dāng)前P(handoffp)
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // handoffp
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_)
            }
            incidlelocked(1)
        } else if s == _Prunning {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 如果當(dāng)前G執(zhí)行時間超過10ms,則搶占(preemptone)
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            // 執(zhí)行搶占
            preemptone(_p_)
        }
    }
    return uint32(n)
}

搶占式調(diào)度

當(dāng)某個goroutine執(zhí)行超過10ms,sysmon會向其發(fā)起搶占調(diào)度請求,由于Go調(diào)度不像OS調(diào)度那樣有時間片的概念,因此實際搶占機制要弱很多: Go中的搶占實際上是為G設(shè)置搶占標(biāo)記(g.stackguard0),當(dāng)G調(diào)用某函數(shù)時(更確切說,在通過newstack分配函數(shù)棧時),被編譯器安插的指令會檢查這個標(biāo)記,并且將當(dāng)前G以runtime.Goched的方式暫停,并加入到全局隊列。源代碼如下:

// src/runtime/stack.go
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
func newstack(ctxt unsafe.Pointer) {
    ...
    // gp為當(dāng)前G
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    if preempt {
        ...

        // Act like goroutine called runtime.Gosched.
        // G狀態(tài)由_Gwaiting變?yōu)?_Grunning 這是為了能以Gosched的方式暫停Go
        casgstatus(gp, _Gwaiting, _Grunning)
        gopreempt_m(gp) // never return
    }
}

// 以goched的方式將G重新放入
func goschedImpl(gp *g) {
    status := readgstatus(gp)
    // 由Running變?yōu)镽unnable
    casgstatus(gp, _Grunning, _Grunnable)
    // 與M解除綁定
    dropg()
    lock(&sched.lock)
    // 將G放入Global隊列
    globrunqput(gp)
    unlock(&sched.lock)
    // 重新調(diào)度
    schedule()
}


func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}

netpoll

前面的findrunnable,G的獲取除了p.runnext,p.runq和sched.runq外,還有一中G從netpoll中獲取,netpoll是Go針對網(wǎng)絡(luò)IO的一種優(yōu)化,本質(zhì)上為了避免網(wǎng)絡(luò)IO陷入系統(tǒng)調(diào)用之中,這樣使得即便G發(fā)起網(wǎng)絡(luò)I/O操作也不會導(dǎo)致M被阻塞(僅阻塞G),從而不會導(dǎo)致大量M被創(chuàng)建出來。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容