簡書前話:
由于簡書不支持 mermaid 流程圖,所以想看完整的版本,可以到我的個人博客 中查看
01.chan 的數(shù)據(jù)結(jié)構(gòu):
golang 中 chan 的源碼在 src/runtime/chan.go 文件中,hchan 則為 chan 的結(jié)構(gòu)體
hchan:
type hchan struct {
qcount uint // 當(dāng)前緩存數(shù)據(jù)的總量
dataqsiz uint // 緩存數(shù)據(jù)的容量
buf unsafe.Pointer // 緩存數(shù)據(jù),為一個循環(huán)數(shù)組,容量大小為 dataqsiz,當(dāng)前大小為 qcount
elemsize uint16 // 數(shù)據(jù)類型的大小,比如 int 為 4
closed uint32 // 標記是否關(guān)閉
elemtype *_type // 數(shù)據(jù)的類型
sendx uint // 發(fā)送隊列 sendq 的長度
recvx uint // 接收隊列 recvq 的長度
recvq waitq // 阻塞的接收 goroutine 的隊列
sendq waitq // 阻塞的發(fā)送 goroutine 的隊列
lock mutex // 鎖,用于并發(fā)控制隊列操作
}
waitq:
type waitq struct {
first *sudog
last *sudog
}
waitq 為雙向鏈表,sudog 代表一個封裝的 goroutine,其參數(shù) g 為 goroutine 實例結(jié),構(gòu)如下圖:

