golang筆記——channel底層原理

一、什么是CSP

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享內(nèi)存來通信,而要通過通信來實現(xiàn)內(nèi)存共享。
這就是 Go 的并發(fā)哲學(xué),它依賴 CSP 模型,基于 channel 實現(xiàn)。

CSP 經(jīng)常被認(rèn)為是 Go 在并發(fā)編程上成功的關(guān)鍵因素。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發(fā)表在 ACM 的一篇論文。論文里指出一門編程語言應(yīng)該重視 input 和 output 的原語,尤其是并發(fā)編程的代碼。

Go 一開始就把 CSP 的思想融入到語言的核心里,所以并發(fā)編程成為 Go 的一個獨特的優(yōu)勢,而且很容易理解。

Go 的并發(fā)原則非常優(yōu)秀,目標(biāo)就是簡單:盡量使用 channel;把 goroutine 當(dāng)作免費的資源,隨便用。

二、channel底層數(shù)據(jù)結(jié)構(gòu)

底層數(shù)據(jù)結(jié)構(gòu)源碼:

type hchan struct {
    // chan 里元素數(shù)量
    qcount   uint
    // chan 底層循環(huán)數(shù)組的長度
    dataqsiz uint
    // 指向底層循環(huán)數(shù)組的指針
    // 只針對有緩沖的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被關(guān)閉的標(biāo)志
    closed   uint32
    // chan 中元素類型
    elemtype *_type // element type
    //有緩沖channel內(nèi)的緩沖數(shù)組會被作為一個“環(huán)型”來使用。
    //當(dāng)下標(biāo)超過數(shù)組容量后會回到第一個位置,所以需要有兩個字段記錄當(dāng)前讀和寫的下標(biāo)位置
    sendx    uint   // 下一次發(fā)送數(shù)據(jù)的下標(biāo)位置
    recvx    uint   // 下一次讀取數(shù)據(jù)的下標(biāo)位置
    //當(dāng)循環(huán)數(shù)組中沒有數(shù)據(jù)時,收到了接收請求,那么接收數(shù)據(jù)的變量地址將會寫入讀等待隊列
    //當(dāng)循環(huán)數(shù)組中數(shù)據(jù)已滿時,收到了發(fā)送請求,那么發(fā)送數(shù)據(jù)的變量地址將寫入寫等待隊列
    recvq    waitq  // 讀等待隊列
    sendq    waitq  // 寫等待隊列

    // 保護(hù) hchan 中所有字段
    lock mutex
}

waitqsudog 的一個雙向鏈表,而 sudog 實際上是對 goroutine 的一個封裝:

type waitq struct {
    first *sudog
    last  *sudog
}

例如,創(chuàng)建一個容量為 6 的,元素為 int 型的 channel 數(shù)據(jù)結(jié)構(gòu)如下 :


總結(jié)hchan結(jié)構(gòu)體的主要組成部分有四個:

  • 用來保存goroutine之間傳遞數(shù)據(jù)的循環(huán)鏈表。=====> buf。
  • 用來記錄此循環(huán)鏈表當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值。=====> sendx和recvx。
  • 用于保存向該chan發(fā)送和從改chan接收數(shù)據(jù)的goroutine的隊列。=====> sendq 和 recvq
  • 保證channel寫入和讀取數(shù)據(jù)時線程安全的鎖。 =====> lock

創(chuàng)建

我們知道,通道有兩個方向,發(fā)送和接收。理論上來說,我們可以創(chuàng)建一個只發(fā)送或只接收的通道,但是這種通道創(chuàng)建出來后,怎么使用呢?一個只能發(fā)的通道,怎么接收呢?同樣,一個只能收的通道,如何向其發(fā)送數(shù)據(jù)呢?

創(chuàng)建 chan 的函數(shù)是 makechan:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 省略了檢查 channel size,align 的代碼
    // ……

    var c *hchan
    // 如果元素類型不含指針 或者 size 大小為 0(無緩沖類型)
    // 只進(jìn)行一次內(nèi)存分配
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // 如果 hchan 結(jié)構(gòu)體中不含指針,GC 就不會掃描 chan 中的元素
        // 只分配 "hchan 結(jié)構(gòu)體大小 + 元素大小*個數(shù)" 的內(nèi)存
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 如果是緩沖型 channel 且元素大小不等于 0(大小等于 0的元素類型:struct{})
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            // 1. 非緩沖型的,buf 沒用,直接指向 chan 起始地址處
            // 2. 緩沖型的,能進(jìn)入到這里,說明元素?zé)o指針且元素類型為 struct{},也無影響
            // 因為只會用到接收和發(fā)送游標(biāo),不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內(nèi)容)
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 進(jìn)行兩次內(nèi)存分配操作
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    // 循環(huán)數(shù)組長度
    c.dataqsiz = uint(size)

    // 返回 hchan 指針
    return c
}

