一、什么是CSP
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享內(nèi)存來通信,而要通過通信來實現(xiàn)內(nèi)存共享。
這就是 Go 的并發(fā)哲學(xué),它依賴 CSP 模型,基于 channel 實現(xiàn)。
CSP 經(jīng)常被認(rèn)為是 Go 在并發(fā)編程上成功的關(guān)鍵因素。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發(fā)表在 ACM 的一篇論文。論文里指出一門編程語言應(yīng)該重視 input 和 output 的原語,尤其是并發(fā)編程的代碼。
Go 一開始就把 CSP 的思想融入到語言的核心里,所以并發(fā)編程成為 Go 的一個獨特的優(yōu)勢,而且很容易理解。
Go 的并發(fā)原則非常優(yōu)秀,目標(biāo)就是簡單:盡量使用 channel;把 goroutine 當(dāng)作免費的資源,隨便用。
二、channel底層數(shù)據(jù)結(jié)構(gòu)
底層數(shù)據(jù)結(jié)構(gòu)源碼:
type hchan struct {
// chan 里元素數(shù)量
qcount uint
// chan 底層循環(huán)數(shù)組的長度
dataqsiz uint
// 指向底層循環(huán)數(shù)組的指針
// 只針對有緩沖的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被關(guān)閉的標(biāo)志
closed uint32
// chan 中元素類型
elemtype *_type // element type
//有緩沖channel內(nèi)的緩沖數(shù)組會被作為一個“環(huán)型”來使用。
//當(dāng)下標(biāo)超過數(shù)組容量后會回到第一個位置,所以需要有兩個字段記錄當(dāng)前讀和寫的下標(biāo)位置
sendx uint // 下一次發(fā)送數(shù)據(jù)的下標(biāo)位置
recvx uint // 下一次讀取數(shù)據(jù)的下標(biāo)位置
//當(dāng)循環(huán)數(shù)組中沒有數(shù)據(jù)時,收到了接收請求,那么接收數(shù)據(jù)的變量地址將會寫入讀等待隊列
//當(dāng)循環(huán)數(shù)組中數(shù)據(jù)已滿時,收到了發(fā)送請求,那么發(fā)送數(shù)據(jù)的變量地址將寫入寫等待隊列
recvq waitq // 讀等待隊列
sendq waitq // 寫等待隊列
// 保護(hù) hchan 中所有字段
lock mutex
}
waitq 是 sudog 的一個雙向鏈表,而 sudog 實際上是對 goroutine 的一個封裝:
type waitq struct {
first *sudog
last *sudog
}
例如,創(chuàng)建一個容量為 6 的,元素為 int 型的 channel 數(shù)據(jù)結(jié)構(gòu)如下 :

總結(jié)hchan結(jié)構(gòu)體的主要組成部分有四個:
- 用來保存goroutine之間傳遞數(shù)據(jù)的循環(huán)鏈表。=====> buf。
- 用來記錄此循環(huán)鏈表當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值。=====> sendx和recvx。
- 用于保存向該chan發(fā)送和從改chan接收數(shù)據(jù)的goroutine的隊列。=====> sendq 和 recvq
- 保證channel寫入和讀取數(shù)據(jù)時線程安全的鎖。 =====> lock
創(chuàng)建
我們知道,通道有兩個方向,發(fā)送和接收。理論上來說,我們可以創(chuàng)建一個只發(fā)送或只接收的通道,但是這種通道創(chuàng)建出來后,怎么使用呢?一個只能發(fā)的通道,怎么接收呢?同樣,一個只能收的通道,如何向其發(fā)送數(shù)據(jù)呢?
創(chuàng)建 chan 的函數(shù)是 makechan:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 省略了檢查 channel size,align 的代碼
// ……
var c *hchan
// 如果元素類型不含指針 或者 size 大小為 0(無緩沖類型)
// 只進(jìn)行一次內(nèi)存分配
if elem.kind&kindNoPointers != 0 || size == 0 {
// 如果 hchan 結(jié)構(gòu)體中不含指針,GC 就不會掃描 chan 中的元素
// 只分配 "hchan 結(jié)構(gòu)體大小 + 元素大小*個數(shù)" 的內(nèi)存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是緩沖型 channel 且元素大小不等于 0(大小等于 0的元素類型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非緩沖型的,buf 沒用,直接指向 chan 起始地址處
// 2. 緩沖型的,能進(jìn)入到這里,說明元素?zé)o指針且元素類型為 struct{},也無影響
// 因為只會用到接收和發(fā)送游標(biāo),不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內(nèi)容)
c.buf = unsafe.Pointer(c)
}
} else {
// 進(jìn)行兩次內(nèi)存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循環(huán)數(shù)組長度
c.dataqsiz = uint(size)
// 返回 hchan 指針
return c
}
從函數(shù)原型來看,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針。
新建一個 chan 后,內(nèi)存在堆上分配,大概長這樣:

