1. 原理
hchan
通過(guò)var聲明或者make函數(shù)創(chuàng)建的channel變量是一個(gè)存儲(chǔ)在函數(shù)棧幀上的指針,占用8個(gè)字節(jié),指向堆上的hchan結(jié)構(gòu)體
源碼包中src/runtime/chan.go定義了hchan的數(shù)據(jù)結(jié)構(gòu)如下:

type hchan struct {
qcount uint // total data in the queue 循環(huán)數(shù)組中的元素?cái)?shù)量
dataqsiz uint // size of the circular queue 循環(huán)數(shù)組的長(zhǎng)度
//channel分為無(wú)緩沖和有緩沖兩種。
// 對(duì)于有緩沖的channel存儲(chǔ)數(shù)據(jù),使用了 ring buffer(環(huán)形緩沖區(qū)) 來(lái)緩存寫(xiě)入的數(shù)據(jù),本質(zhì)是循環(huán)數(shù)組
// 為啥是循環(huán)數(shù)組?普通數(shù)組不行嗎,普通數(shù)組容量固定更適合指定的空間,彈出元素時(shí),普通數(shù)組需要全部都前移
// 當(dāng)下標(biāo)超過(guò)數(shù)組容量后會(huì)回到第一個(gè)位置,所以需要有兩個(gè)字段記錄當(dāng)前讀和寫(xiě)的下標(biāo)位置
buf unsafe.Pointer // points to an array of dataqsiz elements 指向底層循環(huán)數(shù)組的指針(環(huán)形緩沖區(qū))
elemsize uint16 //元素的大小
closed uint32 //channel是否關(guān)閉的標(biāo)志
elemtype *_type // element type channel中的元素類(lèi)型
sendx uint // send index // 下一次寫(xiě)下標(biāo)的位置
recvx uint // receive index // 下一次讀下標(biāo)的位置
recvq waitq // list of recv waiters // 讀等待隊(duì)列
sendq waitq // list of send waiters // 寫(xiě)等待隊(duì)列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥鎖,保證讀寫(xiě)channel時(shí)不存在并發(fā)競(jìng)爭(zhēng)問(wèn)題
}
hchan結(jié)構(gòu)體的主要組成部分有四個(gè):
用來(lái)保存goroutine之間傳遞數(shù)據(jù)的循環(huán)數(shù)組:buf
用來(lái)記錄此循環(huán)數(shù)組當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值:sendx和recvx
用于保存向該chan發(fā)送和從該chan接收數(shù)據(jù)被阻塞的goroutine隊(duì)列: sendq 和 recvq
保證channel寫(xiě)入和讀取數(shù)據(jù)時(shí)線程安全的鎖:lock
環(huán)形數(shù)組
環(huán)形數(shù)組作為channel 的緩沖區(qū) 數(shù)組的長(zhǎng)度就是定義channnel 時(shí)channel 的緩沖大小