從函數(shù)原型來看,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針。

新建一個 chan 后,內(nèi)存在堆上分配,大概長這樣:


三、向channel發(fā)送數(shù)據(jù)

源碼分析

發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù),直接上源碼,同樣大部分都注釋了,可以看懂主流程:

// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未發(fā)送成功
        if !block {
            return false
        }
        // 當(dāng)前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關(guān)……

    // 對于不阻塞的 send,快速檢測失敗場景
    //
    // 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間。這可能是:
    // 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
    // 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel,并發(fā)安全
    lock(&c.lock)

    // 如果 channel 關(guān)閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對于緩沖型的 channel,如果還有緩沖空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將數(shù)據(jù)從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送游標(biāo)值加 1
        c.sendx++
        // 如果發(fā)送游標(biāo)值等于容量值,游標(biāo)值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩沖區(qū)的元素數(shù)量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,則直接返回錯誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了,發(fā)送方會被阻塞。接下來會構(gòu)造一個 sudog

    // 獲取當(dāng)前 goroutine 的指針
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊列
    c.sendq.enqueue(mysg)

    // 當(dāng)前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這里開始被喚醒了(channel 有機會可以發(fā)送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒后,channel 關(guān)閉了。坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上綁定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

上面的代碼注釋地比較詳細(xì)了,我們來詳細(xì)看看。

  • 如果檢測到 channel 是空的,當(dāng)前 goroutine 會被掛起。
  • 對于不阻塞的發(fā)送操作,如果 channel 未關(guān)閉并且沒有多余的緩沖空間(說明:a. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine;b. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素)

對于這一點,runtime 源碼里注釋了很多。這一條判斷語句是為了在不阻塞發(fā)送的場景下快速檢測到發(fā)送失敗,好快速返回。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
}
  1. 如果檢測到 channel 已經(jīng)關(guān)閉,直接 panic。
  2. 如果能從等待接收隊列 recvq 里出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的,沒有元素,所以才會有等待接收者。這時會調(diào)用 send 函數(shù)將元素直接從發(fā)送者的棧拷貝到接收者的棧,關(guān)鍵操作由 sendDirect 函數(shù)完成。
// send 函數(shù)處理向一個空的 channel 發(fā)送操作

// ep 指向被發(fā)送的元素,會被直接拷貝到接收的 goroutine
// 之后,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待隊列里有 goroutine,肯定是空的)
// c 必須被上鎖,發(fā)送操作執(zhí)行完后,會使用 unlockf 函數(shù)解鎖
// sg 必須已經(jīng)從等待隊列里取出來了
// ep 必須是非空,并且它指向堆或調(diào)用者的棧
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……

    // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
    if sg.elem != nil {
        // 直接拷貝內(nèi)存(從發(fā)送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上綁定的 goroutine
    gp := sg.g
    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒接收的 goroutine. skip 和打印棧相關(guān),暫時不理會
    goready(gp, skip+1)
}

繼續(xù)看 sendDirect 函數(shù):

// 向一個非緩沖型的 channel 發(fā)送數(shù)據(jù)、從一個無元素的(非緩沖型或緩沖型但空)的 channel
// 接收數(shù)據(jù),都會導(dǎo)致一個 goroutine 直接操作另一個 goroutine 的棧
// 由于 GC 假設(shè)對棧的寫操作只能發(fā)生在 goroutine 正在運行中并且由當(dāng)前 goroutine 來寫
// 所以這里實際上違反了這個假設(shè)。可能會造成一些問題,所以需要用到寫屏障來規(guī)避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在當(dāng)前 goroutine 的棧上,dst 是另一個 goroutine 的棧

    // 直接進(jìn)行內(nèi)存"搬遷"
    // 如果目標(biāo)地址的棧發(fā)生了棧收縮,當(dāng)我們讀出了 sg.elem 后
    // 就不能修改真正的 dst 位置的值了
    // 因此需要在讀和寫之前加上一個屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

這里涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作,一般而言,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設(shè)。為了不出問題,寫的過程中增加了寫屏障,保證正確地完成寫操作。這樣做的好處是減少了一次內(nèi)存 copy:不用先拷貝到 channel 的 buf,直接由發(fā)送者到接收者,沒有中間商賺差價,效率得以提高,完美。

