Goroutine的創(chuàng)建與銷毀

聲明

下面的分析均基于Golang1.14版本。
go func(){} 只是一個(gè)語法糖,在編譯時(shí)會替換為newproc函數(shù)。

一、創(chuàng)建---newproc

閱讀建議:g的創(chuàng)建涉及的數(shù)據(jù)結(jié)構(gòu)主要有g(shù), p的結(jié)構(gòu)體和全局?jǐn)?shù)據(jù)結(jié)構(gòu)allgs, sched,閱讀時(shí)對照這些數(shù)據(jù)結(jié)構(gòu)閱讀源碼。
newproc的調(diào)用過程:在newproc中切換到g0棧執(zhí)行newproc1.

//go:nosplit
func newproc(siz int32, fn *funcval) { // fn表示要運(yùn)行的函數(shù) siz表示要運(yùn)行的函數(shù)的總參數(shù)的大小
    argp := add(unsafe.Pointer(&fn), sys.PtrSize) // argp表示fn參數(shù)存放的指針 該指針緊跟在fn后面
    gp := getg()                                  // gp 當(dāng)前正在運(yùn)行的g 非g0
    pc := getcallerpc()                           // 獲取調(diào)用當(dāng)前函數(shù)的函數(shù)的pc值 即當(dāng)前函數(shù)返回后的下一個(gè)指令
    systemstack(func() {                          // 切換到g0棧
        newproc1(fn, argp, siz, gp, pc)
    })
}

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
    _g_ := getg() // 當(dāng)前正在運(yùn)行的g 即g0

    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    acquirem() // disable preemption because it can be holding p in a local var  為當(dāng)前m加鎖 避免preempt
    siz := narg
    siz = (siz + 7) &^ 7 // narg 表示總參數(shù)大小 siz為narg內(nèi)存對齊后的值

    // We could allocate a larger initial stack if necessary.
    // Not worth it: this is almost always an error.
    // 4*sizeof(uintreg): extra space added below
    // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize { // 判斷參數(shù)長度是否溢出 g的出事大小為_StackMin并且初始化時(shí)還需要部分內(nèi)存存儲其他參數(shù)
        throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    // 獲取新的g 先嘗試從當(dāng)前p空閑列表獲取 如果獲取不到則創(chuàng)建 具體的不深究
    newg := gfget(_p_)
    if newg == nil {
        newg = malg(_StackMin) // 從空閑列表獲取g失敗 嘗試創(chuàng)建新的g 傳入的參數(shù)_StackMin表示g的棧大小
        casgstatus(newg, _Gidle, _Gdead)
        // 將g放入 allgs切片(allgs管理所有的g)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    if newg.stack.hi == 0 { // 如果棧的高地址為0 說明g的棧內(nèi)存分配失敗 拋出異常
        throw("newproc1: newg missing stack")
    }

    if readgstatus(newg) != _Gdead { // 判斷g的狀態(tài)
        throw("newproc1: new g is not Gdead")
    }

    // 計(jì)算fn參數(shù)所需的空間大小 額外申請了一些空間 具體作用未知
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    // 對齊的原理 以8位對齊舉例 sys.SpAlign - 1表示對齊位后面的bit均為1 即0111 減去 得到的數(shù)&totalSize 表示將totalSize中小于SpAlign的部分減掉
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    sp := newg.stack.hi - totalSize // 根據(jù)棧的起始地址和傳入?yún)?shù)大小計(jì)算g棧的sp的值 注意棧增長是高地址向低地址增長
    spArg := sp // 此時(shí) newg.stack.hi -> sp(spArg)
    if usesLR { // 這部分代碼暫時(shí)看不懂 略過
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // 將傳入的參數(shù)拷貝到g的棧中
        memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) //將參數(shù)從argp拷貝到 spArg --> spArg + narg
        // This is a stack-to-stack copy. If write barriers
        // are enabled and the source stack is grey (the
        // destination is always black), then perform a
        // barrier copy. We do this *after* the memmove
        // because the destination stack may have garbage on
        // it.
        // 如果正在GC copy stack會觸發(fā)寫屏障 具體的操作在GC中分析
        if writeBarrier.needed && !_g_.m.curg.gcscandone {
            f := findfunc(fn.fn)
            stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
            if stkmap.nbit > 0 {
                // We're in the prologue, so it's always stack map index 0.
                bv := stackmapdata(stkmap, 0)
                bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
            }
        }
    }

    // 初始化g的調(diào)度的上下文信息sched
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 設(shè)置g的 pc為goexit函數(shù)+1 +1的原因參考goexit實(shí)現(xiàn)
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 相當(dāng)于執(zhí)行一次gosave 相當(dāng)于是有個(gè)虛擬的函數(shù)f f先執(zhí)行fn再執(zhí)行g(shù)oexit 在執(zhí)行fn前保存上下文
    gostartcallfn(&newg.sched, fn)
    // 記錄調(diào)用newproc的 pc值 父g 和初始函數(shù)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels // 從父g中繼承l(wèi)abels labels作用?
    }

    // 根據(jù)函數(shù)名稱是否以runtime開頭判斷是否為系統(tǒng)函數(shù)
    if isSystemGoroutine(newg, false) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    casgstatus(newg, _Gdead, _Grunnable)

    // 如果當(dāng)前p的goid緩存用完 則再分配
    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    if raceenabled {
        newg.racectx = racegostart(callerpc)
    }
    if trace.enabled {
        traceGoCreate(newg, newg.startpc)
    }
    runqput(_p_, newg, true) // 將g放入p的空閑列表中 true表示放入p中的next中 會在下一次調(diào)度中被執(zhí)行

    // 如果當(dāng)前有p空閑 并且當(dāng)前沒有正在自旋的m(執(zhí)行findrunnable的m)  且maingoroutine已經(jīng)初始化完成
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
    releasem(_g_.m) // 釋放m上的鎖
}

