聲明
下面的分析均基于Golang1.14版本。
go func(){} 只是一個(gè)語法糖,在編譯時(shí)會替換為newproc函數(shù)。
一、創(chuàng)建---newproc
閱讀建議:g的創(chuàng)建涉及的數(shù)據(jù)結(jié)構(gòu)主要有g(shù), p的結(jié)構(gòu)體和全局?jǐn)?shù)據(jù)結(jié)構(gòu)allgs, sched,閱讀時(shí)對照這些數(shù)據(jù)結(jié)構(gòu)閱讀源碼。
newproc的調(diào)用過程:在newproc中切換到g0棧執(zhí)行newproc1.
//go:nosplit
func newproc(siz int32, fn *funcval) { // fn表示要運(yùn)行的函數(shù) siz表示要運(yùn)行的函數(shù)的總參數(shù)的大小
argp := add(unsafe.Pointer(&fn), sys.PtrSize) // argp表示fn參數(shù)存放的指針 該指針緊跟在fn后面
gp := getg() // gp 當(dāng)前正在運(yùn)行的g 非g0
pc := getcallerpc() // 獲取調(diào)用當(dāng)前函數(shù)的函數(shù)的pc值 即當(dāng)前函數(shù)返回后的下一個(gè)指令
systemstack(func() { // 切換到g0棧
newproc1(fn, argp, siz, gp, pc)
})
}
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg() // 當(dāng)前正在運(yùn)行的g 即g0
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var 為當(dāng)前m加鎖 避免preempt
siz := narg
siz = (siz + 7) &^ 7 // narg 表示總參數(shù)大小 siz為narg內(nèi)存對齊后的值
// We could allocate a larger initial stack if necessary.
// Not worth it: this is almost always an error.
// 4*sizeof(uintreg): extra space added below
// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
if siz >= _StackMin-4*sys.RegSize-sys.RegSize { // 判斷參數(shù)長度是否溢出 g的出事大小為_StackMin并且初始化時(shí)還需要部分內(nèi)存存儲其他參數(shù)
throw("newproc: function arguments too large for new goroutine")
}
_p_ := _g_.m.p.ptr()
// 獲取新的g 先嘗試從當(dāng)前p空閑列表獲取 如果獲取不到則創(chuàng)建 具體的不深究
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin) // 從空閑列表獲取g失敗 嘗試創(chuàng)建新的g 傳入的參數(shù)_StackMin表示g的棧大小
casgstatus(newg, _Gidle, _Gdead)
// 將g放入 allgs切片(allgs管理所有的g)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 { // 如果棧的高地址為0 說明g的棧內(nèi)存分配失敗 拋出異常
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead { // 判斷g的狀態(tài)
throw("newproc1: new g is not Gdead")
}
// 計(jì)算fn參數(shù)所需的空間大小 額外申請了一些空間 具體作用未知
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
// 對齊的原理 以8位對齊舉例 sys.SpAlign - 1表示對齊位后面的bit均為1 即0111 減去 得到的數(shù)&totalSize 表示將totalSize中小于SpAlign的部分減掉
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize // 根據(jù)棧的起始地址和傳入?yún)?shù)大小計(jì)算g棧的sp的值 注意棧增長是高地址向低地址增長
spArg := sp // 此時(shí) newg.stack.hi -> sp(spArg)
if usesLR { // 這部分代碼暫時(shí)看不懂 略過
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 {
// 將傳入的參數(shù)拷貝到g的棧中
memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) //將參數(shù)從argp拷貝到 spArg --> spArg + narg
// This is a stack-to-stack copy. If write barriers
// are enabled and the source stack is grey (the
// destination is always black), then perform a
// barrier copy. We do this *after* the memmove
// because the destination stack may have garbage on
// it.
// 如果正在GC copy stack會觸發(fā)寫屏障 具體的操作在GC中分析
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
// 初始化g的調(diào)度的上下文信息sched
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 設(shè)置g的 pc為goexit函數(shù)+1 +1的原因參考goexit實(shí)現(xiàn)
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 相當(dāng)于執(zhí)行一次gosave 相當(dāng)于是有個(gè)虛擬的函數(shù)f f先執(zhí)行fn再執(zhí)行g(shù)oexit 在執(zhí)行fn前保存上下文
gostartcallfn(&newg.sched, fn)
// 記錄調(diào)用newproc的 pc值 父g 和初始函數(shù)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels // 從父g中繼承l(wèi)abels labels作用?
}
// 根據(jù)函數(shù)名稱是否以runtime開頭判斷是否為系統(tǒng)函數(shù)
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
casgstatus(newg, _Gdead, _Grunnable)
// 如果當(dāng)前p的goid緩存用完 則再分配
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
runqput(_p_, newg, true) // 將g放入p的空閑列表中 true表示放入p中的next中 會在下一次調(diào)度中被執(zhí)行
// 如果當(dāng)前有p空閑 并且當(dāng)前沒有正在自旋的m(執(zhí)行findrunnable的m) 且maingoroutine已經(jīng)初始化完成
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
releasem(_g_.m) // 釋放m上的鎖
}
g的獲取途徑有2種,一種是從g的空閑列表中獲取(gfget函數(shù)),一種是重新分配(malg函數(shù))。
// Get from gfree list.
// If local list is empty, grab a batch from global list.
func gfget(_p_ *p) *g {
retry:
// 如果P緩存的g為空 且全局的g空閑列表不為空 則嘗試在全局的空閑列表中獲取g
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// Move a batch of free Gs to the P.
for _p_.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
gp := _p_.gFree.pop()
if gp == nil { // 如果獲取g失敗 則返回nil 在外面調(diào)用malg函數(shù)分配g
return nil
}
_p_.gFree.n--
// 如果分配的g中棧為空 則為其分配棧 (舊的g中的??赡芪瘁尫?
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
// round2把數(shù)值向上調(diào)整為2的冪次
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
// 主要是棧大小的調(diào)整和棧內(nèi)存的實(shí)際分配
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
// Clear the bottom word of the stack. We record g
// there on gsignal stack during VDSO on ARM and ARM64.
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}
將當(dāng)前的pc設(shè)置為goexit函數(shù)的pc值后,需要調(diào)用gostartcallfn保存一次當(dāng)前的上下文。
// adjust Gobuf as if it executed a call to fn
// and then did an immediate gosave.
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
// 獲取函數(shù)指針
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// adjust Gobuf as if it executed a call to fn with context ctxt
// and then did an immediate gosave.
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
if sys.RegSize > sys.PtrSize { // 這一段if沒看懂
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
sp -= sys.PtrSize // 存入goexit指令后 偏移sp
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 在這里將當(dāng)前pc存入棧中
buf.sp = sp
buf.pc = uintptr(fn) // 將pc設(shè)置為fn函數(shù)的入口
buf.ctxt = ctxt // ctxt表示當(dāng)前正在執(zhí)行的 funcval
}
初始化后的g處于runnable狀態(tài),通過runqput放入到隊(duì)列中運(yùn)行。
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
// 如果是 randomizeScheduler 狀態(tài)且50%概率隨機(jī) 即使next為true也設(shè)置為false
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
if next {
retryNext:
// 將p的next設(shè)置為新生成的g 如果當(dāng)前的next不為空 則嘗試放入p中緩存的隊(duì)列
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 如果p中可運(yùn)行g(shù)的緩存隊(duì)列未滿 則放入緩存隊(duì)列中
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 放入全局的可運(yùn)行的g的緩存隊(duì)列中
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
二、銷毀---goexit
// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT|NOFRAME|TOPFRAME,$0-0
MOVD R0, R0 // NOP 空操作符 占一個(gè)機(jī)器字節(jié)
BL runtime·goexit1(SB) // does not return
在newproc1中設(shè)置返回的pc值時(shí),newg.sched.pc = funcPC(goexit) + sys.PCQuantum。該pc值取的是goexit+一個(gè)指令大小,應(yīng)當(dāng)是BL runtime.goexit1(SB)指令,說明實(shí)際執(zhí)行g(shù)oexit1。
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
// 使用mcall調(diào)用goexit0 切換當(dāng)前的棧為g0棧
mcall(goexit0)
}
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
// g 狀態(tài)轉(zhuǎn)換
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
// 將引用的參數(shù)設(shè)置為0
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
// 如果正在GC且和GC相關(guān)的數(shù)據(jù)不為0
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
// 解除g->m m->g的引用
dropg()
// 一些特殊的狀態(tài)判斷
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
// 釋放g 將g放回空閑列表并且釋放g->stack指向的內(nèi)存
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
// 調(diào)度尋找下一個(gè)可執(zhí)行的g
schedule()
}
其中通過gfput將g放入空閑列表并且嘗試釋放g使用的棧。
// Put on gfree list.
// If local list is too long, transfer a batch to the global list.
func gfput(_p_ *p, gp *g) {
if readgstatus(gp) != _Gdead {
throw("gfput: bad status (not Gdead)")
}
stksize := gp.stack.hi - gp.stack.lo
// 由注釋可猜測 非標(biāo)準(zhǔn)的棧大小則釋放 棧在分配時(shí)malg傳入大小為StackMin(進(jìn)一步計(jì)算處理過)
//gfget時(shí)棧大小為_FixedStack 非標(biāo)準(zhǔn)的應(yīng)該是malg分配的棧或者擴(kuò)容后的棧
if stksize != _FixedStack {
// non-standard stack size - free it.
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
}
// 將g放入p的gFree鏈表中 如果p的鏈表數(shù)據(jù)達(dá)到64 則釋放32個(gè)到全局的sched空閑列表中
_p_.gFree.push(gp)
_p_.gFree.n++
if _p_.gFree.n >= 64 {
lock(&sched.gFree.lock)
for _p_.gFree.n >= 32 {
_p_.gFree.n--
gp = _p_.gFree.pop()
if gp.stack.lo == 0 {
sched.gFree.noStack.push(gp)
} else {
sched.gFree.stack.push(gp)
}
sched.gFree.n++
}
unlock(&sched.gFree.lock)
}
}
三、總結(jié)
1.TLS(thread local storage)的思想,從g的分配可看出,p充當(dāng)了m的數(shù)據(jù)的一級緩存角色,因?yàn)閜中的數(shù)據(jù)同一時(shí)刻只會被和其綁定的m訪問,因此可以無鎖使用。
2.goexit在初始化時(shí)寫入棧中的做法值得細(xì)細(xì)品味。