M的狀態(tài)轉(zhuǎn)換

聲明

下面的分析均基于Golang1.14版本。

M的狀態(tài)

M只有Running和Stop這2個(gè)狀態(tài),還有一個(gè)spinning中間態(tài),當(dāng)從Running轉(zhuǎn)為Stop時(shí),會(huì)先spinning尋找可運(yùn)行的G,如果找不到則進(jìn)入Stop。

M狀態(tài)轉(zhuǎn)換

主要流程

1.mstart,Go程序初始化時(shí),第一個(gè)M是由mstart創(chuàng)建,新的物理線程創(chuàng)建時(shí),調(diào)用的函數(shù)也是mstart。
2.startm,當(dāng)有新的G創(chuàng)建或者有G從waiting進(jìn)入running且還有空閑的P,此時(shí)會(huì)調(diào)用startm,獲取一個(gè)M和空閑的P綁定執(zhí)行G。
3.newm,當(dāng)調(diào)用startm時(shí),如果沒有空閑的M則會(huì)通過newm創(chuàng)建M。
4.stopm,在2種情況下會(huì)執(zhí)行stopm,一是當(dāng)M綁定的P無可運(yùn)行的G且無法從其它P竊取可運(yùn)行的G時(shí),M先進(jìn)入spinning狀態(tài),然后退出。二是當(dāng)M和G進(jìn)入系統(tǒng)調(diào)用后,長時(shí)間未退出,P被retake且M找不到空閑的P綁定,此時(shí)M會(huì)調(diào)用stopm。
5.spinning狀態(tài),在findrunnable函數(shù)中,會(huì)短暫進(jìn)入spinning狀態(tài),如果找不到可運(yùn)行的G則調(diào)用stopm。
PS:上述主要流程解釋了函數(shù)什么時(shí)候由誰觸發(fā)調(diào)用,后面不再贅述。

線程的休眠和喚醒

1.M綁定了一個(gè)物理線程,M的running和stop就代表了物理線程的狀態(tài),那么物理線程是如何休眠和喚醒的呢?下面以Linux操作系統(tǒng)為例介紹物理線程的變化。
2.Linux線程同步通過futex系統(tǒng)調(diào)用實(shí)現(xiàn),futex詳細(xì)介紹。

#include <linux/futex.h>
       #include <sys/time.h>

// 主要關(guān)注前3個(gè)參數(shù),uaddr表示同步的內(nèi)存地址。
//futex_op表示操作類型,這里使用了FUTEX_WAIT,F(xiàn)UTEX_WAKE這2種類型。
//val在FUTEX_WAIT時(shí)表示當(dāng)uaddr指向的值等于val時(shí)休眠
//val在FUTEX_WAKE時(shí)表示喚醒休眠在uaddr上的線程數(shù)量(Go中默認(rèn)是1)
       int futex(int *uaddr, int futex_op, int val,
                 const struct timespec *timeout,   /* or: uint32_t val2 */
                 int *uaddr2, int val3);

3.線程休眠
asmcgoyield是執(zhí)行cgo_yield函數(shù),具體的不深究。

func notesleep(n *note) {
    gp := getg() // 線程休眠只發(fā)生在退出系統(tǒng)調(diào)用或者schedule函數(shù) 因此必然是g0
    if gp != gp.m.g0 {
        throw("notesleep not on g0")
    }
    ns := int64(-1) // 通常休眠后不會(huì)主動(dòng)喚醒
    if *cgo_yield != nil {
        // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
        ns = 10e6 // 在cgo情況下 每休眠10ms喚醒一次
    }
    for atomic.Load(key32(&n.key)) == 0 { // 當(dāng)note.key==0時(shí) 休眠
        gp.m.blocked = true
        futexsleep(key32(&n.key), 0, ns) // 調(diào)用futexsleep進(jìn)入休眠
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil) // 如果是cgo調(diào)用asmcgocall
        }
        gp.m.blocked = false
    }
}

