下載Go源碼后,根目錄結(jié)構(gòu)如下:
VERSION-- 文件,當(dāng)前Go版本
api-- 目錄,包含所有API列表
doc-- 目錄,Go語(yǔ)言的各種文檔,官網(wǎng)上有的,這里基本會(huì)有
favicon.ico-- 文件,官網(wǎng)logo
include-- 目錄,Go 基本工具依賴的庫(kù)的頭文件
lib-- 目錄,文檔模板
misc-- 目錄,其他的一些工具,大部分是各種編輯器的Go語(yǔ)言支持,還有cgo的例子等
robots.txt-- 文件,搜索引擎 robots文件
src -- 目錄,Go語(yǔ)言源碼:基本工具(編譯器等)、標(biāo)準(zhǔn)庫(kù)
test-- 目錄,包含很多測(cè)試程序(并非_test.go方式的單元測(cè)試,而是包含main包的測(cè)試)包括一些fixbug測(cè)試;可以通過這個(gè)學(xué)到一些特性的使用
channel實(shí)現(xiàn)文件目錄:
/go/src/runtime/chan.go
channel 數(shù)據(jù)結(jié)構(gòu)
channel 是 goroutine 之間通信的一種方式,CSP 模型中 消息通道對(duì)應(yīng)的就是 channel;channel 結(jié)構(gòu)體定義如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
結(jié)構(gòu)體元素解析:
- qcount 緩沖通道中的元素個(gè)數(shù)
- dataqsiz 緩沖通道中的容量
- buf 有緩沖channel的緩沖區(qū),一個(gè)定長(zhǎng)環(huán)形數(shù)組
- elemsize 通道中存儲(chǔ)元素的長(zhǎng)度
- closed 關(guān)閉通道使用非0表示關(guān)閉
- elemtype 通道中存儲(chǔ)元素的類型
- sendx 當(dāng)前發(fā)送元素指向buf環(huán)形數(shù)組的下標(biāo)指針
- recvx 當(dāng)前接收元素指向buf環(huán)形數(shù)組的下標(biāo)指針
- recvq 因消費(fèi)者而阻塞的等待隊(duì)列
- sendq 因生產(chǎn)者而阻塞的等待隊(duì)列
- lock 鎖保護(hù) hchan 中的所有字段
核心的部分是存放 channel 數(shù)據(jù)的環(huán)形隊(duì)列,dataqsiz、qcount 分別指定了隊(duì)列的容量和當(dāng)前使用量;另一個(gè)重要部分就是recvq 和 sendq 兩個(gè)鏈表,recvq 是因讀這個(gè)通道而導(dǎo)致阻塞的 goroutine,sendq 是因?qū)戇@個(gè)通道而阻塞的 goroutine;如果一個(gè) goroutine 阻塞于 channel 了,那么它就被掛在 recvq 或 sendq 中;waitq是鏈表的定義,包含一個(gè)頭結(jié)點(diǎn)和一個(gè)尾結(jié)點(diǎn):
type waitq struct {
first *sudog
last *sudog
}
鏈表中每個(gè)元素都是sudog結(jié)構(gòu)體如下:
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
結(jié)構(gòu)中主要的就是 g、elem:
- g 代表著 G-M-P模型中的 G,sudog 是對(duì)g的封裝便于在 csp 模型中 g 可以同時(shí)阻塞在不同的 channel 上
- elem 用于存儲(chǔ) goroutine 的數(shù)據(jù);讀通道時(shí),數(shù)據(jù)會(huì)從 hchan 的隊(duì)列中拷貝到 sudog 的 elem 域;寫通道時(shí),數(shù)據(jù)則是由 sudog 的elem 域拷貝到 hchan 的隊(duì)列中
創(chuàng)建channel實(shí)現(xiàn)
創(chuàng)建方法:
ch := make(chan string,5)
實(shí)現(xiàn)函數(shù)如下:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 異常判斷 元素類型大小限制
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 異常判斷 對(duì)齊限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// maxAlloc 是 Arena 區(qū)域的最大值,緩沖元素的大小與hchan相加不能超過 緩沖槽大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// 無緩沖channel
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// buf 是不分配空間 緩存地址就指向自己
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 分配一整塊內(nèi)存 存儲(chǔ)hchan和 buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 是指針類型 分配hchan結(jié)構(gòu)體 buf單獨(dú)分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化元素類型的大小
c.elemsize = uint16(elem.size)
// 初始化元素的類型
c.elemtype = elem
// 初始化 channel 的容量
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
通道緩沖chanbuf實(shí)現(xiàn)
實(shí)現(xiàn)函數(shù)如下:
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
傳入 hchan 對(duì)象及元素在緩沖區(qū)環(huán)形數(shù)組中的下標(biāo)計(jì)算該下標(biāo)槽點(diǎn)內(nèi)存地址并返回
發(fā)送數(shù)據(jù)channelsend實(shí)現(xiàn)
實(shí)現(xiàn)函數(shù)如下:
// 傳入?yún)?shù) hchan ,發(fā)送數(shù)據(jù)地址,是否阻塞發(fā)送, select中的通道操作使用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 判斷 channel 為空 向其中發(fā)送數(shù)據(jù)將會(huì)永久阻塞
if c == nil {
// 如果非阻塞返回 false
if !block {
return false
}
// 如果阻塞
// gopark 會(huì)使當(dāng)前 goroutine 掛起,通過 unlockf 喚醒;調(diào)用gopark時(shí)傳入的unlockf為nil,會(huì)被一直休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
// 如果開啟競(jìng)爭(zhēng)檢測(cè)
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
// 寫入數(shù)據(jù)到 channel
// 1.非阻塞寫 2.沒有關(guān)閉channel 3.無緩沖channel并且消費(fèi)者環(huán)形隊(duì)列頭結(jié)點(diǎn)為空 或 有緩沖channel中存儲(chǔ)的元素?cái)?shù)量與容量相等 返回false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
// 計(jì)時(shí)器
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// 判斷 channel 關(guān)閉,解鎖并 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 當(dāng)有 goroutine 在 recvq 隊(duì)列上等待時(shí),跳過緩存隊(duì)列,將消息直接發(fā)給 reciever goroutine;dequeue 從等待接受的 goroutine 隊(duì)列鏈表獲取一個(gè)sudog,goready 喚醒阻塞的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 緩存隊(duì)列未滿,將消息復(fù)制到緩存隊(duì)列上并移動(dòng) sendx 下標(biāo),hchan buf 數(shù)據(jù)量增加
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 數(shù)據(jù)拷貝到 buf 中
typedmemmove(c.elemtype, qp, ep)
// index 移動(dòng)
c.sendx++
// 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖元素?cái)?shù)量加 1
c.qcount++
// 解鎖返回
unlock(&c.lock)
return true
}
// 阻塞 解鎖直接返回 false
if !block {
unlock(&c.lock)
return false
}
// chan隊(duì)列已滿,阻塞 將本協(xié)程放入等待協(xié)程中,同時(shí)休眠此協(xié)程
// Block on the channel. Some receiver will complete our operation for us.
// 創(chuàng)建 goroutine
gp := getg()
// 創(chuàng)建 sudog
mysg := acquireSudog()
// 初始化 釋放時(shí)間
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化寫入的數(shù)據(jù)
mysg.elem = ep
mysg.waitlink = nil
// 初始化 goroutine
mysg.g = gp
mysg.isSelect = false
// 初始化 hchan
mysg.c = c
// goroutine 設(shè)置的休眠 sudog
gp.waiting = mysg
gp.param = nil
// 加入到寫阻塞的等待隊(duì)列
c.sendq.enqueue(mysg)
// 掛起休眠
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 保證數(shù)據(jù)不被回收
KeepAlive(ep)
// 此時(shí)被喚醒 gp.waiting不是當(dāng)前的 mysg 直接 panic
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 等待的 sudog 置為 nil
gp.waiting = nil
// 喚醒時(shí)傳遞的參數(shù)為 nil 說明出問題了直接 panic
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 將 hchan 置為 nil
mysg.c = nil
// 釋放 sudog
releaseSudog(mysg)
return true
}
生產(chǎn)者數(shù)據(jù)發(fā)送send實(shí)現(xiàn)如下:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 寫入的數(shù)據(jù)不為空
if sg.elem != nil {
// 將數(shù)據(jù)拷貝到 hchan
sendDirect(c.elemtype, sg, ep)
// sudog 中數(shù)據(jù)置為 nil
sg.elem = nil
}
// 取數(shù) goroutine
gp := sg.g
unlockf()
// 傳入 sudug 使 param 不為空
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒 goroutine
goready(gp, skip+1)
}
接收數(shù)據(jù)chanrecv實(shí)現(xiàn)
實(shí)現(xiàn)函數(shù)如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// hchan 為 nil
if c == nil {
if !block {
return
}
// hchan 中接收消息永久阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
// 1.非阻塞讀 2.無緩沖channel并且消費(fèi)者環(huán)形隊(duì)列頭結(jié)點(diǎn)為空 或 有緩沖channel中存儲(chǔ)的元素?cái)?shù)量為0 3.沒有關(guān)閉channel 直接返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 計(jì)時(shí)器
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// channel 關(guān)閉 且 緩沖元素為0 返回空值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// typedmemclr 使返回值 ep 變成了零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果有 send 生產(chǎn)者阻塞在隊(duì)列中,直接從 send 生產(chǎn)者取數(shù)據(jù)
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩存隊(duì)列不為空,從隊(duì)列頭取出元素
if c.qcount > 0 {
// Receive directly from queue
// 根據(jù)hchan 索引獲取數(shù)據(jù)地址
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 數(shù)據(jù)拷貝到 ep 中
typedmemmove(c.elemtype, ep, qp)
}
// 清空環(huán)形數(shù)組己經(jīng)讀取的 gp
typedmemclr(c.elemtype, qp)
// 移動(dòng)索引
c.recvx++
// 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 緩存隊(duì)列元素?cái)?shù)量減 1
c.qcount--
unlock(&c.lock)
return true, true
}
// 沒有數(shù)據(jù) 讀非阻塞 直接解鎖返回
if !block {
unlock(&c.lock)
return false, false
}
// chan隊(duì)列為空,阻塞 將本協(xié)程放入等待協(xié)程中,同時(shí)休眠此協(xié)程
// no sender available: block on this channel.
// 獲取 goroutine
gp := getg()
// 獲取 SudoG
mysg := acquireSudog()
// 初始化釋放時(shí)間
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// goroutine 加入到讀阻塞等待隊(duì)列
c.recvq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 此時(shí)被喚醒 gp.waiting不是當(dāng)前的 mysg 直接 panic
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
// 將 goroutine 中 param 參數(shù)置為 nil
gp.param = nil
// SudoG 中的 hchan 置為 nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
消費(fèi)者數(shù)據(jù)接收recv實(shí)現(xiàn)如下:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 緩存隊(duì)列不為空,直接從生產(chǎn)者獲取數(shù)據(jù)
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 有 send 阻塞在這里,從 buf 中獲取數(shù)據(jù)
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 將 buf 中未讀的當(dāng)前位置數(shù)據(jù)拷貝給消費(fèi)者
typedmemmove(c.elemtype, ep, qp)
}
// 將阻塞的生產(chǎn)者數(shù)據(jù)拷貝此位置
typedmemmove(c.elemtype, qp, sg.elem)
// 接收元素索引向后移動(dòng)
c.recvx++
// 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 環(huán)形隊(duì)列讀取的索引位置就是寫入數(shù)據(jù)環(huán)形的末端
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 數(shù)據(jù)置為 nil
sg.elem = nil
// 獲取 SudoG 中的 goroutine 傳遞給 param 參數(shù)
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒 sendq 里面 SudoG 對(duì)應(yīng)的 g
goready(gp, skip+1)
}
closechan 實(shí)現(xiàn)
關(guān)閉通道設(shè)置chan關(guān)閉標(biāo)志位,closed=1;函數(shù)如下:
func closechan(c *hchan) {
// 關(guān)閉為 nil 的 hchan 直接 panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 獲取同步鎖
lock(&c.lock)
// 已關(guān)閉 hchan 釋放鎖 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 將 closed 置為 1
c.closed = 1
var glist gList
// 遍歷接收隊(duì)列
// release all readers
for {
// 取出讀阻塞隊(duì)列中的 SudoG
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// typedmemclr 使返回值 ep 變成了零值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 獲取 goroutine 將參數(shù) param 值為空,在接收方法中根據(jù) param 是否為空判斷是否為close
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 將 goroutine 加入到 glist
glist.push(gp)
}
// 遍歷發(fā)送隊(duì)列
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 將接收隊(duì)列和發(fā)送隊(duì)列全部喚醒
goready(gp, 3)
}
}