g的獲取途徑有2種,一種是從g的空閑列表中獲取(gfget函數(shù)),一種是重新分配(malg函數(shù))。

// Get from gfree list.
// If local list is empty, grab a batch from global list.
func gfget(_p_ *p) *g {
retry:
    // 如果P緩存的g為空 且全局的g空閑列表不為空 則嘗試在全局的空閑列表中獲取g
    if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
        lock(&sched.gFree.lock)
        // Move a batch of free Gs to the P.
        for _p_.gFree.n < 32 {
            // Prefer Gs with stacks.
            gp := sched.gFree.stack.pop()
            if gp == nil {
                gp = sched.gFree.noStack.pop()
                if gp == nil {
                    break
                }
            }
            sched.gFree.n--
            _p_.gFree.push(gp)
            _p_.gFree.n++
        }
        unlock(&sched.gFree.lock)
        goto retry
    }
    gp := _p_.gFree.pop()
    if gp == nil { // 如果獲取g失敗 則返回nil 在外面調(diào)用malg函數(shù)分配g
        return nil
    }
    _p_.gFree.n--
    // 如果分配的g中棧為空 則為其分配棧 (舊的g中的??赡芪瘁尫?
    if gp.stack.lo == 0 {
        // Stack was deallocated in gfput. Allocate a new one.
        systemstack(func() {
            gp.stack = stackalloc(_FixedStack)
        })
        gp.stackguard0 = gp.stack.lo + _StackGuard
    } else {
        if raceenabled {
            racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
        }
        if msanenabled {
            msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
        }
    }
    return gp
}

// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        // round2把數(shù)值向上調(diào)整為2的冪次
        stacksize = round2(_StackSystem + stacksize)
        systemstack(func() {
            // 主要是棧大小的調(diào)整和棧內(nèi)存的實(shí)際分配
            newg.stack = stackalloc(uint32(stacksize))
        })
        newg.stackguard0 = newg.stack.lo + _StackGuard
        newg.stackguard1 = ^uintptr(0)
        // Clear the bottom word of the stack. We record g
        // there on gsignal stack during VDSO on ARM and ARM64.
        *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
    }
    return newg
}

將當(dāng)前的pc設(shè)置為goexit函數(shù)的pc值后,需要調(diào)用gostartcallfn保存一次當(dāng)前的上下文。

// adjust Gobuf as if it executed a call to fn
// and then did an immediate gosave.
func gostartcallfn(gobuf *gobuf, fv *funcval) {
    var fn unsafe.Pointer
    // 獲取函數(shù)指針
    if fv != nil {
        fn = unsafe.Pointer(fv.fn)
    } else {
        fn = unsafe.Pointer(funcPC(nilfunc))
    }
    gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// adjust Gobuf as if it executed a call to fn with context ctxt
// and then did an immediate gosave.
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
    sp := buf.sp
    if sys.RegSize > sys.PtrSize {  // 這一段if沒看懂
        sp -= sys.PtrSize
        *(*uintptr)(unsafe.Pointer(sp)) = 0
    }
    sp -= sys.PtrSize // 存入goexit指令后 偏移sp
    *(*uintptr)(unsafe.Pointer(sp)) = buf.pc  // 在這里將當(dāng)前pc存入棧中
    buf.sp = sp
    buf.pc = uintptr(fn) // 將pc設(shè)置為fn函數(shù)的入口
    buf.ctxt = ctxt // ctxt表示當(dāng)前正在執(zhí)行的 funcval
}