三、向channel發(fā)送數(shù)據(jù)
源碼分析
發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù),直接上源碼,同樣大部分都注釋了,可以看懂主流程:
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,直接返回 false,表示未發(fā)送成功
if !block {
return false
}
// 當(dāng)前 goroutine 被掛起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相關(guān)……
// 對于不阻塞的 send,快速檢測失敗場景
//
// 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間。這可能是:
// 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
// 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 鎖住 channel,并發(fā)安全
lock(&c.lock)
// 如果 channel 關(guān)閉了
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 對于緩沖型的 channel,如果還有緩沖空間
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 將數(shù)據(jù)從 ep 處拷貝到 qp
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送游標(biāo)值加 1
c.sendx++
// 如果發(fā)送游標(biāo)值等于容量值,游標(biāo)值歸 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)的元素數(shù)量加一
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果不需要阻塞,則直接返回錯誤
if !block {
unlock(&c.lock)
return false
}
// channel 滿了,發(fā)送方會被阻塞。接下來會構(gòu)造一個 sudog
// 獲取當(dāng)前 goroutine 的指針
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊列
c.sendq.enqueue(mysg)
// 當(dāng)前 goroutine 被掛起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 從這里開始被喚醒了(channel 有機會可以發(fā)送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被喚醒后,channel 關(guān)閉了。坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上綁定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
上面的代碼注釋地比較詳細(xì)了,我們來詳細(xì)看看。
- 如果檢測到 channel 是空的,當(dāng)前 goroutine 會被掛起。
- 對于不阻塞的發(fā)送操作,如果 channel 未關(guān)閉并且沒有多余的緩沖空間(說明:a. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine;b. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素)
對于這一點,runtime 源碼里注釋了很多。這一條判斷語句是為了在不阻塞發(fā)送的場景下快速檢測到發(fā)送失敗,好快速返回。
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
- 如果檢測到 channel 已經(jīng)關(guān)閉,直接 panic。
- 如果能從等待接收隊列 recvq 里出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的,沒有元素,所以才會有等待接收者。這時會調(diào)用 send 函數(shù)將元素直接從發(fā)送者的棧拷貝到接收者的棧,關(guān)鍵操作由 sendDirect 函數(shù)完成。
// send 函數(shù)處理向一個空的 channel 發(fā)送操作
// ep 指向被發(fā)送的元素,會被直接拷貝到接收的 goroutine
// 之后,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待隊列里有 goroutine,肯定是空的)
// c 必須被上鎖,發(fā)送操作執(zhí)行完后,會使用 unlockf 函數(shù)解鎖
// sg 必須已經(jīng)從等待隊列里取出來了
// ep 必須是非空,并且它指向堆或調(diào)用者的棧
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 省略一些用不到的
// ……
// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
// 直接拷貝內(nèi)存(從發(fā)送者到接收者)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// sudog 上綁定的 goroutine
gp := sg.g
// 解鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒接收的 goroutine. skip 和打印棧相關(guān),暫時不理會
goready(gp, skip+1)
}
繼續(xù)看 sendDirect 函數(shù):
// 向一個非緩沖型的 channel 發(fā)送數(shù)據(jù)、從一個無元素的(非緩沖型或緩沖型但空)的 channel
// 接收數(shù)據(jù),都會導(dǎo)致一個 goroutine 直接操作另一個 goroutine 的棧
// 由于 GC 假設(shè)對棧的寫操作只能發(fā)生在 goroutine 正在運行中并且由當(dāng)前 goroutine 來寫
// 所以這里實際上違反了這個假設(shè)。可能會造成一些問題,所以需要用到寫屏障來規(guī)避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在當(dāng)前 goroutine 的棧上,dst 是另一個 goroutine 的棧
// 直接進(jìn)行內(nèi)存"搬遷"
// 如果目標(biāo)地址的棧發(fā)生了棧收縮,當(dāng)我們讀出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在讀和寫之前加上一個屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
這里涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作,一般而言,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設(shè)。為了不出問題,寫的過程中增加了寫屏障,保證正確地完成寫操作。這樣做的好處是減少了一次內(nèi)存 copy:不用先拷貝到 channel 的 buf,直接由發(fā)送者到接收者,沒有中間商賺差價,效率得以提高,完美。
然后,解鎖、喚醒接收者,等待調(diào)度器的光臨,接收者也得以重見天日,可以繼續(xù)執(zhí)行接收操作之后的代碼了。
- 如果 c.qcount < c.dataqsiz,說明緩沖區(qū)可用(肯定是緩沖型的 channel)。先通過函數(shù)取出待發(fā)送元素應(yīng)該去到的位置:
qp := chanbuf(c, c.sendx)
// 返回循環(huán)隊列里第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
c.sendx 指向下一個待發(fā)送元素在循環(huán)數(shù)組中的位置,然后調(diào)用 typedmemmove 函數(shù)將其拷貝到循環(huán)數(shù)組中。之后 c.sendx 加 1,元素總量加 1 :c.qcount++,最后,解鎖并返回。
- 如果沒有命中以上條件的,說明 channel 已經(jīng)滿了。不管這個 channel 是緩沖型的還是非緩沖型的,都要將這個 sender “關(guān)起來”(goroutine 被阻塞)。如果 block 為 false,直接解鎖,返回 false。
- 最后就是真的需要被阻塞的情況。先構(gòu)造一個 sudog,將其入隊(channel 的 sendq 字段)。然后調(diào)用 goparkunlock 將當(dāng)前 goroutine 掛起,并解鎖,等待合適的時機再喚醒。
喚醒之后,從 goparkunlock 下一行代碼開始繼續(xù)往下執(zhí)行。
這里有一些綁定操作,sudog 通過 g 字段綁定 goroutine,而 goroutine 通過 waiting 綁定 sudog,sudog 還通過 elem 字段綁定待發(fā)送元素的地址,以及 c 字段綁定被“坑”在此處的 channel。
所以,待發(fā)送的元素地址其實是存儲在 sudog 結(jié)構(gòu)體里,也就是當(dāng)前 goroutine 里。
四、從channel接收數(shù)據(jù)
接收操作有兩種寫法,一種帶 “ok”,反應(yīng) channel 是否關(guān)閉;一種不帶 “ok”,這種寫法,當(dāng)接收到相應(yīng)類型的零值時無法知道是真實的發(fā)送者發(fā)送過來的值,還是 channel 被關(guān)閉后,返回給接收者的默認(rèn)類型的零值。兩種寫法,都有各自的應(yīng)用場景。
經(jīng)過編譯器的處理后,這兩種寫法最后對應(yīng)源碼里的這兩個函數(shù):
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv1 函數(shù)處理不帶 “ok” 的情形,chanrecv2 則通過返回 “received” 這個字段來反應(yīng) channel 是否被關(guān)閉。接收值則比較特殊,會“放到”參數(shù) elem 所指向的地址了,這很像 C/C++ 里的寫法。如果代碼里忽略了接收值,這里的 elem 為 nil。
無論如何,最終轉(zhuǎn)向了 chanrecv 函數(shù):
// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài),將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址。返回 (true, true)
// 如果 ep 非空,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 內(nèi)容 …………
// 如果是一個 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否則,接收一個 nil 的 channel,goroutine 掛起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不會執(zhí)行到這里
throw("unreachable")
}
// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
// 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
// 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
// 2. 緩沖型,但 buf 里沒有元素
// 之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
// 因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候 channel 也是未關(guān)閉的,
// 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
// 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
// 也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
// buf 里有元素的情況下還能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解鎖
unlock(&c.lock)
if ep != nil {
// 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
// 那么接收的值將是一個該類型的零值
// typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
typedmemclr(c.elemtype, ep)
}
// 從一個已關(guān)閉的 channel 接收,selected 會返回true
return true, false
}
// 等待發(fā)送隊列里有 goroutine 存在,說明 buf 是滿的
// 這有可能是:
// 1. 非緩沖型的 channel
// 2. 緩沖型的 channel,但 buf 滿了
// 針對 1,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
// 針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩沖型,buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接從循環(huán)數(shù)組里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
typedmemclr(c.elemtype, qp)
// 接收游標(biāo)向前移動
c.recvx++
// 接收游標(biāo)歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 數(shù)組里的元素個數(shù)減 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
unlock(&c.lock)
return false, false
}
// 接下來就是要被阻塞的情況了
// 構(gòu)造一個 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收數(shù)據(jù)的地址保存下來
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 進(jìn)入channel 的等待接收隊列
c.recvq.enqueue(mysg)
// 將當(dāng)前 goroutine 掛起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
- 如果 channel 是一個空值(nil),在非阻塞模式下,會直接返回。在阻塞模式下,會調(diào)用 gopark 函數(shù)掛起 goroutine,這個會一直阻塞下去。因為在 channel 是 nil 的情況下,要想不阻塞,只有關(guān)閉它,但關(guān)閉一個 nil 的 channel 又會發(fā)生 panic,所以沒有機會被喚醒了。更詳細(xì)地可以在 closechan 函數(shù)的時候再看。
- 和發(fā)送函數(shù)一樣,接下來搞了一個在非阻塞模式下,不用獲取鎖,快速檢測到失敗并且返回的操作。順帶插一句,我們平時在寫代碼的時候,找到一些邊界條件,快速返回,能讓代碼邏輯更清晰,因為接下來的正常情況就比較少,更聚焦了,看代碼的人也更能專注地看核心代碼邏輯了。
// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
- 非緩沖型,等待發(fā)送列隊里沒有 goroutine 在等待
- 緩沖型,但 buf 里沒有元素
之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候, channel 也是未關(guān)閉的,因此在這種情況下可以直接宣布接收失敗,快速返回。因為沒被選中,也沒接收到數(shù)據(jù),所以返回值為 (false, false)。
接下來的操作,首先會上一把鎖,粒度比較大。如果 channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素。對應(yīng)非緩沖型關(guān)閉和緩沖型關(guān)閉但 buf 無元素的情況,返回對應(yīng)類型的零值,但 received 標(biāo)識是 false,告訴調(diào)用者此 channel 已關(guān)閉,你取出來的值并不是正常由發(fā)送者發(fā)送過來的數(shù)據(jù)。但是如果處于 select 語境下,這種情況是被選中了的。很多將 channel 用作通知信號的場景就是命中了這里。
接下來,如果有等待發(fā)送的隊列,說明 channel 已經(jīng)滿了,要么是非緩沖型的 channel,要么是緩沖型的 channel,但 buf 滿了。這兩種情況下都可以正常接收數(shù)據(jù)。
于是,調(diào)用 recv 函數(shù):
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非緩沖型的 channel
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 未忽略接收的數(shù)據(jù)
if ep != nil {
// 直接拷貝數(shù)據(jù),從 sender goroutine -> receiver goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// 緩沖型的 channel,但 buf 已滿。
// 將循環(huán)數(shù)組 buf 隊首的元素拷貝到接收數(shù)據(jù)的地址
// 將發(fā)送者的數(shù)據(jù)入隊。實際上這時 revx 和 sendx 值相等
// 找到接收游標(biāo)
qp := chanbuf(c, c.recvx)
// …………
// 將接收游標(biāo)處的數(shù)據(jù)拷貝給接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 將發(fā)送者數(shù)據(jù)拷貝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
// 更新游標(biāo)值
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
// 解鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒發(fā)送的 goroutine。需要等到調(diào)度器的光臨
goready(gp, skip+1)
}
如果是非緩沖型的,就直接從發(fā)送者的??截惖浇邮照叩臈!?/p>
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
否則,就是緩沖型 channel,而 buf 又滿了的情形。說明發(fā)送游標(biāo)和接收游標(biāo)重合了,因此需要先找到接收游標(biāo):
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
將該處的元素拷貝到接收地址。然后將發(fā)送者待發(fā)送的數(shù)據(jù)拷貝到接收游標(biāo)處。這樣就完成了接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作。接著,分別將發(fā)送游標(biāo)和接收游標(biāo)向前進(jìn)一,如果發(fā)生“環(huán)繞”,再從 0 開始。
最后,取出 sudog 里的 goroutine,調(diào)用 goready 將其狀態(tài)改成 “runnable”,待發(fā)送者被喚醒,等待調(diào)度器的調(diào)度。
然后,如果 channel 的 buf 里還有數(shù)據(jù),說明可以比較正常地接收。注意,這里,即使是在 channel 已經(jīng)關(guān)閉的情況下,也是可以走到這里的。這一步比較簡單,正常地將 buf 里接收游標(biāo)處的數(shù)據(jù)拷貝到接收數(shù)據(jù)的地址。
到了最后一步,走到這里來的情形是要阻塞的。當(dāng)然,如果 block 傳進(jìn)來的值是 false,那就不阻塞,直接返回就好了。
先構(gòu)造一個 sudog,接著就是保存各種值了。注意,這里會將接收數(shù)據(jù)的地址存儲到了 elem 字段,當(dāng)被喚醒時,接收到的數(shù)據(jù)就會保存到這個字段指向的地址。然后將 sudog 添加到 channel 的 recvq 隊列里。調(diào)用 goparkunlock 函數(shù)將 goroutine 掛起。
接下來的代碼就是 goroutine 被喚醒后的各種收尾工作了。
channel操作總結(jié):

注意:關(guān)閉已經(jīng)關(guān)閉的channel也會引發(fā)panic。
References:
https://cloud.tencent.com/developer/article/1750350
https://golang.design/go-questions/channel
https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html
https://juejin.cn/post/7037656471210819614