聲明
下面的分析均基于Golang1.14版本。
M的狀態(tài)
M只有Running和Stop這2個(gè)狀態(tài),還有一個(gè)spinning中間態(tài),當(dāng)從Running轉(zhuǎn)為Stop時(shí),會(huì)先spinning尋找可運(yùn)行的G,如果找不到則進(jìn)入Stop。

主要流程
1.mstart,Go程序初始化時(shí),第一個(gè)M是由mstart創(chuàng)建,新的物理線程創(chuàng)建時(shí),調(diào)用的函數(shù)也是mstart。
2.startm,當(dāng)有新的G創(chuàng)建或者有G從waiting進(jìn)入running且還有空閑的P,此時(shí)會(huì)調(diào)用startm,獲取一個(gè)M和空閑的P綁定執(zhí)行G。
3.newm,當(dāng)調(diào)用startm時(shí),如果沒有空閑的M則會(huì)通過newm創(chuàng)建M。
4.stopm,在2種情況下會(huì)執(zhí)行stopm,一是當(dāng)M綁定的P無可運(yùn)行的G且無法從其它P竊取可運(yùn)行的G時(shí),M先進(jìn)入spinning狀態(tài),然后退出。二是當(dāng)M和G進(jìn)入系統(tǒng)調(diào)用后,長時(shí)間未退出,P被retake且M找不到空閑的P綁定,此時(shí)M會(huì)調(diào)用stopm。
5.spinning狀態(tài),在findrunnable函數(shù)中,會(huì)短暫進(jìn)入spinning狀態(tài),如果找不到可運(yùn)行的G則調(diào)用stopm。
PS:上述主要流程解釋了函數(shù)什么時(shí)候由誰觸發(fā)調(diào)用,后面不再贅述。
線程的休眠和喚醒
1.M綁定了一個(gè)物理線程,M的running和stop就代表了物理線程的狀態(tài),那么物理線程是如何休眠和喚醒的呢?下面以Linux操作系統(tǒng)為例介紹物理線程的變化。
2.Linux線程同步通過futex系統(tǒng)調(diào)用實(shí)現(xiàn),futex詳細(xì)介紹。
#include <linux/futex.h>
#include <sys/time.h>
// 主要關(guān)注前3個(gè)參數(shù),uaddr表示同步的內(nèi)存地址。
//futex_op表示操作類型,這里使用了FUTEX_WAIT,F(xiàn)UTEX_WAKE這2種類型。
//val在FUTEX_WAIT時(shí)表示當(dāng)uaddr指向的值等于val時(shí)休眠
//val在FUTEX_WAKE時(shí)表示喚醒休眠在uaddr上的線程數(shù)量(Go中默認(rèn)是1)
int futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, /* or: uint32_t val2 */
int *uaddr2, int val3);
3.線程休眠
asmcgoyield是執(zhí)行cgo_yield函數(shù),具體的不深究。
func notesleep(n *note) {
gp := getg() // 線程休眠只發(fā)生在退出系統(tǒng)調(diào)用或者schedule函數(shù) 因此必然是g0
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
ns := int64(-1) // 通常休眠后不會(huì)主動(dòng)喚醒
if *cgo_yield != nil {
// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
ns = 10e6 // 在cgo情況下 每休眠10ms喚醒一次
}
for atomic.Load(key32(&n.key)) == 0 { // 當(dāng)note.key==0時(shí) 休眠
gp.m.blocked = true
futexsleep(key32(&n.key), 0, ns) // 調(diào)用futexsleep進(jìn)入休眠
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil) // 如果是cgo調(diào)用asmcgocall
}
gp.m.blocked = false
}
}
func futexsleep(addr *uint32, val uint32, ns int64) {
if ns < 0 { // 如果ns < 0 則一直休眠不主動(dòng)喚醒
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns) // 設(shè)置休眠時(shí)間
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
4.線程喚醒
func notewakeup(n *note) {
old := atomic.Xchg(key32(&n.key), 1) // 將note.key設(shè)置為1 note.key休眠時(shí)為0
if old != 0 {
print("notewakeup - double wakeup (", old, ")\n")
throw("notewakeup - double wakeup")
}
futexwakeup(key32(&n.key), 1) // 嘗試喚醒note.key
}
func futexwakeup(addr *uint32, cnt uint32) {
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// 正常情況下不會(huì)執(zhí)行到下面的代碼
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
5.總結(jié) M的休眠和喚醒都是通過m.note.key進(jìn)行同步,對(duì)M的休眠和喚醒操作都是操作m.note.key所在的內(nèi)存。
mstart
此時(shí)初始化的M為m0,是Go進(jìn)程的第一個(gè)M。
func mstart() {
_g_ := getg()
osStack := _g_.stack.lo == 0
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// Initialize stack guard so that we can start calling regular
// Go code.
_g_.stackguard0 = _g_.stack.lo + _StackGuard
// This is the g0, so we can also call go:systemstack
// functions, which check stackguard1.
_g_.stackguard1 = _g_.stackguard0
mstart1()
// Exit this thread.
switch GOOS {
case "windows", "solaris", "illumos", "plan9", "darwin", "aix":
// Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
// the stack, but put it in _g_.stack before mstart,
// so the logic above hasn't set osStack yet.
osStack = true
}
mexit(osStack)
}
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此處的pc和sp是mstart調(diào)用mstart1時(shí)的pc和sp
asminit() // 針對(duì)不同的CPU進(jìn)行初始化 忽略
minit() // 主要是將阻塞的信號(hào)屏蔽 阻塞該信號(hào)
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 {
mstartm0() // 初始化信號(hào)處理函數(shù)
}
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
func minit() {
// The alternate signal stack is buggy on arm and arm64.
// The signal handler handles it directly.
if GOARCH != "arm" && GOARCH != "arm64" {
minitSignalStack() // 信號(hào)回調(diào)棧 信號(hào)處理函數(shù)使用的棧
}
minitSignalMask() // 初始化信號(hào)處理 不深究
getg().m.procid = uint64(pthread_self())
}
func mstartm0() {
// Create an extra M for callbacks on threads not created by Go.
// An extra M is also needed on Windows for callbacks created by
// syscall.NewCallback. See issue #6751 for details.
if (iscgo || GOOS == "windows") && !cgoHasExtraM {
cgoHasExtraM = true
newextram()
}
initsig(false) // 注冊(cè)信號(hào)處理函數(shù) 不遞歸深究
}
startm
尋找空閑的M和P,將P綁定到M中的m.nextp,并且嘗試通過m.note喚醒M。當(dāng)M喚醒后,和m.nextp指定的P綁定。
func startm(_p_ *p, spinning bool) {
lock(&sched.lock) // 對(duì)sched上鎖
if _p_ == nil {
_p_ = pidleget() // 獲取空閑的P
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
mp := mget() // 獲取空閑的M
unlock(&sched.lock) // 釋放sched中的鎖
if mp == nil { // 如果沒有空閑m 則新建一個(gè)空閑的m
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
// 異常情況拋出異常
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_) // 設(shè)置M的nextp 當(dāng)M喚醒后會(huì)和m.nextp中的P綁定
// 通過m.note喚醒M
notewakeup(&mp.park)
}
stopm
調(diào)用stopm時(shí),P和M已經(jīng)解綁,此時(shí)將M投入全局的空閑隊(duì)列并且伴隨物理線程一起休眠。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // m投入全局的空閑m列表中
unlock(&sched.lock)
// 線程m將停在 notesleep 中
notesleep(&_g_.m.park)
noteclear(&_g_.m.park) // 休眠時(shí)m.note.key == 0 當(dāng)m.note.key != 0時(shí)退出休眠 此時(shí)回復(fù)m.note.key = 0
acquirep(_g_.m.nextp.ptr()) // 和m.nextp進(jìn)行綁定
_g_.m.nextp = 0 // m.nextp設(shè)置為0
}
newm
在源碼剖析前先分析newm要做什么。
1.創(chuàng)建M對(duì)應(yīng)的結(jié)構(gòu)體。
2.創(chuàng)建和M綁定的g0。
3.創(chuàng)建物理線程進(jìn)入休眠,并且M和物理線程綁定。
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) // 根據(jù)P和fn創(chuàng)建M
mp.nextp.set(_p_) // 設(shè)置M的nextp
mp.sigmask = initSigmask
// 刪除部分代碼 go 調(diào)用C 然后調(diào)用 go? 暫不考慮
newm1(mp)
}
func allocm(_p_ *p, fn func()) *m {
_g_ := getg()
acquirem() // disable GC because it can be called from sysmon
if _g_.m.p == 0 { // 如果P空閑則嘗試獲取P
acquirep(_p_) // temporarily borrow p for mallocs in this function
}
// Release the free M list. We need to do this somewhere and
// this may free up a stack we can use.
//刪掉部分代碼 釋放freem 不太理解為什么要在這里做
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp) // m初始化 不深究
// In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
// 初始化g0 注意 有cgo的情況下 g0不分配棧 而是使用物理線程的棧 為什么呢?
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
mp.g0.m = mp
if _p_ == _g_.m.p.ptr() {
releasep()
}
releasem(_g_.m)
return mp
}
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
// round2把數(shù)值向上調(diào)整為2的冪次
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
// 主要是棧大小的調(diào)整和棧內(nèi)存的實(shí)際分配
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
// Clear the bottom word of the stack. We record g
// there on gsignal stack during VDSO on ARM and ARM64.
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}
func newm1(mp *m) {
if iscgo { // 如果是cgo
// cgo情況下 會(huì)將M作為參數(shù)傳入并且最終調(diào)用mstart函數(shù) 在mstart中具體分析
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(funcPC(mstart)) //
if msanenabled {
msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
execLock.rlock() // Prevent process clone.
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock() // Prevent process clone.
newosproc(mp) // 創(chuàng)建物理線程
execLock.runlock()
}
// startm 部分節(jié)選
func mstart() {
osStack := _g_.stack.lo == 0 // cgo時(shí) 未初始化g0的棧 使用os的棧
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
// g0的棧綁定到物理線程的棧上
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
mstart1()
}
func mstart1() {
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
// 創(chuàng)建物理線程來執(zhí)行mstart
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的棧作為線程的棧
/*
* note: strace gets confused if we use CLONE_PTRACE here.
*/
if false {
print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")
}
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// 通過clone創(chuàng)建線程 g0.stack作為棧 mstart作為啟動(dòng)函數(shù)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
if ret < 0 {
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
if ret == -_EAGAIN {
println("runtime: may need to increase max user processes (ulimit -u)")
}
throw("newosproc")
}
}
總結(jié):
1.g0的棧和物理線程使用的棧是統(tǒng)一的。
2.cgo情況下,使用物理線程分配的棧,原因是cgo調(diào)用的C的庫,C代碼都是運(yùn)行在物理線程上,如果不使用物理線程大小的棧,cgo代碼可能在其它語言調(diào)用時(shí)是正常的,而在go中調(diào)用失敗,棧溢出。
3.newm創(chuàng)建的M已經(jīng)在執(zhí)行schedule函數(shù)了,不需要再度喚醒。
findrunnable--spinning
spinning主要是
1.GC,網(wǎng)絡(luò),timer相關(guān)的處理。
2.嘗試從全局的和其它的P中竊取可運(yùn)行的G。
3.找不到可運(yùn)行的G則stopm。
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// local runq 本地的可運(yùn)行G隊(duì)列
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq 全局的可運(yùn)行G隊(duì)列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Steal work from other P's. 從其它P中竊取部分G
procs := uint32(gomaxprocs)
ranTimer := false
// 運(yùn)行中的P有一半是在spinning 則直接stop
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ { // 總共找4次 每次隨機(jī)一個(gè)起始的P進(jìn)行偷取
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle(delta) { // 進(jìn)入stop前回調(diào)
// At least one goroutine got woken.
goto top
}
// 再次檢查全局的G運(yùn)行隊(duì)列
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again 再次嘗試從其它P中偷取
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
stopm()
goto top
}
總結(jié)
1.M包含物理線程運(yùn)行所需要的數(shù)據(jù),P包含調(diào)度G所需要的數(shù)據(jù)。