channel是golang中特有的一種數據結構,通常與goroutine一起使用,下面我們就介紹一下這種數據結構。
channel數據結構
channel最重要的一個結構體就是hchan,我們創(chuàng)建一個channel的時候,實際上是創(chuàng)建了一個下面結構體的實例。
hchan結構體
// src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
字段說明
-
qcount當前channel中的元素數量 -
dataqsiz環(huán)形隊列的大小 -
buf指向dataqsize的數組指針,只有緩沖chan有效 -
closed當前channel關閉狀態(tài) -
elemsize存儲元素的大小 -
elemtype存儲元素的數據類型 -
sendx發(fā)送操作處理到的索引位置,最大值為數組buf的最大下標值 -
recvx接收操作處理到的索引位置,最大值為數組buf的最大下標值 -
recvq接收隊列,雙向鏈表,阻塞元素 -
sendq發(fā)送列隊,雙向鏈表,阻塞元素 -
lock鎖,,用來保護sudog里的所的字段

hchan struct
其中elemsize 和 elemtype 表示存儲數據的大小和類型;sendx和recvx是指向底層數據的索引位置,表示當前處理的進度位置;recvq和sendq 是一個由雙向鏈表實現的隊列,它存儲的內容是由于隊列dataqsize過小,而阻塞的數據。
每次進行發(fā)送數據和讀取數據時都需要加鎖。
waitq結構體
// src/runtime/chan.go
type waitq struct {
first *sudog
last *sudog
}
sudog結構體
// src/runtime/runtime2.go
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
這里 sudog 實際上是對 goroutine 的一個封裝,一個sudog 就是一個goroutine,如在channal上發(fā)送和接收。
sudogs 是通過一個特殊的池來分配的,通過 acquireSudog() 和releaseSudog()進行獲取和釋放。
sudog里的字段是由 hchan.lock 鎖來進行保護。
channel 整體結構圖

<figcaption>hchan 結構圖(來源:互聯網技術窩)</figcaption>
創(chuàng)建
// 無緩沖通道
ch1 := make(chan int)
// 有緩沖通道
ch2 := make(chan int, 10)
通過編譯可以發(fā)現channel的創(chuàng)建是由[makechan()](https://github.com/golang/go/blob/go1.15.6/src/runtime/chan.go#L71-L118)函數來完成的。源碼
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
}
return c
}
函數返回的是一個指針類型,因此我們可以在函數中通過參數直接傳遞,不需要再轉為指針使傳遞。
步驟
- 數據合法性檢查,包括發(fā)送數據的類型和大小
- 根據不同場景分配內存,主要針對buf字段
a. 內存大小為0,注意這時c.buf 的值為c.raceaddr()
b. 元素不包含指針,一次性分配一段內存地址
c. 元素包含指針,分配內存 - 初始化其它字段
第一個參數 *chantype 結構定義
// src/runtime/type.go
type chantype struct {
typ _type
elem *_type
dir uintptr
}
實際上創(chuàng)建一個channel, 只是對一個hchan結構體進行了一些初始化操作,并返回其指針。因此我們在函數傳遞時,不需要傳遞指針,直接使用就可以了,因為它本身就是一個指針的類型。
注意:對于chan內存是在heap上分配的。
發(fā)送數據
對于channel的寫操作是由 chansend() 函數來實現的。
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
在chan為nil的情況下, 如果是非阻塞則直接返回,否則panic。
對于分送數據chan有三種情況,分別是直接發(fā)送,緩存區(qū)發(fā)送 和 阻塞發(fā)送,其中阻塞發(fā)送涉及到GMP 的調度,理解起來有些吃力。
在發(fā)送數據前需要進行加鎖操作,發(fā)送完再解鎖,保證原子性操作。
直接發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 直接發(fā)送
// 如果接收隊列中有接收者,則直接將數據發(fā)給接收者,重點在send()函數,并在函數里進行解鎖
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
......
}
如果接收隊列中有接收者,則優(yōu)化從接收者從隊列中取出來sg(sg := c.recvq.dequeue()),然后再通過調用 send() 函數將數據發(fā)送給接收者即可。

<figcaption>channel send</figcaption>
在send()函數里,會執(zhí)行一個回調函數主要用來進行解鎖c.lock。真正的發(fā)送操作是函數 sendDirect(),通過memmove(dst, src, t.size) 將數據復制過去。
緩沖區(qū)發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 緩沖區(qū)發(fā)送
// 接收者隊列中沒有接收者goroutine
// 當前channel中的元素<隊列的大小,有緩沖buffer未滿的情況
// 將數據存放在sendx在buf數組中的索引位置,然后再將sendx索引+1
// 由于是一個循環(huán)數組,所以如果達到了dataqsize,則從0開始,同時個數+1
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
......
}
如果當前recvq 隊列里沒有處于等待執(zhí)行的sudog的話,則需要將數據發(fā)送到緩沖隊列中(如果當前隊列為緩沖chan)。
假設當前buffer大小為6(dataqsiz=6),數據個數為0(qcount=0),這里寫入6個數據,如下圖。