等待隊(duì)列 waitq
在hchan 中包括了讀/寫(xiě) 等待隊(duì)列, waitq是一個(gè)雙向隊(duì)列,包括了一個(gè)頭結(jié)點(diǎn)和尾節(jié)點(diǎn)。 每個(gè)節(jié)點(diǎn)是一個(gè)sudog結(jié)構(gòu)體變量
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
操作
-
創(chuàng)建
使用 make(chan T, cap) 來(lái)創(chuàng)建 channel,channel 可分為帶緩沖和不帶緩沖的, cap 就是緩沖區(qū)的大小// 帶緩沖,緩沖大小為3 ch := make(chan int, 3) // 不帶緩沖 ch := make(chan int)make 語(yǔ)法會(huì)在編譯時(shí),轉(zhuǎn)換為 makechan64 和 makechan
// 源碼 func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) } return makechan(t, int(size)) } func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }創(chuàng)建時(shí)的會(huì)做一些檢查:
- 元素大小不能超過(guò) 64K
- 元素的對(duì)齊大小不能超過(guò) maxAlign 也就是 8 字節(jié)
- 計(jì)算出來(lái)的內(nèi)存是否超過(guò)限制
創(chuàng)建時(shí)的策略:
- 如果是無(wú)緩沖的 channel,會(huì)直接給 hchan 分配內(nèi)存
- 如果是有緩沖的 channel,并且元素不包含指針,那么會(huì)為 hchan 和底層數(shù)組分配一段連續(xù)的地址
- 如果是有緩沖的 channel,并且元素包含指針,那么會(huì)為 hchan 和底層數(shù)組分別分配地址
-
發(fā)送
發(fā)送操作,編譯時(shí)轉(zhuǎn)換為runtime.chansend函數(shù)func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool通過(guò)block 參數(shù),確認(rèn)是否是阻塞式發(fā)送
阻塞式:調(diào)用chansend 函數(shù)時(shí) block=truech<- 1非阻塞式:調(diào)用chansend 函數(shù)時(shí) block=false
select { case ch <- 10: ... default }向 channel 中發(fā)送數(shù)據(jù)時(shí)大概分為兩大塊:檢查和數(shù)據(jù)發(fā)送,數(shù)據(jù)發(fā)送流程如下:
- 如果 channel 的讀等待隊(duì)列存在接收者goroutine
將數(shù)據(jù)直接發(fā)送給第一個(gè)等待的 goroutine, 喚醒接收的 goroutine - 如果 channel 的讀等待隊(duì)列不存在接收者goroutine
a. 如果循環(huán)數(shù)組buf未滿,那么將會(huì)把數(shù)據(jù)發(fā)送到循環(huán)數(shù)組buf的隊(duì)尾
b. 如果循環(huán)數(shù)組buf已滿,這個(gè)時(shí)候就會(huì)走阻塞發(fā)送的流程,將當(dāng)前 goroutine 加入寫(xiě)等待隊(duì)列,并掛起等待喚醒
- 如果 channel 的讀等待隊(duì)列存在接收者goroutine
-
接收
接收操作,編譯時(shí)轉(zhuǎn)換為runtime.chanrecv函數(shù)func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)阻塞式:
調(diào)用chanrecv函數(shù),并且block=true
<ch v := <ch v, ok := <ch // 當(dāng)channel關(guān)閉時(shí),for循環(huán)會(huì)自動(dòng)退出,無(wú)需主動(dòng)監(jiān)測(cè)channel是否關(guān)閉,可以防止讀取已經(jīng)關(guān)閉的 channel,造成讀到數(shù)據(jù)為通道所存儲(chǔ)的數(shù)據(jù)類(lèi)型的零值 for i := range ch { fmt.Println(i) }非阻塞式:
調(diào)用chanrecv函數(shù),并且block=false
select { case <-ch: ... default }向 channel 中接收數(shù)據(jù)時(shí)大概分為兩大塊,檢查和數(shù)據(jù)發(fā)送,而數(shù)據(jù)接收流程如下:
- 如果 channel 的寫(xiě)等待隊(duì)列存在發(fā)送者goroutine
如果是無(wú)緩沖 channel,直接從第一個(gè)發(fā)送者goroutine那里把數(shù)據(jù)拷貝給接收變量,喚醒發(fā)送的 goroutine
如果是有緩沖 channel(已滿),將循環(huán)數(shù)組buf的隊(duì)首元素拷貝給接收變量,將第一個(gè)發(fā)送者goroutine的數(shù)據(jù)拷貝到 buf循環(huán)數(shù)組隊(duì)尾,喚醒發(fā)送的 goroutine - 如果 channel 的寫(xiě)等待隊(duì)列不存在發(fā)送者goroutine
如果循環(huán)數(shù)組buf非空,將循環(huán)數(shù)組buf的隊(duì)首元素拷貝給接收變量
如果循環(huán)數(shù)組buf為空,這個(gè)時(shí)候就會(huì)走阻塞接收的流程,將當(dāng)前 goroutine 加入讀等待隊(duì)列,并掛起等待喚醒
- 如果 channel 的寫(xiě)等待隊(duì)列存在發(fā)送者goroutine
-
關(guān)閉
關(guān)閉操作,調(diào)用close函數(shù),編譯時(shí)轉(zhuǎn)換為runtime.closechan函數(shù)
close(ch) func closechan(c *hchan)
2. 特點(diǎn)
channel有2種類(lèi)型:無(wú)緩沖、有緩沖, 在創(chuàng)建時(shí)make(chan type cap) 通過(guò)cap 設(shè)定緩沖大小
channel有3種模式:寫(xiě)操作模式(單向通道)、讀操作模式(單向通道)、讀寫(xiě)操作模式(雙向通道)
| 寫(xiě)操作模式 | 讀操作模式 | 讀寫(xiě)操作模式 | |
|---|---|---|---|
| 創(chuàng)建 | make(chan<- int) | make(<-chan int) | make(chan int) |
channel有3種狀態(tài):未初始化、正常、關(guān)閉
| 未初始化 | 關(guān)閉 | 正常 | |
|---|---|---|---|
| 關(guān)閉 | panic | panic | 正常關(guān)閉 |
| 發(fā)送 | 永遠(yuǎn)阻塞導(dǎo)致死鎖 | panic | 阻塞或者成功發(fā)送 |
| 接收 | 永遠(yuǎn)阻塞導(dǎo)致死鎖 | 緩沖區(qū)為空則為零值, 否則可以繼續(xù)讀 | 阻塞或者成功接收 |
如下幾種狀態(tài)會(huì)引發(fā)panic
1.關(guān)閉未初始化的channel 和已經(jīng)關(guān)閉的channel
- 向已經(jīng)關(guān)閉的channel 中發(fā)送數(shù)據(jù)
3. 線程安全
channel 是線程安全的,channel的底層實(shí)現(xiàn)中,hchan結(jié)構(gòu)體中采用Mutex鎖來(lái)保證數(shù)據(jù)讀寫(xiě)安全。在對(duì)循環(huán)數(shù)組buf中的數(shù)據(jù)進(jìn)行入隊(duì)和出隊(duì)操作時(shí),必須先獲取互斥鎖,才能操作channel數(shù)據(jù)