今天來講一下調(diào)度器,我本來寫了兩個版本,后面發(fā)現(xiàn)都好像不太好,其實核心差不太多,就是層次不夠清晰,然后在度娘上又啃了幾篇相關(guān)的文章,又進(jìn)行了綜合一下,文章末尾有引用的文章鏈接。不得不說,大佬們畫圖還是非常厲害的。其實突然在這個期間發(fā)現(xiàn)一些問題:就是markdown模式下的話容易讓人看不清重點,最近在找替代簡書的地方,有推薦的可以留言推薦
(一)調(diào)度器的核心點:
- 1.復(fù)用線程
避免頻繁的創(chuàng)建銷毀線程,我們知道線程的啟停銷毀是很耗費性能的一件事情,我們就要reuse thread(線程復(fù)用) ,那么具體該怎么來處理。golang :”create threads when needed ;keep them around for reuse“,當(dāng)我們需要創(chuàng)建的時候就創(chuàng)建,然后保留它來復(fù)用,看過M結(jié)構(gòu)的人就知道M有個allM的字段就是保存所有的M的鏈表。 - 2.利用并行
設(shè)置GOMAXPROCS 來進(jìn)行設(shè)置go program的核心數(shù),程序的并行 - 3.stealing working
利用小偷算法,當(dāng)本地的隊列沒有g(shù)了,去別的地方偷一半的g進(jìn)行運行,保證任務(wù)的公平性 - 4.handoff
利用移交算法,當(dāng)本線程因為系統(tǒng)調(diào)用進(jìn)行阻塞的時候,線程釋放綁定的P,把P給其他的M執(zhí)行
值得一說的是:Go1.1之前只有G-M模型,沒有P,Dmitry Vyukov在Scalable Go Scheduler Design Doc提出該模型在并發(fā)伸縮性方面的問題,并通過加入P(Processors)來改進(jìn)該問題。
(二)重要結(jié)構(gòu)體
G:goroutine
每次go調(diào)用的時候,都會創(chuàng)建一個G對象,它包括棧、指令指針以及對于調(diào)用goroutines很重要的其它信息,比如阻塞它的任何channel,其主要數(shù)據(jù)結(jié)構(gòu)
// Go1.11版本默認(rèn)stack大小為2KB
_StackMin = 2048
// 創(chuàng)建一個g對象,然后放到g隊列
// 等待被執(zhí)行
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg()
_g_.m.locks++
siz := narg
siz = (siz + 7) &^ 7
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
// 初始化g stack大小
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
// 以下省略}
M:mechine
代表一個線程,每次創(chuàng)建一個M的時候,都會有一個底層線程創(chuàng)建;所有的G任務(wù),最終還是在M上執(zhí)行,其主要數(shù)據(jù)結(jié)構(gòu)
type m struct {
/*
1. 所有調(diào)用棧的Goroutine,這是一個比較特殊的Goroutine。
2. 普通的Goroutine棧是在Heap分配的可增長的stack,而g0的stack是M對應(yīng)的線程棧。
3. 所有調(diào)度相關(guān)代碼,會先切換到該Goroutine的棧再執(zhí)行。
*/
g0 *g
curg *g // M當(dāng)前綁定的結(jié)構(gòu)體G
// SP、PC寄存器用于現(xiàn)場保護(hù)和現(xiàn)場恢復(fù)
vdsoSP uintptr
vdsoPC uintptr
// 省略…}
P:Processor
代表一個處理器,每一個運行的M都必須綁定一個P,就像線程必須在么一個CPU核上執(zhí)行一樣,由P來調(diào)度G在M上的運行,P的個數(shù)就是GOMAXPROCS(最大256),啟動時固定的,一般不修改;M的個數(shù)和P的個數(shù)不一定一樣多(會有休眠的M或者不需要太多的M)(最大10000);每一個P保存著本地G任務(wù)隊列,也有一個全局G任務(wù)隊列。P的數(shù)據(jù)結(jié)構(gòu)
// 自定義設(shè)置GOMAXPROCS數(shù)量
func GOMAXPROCS(n int) int {
/*
1. GOMAXPROCS設(shè)置可執(zhí)行的CPU的最大數(shù)量,同時返回之前的設(shè)置。
2. 如果n < 1,則不更改當(dāng)前的值。
*/
ret := int(gomaxprocs)
stopTheWorld("GOMAXPROCS")
// startTheWorld啟動時,使用newprocs。
newprocs = int32(n)
startTheWorld()
return ret
}
// 默認(rèn)P被綁定到所有CPU核上
// P == cpu.cores
func getproccount() int32 {
const maxCPUs = 64 * 1024
var buf [maxCPUs / 8]byte
// 獲取CPU Core
r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])
n := int32(0)
for _, v := range buf[:r] {
for v != 0 {
n += int32(v & 1)
v >>= 1
}
}
if n == 0 {
n = 1
}
return n
}
// 一個進(jìn)程默認(rèn)被綁定在所有CPU核上,返回所有CPU core。
// 獲取進(jìn)程的CPU親和性掩碼系統(tǒng)調(diào)用
// rax 204 ; 系統(tǒng)調(diào)用碼
// system_call sys_sched_getaffinity; 系統(tǒng)調(diào)用名稱
// rid pid ; 進(jìn)程號
// rsi unsigned int len
// rdx unsigned long *user_mask_ptr
sys_linux_amd64.s:
TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
MOVQ pid+0(FP), DI
MOVQ len+8(FP), SI
MOVQ buf+16(FP), DX
MOVL $SYS_sched_getaffinity, AX
SYSCALL
MOVL AX, ret+24(FP)
RET
(三)調(diào)度過程