func futexsleep(addr *uint32, val uint32, ns int64) {
    if ns < 0 { // 如果ns < 0 則一直休眠不主動(dòng)喚醒
        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
        return
    }

    var ts timespec
    ts.setNsec(ns) // 設(shè)置休眠時(shí)間
    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

4.線程喚醒

func notewakeup(n *note) {
    old := atomic.Xchg(key32(&n.key), 1) // 將note.key設(shè)置為1 note.key休眠時(shí)為0
    if old != 0 {
        print("notewakeup - double wakeup (", old, ")\n")
        throw("notewakeup - double wakeup")
    }
    futexwakeup(key32(&n.key), 1) // 嘗試喚醒note.key
}

func futexwakeup(addr *uint32, cnt uint32) {
    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
    if ret >= 0 {
        return
    }
 // 正常情況下不會(huì)執(zhí)行到下面的代碼
    systemstack(func() {
        print("futexwakeup addr=", addr, " returned ", ret, "\n")
    })

    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

5.總結(jié) M的休眠和喚醒都是通過m.note.key進(jìn)行同步,對(duì)M的休眠和喚醒操作都是操作m.note.key所在的內(nèi)存。

mstart

此時(shí)初始化的M為m0,是Go進(jìn)程的第一個(gè)M。

func mstart() {
    _g_ := getg()

    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    // Initialize stack guard so that we can start calling regular
    // Go code.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    // This is the g0, so we can also call go:systemstack
    // functions, which check stackguard1.
    _g_.stackguard1 = _g_.stackguard0
    mstart1()

    // Exit this thread.
    switch GOOS {
    case "windows", "solaris", "illumos", "plan9", "darwin", "aix":
        // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        osStack = true
    }
    mexit(osStack)
}

func mstart1() {
    _g_ := getg()

    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }

    // Record the caller for use as the top of stack in mcall and
    // for terminating the thread.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此處的pc和sp是mstart調(diào)用mstart1時(shí)的pc和sp
    asminit() // 針對(duì)不同的CPU進(jìn)行初始化 忽略
    minit() // 主要是將阻塞的信號(hào)屏蔽 阻塞該信號(hào)

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    if _g_.m == &m0 {
        mstartm0() // 初始化信號(hào)處理函數(shù)
    }

    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }

    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()
}

func minit() {
    // The alternate signal stack is buggy on arm and arm64.
    // The signal handler handles it directly.
    if GOARCH != "arm" && GOARCH != "arm64" {
        minitSignalStack() // 信號(hào)回調(diào)棧 信號(hào)處理函數(shù)使用的棧
    }
    minitSignalMask() // 初始化信號(hào)處理 不深究
    getg().m.procid = uint64(pthread_self())
}

func mstartm0() {
    // Create an extra M for callbacks on threads not created by Go.
    // An extra M is also needed on Windows for callbacks created by
    // syscall.NewCallback. See issue #6751 for details.
    if (iscgo || GOOS == "windows") && !cgoHasExtraM {
        cgoHasExtraM = true
        newextram()
    }
    initsig(false) // 注冊(cè)信號(hào)處理函數(shù) 不遞歸深究
}

startm

尋找空閑的M和P,將P綁定到M中的m.nextp,并且嘗試通過m.note喚醒M。當(dāng)M喚醒后,和m.nextp指定的P綁定。

func startm(_p_ *p, spinning bool) {
    lock(&sched.lock) // 對(duì)sched上鎖
    if _p_ == nil {
        _p_ = pidleget() // 獲取空閑的P
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    mp := mget() // 獲取空閑的M
    unlock(&sched.lock) // 釋放sched中的鎖
    if mp == nil { // 如果沒有空閑m 則新建一個(gè)空閑的m
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    
    // 異常情況拋出異常
    if mp.spinning {
        throw("startm: m is spinning")
    }
    if mp.nextp != 0 {
        throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {
        throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning
    mp.nextp.set(_p_) // 設(shè)置M的nextp 當(dāng)M喚醒后會(huì)和m.nextp中的P綁定
    // 通過m.note喚醒M
    notewakeup(&mp.park)
}

stopm

調(diào)用stopm時(shí),P和M已經(jīng)解綁,此時(shí)將M投入全局的空閑隊(duì)列并且伴隨物理線程一起休眠。

func stopm() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {
        throw("stopm holding p")
    }
    if _g_.m.spinning {
        throw("stopm spinning")
    }

    lock(&sched.lock)
    mput(_g_.m) // m投入全局的空閑m列表中
    unlock(&sched.lock)
    // 線程m將停在 notesleep 中
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park) // 休眠時(shí)m.note.key == 0 當(dāng)m.note.key != 0時(shí)退出休眠 此時(shí)回復(fù)m.note.key = 0
    acquirep(_g_.m.nextp.ptr()) // 和m.nextp進(jìn)行綁定
    _g_.m.nextp = 0 // m.nextp設(shè)置為0
}

newm

在源碼剖析前先分析newm要做什么。
1.創(chuàng)建M對(duì)應(yīng)的結(jié)構(gòu)體。
2.創(chuàng)建和M綁定的g0。
3.創(chuàng)建物理線程進(jìn)入休眠,并且M和物理線程綁定。

func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn) // 根據(jù)P和fn創(chuàng)建M
    mp.nextp.set(_p_) // 設(shè)置M的nextp
    mp.sigmask = initSigmask
    // 刪除部分代碼 go 調(diào)用C 然后調(diào)用 go? 暫不考慮
    newm1(mp)
}

