了解過go的都知道,go最為突出的優(yōu)點就是它天然支持高并發(fā),但是所有高并發(fā)情況都面臨著一個很明顯的問題,就是并發(fā)的多線程或多協(xié)程之間如何通信,而channel就是go中g(shù)oroutine通信的‘管道’。
channel在go中時如何使用的
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var exit = make(chan string, 1)
func main() {
go dealSignal()
exited := make(chan struct{}, 1)
go channel1(exited)
count := 0
t := time.Tick(time.Second)
Loop:
for {
select {
case <-t:
count++
fmt.Printf("main run %d\n", count)
case <-exited:
fmt.Println("main exit begin")
break Loop
}
}
fmt.Println("main exit end")
}
func dealSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
exit <- "shutdown"
}()
}
func channel1(exited chan<- struct{}) {
t := time.Tick(time.Second)
count := 0
for {
select {
case <-t:
count++
fmt.Printf("channel1 run %d\n", count)
case <-exit:
fmt.Println("channel1 exit")
close(exited)
return
}
}
}
這個例子首先并發(fā)出一個dealsign方法,用來接收關(guān)閉信號,如果接收到關(guān)閉信號后往exit channel發(fā)送一條消息,然后并發(fā)運行channel1,channel1中定了一個ticker,正常情況下channel1每秒打印第一個case語句,如果接收到exit的信號,進入第二個case,然后關(guān)閉傳入的exited channel,那么main中的Loop,接收到exited關(guān)閉的信號后,打印“main exit begin”, 然后退出循環(huán),進程成功退出。這個例子演示了channel在goroutine中起到的傳遞消息的作用。這個例子是為了向大家展示channel在多個goroutine之間進行通信。
Channel在底層是什么樣的
type hchan struct {
qcount uint // total data in the queue;chan中的元素總數(shù)
dataqsiz uint // size of the circular queue;底層循環(huán)數(shù)組的size
buf unsafe.Pointer // points to an array of dataqsiz elements,指向底層循環(huán)數(shù)組的指針,只針對有緩沖的channel
elemsize uint16 //chan中元素的大小
closed uint32 //chan是否關(guān)閉
elemtype *_type // element type;元素類型
sendx uint // send index;已發(fā)送元素在循環(huán)數(shù)組中的索引
recvx uint // receive index;已接收元素在循環(huán)數(shù)組中的索引
recvq waitq // list of recv waiters,等待接收消息的goroutine隊列
sendq waitq // list of send waiters,等待發(fā)送消息的goroutine隊列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
創(chuàng)建一個底層數(shù)組容量為5,元素類型為int,那么channel的數(shù)據(jù)結(jié)構(gòu)如下圖所示:

創(chuàng)建channel的時候到底發(fā)生了什么
創(chuàng)建channel的時候,其實底層是調(diào)用makechan方法,我們來看下源碼:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
從函數(shù)原型來看,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針。
具體來看下代碼:
可以看出makechan中其實主要的代碼就是一個switch,針對不同的情況:
1、case mem == 0代表無緩沖型channel,只分配hchan本身結(jié)構(gòu)體大小的內(nèi)存
2、case elem.ptrdata==0 代表元素類型不含指針,只分配hchan本身結(jié)構(gòu)體大小+元素大小*個數(shù)的內(nèi)存,是連續(xù)的內(nèi)存空間
3、default元素類型包括指針,兩次分配內(nèi)存的操作
channel的接收與發(fā)送
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("G1 received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("G2 received data: ", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
首先創(chuàng)建了一個無緩沖型的channel,然后啟動兩個goroutine去消費channel的數(shù)據(jù),緊接著向channel中發(fā)送數(shù)據(jù)。我們一步一步來分析channel是如何接收和發(fā)送數(shù)據(jù)的,首先來看接收,golang中接收channel數(shù)據(jù)有兩種方式:
i <- ch
i, ok <- ch
// 位于 src/runtime/chan.go
// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài),將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函數(shù)調(diào)用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 內(nèi)容 …………
// 如果是一個 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否則,接收一個 nil 的 channel,goroutine 掛起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不會執(zhí)行到這里
throw("unreachable")
}
// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
// 當我們觀察到 channel 沒準備好接收:
// 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
// 2. 緩沖型,但 buf 里沒有元素
// 之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
// 因為 channel 不可能被重復打開,所以前一個觀測的時候 channel 也是未關(guān)閉的,
// 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
// 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
// 也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
// buf 里有元素的情況下還能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解鎖
unlock(&c.lock)
if ep != nil {
// 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
// 那么接收的值將是一個該類型的零值
// typedmemclr 根據(jù)類型清理相應地址的內(nèi)存
typedmemclr(c.elemtype, ep)
}
// 從一個已關(guān)閉的 channel 接收,selected 會返回true
return true, false
}
// 等待發(fā)送隊列里有 goroutine 存在,說明 buf 是滿的
// 這有可能是:
// 1. 非緩沖型的 channel
// 2. 緩沖型的 channel,但 buf 滿了
// 針對 1,直接進行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
// 針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩沖型,buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接從循環(huán)數(shù)組里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環(huán)數(shù)組里相應位置的值
typedmemclr(c.elemtype, qp)
// 接收游標向前移動
c.recvx++
// 接收游標歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 數(shù)組里的元素個數(shù)減 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
unlock(&c.lock)
return false, false
}
// 接下來就是要被阻塞的情況了
// 構(gòu)造一個 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收數(shù)據(jù)的地址保存下來
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 進入channel 的等待接收隊列
c.recvq.enqueue(mysg)
// 將當前 goroutine 掛起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
Step1
如果channel是nil:如果是非阻塞模式,直接返回(false,false);如果是阻塞模式,調(diào)用goprak掛起goroutine,會阻塞下去。
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
Step2
快速操作(不用獲取鎖,快速返回),三組條件全部滿足,快速返(false,false)
條件1:首先是在非阻塞模式下
條件2:如果是非緩沖型(datasiz=0)并且等待發(fā)送goroutine隊列為空(sendq.first=nil,就是沒人往channel寫數(shù)據(jù)),或者緩沖型channel(datasiz>0)并且buf中沒有數(shù)據(jù);
條件3:channel未關(guān)閉
//##################step2####################
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
Step3
首先加鎖,如果channel已經(jīng)關(guān)閉,并且buf中沒有元素,返回對應類型的0值,但是received為false;兩種情況
情形1:非緩沖型,channel已關(guān)閉
情形2:緩沖型,channel已關(guān)閉,并且buf無元素
也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
buf 里有元素的情況下還能接收到元素
//##################step3####################
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
step4
如果等待發(fā)送隊列中有元素,證明channel已經(jīng)滿了,兩種情形
情形1:非緩沖型,無buf
情形2:緩沖型,buf滿了
//##################step4####################
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
兩種情形都正常進入recv方法,我們來看下源碼:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//##################step4-1####################
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
//##################step4-2####################
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
針對 1,直接進行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)(從發(fā)送者的棧copy到接收者的棧)
針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部.
然后喚醒等待發(fā)送隊列中的goroutine,等待調(diào)度器調(diào)度。
step5
沒有等待發(fā)送的隊列,并且buf中有元素,直接把接收游標處的數(shù)據(jù)copy到接收數(shù)據(jù)的地址,然后改變hchan中元素數(shù)據(jù)。
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
step6
如果是非阻塞,那么直接返回;如果是阻塞的,構(gòu)造sudog,保存各種值;將sudog保存到channel的recvq中,調(diào)用goparkunlock將goroutine掛起
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
我們繼續(xù)之前的例子。前面說到第 14 行,創(chuàng)建了一個非緩沖型的 channel,接著,第 15、16 行分別創(chuàng)建了一個 goroutine,各自執(zhí)行了一個接收操作。通過前面的源碼分析,我們知道,這兩個 goroutine (后面稱為 G1 和 G2 好了)都會被阻塞在接收操作。G1 和 G2 會掛在 channel 的 recq 隊列中,形成一個雙向循環(huán)鏈表。
在程序的 17 行之前,chan 的整體數(shù)據(jù)結(jié)構(gòu)如下:

buf 指向一個長度為 0 的數(shù)組,qcount 為 0,表示 channel 中沒有元素。重點關(guān)注 recvq 和 sendq,它們是 waitq 結(jié)構(gòu)體,而 waitq 實際上就是一個雙向鏈表,鏈表的元素是 sudog,里面包含 g 字段,g 表示一個 goroutine,所以 sudog 可以看成一個 goroutine。recvq 存儲那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲那些嘗試寫入 channel,但被阻塞的 goroutine。
此時,我們可以看到,recvq 里掛了兩個 goroutine,也就是前面啟動的 G1 和 G2。因為沒有 goroutine 接收,而 channel 又是無緩沖類型,所以 G1 和 G2 被阻塞。sendq 沒有被阻塞的 goroutine。
再從整體上來看一下 chan 此時的狀態(tài):

當一個channel關(guān)閉后,我們依然可以從中讀出數(shù)據(jù),如果chan的buf中有元素,則讀出的是chan中buf的數(shù)據(jù),如果buf為空,則輸出對應元素類型的零值。那么我們來看下如下的一段程序:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var exit1 = make(chan struct{}, 1)
func main() {
go dealSignal1()
count := 0
t := time.Tick(time.Second)
for {
select {
case <-t:
count++
fmt.Printf("main run %d\n", count)
case <-exit1:
fmt.Println("main exit begin")
}
}
fmt.Println("main exit over")
}
func dealSignal1() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
close(exit1)
}()
}
發(fā)送
接著上面的例子,G1 和 G2 現(xiàn)在都在 recvq 隊列里了。
17 行向 channel 發(fā)送了一個元素 3。
發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù),直接上源碼,同樣大部分都注釋了,可以看懂主流程:
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,直接返回 false,表示未發(fā)送成功
if !block {
return false
}
// 當前 goroutine 被掛起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相關(guān)……
// 對于不阻塞的 send,快速檢測失敗場景
//
// 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間。這可能是:
// 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
// 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 鎖住 channel,并發(fā)安全
lock(&c.lock)
// 如果 channel 關(guān)閉了
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 對于緩沖型的 channel,如果還有緩沖空間
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 將數(shù)據(jù)從 ep 處拷貝到 qp
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送游標值加 1
c.sendx++
// 如果發(fā)送游標值等于容量值,游標值歸 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)的元素數(shù)量加一
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果不需要阻塞,則直接返回錯誤
if !block {
unlock(&c.lock)
return false
}
// channel 滿了,發(fā)送方會被阻塞。接下來會構(gòu)造一個 sudog
// 獲取當前 goroutine 的指針
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當前 goroutine 進入發(fā)送等待隊列
c.sendq.enqueue(mysg)
// 當前 goroutine 被掛起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 從這里開始被喚醒了(channel 有機會可以發(fā)送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被喚醒后,channel 關(guān)閉了??拥?,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上綁定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
我們繼續(xù)往下走,G1、G2被掛起后,往channel中發(fā)送一個數(shù)據(jù)3,其實調(diào)用的是chansend方法,我們還是逐步的去講解
step1
如果channel=nil,當前goroutine會被掛起
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
step2
依然是一個不加鎖的快速操作,三組條件
條件1:非阻塞
條件2:channel未關(guān)閉
條件3:channel是非緩沖型,并且等待接收隊列為空;或者緩沖型,并且循環(huán)數(shù)組已經(jīng)滿了
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
step3
加鎖,如果channel已經(jīng)關(guān)閉,直接panic
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
step4
如果等待接收隊列不為空,說明什么?
情形1:非緩沖型,等待接收隊列不為空
情形2:緩沖型,等待接收隊列不為空(說明buf為空)
兩種情形,都是直接將待發(fā)送數(shù)據(jù)直接copy到接收處
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接從ep copy到sg
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
兩種情形,都直接從一個用一個goroutine操作另一個goroutine的棧,因此在sendDirect方法中會有一次寫屏障
step5
如果等待隊列為空,并且緩沖區(qū)未滿,肯定是緩沖型的channel
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
將元素放在sendx處,然后sendx加1,channel總量加1
step6
如果以上情況都沒有命中,說明什么?說明channel已經(jīng)滿了,如果是非阻塞的直接返回,否則需要調(diào)用gopack將這個goroutine掛起,等待被喚醒。
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
我們對照程序分析下,在前一個小節(jié)G1、G2被掛起來了,等待sender的解救;這時候往ch中發(fā)送了一個3,(step4)這時sender發(fā)現(xiàn)ch的等待接收隊列recvq中有receiver,就會出隊一個sudog,然后將元素直接copy到sudog的elem處,然后調(diào)用goready將G1喚醒,繼續(xù)執(zhí)行G1原來的代碼,打印出結(jié)果。如下圖:

當調(diào)度器光顧 G1 時,將 G1 變成 running 狀態(tài),執(zhí)行 goroutineA 接下來的代碼。G 表示其他可能有的 goroutine。
這里其實涉及到一個協(xié)程寫另一個協(xié)程棧的操作。有兩個 receiver 在 channel 的一邊虎視眈眈地等著,這時 channel 另一邊來了一個 sender 準備向 channel 發(fā)送數(shù)據(jù),為了高效,用不著通過 channel 的 buf “中轉(zhuǎn)”一次,直接從源地址把數(shù)據(jù) copy 到目的地址就可以了,效率高?。?/p>
關(guān)閉
close一個channel會調(diào)用closechan方法,比較簡單,我們也來看下
func closechan(c *hchan) {
// 關(guān)閉一個 nil channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上鎖
lock(&c.lock)
// 如果 channel 已經(jīng)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// …………
// 修改關(guān)閉狀態(tài)
c.closed = 1
var glist *g
// 將 channel 所有等待接收隊列的里 sudog 釋放
for {
// 從接收隊列里出隊一個 sudog
sg := c.recvq.dequeue()
// 出隊完畢,跳出循環(huán)
if sg == nil {
break
}
// 如果 elem 不為空,說明此 receiver 未忽略接收數(shù)據(jù)
// 給它賦一個相應類型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 相連,形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 將 channel 等待發(fā)送隊列里的 sudog 釋放
// 如果存在,這些 goroutine 將會 panic
for {
// 從發(fā)送隊列里出隊一個 sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 發(fā)送者會 panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 解鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍歷鏈表
for glist != nil {
// 取最后一個
gp := glist
// 向前走一步,下一個喚醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 喚醒相應 goroutine
goready(gp, 3)
}
}
step1
如果channel為nil,會直接panic
if c == nil {
panic(plainError("close of nil channel"))
}
step2
加鎖,如果channel已經(jīng)關(guān)閉,再次關(guān)閉會panic
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
step3
首選將hchan對應close標志置為1,然后聲明一個鏈表;將等待接收隊列中的所有sudog加入到鏈表,并將其elem賦予一個相應類型的0值;
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
step4
向所有等待發(fā)送隊列的sudog加入鏈表
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
step5
喚醒sudog所有g(shù)oroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
close 邏輯比較簡單,對于一個 channel,recvq 和 sendq 中分別保存了阻塞的發(fā)送者和接收者。關(guān)閉 channel 后,對于等待接收者而言,會收到一個相應類型的零值。對于等待發(fā)送者,會直接 panic。所以,在不了解 channel 還有沒有接收者的情況下,不能貿(mào)然關(guān)閉 channel。
close 函數(shù)先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒。
喚醒之后,該干嘛干嘛。sender 會繼續(xù)執(zhí)行 chansend 函數(shù)里 goparkunlock 函數(shù)之后的代碼,很不幸,檢測到 channel 已經(jīng)關(guān)閉了,panic。receiver 則比較幸運,進行一些掃尾工作后,返回。這里,selected 返回 true,而返回值 received 則要根據(jù) channel 是否關(guān)閉,返回不同的值。如果 channel 關(guān)閉,received 為 false,否則為 true。
總結(jié)
總結(jié)一下,發(fā)生 panic 的情況有三種:
1.向一個關(guān)閉的 channel 進行寫操作;
- 關(guān)閉一個 nil 的 channel;
- 重復關(guān)閉一個 channel。
讀、寫一個 nil channel 都會被阻塞。
channel發(fā)送和接收元素的本質(zhì)還是值得拷貝
channel是并發(fā)安全的(加鎖)