然后,解鎖、喚醒接收者,等待調(diào)度器的光臨,接收者也得以重見天日,可以繼續(xù)執(zhí)行接收操作之后的代碼了。

  • 如果 c.qcount < c.dataqsiz,說明緩沖區(qū)可用(肯定是緩沖型的 channel)。先通過函數(shù)取出待發(fā)送元素應(yīng)該去到的位置:
qp := chanbuf(c, c.sendx)

// 返回循環(huán)隊列里第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

c.sendx 指向下一個待發(fā)送元素在循環(huán)數(shù)組中的位置,然后調(diào)用 typedmemmove 函數(shù)將其拷貝到循環(huán)數(shù)組中。之后 c.sendx 加 1,元素總量加 1 :c.qcount++,最后,解鎖并返回。

  • 如果沒有命中以上條件的,說明 channel 已經(jīng)滿了。不管這個 channel 是緩沖型的還是非緩沖型的,都要將這個 sender “關(guān)起來”(goroutine 被阻塞)。如果 block 為 false,直接解鎖,返回 false。
  • 最后就是真的需要被阻塞的情況。先構(gòu)造一個 sudog,將其入隊(channel 的 sendq 字段)。然后調(diào)用 goparkunlock 將當(dāng)前 goroutine 掛起,并解鎖,等待合適的時機再喚醒。

喚醒之后,從 goparkunlock 下一行代碼開始繼續(xù)往下執(zhí)行。

這里有一些綁定操作,sudog 通過 g 字段綁定 goroutine,而 goroutine 通過 waiting 綁定 sudog,sudog 還通過 elem 字段綁定待發(fā)送元素的地址,以及 c 字段綁定被“坑”在此處的 channel。

所以,待發(fā)送的元素地址其實是存儲在 sudog 結(jié)構(gòu)體里,也就是當(dāng)前 goroutine 里。

四、從channel接收數(shù)據(jù)

接收操作有兩種寫法,一種帶 “ok”,反應(yīng) channel 是否關(guān)閉;一種不帶 “ok”,這種寫法,當(dāng)接收到相應(yīng)類型的零值時無法知道是真實的發(fā)送者發(fā)送過來的值,還是 channel 被關(guān)閉后,返回給接收者的默認(rèn)類型的零值。兩種寫法,都有各自的應(yīng)用場景。

經(jīng)過編譯器的處理后,這兩種寫法最后對應(yīng)源碼里的這兩個函數(shù):

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv1 函數(shù)處理不帶 “ok” 的情形,chanrecv2 則通過返回 “received” 這個字段來反應(yīng) channel 是否被關(guān)閉。接收值則比較特殊,會“放到”參數(shù) elem 所指向的地址了,這很像 C/C++ 里的寫法。如果代碼里忽略了接收值,這里的 elem 為 nil。

無論如何,最終轉(zhuǎn)向了 chanrecv 函數(shù):

// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài),將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址。返回 (true, true)
// 如果 ep 非空,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內(nèi)容 …………

    // 如果是一個 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否則,接收一個 nil 的 channel,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會執(zhí)行到這里
        throw("unreachable")
    }

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
    // 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
    // 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
    // 2. 緩沖型,但 buf 里沒有元素
    // 之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
    // 因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候 channel 也是未關(guān)閉的,
    // 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
    // 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
    // 也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
    // buf 里有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
            // 那么接收的值將是一個該類型的零值
            // typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
            typedmemclr(c.elemtype, ep)
        }
        // 從一個已關(guān)閉的 channel 接收,selected 會返回true
        return true, false
    }

    // 等待發(fā)送隊列里有 goroutine 存在,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩沖型的 channel
    // 2. 緩沖型的 channel,但 buf 滿了
    // 針對 1,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
    // 針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩沖型,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // 直接從循環(huán)數(shù)組里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游標(biāo)向前移動
        c.recvx++
        // 接收游標(biāo)歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數(shù)組里的元素個數(shù)減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構(gòu)造一個 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收數(shù)據(jù)的地址保存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進(jìn)入channel 的等待接收隊列
    c.recvq.enqueue(mysg)
    // 將當(dāng)前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
  1. 如果 channel 是一個空值(nil),在非阻塞模式下,會直接返回。在阻塞模式下,會調(diào)用 gopark 函數(shù)掛起 goroutine,這個會一直阻塞下去。因為在 channel 是 nil 的情況下,要想不阻塞,只有關(guān)閉它,但關(guān)閉一個 nil 的 channel 又會發(fā)生 panic,所以沒有機會被喚醒了。更詳細(xì)地可以在 closechan 函數(shù)的時候再看。
  2. 和發(fā)送函數(shù)一樣,接下來搞了一個在非阻塞模式下,不用獲取鎖,快速檢測到失敗并且返回的操作。順帶插一句,我們平時在寫代碼的時候,找到一些邊界條件,快速返回,能讓代碼邏輯更清晰,因為接下來的正常情況就比較少,更聚焦了,看代碼的人也更能專注地看核心代碼邏輯了。
// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:

  • 非緩沖型,等待發(fā)送列隊里沒有 goroutine 在等待
  • 緩沖型,但 buf 里沒有元素

之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候, channel 也是未關(guān)閉的,因此在這種情況下可以直接宣布接收失敗,快速返回。因為沒被選中,也沒接收到數(shù)據(jù),所以返回值為 (false, false)。

接下來的操作,首先會上一把鎖,粒度比較大。如果 channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素。對應(yīng)非緩沖型關(guān)閉和緩沖型關(guān)閉但 buf 無元素的情況,返回對應(yīng)類型的零值,但 received 標(biāo)識是 false,告訴調(diào)用者此 channel 已關(guān)閉,你取出來的值并不是正常由發(fā)送者發(fā)送過來的數(shù)據(jù)。但是如果處于 select 語境下,這種情況是被選中了的。很多將 channel 用作通知信號的場景就是命中了這里。

接下來,如果有等待發(fā)送的隊列,說明 channel 已經(jīng)滿了,要么是非緩沖型的 channel,要么是緩沖型的 channel,但 buf 滿了。這兩種情況下都可以正常接收數(shù)據(jù)。

于是,調(diào)用 recv 函數(shù):

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是非緩沖型的 channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        // 未忽略接收的數(shù)據(jù)
        if ep != nil {
            // 直接拷貝數(shù)據(jù),從 sender goroutine -> receiver goroutine
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 緩沖型的 channel,但 buf 已滿。
        // 將循環(huán)數(shù)組 buf 隊首的元素拷貝到接收數(shù)據(jù)的地址
        // 將發(fā)送者的數(shù)據(jù)入隊。實際上這時 revx 和 sendx 值相等
        // 找到接收游標(biāo)
        qp := chanbuf(c, c.recvx)
        // …………
        // 將接收游標(biāo)處的數(shù)據(jù)拷貝給接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }

        // 將發(fā)送者數(shù)據(jù)拷貝到 buf
        typedmemmove(c.elemtype, qp, sg.elem)
        // 更新游標(biāo)值
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx
    }
    sg.elem = nil
    gp := sg.g

    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }

    // 喚醒發(fā)送的 goroutine。需要等到調(diào)度器的光臨
    goready(gp, skip+1)
}

如果是非緩沖型的,就直接從發(fā)送者的??截惖浇邮照叩臈!?/p>

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

否則,就是緩沖型 channel,而 buf 又滿了的情形。說明發(fā)送游標(biāo)和接收游標(biāo)重合了,因此需要先找到接收游標(biāo):

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

將該處的元素拷貝到接收地址。然后將發(fā)送者待發(fā)送的數(shù)據(jù)拷貝到接收游標(biāo)處。這樣就完成了接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作。接著,分別將發(fā)送游標(biāo)和接收游標(biāo)向前進(jìn)一,如果發(fā)生“環(huán)繞”,再從 0 開始。

最后,取出 sudog 里的 goroutine,調(diào)用 goready 將其狀態(tài)改成 “runnable”,待發(fā)送者被喚醒,等待調(diào)度器的調(diào)度。

然后,如果 channel 的 buf 里還有數(shù)據(jù),說明可以比較正常地接收。注意,這里,即使是在 channel 已經(jīng)關(guān)閉的情況下,也是可以走到這里的。這一步比較簡單,正常地將 buf 里接收游標(biāo)處的數(shù)據(jù)拷貝到接收數(shù)據(jù)的地址。

到了最后一步,走到這里來的情形是要阻塞的。當(dāng)然,如果 block 傳進(jìn)來的值是 false,那就不阻塞,直接返回就好了。

先構(gòu)造一個 sudog,接著就是保存各種值了。注意,這里會將接收數(shù)據(jù)的地址存儲到了 elem 字段,當(dāng)被喚醒時,接收到的數(shù)據(jù)就會保存到這個字段指向的地址。然后將 sudog 添加到 channel 的 recvq 隊列里。調(diào)用 goparkunlock 函數(shù)將 goroutine 掛起。

接下來的代碼就是 goroutine 被喚醒后的各種收尾工作了。

channel操作總結(jié):


注意:關(guān)閉已經(jīng)關(guān)閉的channel也會引發(fā)panic。


References:
https://cloud.tencent.com/developer/article/1750350
https://golang.design/go-questions/channel
https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html
https://juejin.cn/post/7037656471210819614

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容