數(shù)據(jù)結(jié)構(gòu)
type hchan struct {
qcount uint // 隊(duì)列中的元素總量
dataqsiz uint // 緩存大小
buf unsafe.Pointer // 緩存指針 (環(huán)線數(shù)組)
elemsize uint16 // 數(shù)據(jù)項(xiàng)大小
closed uint32 // close 關(guān)閉標(biāo)記
elemtype *_type // 數(shù)據(jù)類(lèi)型
sendx uint // 待發(fā)送元素在緩存中的索引
recvx uint // 待接收元素在緩存中的索引
recvq waitq // 接收等待隊(duì)列,用于阻塞接收協(xié)程
sendq waitq // 發(fā)送等待隊(duì)列,用于阻塞發(fā)送協(xié)程
lock mutex // 互斥鎖
}
chan 創(chuàng)建
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 數(shù)據(jù)項(xiàng)不能超過(guò)64kb
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 緩沖槽大小檢查
if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case size == 0 || elem.size == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// 緩存地址就指向自己
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0: //指針
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 跳過(guò)chan對(duì)象的大小就是緩沖首地址
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// chan對(duì)象和緩存是兩個(gè)內(nèi)存
c = new(hchan)
c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
消息發(fā)送
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 上面block傳的true,所以下面只考慮true
if c == nil {
if !block {
return false
}
// 當(dāng) chan 為 nil時(shí), 阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// 當(dāng)向已關(guān)閉的channel發(fā)送消息時(shí)會(huì)產(chǎn)生panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// CASE 1: 如果當(dāng)有 G 在接受隊(duì)列上等待時(shí),直接將消息發(fā)送給 G
if sg := c.recvq.dequeue(); sg != nil {
// send中有個(gè)goready(gp, skip+1)可以喚醒 G
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// CASE 2: 緩存隊(duì)列未滿,則將消息復(fù)制到緩存隊(duì)列上
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 計(jì)算加入的地址
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 復(fù)制到緩存中
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送的index 加1
c.sendx++
// 注意這里被重置為0,所以是環(huán)線數(shù)組
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩存?zhèn)€數(shù) + 1
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// CASE 3: 緩存隊(duì)列已滿, 將 G 加入到 send隊(duì)列中
// Block on the channel. Some receiver will complete our operation for us.
// getg()獲取的是當(dāng)前的 g, 那么意味著將當(dāng)前 goroutine 加入 send 隊(duì)列并阻塞。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入send隊(duì)列
c.sendq.enqueue(mysg)
// 阻塞
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// someone woke us up.
// 被喚醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 當(dāng) G 被喚醒時(shí), 已關(guān)閉的chan也是會(huì)報(bào)錯(cuò)的
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 釋放 G
releaseSudog(mysg)
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
....
gp.param = unsafe.Pointer(sg) //注意這里的param不為空
.....
// 喚醒 G
goready(gp, skip+1)
}
接收消息
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 因?yàn)樯厦?block傳到true, 所以下面只考慮true的情況
// 當(dāng)初 為 nil 的chan中接收消息時(shí),永久阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// CASE 1: 向已經(jīng)close且為空的chan中獲取消息時(shí),返回空值。這里的false可以判斷是否close
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
// 釋放同步鎖
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// CASE 2: send 隊(duì)列不為空,什么時(shí)候send隊(duì)列不為空呢? 只有兩種可能 1. 無(wú)緩存chan 2. 緩存隊(duì)列已滿
if sg := c.sendq.dequeue(); sg != nil {
// CASE 2.1: 無(wú)緩存chan, 直接從send中獲取消息
// CASE 2.2: 緩存隊(duì)列已滿, 從隊(duì)列獲取頭元素,喚醒send 將其消息加入隊(duì)列尾部(由于是環(huán)線隊(duì)列,所以尾部和頭部是同一位置)移動(dòng)recvx
// 同樣在recv中有g(shù)oready(gp, skip+1)可以喚醒G
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// CASE 3: 緩存隊(duì)列不會(huì)空,直接從隊(duì)列獲取元素,移動(dòng)頭索引
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// CASE 4: 緩存隊(duì)列為空, 將當(dāng)前 G 加入接收隊(duì)列中, 休眠
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 將當(dāng)前 G 加入 接收隊(duì)列中
c.recvq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// close喚醒時(shí)param為空,在send中喚醒時(shí)param不為空
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
關(guān)閉通道
func closechan(c *hchan) {
// 關(guān)閉為nil的chan報(bào)錯(cuò)
if c == nil {
panic(plainError("close of nil channel"))
}
// 獲取鎖
lock(&c.lock)
// 重復(fù)關(guān)閉,報(bào)錯(cuò)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist *g
// release all readers
// 遍歷接收隊(duì)列
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
//對(duì)比send(),這里的param是nil。在接收方法中根據(jù)param是否為空判斷是否為close
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
gp.schedlink.set(glist)
glist = gp
}
// release all writers (they will panic)
// 遍歷發(fā)送隊(duì)列
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3) // 這里會(huì)將接收隊(duì)列和發(fā)送隊(duì)列全部喚醒
}
}
通過(guò)上面的chansend、chanrecv、closechan發(fā)現(xiàn)里面都加了鎖,所以chan是線程安全的