func allocm(_p_ *p, fn func()) *m {
    _g_ := getg()
    acquirem() // disable GC because it can be called from sysmon
    if _g_.m.p == 0 { // 如果P空閑則嘗試獲取P
        acquirep(_p_) // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    //刪掉部分代碼 釋放freem 不太理解為什么要在這里做

    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp) // m初始化 不深究

    // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 初始化g0 注意 有cgo的情況下 g0不分配棧 而是使用物理線程的棧 為什么呢?
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
        mp.g0 = malg(-1)
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    mp.g0.m = mp

    if _p_ == _g_.m.p.ptr() {
        releasep()
    }
    releasem(_g_.m)

    return mp
}

func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        // round2把數(shù)值向上調(diào)整為2的冪次
        stacksize = round2(_StackSystem + stacksize)
        systemstack(func() {
            // 主要是棧大小的調(diào)整和棧內(nèi)存的實(shí)際分配
            newg.stack = stackalloc(uint32(stacksize))
        })
        newg.stackguard0 = newg.stack.lo + _StackGuard
        newg.stackguard1 = ^uintptr(0)
        // Clear the bottom word of the stack. We record g
        // there on gsignal stack during VDSO on ARM and ARM64.
        *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
    }
    return newg
}

func newm1(mp *m) {
    if iscgo { // 如果是cgo
        // cgo情況下 會(huì)將M作為參數(shù)傳入并且最終調(diào)用mstart函數(shù) 在mstart中具體分析
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(funcPC(mstart)) //
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    newosproc(mp)  // 創(chuàng)建物理線程
    execLock.runlock()
}

// startm 部分節(jié)選
func mstart() {
    osStack := _g_.stack.lo == 0 // cgo時(shí) 未初始化g0的棧 使用os的棧
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        // g0的棧綁定到物理線程的棧上
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    mstart1()
}
func mstart1() {
    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()
}

// 創(chuàng)建物理線程來執(zhí)行mstart
func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的棧作為線程的棧
    /*
     * note: strace gets confused if we use CLONE_PTRACE here.
     */
    if false {
        print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")
    }

    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 通過clone創(chuàng)建線程 g0.stack作為棧 mstart作為啟動(dòng)函數(shù)
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)

    if ret < 0 {
        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
        if ret == -_EAGAIN {
            println("runtime: may need to increase max user processes (ulimit -u)")
        }
        throw("newosproc")
    }
}

總結(jié):
1.g0的棧和物理線程使用的棧是統(tǒng)一的。
2.cgo情況下,使用物理線程分配的棧,原因是cgo調(diào)用的C的庫,C代碼都是運(yùn)行在物理線程上,如果不使用物理線程大小的棧,cgo代碼可能在其它語言調(diào)用時(shí)是正常的,而在go中調(diào)用失敗,棧溢出。
3.newm創(chuàng)建的M已經(jīng)在執(zhí)行schedule函數(shù)了,不需要再度喚醒。

findrunnable--spinning

spinning主要是
1.GC,網(wǎng)絡(luò),timer相關(guān)的處理。
2.嘗試從全局的和其它的P中竊取可運(yùn)行的G。
3.找不到可運(yùn)行的G則stopm。

func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
top:
    _p_ := _g_.m.p.ptr()
    // local runq 本地的可運(yùn)行G隊(duì)列
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq 全局的可運(yùn)行G隊(duì)列
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // Steal work from other P's. 從其它P中竊取部分G
    procs := uint32(gomaxprocs)
    ranTimer := false
// 運(yùn)行中的P有一半是在spinning 則直接stop
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    for i := 0; i < 4; i++ { // 總共找4次 每次隨機(jī)一個(gè)起始的P進(jìn)行偷取
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            p2 := allp[enum.position()]
            if _p_ == p2 {
                continue
            }
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
stop:
    // wasm only:
    // If a callback returned and no other goroutine is awake,
    // then pause execution until a callback was triggered.
    if beforeIdle(delta) { // 進(jìn)入stop前回調(diào)
        // At least one goroutine got woken.
        goto top
    }

    // 再次檢查全局的G運(yùn)行隊(duì)列
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    pidleput(_p_)
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again 再次嘗試從其它P中偷取
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    stopm()
    goto top
}

總結(jié)

1.M包含物理線程運(yùn)行所需要的數(shù)據(jù),P包含調(diào)度G所需要的數(shù)據(jù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容