我們通過 go func()來創(chuàng)建一個goroutine;g 的結(jié)構(gòu)是可復(fù)用的,對于可復(fù)用的g也是有l(wèi)ocal隊列和global隊列的,用:p.freeg 這個參數(shù),全局隊列就是sched.pfree,獲取參數(shù)都是差不多的,優(yōu)先從p.gfree中獲取,這一步是無鎖的,否者就從sched.pfree中獲取一部分過來這是有鎖的一個操作
有兩個存儲G的隊列,一個是局部調(diào)度器P的本地隊列、一個是全局G隊列。新創(chuàng)建的G會優(yōu)先嘗試放到p的runnext中,作為下一個執(zhí)行G,如果不行就得放到我們的本地隊列中,如果P的本地隊列已經(jīng)滿了就會保存在全局的隊列中;
G只能運行在M中,一個M必須持有一個P,M與P是1:1的關(guān)系。M會從P的本地隊列彈出一個可執(zhí)行狀態(tài)的G來
執(zhí)行,如果P的本地隊列為空,就會想其他的MP組合偷取一個可執(zhí)行的G來執(zhí)行;一個M調(diào)度G執(zhí)行的過程是一個循環(huán)機制;
當(dāng)M執(zhí)行某一個G時候如果發(fā)生了syscall或則其余阻塞操作,M會阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會把
這個線程M從P中摘除(detach),然后再創(chuàng)建一個新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來
服務(wù)于這個P;當(dāng)M系統(tǒng)調(diào)用結(jié)束時候,這個G會嘗試獲取一個空閑的P執(zhí)行,并放入到這個P的本地隊列。如果獲取不到P,
那么這個線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個G會被放入全局隊列中
3.1G的幾種暫停方式
- gosched: 將當(dāng)前的G暫停,保存堆棧狀態(tài),以_GRunnable狀態(tài)放入Global隊列中,讓當(dāng)前M繼續(xù)執(zhí)行其它任務(wù)。無需對G進(jìn)行喚醒操作,因為總會有M從Global隊列取得并執(zhí)行該G。搶占調(diào)度即使用該方式
- 2.gopark: 與goched的最大區(qū)別在于gopark沒有將G放回執(zhí)行隊列,而是位于某個等待隊列中(如channel的waitq,此時G狀態(tài)為_Gwaitting),因此G必須被手動喚醒(通過goready),否則會丟失任務(wù)。應(yīng)用層阻塞通常使用這種方式。
- 3.notesleep: 既不讓出M,也不讓G和P重新調(diào)度,直接讓線程休眠直到被喚醒(notewakeup),該方式更快,通常用于gcMark,stopm這類自旋場景
- 4.notesleepg: 阻塞G和M,放飛P,P可以和其它M綁定繼續(xù)執(zhí)行,比如可能阻塞的系統(tǒng)調(diào)用會主動調(diào)用entersyscallblock,則會觸發(fā) notesleepg
- 5.goexit: 立即終止G任務(wù),不管其處于調(diào)用堆棧的哪個層次,在終止前,確保所有defer正確執(zhí)行。
(四)調(diào)度源碼
// go1.9.1 src/runtime/proc.go
// 省略了GC檢查等其它細(xì)節(jié),只保留了主要流程
// g: G結(jié)構(gòu)體定義
// sched: Global隊列
// 獲取一個待執(zhí)行的G
func findrunnable() (gp *g, inheritTime bool) {
// 獲取當(dāng)前的G對象
_g_ := getg()
top:
// 獲取當(dāng)前P對象
_p_ := _g_.m.p.ptr()
// 1. 嘗試從P的Local隊列中取得G 優(yōu)先_p_.runnext 然后再從Local隊列中取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 2. 嘗試從Global隊列中取得G
if sched.runqsize != 0 {
lock(&sched.lock)
// globrunqget從Global隊列中獲取G 并轉(zhuǎn)移一批G到_p_的Local隊列
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 3. 檢查netpoll任務(wù):檢測是否存在M阻塞
if netpollinited() && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
// netpoll返回的是G鏈表,將其它G放回Global隊列
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// 4. 嘗試從其它P竊取任務(wù)
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
// 隨機P的遍歷順序
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// runqsteal執(zhí)行實際的steal工作,從目標(biāo)P的Local隊列轉(zhuǎn)移一般的G過來
// stealRunNextG指是否steal目標(biāo)P的p.runnext G
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
...
}
4.1用戶態(tài)的阻塞
當(dāng)Goroutine因為Channel操作而阻塞(通過gopark)時,對應(yīng)的G會被放置到某個wait隊列(如channel的waitq),該G的狀態(tài)由_Gruning變?yōu)開Gwaitting,而M會跳過該G嘗試獲取并執(zhí)行下一個G。
當(dāng)阻塞的G被G2喚醒(通過goready)時(比如channel可讀/寫),G會嘗試加入G2所在P的runnext,然后再是P Local隊列和Global隊列。簡單解釋一下:當(dāng)G是chan<-的接收消息,被阻塞了,如果G2是Chan的寫消息,當(dāng)G阻塞,G2寫入了一個數(shù)據(jù),那么G就被G2喚醒了,G就被放到了G2的P的runnext,如果放成功了,就是跳過了排隊,然后執(zhí)行,如果失敗了就丟入local隊列
4.2系統(tǒng)調(diào)用阻塞:syscall
當(dāng)G被阻塞在某個系統(tǒng)調(diào)用上時,此時G會阻塞在_Gsyscall狀態(tài),M也處于block on syscall狀態(tài),此時仍然可被搶占調(diào)度: 執(zhí)行該G的M會與P解綁,而P則嘗試與其它idle的M綁定,繼續(xù)執(zhí)行其它G。如果沒有其它idle的M,但隊列中仍然有G需要執(zhí)行,則創(chuàng)建一個新的M。
當(dāng)系統(tǒng)調(diào)用完成后,G會重新嘗試獲取一個idle的P,并恢復(fù)執(zhí)行,如果沒有idle的P,G將加入到Global隊列。
系統(tǒng)調(diào)用能被調(diào)度的關(guān)鍵有兩點:
runtime/syscall包中,將系統(tǒng)調(diào)用分為SysCall和RawSysCall,前者和后者的區(qū)別是前者會在系統(tǒng)調(diào)用前后分別調(diào)用entersyscall和exitsyscall(位于src/runtime/proc.go),做一些現(xiàn)場保存和恢復(fù)操作,這樣才能使P安全地與M解綁,并在其它M上繼續(xù)執(zhí)行其它G。某些系統(tǒng)調(diào)用本身可以確定會長時間阻塞(比如鎖),會調(diào)用entersyscallblock在發(fā)起系統(tǒng)調(diào)用前直接讓P和M解綁(handoffp)。
4.3GMP的幾個狀態(tài)
- P的幾個狀態(tài):
(五)sysmon
sysmon是一個由runtime啟動的M,也叫監(jiān)控線程,它無需P也可以運行,它每20us~10ms喚醒一次,主要執(zhí)行:
釋放閑置超過5分鐘的span物理內(nèi)存;
如果超過2分鐘沒有垃圾回收,強制執(zhí)行;
將長時間未處理的netpoll結(jié)果添加到任務(wù)隊列;
向長時間運行的G任務(wù)發(fā)出搶占調(diào)度;
收回因syscall長時間阻塞的P;
入口在src/runtime/proc.go:sysmon函數(shù),它通過retake實現(xiàn)對syscall和長時間運行的G進(jìn)行調(diào)度:
func retake(now int64) uint32 {
n := 0
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// 如果當(dāng)前P Local隊列沒有其它G,當(dāng)前有其它P處于Idle狀態(tài),并且syscall執(zhí)行事件不超過10ms,則不用解綁當(dāng)前P(handoffp)
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// handoffp
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
} else if s == _Prunning {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
// 如果當(dāng)前G執(zhí)行時間超過10ms,則搶占(preemptone)
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 執(zhí)行搶占
preemptone(_p_)
}
}
return uint32(n)
}
搶占式調(diào)度
當(dāng)某個goroutine執(zhí)行超過10ms,sysmon會向其發(fā)起搶占調(diào)度請求,由于Go調(diào)度不像OS調(diào)度那樣有時間片的概念,因此實際搶占機制要弱很多: Go中的搶占實際上是為G設(shè)置搶占標(biāo)記(g.stackguard0),當(dāng)G調(diào)用某函數(shù)時(更確切說,在通過newstack分配函數(shù)棧時),被編譯器安插的指令會檢查這個標(biāo)記,并且將當(dāng)前G以runtime.Goched的方式暫停,并加入到全局隊列。源代碼如下:
// src/runtime/stack.go
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
func newstack(ctxt unsafe.Pointer) {
...
// gp為當(dāng)前G
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
...
// Act like goroutine called runtime.Gosched.
// G狀態(tài)由_Gwaiting變?yōu)?_Grunning 這是為了能以Gosched的方式暫停Go
casgstatus(gp, _Gwaiting, _Grunning)
gopreempt_m(gp) // never return
}
}
// 以goched的方式將G重新放入
func goschedImpl(gp *g) {
status := readgstatus(gp)
// 由Running變?yōu)镽unnable
casgstatus(gp, _Grunning, _Grunnable)
// 與M解除綁定
dropg()
lock(&sched.lock)
// 將G放入Global隊列
globrunqput(gp)
unlock(&sched.lock)
// 重新調(diào)度
schedule()
}
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
netpoll
前面的findrunnable,G的獲取除了p.runnext,p.runq和sched.runq外,還有一中G從netpoll中獲取,netpoll是Go針對網(wǎng)絡(luò)IO的一種優(yōu)化,本質(zhì)上為了避免網(wǎng)絡(luò)IO陷入系統(tǒng)調(diào)用之中,這樣使得即便G發(fā)起網(wǎng)絡(luò)I/O操作也不會導(dǎo)致M被阻塞(僅阻塞G),從而不會導(dǎo)致大量M被創(chuàng)建出來。