<figcaption>channel send</figcaption>
如果當前緩沖區(qū)的元素數量<隊列的大小,說明緩沖區(qū)還沒有滿,還可以繼續(xù)裝載數據。
這時第一步先計算出 s.sendx 索引位置的內存地址,然后調用 typememmove() 函數將qp復制到內存地址,再將s.sendx索引值+1,同時c.qcount++。
當 sendx = dataqsiz 的時候,說明已到了數組最后一個元素,下次存儲數據的話,則需要重新從0開始了,所以需要重置為0。
buf是一個由數組組成的隊列,滿足隊列的FIFO的機制,最新存儲的數據也先消費,最多可以存儲 dataqsiz 個數量。超出這個數據量就需要使用第三種 阻塞發(fā)送 方式了。
sendx 始終保存的是下次存儲數據的數組索引位置,每次使用完記得+1 。每次存儲以前都需要判斷當前buffer是否有空間可用 c.qcount < c.dataqsiz 。
總結
-
q.sendx最大值為c.dataqsiz -1,即數組的最大索引值。 -
q.count是當前chan 存儲的元素個數,有可能 >c.dataqsiz
阻塞發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 阻塞發(fā)送
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
......
}
如果當buff也寫滿的話,再send數據的話,則需要進行阻塞發(fā)送了。