初始化后的g處于runnable狀態(tài),通過runqput放入到隊(duì)列中運(yùn)行。

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
    // 如果是 randomizeScheduler 狀態(tài)且50%概率隨機(jī) 即使next為true也設(shè)置為false
    if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
    }

    if next {
    retryNext:
        // 將p的next設(shè)置為新生成的g 如果當(dāng)前的next不為空 則嘗試放入p中緩存的隊(duì)列
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 如果p中可運(yùn)行g(shù)的緩存隊(duì)列未滿 則放入緩存隊(duì)列中
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 放入全局的可運(yùn)行的g的緩存隊(duì)列中
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

二、銷毀---goexit

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT|NOFRAME|TOPFRAME,$0-0
    MOVD    R0, R0  // NOP 空操作符 占一個(gè)機(jī)器字節(jié)
    BL  runtime·goexit1(SB) // does not return

在newproc1中設(shè)置返回的pc值時(shí),newg.sched.pc = funcPC(goexit) + sys.PCQuantum。該pc值取的是goexit+一個(gè)指令大小,應(yīng)當(dāng)是BL runtime.goexit1(SB)指令,說明實(shí)際執(zhí)行g(shù)oexit1。

func goexit1() {
    if raceenabled {
        racegoend()
    }
    if trace.enabled {
        traceGoEnd()
    }

    // 使用mcall調(diào)用goexit0 切換當(dāng)前的棧為g0棧
    mcall(goexit0)
}

// goexit continuation on g0.
func goexit0(gp *g) {
    _g_ := getg()

    // g 狀態(tài)轉(zhuǎn)換
    casgstatus(gp, _Grunning, _Gdead)
    if isSystemGoroutine(gp, false) {
        atomic.Xadd(&sched.ngsys, -1)
    }
    // 將引用的參數(shù)設(shè)置為0
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.preemptStop = false
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    // 如果正在GC且和GC相關(guān)的數(shù)據(jù)不為0
    if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
        // Flush assist credit to the global pool. This gives
        // better information to pacing if the application is
        // rapidly creating an exiting goroutines.
        scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
        atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
        gp.gcAssistBytes = 0
    }

    // 解除g->m  m->g的引用
    dropg()

    // 一些特殊的狀態(tài)判斷
    if GOARCH == "wasm" { // no threads yet on wasm
        gfput(_g_.m.p.ptr(), gp)
        schedule() // never returns
    }

    if _g_.m.lockedInt != 0 {
        print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
        throw("internal lockOSThread error")
    }
    // 釋放g 將g放回空閑列表并且釋放g->stack指向的內(nèi)存
    gfput(_g_.m.p.ptr(), gp)
    if locked {
        // The goroutine may have locked this thread because
        // it put it in an unusual kernel state. Kill it
        // rather than returning it to the thread pool.

        // Return to mstart, which will release the P and exit
        // the thread.
        if GOOS != "plan9" { // See golang.org/issue/22227.
            gogo(&_g_.m.g0.sched)
        } else {
            // Clear lockedExt on plan9 since we may end up re-using
            // this thread.
            _g_.m.lockedExt = 0
        }
    }
    // 調(diào)度尋找下一個(gè)可執(zhí)行的g
    schedule()
}

其中通過gfput將g放入空閑列表并且嘗試釋放g使用的棧。

// Put on gfree list.
// If local list is too long, transfer a batch to the global list.
func gfput(_p_ *p, gp *g) {
    if readgstatus(gp) != _Gdead {
        throw("gfput: bad status (not Gdead)")
    }

    stksize := gp.stack.hi - gp.stack.lo

    // 由注釋可猜測 非標(biāo)準(zhǔn)的棧大小則釋放 棧在分配時(shí)malg傳入大小為StackMin(進(jìn)一步計(jì)算處理過)
    //gfget時(shí)棧大小為_FixedStack 非標(biāo)準(zhǔn)的應(yīng)該是malg分配的棧或者擴(kuò)容后的棧
    if stksize != _FixedStack {
        // non-standard stack size - free it.
        stackfree(gp.stack)
        gp.stack.lo = 0
        gp.stack.hi = 0
        gp.stackguard0 = 0
    }

    // 將g放入p的gFree鏈表中 如果p的鏈表數(shù)據(jù)達(dá)到64 則釋放32個(gè)到全局的sched空閑列表中
    _p_.gFree.push(gp)
    _p_.gFree.n++
    if _p_.gFree.n >= 64 {
        lock(&sched.gFree.lock)
        for _p_.gFree.n >= 32 {
            _p_.gFree.n--
            gp = _p_.gFree.pop()
            if gp.stack.lo == 0 {
                sched.gFree.noStack.push(gp)
            } else {
                sched.gFree.stack.push(gp)
            }
            sched.gFree.n++
        }
        unlock(&sched.gFree.lock)
    }
}

三、總結(jié)

1.TLS(thread local storage)的思想,從g的分配可看出,p充當(dāng)了m的數(shù)據(jù)的一級緩存角色,因?yàn)閜中的數(shù)據(jù)同一時(shí)刻只會被和其綁定的m訪問,因此可以無鎖使用。
2.goexit在初始化時(shí)寫入棧中的做法值得細(xì)細(xì)品味。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容