02. 新建 chan:
在 go 中,通過如下代碼創(chuàng)建 chan
c := make(chan int, 4)
以上代碼,對應(yīng)的是源碼:
func makechan(t *chantype, size int) *hchan
邏輯流程如下:
graph TD
A[makechan] -->|t, size| B{安全檢查}
B -->|N| ZR[ERROR]
B -->|Y| E{size 或 t.elem.size 是否為0?}
E -->|Y| F[mallocgc 默認大小 hchanSize 內(nèi)存]
E -->|N| G{數(shù)據(jù)類型是否為指針?}
G -->|Y| H[通過new單獨分配chan內(nèi)存]
G -->|N| I[mallocgc 內(nèi)存 hchanSize + mem]
H --> Z
F --> Z
I --> Z
Z[chan 賦值屬性]
Z --> ZB[END]
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 安全檢查,數(shù)據(jù)項大小不超過 16K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 獲取要分配的內(nèi)存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// size 為 0 的情況,分配 hchan 結(jié)構(gòu)體大小的內(nèi)存,64位系統(tǒng)為 96 Byte.
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 數(shù)據(jù)項不為指針類型,調(diào)用 mallocgc 一次性分配內(nèi)存大小,hchan 結(jié)構(gòu)體大小 + 數(shù)據(jù)總量大小
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 數(shù)據(jù)項為指針類型,hchan 和 buf 分開分配內(nèi)存,GC 中指針類型判斷 reachable and unreadchable.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// chan 賦值屬性, 數(shù)據(jù)項大小、數(shù)據(jù)項類型、緩存數(shù)據(jù)的容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
03.讀寫chan
在 go 中,寫入 chan 的代碼如下:
v := 1
c := make(chan int)
c <- v
讀取 chan 的代碼如下:
var v int
c := make(chan int)
c -> v
c <- v 操作對應(yīng)的源碼為 runtime 中的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
而 c -> v 操作對應(yīng)源碼為 runtime 中的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
其中 c 為 chansend 的 c, v 的地址為 chansend 的 ep.
邏輯流程如下:
graph TD
A[chansend 或 chanrecv] -->|hchan, ep| B{校驗}
B --> |Y| C[加鎖 lock]
B --> |N| D["gopark(), 阻塞當(dāng)前 goroutine 和 throw error"]
C --> E{chan close?}
E --> |Y| F[unlock和panic]
E --> |N| G[取出 recvq 或 sendq 隊列]
G --> H{是否等待的sudog?}
H --> |Y| I["send()或recv()"]
I --> J["goready(), 運行 sudog 的 goroutine"]
H --> |N| K{存在剩余緩沖區(qū)?}
K --> |Y| L["數(shù)據(jù)放入緩沖區(qū) buf, unlock"]
K --> |N| M["打包成sudog,加入sendq或recvq隊列"]
M --> O["gopark(),阻塞當(dāng)前goroutine等待被接受者喚醒"]
O -."如果被喚醒,說明數(shù)據(jù)已經(jīng)被接收,回收sudog".-> P["保存context,運行別的 goroutine"]
P --> Z
L --> Z
Z[End]
由于發(fā)送和接收的邏輯都是差不多的,所以這里就直接放上發(fā)送的邏輯代碼來分析就好了
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 校驗
if c == nil {
if !block {
return false
}
// 參數(shù)異常,block == true, 進行阻塞 goroutine.
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖,并發(fā)讀寫控制
lock(&c.lock)
// 查看 chan 是否關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 從等待接收列隊 recvq 中試圖獲取獲取封裝的 goroutine sudog.
if sg := c.recvq.dequeue(); sg != nil {
// 找到等待接收 chan 的 goroutine sudog,直接發(fā)送 value 給接收者,并通過 goready() 喚醒接受者 goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 查看查看緩存空間是否 buf 是否還有剩余
if c.qcount < c.dataqsiz {
// 將數(shù)據(jù)移動到 qp 中并放入 chan 緩存,sendx++
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// chan 如果為非阻塞,unlock 后直接返回
if !block {
unlock(&c.lock)
return false
}
// 將當(dāng)前 goroutine 封裝 sudog,并放入到等待發(fā)送隊列 sendq 中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 阻塞當(dāng)前 goroutine,等待被接受者 chanrecv() 的喚醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// KeepAlive 方法,由于 GC 的緣故,而調(diào)用
KeepAlive(ep)
// goroutine 被喚醒,重置 gorotuine 狀態(tài) 和 sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
代碼的部分詳解:
gopark:M(工作線程) 會保存 goroutine 的上下文,而調(diào)度器會讓當(dāng)前工作線程線程 M 綁定執(zhí)行其他的 goroutine.
KeepAlive(ep): 由于 GC 的機制,當(dāng) ep 不再被上下文引用的時候,GC 會主動回收 eq,導(dǎo)致 buf 被回收,所以調(diào)用 KeepAlive,告訴 GC 不需要對 eq 變量進行內(nèi)存回收,具體可以查看 runtime.SetFinalizer 方法部分有詳細介紹.
喚醒:goroutine 會在 chanrecv 這個 chan 接收接收函數(shù)中,從 hchan.sendq 被取出,執(zhí)行 goready(), 通知調(diào)度器去喚醒,然后放入 P(邏輯處理器) 的執(zhí)行等待隊列中,等待被下一次調(diào)用.
send()/recv(): 通過 memmove() 的方式從發(fā)送方拷貝 buf 到接收方.
下面我們通過一個使用channel做生產(chǎn)/消費的模型來試圖分解一下 chan 的步驟:
func main(){
//初始化任務(wù)隊列 channel
ch := make(chan Data, 4)
//生產(chǎn)者往channel丟數(shù)據(jù)
for _, task := range {
ch <- task
}
//初始化消費者
for i := 0; i< ConsumerNum; i++ {
go consumer(ch)
}
...
}
// 消費者
func consumer(ch chan Data){
for {
//收取任務(wù)并處理
data := <- ch
process(data)
}
}
從 main 函數(shù)開始,golang 就會開啟一個 goroutine 來執(zhí)行代碼,我們可以將其記作生產(chǎn)者 G'p, 代碼中 go consumer 標記 consumer 函數(shù)也開啟一個 goroutine 來進行,我們記其為消費者 G'c.
初始化任務(wù)隊列channel
此時會在堆區(qū)域分配一塊內(nèi)存,用于存儲 hchan 結(jié)構(gòu)體和 buf 的緩存數(shù)據(jù)。hchan.buf指向一個大小為4的數(shù)組,并且hchan.sendx、hchan.recvx置0,hchan.dataqsiz置4。生產(chǎn)者往channel丟數(shù)據(jù)
G'p 往 ch 發(fā)送數(shù)據(jù)的時候,會執(zhí)行 lock(&hchan.lock) 對 buf 加鎖,把要發(fā)送的數(shù)據(jù)拷貝到 buf 里,hchan.sendx++,之后 unlock(hchan.lock) 釋放鎖。消費者執(zhí)行消費行為
G'c 從 ch 中獲取數(shù)據(jù)的時候,會執(zhí)行 lock(&hchan.lock) 對 buf 加鎖,將 buf 里面的一條數(shù)據(jù)拷貝到接收變量 data 對應(yīng)的空間中,hchan.recvx++,之后釋放鎖。
0.4 關(guān)閉 chan
在 go 中,關(guān)閉 chan 的代碼如下:
ch := make(chan int ,10)
close(ch)
close(ch) 對應(yīng)的runtime的函數(shù):
func closechan(c *hchan)
邏輯流程如下:
graph TD
A[closechan] --c--> B{檢查}
B --> |Y| C[加鎖 lock]
B --> |N| D[panic]
C --> E{chan是否已close}
E --> |Y| D
E --> |N| F[置 hchan.close = 1]
F --> G[釋放recvq的所有等待接收者]
G --> H[釋放sendq的所有等待發(fā)送者]
H -.發(fā)送者會panic.-> D
H --> I[unlock]
I --> J[喚醒recvq和sendq的所有g(shù)oroutine]
J --> End
func closechan(c *hchan) {
// 檢查,chan 是否為空
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖,防止資源競爭
lock(&c.lock)
// chan 如果已關(guān)閉,則 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 置 hchan.close = 1, 標記已關(guān)閉
c.closed = 1
var glist gList
// 釋放recvq的所有等待接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放sendq的所有等待發(fā)送者
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 解鎖,unlock
unlock(&c.lock)
// 喚醒recvq和sendq的所有g(shù)oroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
close 的主要作用是用于喚醒所有監(jiān)測 chann 的 goroutine,但是要注意的是:
- 如果 sendq 的緩沖區(qū)還有發(fā)送者,這些發(fā)送者都會 panic
- 如果兩次 close chan,會導(dǎo)致 panic
0.5 關(guān)于 chan 的面試問題
chan 如何處理并發(fā)讀寫問題
hchan 結(jié)構(gòu)體中通過鎖lock mutex參數(shù)進行對公共緩存資源 buf 的控制達到并發(fā)讀寫的 race 問題.如果往 chan 發(fā)送數(shù)據(jù),size 滿了,或者往 chan 獲取數(shù)據(jù),buf 空。這會導(dǎo)致阻塞,此時runtime的行為是怎么樣的呢?
由于兩者邏輯一樣,我們就直接講往 chan 發(fā)送數(shù)據(jù),size 滿了的情況.
如果往 chan 發(fā)送數(shù)據(jù),size 滿了,此時 goroutine 和 buf 會被打包成 sudog,通過 gopark 將 goroutine 狀態(tài)置為等待, 同時把 sudog 放入 hchan.sendq 等待發(fā)送隊列中,等待接收者接收并調(diào)用 goready() 重新調(diào)度 goroutine. 此時 goroutine 被阻塞后,M(工作線程) 會與 goroutine 解綁,通過 P(邏輯處理器) 重新進行調(diào)度,M 與新的 goroutine 重新綁定執(zhí)行.
感悟
還是有一部分以目前的知識還是無法看懂,以后慢慢積累后再回來補坑,或大佬們可以幫我指出一下,謝謝.