假如我們有一個緩沖chan,但緩沖大小已經使用完,再次發(fā)送數據的話,則需要進入sendq隊列了(將sudog綁定到一個goroutine,并放在sendq,等待讀?。?/p>
對于阻塞的情況,理解起來有些吃力,因為涉及到GMP的關系和調度。
- 調用 getg() 函數獲取當前運行的goroutine
- 調用 acquireSudog() 函數獲取一個sudog,并進行數據綁定
- 將mysg 添加到發(fā)送隊列sendq,并設置為gp.waiting
- 更改goroutine狀態(tài)
- 設置goroutine為等待喚醒狀態(tài),調用 atomic.Store8(&gp.parkingOnChan, 1)函數?
- 通過keepAlive()函數可以保證發(fā)送的值一直有效,直到被接收者取走
- 進行清理工作
- 釋放 sudog 結構體
總結
- 阻塞發(fā)送并不會更新
c.qcount數量個數 - acquireSudog() 和 releaseSudog(mysg) 是配對一起使用。
讀取數據
對于channel的讀取方式:
v <- ch
v, ok <- ch
其中 v<-ch 對應的是 runtime.chanrecv1(), v, ok <-ch 對應的是`runtime.chanrecv2()。但這兩個函數最終調用的還是同一個函數,即 chanrecv()。
我們先看一下官方文檔對這個函數的說明
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
- chanrecv 用來從chan 中接收數據,并將接收的數據寫入到ep
- 如果ep為 nil 的話,則接收的數據將被忽略
- 如果非阻塞的且沒有可接收的數據將返回 (false ,false)
- 如果chan已關閉,零值 ep 和返回值將是true, false,否則使用一個元素代替ep并返回 (true, true)
- 一個非nil的 ep, 必須指向heap或者調用stack
// src/runtime/chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果c為nil,表示非法操作,則直接gopark(),表示出讓當前GMP中的P的使用權,允許其它G使用
if c == nil {
// 如果非阻塞的話,直接返回;如果是阻塞的話,直接panic
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
// 如果chan已關閉且元素個數為0
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 設置內存內容為類型 c.elemtype 的零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
如果當前讀取的 chan 為nil的話,且非阻塞的情況,則會產生死鎖,最終提示
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]:
否則返回零值。
同時出讓自己占用的P,允許其它goroutine搶占使用。
如果讀取的chan已關閉,則讀取出來的值為零值(函數說明第四條)。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 在沒有獲取鎖的情況下,檢查非阻塞操作失敗
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// 如果當前chan未關閉
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
這段代碼主要是對重排讀的情況,進行了雙重檢測,暫是未明白code中考慮的情況,改天再消化消化。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖,下面才是真正要讀取的邏輯
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
...
}
讀取之前先加鎖。
對chan的讀取與發(fā)送一樣,同樣有三種方式,為直接讀取、緩沖區(qū)讀取和阻塞讀取。
直接讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 直接讀取
// 從c.sendq隊列中取sudog, 將數據復制到sg
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
獲取一個待發(fā)送者,如果buffer大小為0,則直接從發(fā)送者接收數據。否則從隊列頭部接收,并將發(fā)送者發(fā)送的數據放在隊列尾部。

從c.sendq隊列里讀取一個 *sudog,通過調用 recv() 函數,將數據從發(fā)送者復制到ep中,并返回true,true,表示讀取成功。真正讀取函數為 recvDirect()。
緩沖區(qū)讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果c.qcount>0,說明緩沖區(qū)有元素可直接讀取
if c.qcount > 0 {
// Receive directly from queue
// 直接從隊列中讀取
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
如果c.qcount > 0,則說明緩沖區(qū)里有內容可以讀取。則
直接獲取 c.recvx 數組索引位置的內存地址,則
- 將
r.recvx索引地址的值讀取出來復制給 ep, - 然后更新接收數組索引
c.recvx++, 如果>數組索引最大索引值 ,重置為0 - 減少元素個數
- 釋放鎖 c.qcount--
- 最后unlock返回。

<figcaption>chan recv</figcaption>
阻塞讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
// c.sendq沒有sender,buffer里也是空的,直接阻塞讀取
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
- 通過getg()獲取一個goroutine
- 獲取一個sudog結構體
- 綁定兩者關系
- 加入 c.recvq 隊列
- 設置goroutine為等待喚醒狀態(tài)
- 清理狀態(tài)

關閉chan
關閉chan語句
close(ch)
對于已關閉的chan,是不允許再次關閉的,否則會產生panic。對應的函數為 runtime.closechan() 。
// src/runtime/chan.go
func closechan(c *hchan) {
// 如果chan未初始化,觸發(fā)panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 關閉已關閉的chan,觸發(fā)panicc
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
......
}
對于一個未初始化的chan,或者已關閉的chan,如果再次關閉則會觸發(fā)panic。
func closechan(c *hchan) {
......
// 設置chan關閉狀態(tài)
c.closed = 1
// 聲明一個結構體鏈表gList,主要用來調度使用
var glist gList
// release all readers
// 釋放所有readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 設置元素為nil
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 釋放所有writers,會引起panic,見下面說明
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 設置元素為nil
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 調度所有g
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 喚醒goroutine
goready(gp, 3)
}
}
- 聲明一個
gList鏈表結構體 - 將接收隊列
c.recvq中的所有元素添加到gList中,并將原來的值設置為零值 - 將發(fā)送隊列
c.sendq中的所有元素添加到gList中,并將原來的值設置為零值 - 將所有的阻塞goroutine通過函數
goready()進行調度
文章里提到在對c.sendq 處理的時候可能會觸發(fā)panic。這是因為關閉chan后,執(zhí)行了 goready() 對原來sendq里的sudogs 進行了進行了重新調度,這時候發(fā)現chan已經關閉了,所以會panic。那么又是如何調度的呢?
package main
import (
"fmt"
"time"
)
var ch chan int
func f() {
}
func main() {
ch := make(chan int, 10)
// buffer大小為10,這里發(fā)送11個,使最后一個進入到c.sendq里面
for i := 0; i < 11; i++ { // i < 10 則正常
go func(v int) {
ch <- v
}(i)
}
time.Sleep(time.Second)
fmt.Println("發(fā)送完畢")
// 關閉chan,將對sendq里的g進行喚醒,喚醒后發(fā)現chan關閉狀態(tài),直接panic
close(ch)
for v := range ch {
fmt.Println(v)
}
time.Sleep(time.Second)
}
有一條廣泛流傳的關閉 channel 的原則:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
不要從一個 receiver 側關閉 channel,也不要在有多個 sender 時,關閉 channel。對于只有一個sender的話,直接在sender端關閉就可以。但對于多個sender的話,則需要通過一個信號量進行關閉,參考這里。
總結
close 操作會觸發(fā)goroutine的調度行為。
總結
- 在發(fā)送和讀取 chan的時候,如果chan為nil的話,這時候就根據是否阻塞進行判斷是否會發(fā)生panic。如果阻塞狀態(tài)的話,則會發(fā)生panic,否則會直接返回
- 對chan 發(fā)送或接收數據的時候要保證已初始化狀態(tài)
- 對于已關閉的chan再次關閉會觸發(fā)panic
- 對于發(fā)送和讀取數據都有三種處理情況,分別是直接讀寫,緩存區(qū)讀寫和阻塞讀寫
- 發(fā)送和接收數據的本質上是對值的
復制操作。All transfer of value on the go channels happens with the copy of value. - close(ch)會觸發(fā)goroutine 的調度行為
- 內部使用 sudogs對goroutine進行了一次封裝。
- 如果buffer中的元素無法保證消費完的話,則會產生內存泄漏的危險,這時gc是無法對這些元素時間清理的,過多的 chan就會占用大量的資源
- 對于chan的分配的內存是在哪里,heap還是stack?
參考
- https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#
- https://studygolang.com/articles/20714
- https://github.com/qcrao/Go-Questions/tree/master/channel
本文如有錯誤,歡迎大家在下方留言指出。