golang channel 源碼剖析

channel 在 golang 中是一個(gè)非常重要的特性,它為我們提供了一個(gè)并發(fā)模型。對(duì)比鎖,通過(guò) chan 在多個(gè) goroutine 之間完成數(shù)據(jù)交互,可以讓代碼更簡(jiǎn)潔、更容易實(shí)現(xiàn)、更不容易出錯(cuò)。golang 的 channel 設(shè)計(jì)模型遵循 CSP(Communicating Sequential Processes,序列通信處理) 的設(shè)計(jì)理念。

本文將從源碼角度來(lái)分析 golang 的 channel 是怎樣實(shí)現(xiàn)的。先看一下 *channel8 給我們提供的一些特性。

1. channel 的使用

關(guān)于這一小節(jié),熟悉 channel 使用的讀者可以快速瀏覽一下這一部分,這里沒(méi)有什么特別的東西。

1.1 使用通道傳輸數(shù)據(jù)

func main() {
    c := make(chan int, 8)
    go func() {
        c <- 1
    }()
    fmt.Println(<-c)
}

上面的代碼中,make(chan int, 8) 創(chuàng)建并返回一個(gè)緩沖區(qū)大小為 8 的通道,通道的元素類(lèi)型為 int。如果把這個(gè) 8 去掉,像這樣:c := make(chan int),那么創(chuàng)建的通道就是沒(méi)有緩沖區(qū)的通道。如果你熟悉 go,你一定知道我們可以一直向緩沖區(qū)發(fā)送數(shù)據(jù),直到緩沖區(qū)變滿為止才會(huì)阻塞。而如果我們向無(wú)緩沖區(qū)的通道發(fā)送數(shù)據(jù),就有存在其它的接收者正在等待,發(fā)送才不會(huì)不阻塞。

在創(chuàng)建通道之后,接下來(lái)使用 go 語(yǔ)句啟動(dòng)一個(gè) goroutine,這個(gè) goroutine 中,將 1 寫(xiě)入通道 c。最后使用 <-c 讀取通道數(shù)據(jù)并且打印。

這很簡(jiǎn)單,但是我們需要思考幾個(gè)問(wèn)題:

  • 創(chuàng)建通道的時(shí)候發(fā)生了什么事情?我們創(chuàng)建了一個(gè)什么樣的數(shù)據(jù)結(jié)構(gòu)?
  • 向通道發(fā)送數(shù)據(jù)的時(shí)候發(fā)生了什么事情?緩沖區(qū)滿了就會(huì)阻塞是怎么實(shí)現(xiàn)的?
  • 從通道中接收數(shù)據(jù)時(shí)發(fā)生了什么事情?
  • 帶緩沖區(qū)的通道和不帶緩沖區(qū)的通道有什么不同嗎?

1.2 select

然后讓我們看一個(gè)稍微復(fù)雜一點(diǎn)的: select。select 會(huì)從所有的 case 中挑選出一個(gè)不會(huì)阻塞的通道讀操作、寫(xiě)操作或者是 default 操作執(zhí)行。如果都會(huì)阻塞,那么 select 就會(huì)等待,對(duì)應(yīng)的 goroutine 也會(huì)被掛起。

如下面的代碼, c1c2 是兩個(gè)通道, go 啟動(dòng)一個(gè) goroutine,如果 c1 可讀且 c2 不可寫(xiě),那么就會(huì)執(zhí)行第一個(gè) case, 如果 c1 不可讀但 c2 可寫(xiě),那么就會(huì)執(zhí)行第二個(gè) case。如果 c1 可讀而且 c2 可寫(xiě),那么就會(huì)隨機(jī)執(zhí)行第一個(gè) case 或者第二個(gè) case。如果 c1 不可讀而且 c2 不可寫(xiě),那么就會(huì)執(zhí)行 default。這里,如果我們沒(méi)有實(shí)現(xiàn) default 分支,那么 select 就會(huì)阻塞。

package main

import (
    "fmt"
    "math/rand"
)

func main() {

    c1 := make(chan int)
    c2 := make(chan int)

    go func() {
        for {
            select {
            case x := <-c1:
                fmt.Println("從 c1 接受數(shù)據(jù);", x)
            case c2 <- 100:
                fmt.Println("向 c2 發(fā)送數(shù)據(jù)")
            default:
                fmt.Println("c1 和 c2 都沒(méi)什么可操作的")
            }
        }
    }()

    for i := 0; i < 500; i++ {
        rd := rand.Intn(2)
        switch rd {
        case 0:
            c1 <- 200
        case 1:
            <-c2
        }
    }
}

只是稍微復(fù)雜了一點(diǎn)點(diǎn),但是還是有很多東西我們需要去探索:

  • select 的工作原理是什么?它是怎么選出一個(gè)可執(zhí)行的語(yǔ)句的?
  • select 為什么可以在多個(gè)通道上阻塞?
  • 為什么沒(méi)有 default 分支時(shí)會(huì)阻塞,有 default 時(shí)會(huì)執(zhí)行 default 的內(nèi)容?
  • 有多個(gè)可執(zhí)行的語(yǔ)句時(shí),為什么會(huì)是隨機(jī)選的,而不是按照我們代碼的順序?

帶著上面的所有問(wèn)題,我們來(lái)看一看 channel 的源碼。

2. 預(yù)備知識(shí)

在深入 channel 源碼之前,先了解一下需要有哪些預(yù)備知識(shí)

2.1 goroutine 的表示

runtime 庫(kù)中,goroutine 用一個(gè)叫做 g 的結(jié)構(gòu)表示,每個(gè) g 對(duì)象表示一個(gè) goroutine

type g struct {
  // ...
  atomicstatus   uint32  // 表示 goroutine 的狀態(tài)
  param          unsafe.Pointer // 喚醒時(shí)參數(shù)
  waiting        *sudog // 等待隊(duì)列,后文會(huì)說(shuō)到
  // ...
}

通過(guò) getg() 函數(shù)可以拿到當(dāng)前 goroutineg 對(duì)象:

func getg() *g

2.2 sudog

g 對(duì)象中,有一個(gè)名字為 waiting 的 *sudog 指針,它表示這個(gè) goroutine 正在等待什么東西或者正在等待哪些東西。

sudog 是一個(gè)鏈表形式的類(lèi)型,waitlink 表示它的下一個(gè)節(jié)點(diǎn)。對(duì)于 cisSelect、 elem 字段,我們后文會(huì)說(shuō)到。

type sudog struct {
        // ....
        isSelect bool
        elem     unsafe.Pointer // data element (may point to stack)      
        waitlink    *sudog // g.waiting list or semaRoot
        c           *hchan // channel
}

acquireSudog 申請(qǐng)一個(gè) sudog 對(duì)象。 releaseSudog 釋放 sudog 對(duì)象

func acquireSudog() *sudog {}
func releaseSudog(s *sudog) {}

2.3 gopark 和 goready

gopark 將當(dāng)前的 goroutine 修改成等待狀態(tài),然后等待被喚醒。

func gopark(unlockf func(*g, unsafe.Pointer) bool, 
  lock unsafe.Pointer, 
  reason waitReason, 
  traceEv byte, 
  traceskip int)

goready 函數(shù)用來(lái)喚醒一個(gè) goroutine,它將 goroutine 的狀態(tài)修改為可運(yùn)行狀態(tài),隨后會(huì)被調(diào)度器運(yùn)行。當(dāng)被調(diào)度時(shí),對(duì)應(yīng)的 gopark 函數(shù)返回。

2.4 race***

在編譯時(shí),使用 -race 參數(shù),可以執(zhí)行競(jìng)態(tài)檢查,在我們即將要分析的源碼中,有相當(dāng)部分代碼為 race 提供了支持。分析時(shí)會(huì)跳過(guò)這一部分,有興趣的讀者可以參考: https://blog.golang.org/race-detector

3. 基本數(shù)據(jù)結(jié)構(gòu)

chan 使用 hchan 表示,它的傳參與賦值始終都是指針形式,每個(gè) hchan 對(duì)象代表著一個(gè) chan。

  • hchan 中包含一個(gè)緩沖區(qū) buf,它表示已經(jīng)發(fā)送但是還未被接收的數(shù)據(jù)緩存。buf 的大小由創(chuàng)建 chan 時(shí)的參數(shù)來(lái)決定。qcount 表示當(dāng)前緩沖區(qū)中有效數(shù)據(jù)的總量,dataqsiz 表示緩沖區(qū)的大小,對(duì)于無(wú)緩沖區(qū)通道而言 dataqsiz 的值為 0。如果 qcount 和 dataqsiz 的值相同,則表示緩沖區(qū)用完了。
  • 緩沖區(qū)表示的是一個(gè)環(huán)形隊(duì)列 (如果你不熟悉環(huán)形隊(duì)列,可以看一下 https://www.geeksforgeeks.org/circular-queue-set-1-introduction-array-implementation/)。其中 sendx 表示下一個(gè)發(fā)送的地址,recvx 表示下一個(gè)接收的地址。
  • recvq 表示等待接收的 sudog 列表,一個(gè)接收語(yǔ)句執(zhí)行時(shí),如果緩沖區(qū)沒(méi)有數(shù)據(jù)而且當(dāng)前沒(méi)有別的發(fā)送者在等待,那么執(zhí)行者 goroutine 會(huì)被掛起,并且將對(duì)應(yīng)的 sudog 對(duì)象放到 recvq 中。
  • sendq 類(lèi)似于 recvq,一個(gè)發(fā)送語(yǔ)句執(zhí)行時(shí),如果緩沖區(qū)已經(jīng)滿了,而且沒(méi)有接收者在等待,那么執(zhí)行者 goroutine 會(huì)被掛起,并且將對(duì)應(yīng)的 sudog 放到 sendq 中。
  • closed 表示通道是否已經(jīng)被關(guān)閉,0 代表沒(méi)有被關(guān)閉,非 0 值代表已經(jīng)被關(guān)閉。
  • lock 用于對(duì) hchan 加鎖
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

4. 創(chuàng)建通道

當(dāng)你在代碼里面寫(xiě)了一句 c := make(chan int, 8) 時(shí),編譯器就會(huì)把它翻譯成

t := typeof(chan int) // 編譯器給你生成了 chan int 的類(lèi)型描述信息,然后 t 指向這個(gè)類(lèi)型描述信息
c := makechan(t, 8)

沒(méi)錯(cuò),makechan 就是創(chuàng)建通道的入口。它的目的就是構(gòu)建 hchan 對(duì)象并返回。由于 hchan 在程序中始終以引用的形式存在,通過(guò)賦值或者是傳參,它指向的都是同一個(gè)對(duì)象,所以 hchan 在標(biāo)準(zhǔn)庫(kù)中都是以指針形式呈現(xiàn)給外部的。對(duì)于 makechan 的邏輯,這里分 3 種情況:

  1. 緩沖區(qū)所需大小為 0。對(duì)于這種情況,在為 hchan 分配內(nèi)存時(shí),只需要分配 sizeof(hchan) 大小的內(nèi)存。這很好理解。
  2. 緩沖區(qū)所需大小不為 0,而且數(shù)據(jù)類(lèi)型不包含指針。
    我們先來(lái)理解下 不包含指針 這個(gè)東西,對(duì)于指針類(lèi)型或者成員中有指針的類(lèi)型,那就是包含指針的,否則就是不包含指針的。如下代碼,A{}是不包含指針的,&A{}、B{}、&B{} 是包含指針的。
type A struct {
    a int
    b int
}

type B struct {
    a *int
    b *int
}

對(duì)于不包含指針的這種情況,分配一塊連續(xù)內(nèi)存容納 hchan 和緩沖區(qū)對(duì)象。

  1. 緩沖區(qū)所需大小不為 0,而且數(shù)據(jù)類(lèi)型包含指針。對(duì)于這種情況,分配兩塊內(nèi)存,其中一塊表示 hchan 對(duì)象,另外一塊用來(lái)表示 buf。

下面是 makechan 的核心代碼:


func makechan(t *chantype, size int) *hchan {
    // ...

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))

    var c *hchan
    switch {
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
    case elem.kind&kindNoPointers != 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    // ...

    return c
}

至于為什么要區(qū)分包含指針和不包含指針這兩種情況,makechan 的注釋給出了一段解釋?zhuān)?/p>

Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.

下面是我的猜想,如果不對(duì),歡迎高人指正:

GC 不會(huì)知道 unsafe.Pointer 里面存儲(chǔ)的是什么類(lèi)型,因此如果實(shí)際元素類(lèi)型里面包含指針,就要通過(guò) mallocgc 將分配什么類(lèi)型的數(shù)據(jù)告訴 gc,這樣 gc 就不會(huì)回收這塊內(nèi)存中存儲(chǔ)的指針?biāo)赶虻膬?nèi)存。反之, buf 不包含指針,可以用一塊大的內(nèi)存來(lái)存儲(chǔ) hchan 對(duì)象和緩沖區(qū),這樣可以減輕 gc 壓力。

5. 發(fā)送數(shù)據(jù)

向通道發(fā)送數(shù)據(jù),runtime 中通過(guò) chansend 實(shí)現(xiàn),它的聲明如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 

參數(shù) c 表示要向哪個(gè) chan 發(fā)送數(shù)據(jù), ep 表示要發(fā)送的數(shù)據(jù)的地址,block 表示是否需要阻塞, callerpc 表示調(diào)用地址。返回值 bool 表示數(shù)據(jù)是否成功發(fā)送。

block 是為了實(shí)現(xiàn)如下代碼的語(yǔ)義:

    c := make(chan int)
        // ...
    select {
    case <-c:
        // ...
    default:
        // ...
    }

上面這段代碼被編譯成對(duì) selectnbsend 的調(diào)用:

if selectnbsend(c, v) {
    ... foo
} else {
    ... bar
}

selectnbsend 的實(shí)現(xiàn)如下

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc()) // 非阻塞的發(fā)送
}

它與擁有多個(gè) case 的 select 不同(多個(gè) case 的 select 將在后文分析)。

chansend 按照下面的邏輯執(zhí)行:

  1. 如果通道是空的,對(duì)于非阻塞的發(fā)送,直接返回 false。對(duì)于阻塞的通道,將 goroutine 掛起,并且永遠(yuǎn)不會(huì)返回
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
  1. 非阻塞的情況下,如果通道沒(méi)有關(guān)閉,而且當(dāng)前沒(méi)有接收者,緩沖區(qū)也已經(jīng)滿了或者沒(méi)有緩沖區(qū)(即不可以發(fā)送數(shù)據(jù))。那么直接返回 false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

注意:前兩步中都沒(méi)有加鎖。第 1 步中,沒(méi)有訪問(wèn) hchan 的任何成員,所以無(wú)需加鎖。第 2 步中,可能被寫(xiě)的變量只有 closed 、qcount 和 c.recvq.first ,這些變量都是單字長(zhǎng)的,所以對(duì)它們的單個(gè)值的讀操作是原子性的。

然而,我們應(yīng)該要更仔細(xì)的分析,對(duì)單個(gè)值的讀操作是原子性的,但是對(duì)多個(gè)值的讀操作就不一定是原子性的了。因?yàn)樵谂袛嗤?closed 之后,通道可能在這一瞬間從未關(guān)閉狀態(tài)轉(zhuǎn)變成關(guān)閉狀態(tài)(closed 不會(huì)從非 0 變成 0,但有可能從 0 變成非 0,所以在判斷 closed==0 之后,通道可能還會(huì)轉(zhuǎn)變成關(guān)閉狀態(tài)),也就是說(shuō)這里的 if 測(cè)試通過(guò)的那一瞬間,可能有兩種情況:

  • 通道沒(méi)有關(guān)閉,而且已經(jīng)滿了。那么這段邏輯運(yùn)行 ok,應(yīng)該返回 false。
  • 通道已經(jīng)關(guān)閉,而且已經(jīng)滿了。按照發(fā)送數(shù)據(jù)的語(yǔ)義來(lái)說(shuō),此時(shí)應(yīng)該 panic。但實(shí)際上這段邏輯的實(shí)現(xiàn),它會(huì)返回 false。

但我們還要注意到的是,第 2 種情況的發(fā)生,肯定意味著第 1 種情況發(fā)生過(guò)。而且它取決與通道的 close 是何時(shí)被調(diào)用的,至少在 if 之前 close 還沒(méi)有完成調(diào)用。所以我們認(rèn)為第 2 種情況的邏輯也是正確的。

(嗯,確實(shí)有點(diǎn)難理解,也很難描述)

  1. 調(diào)用 lock 對(duì)通道加鎖

  2. 如果此時(shí)通道被關(guān)閉,那么發(fā)生 panic

// 第 3 步,加鎖
lock(&c.lock)  

// 第 4 步,如果通道已經(jīng)被關(guān)閉了,那么 panic
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
  1. 從 recvq 中取出一個(gè)接收者,如果接收者存在,直接向該接收者發(fā)送數(shù)據(jù)。
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send 函數(shù)將 ep 作為參數(shù)傳送給接收方的 sg 對(duì)象,然后使用 goready 將其喚醒。sg.elem 如果非空,則將 ep 的內(nèi)容直接 copy 到 elem 指向的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    memmove(dst, src, t.size)
}

注意:如果有接收者在隊(duì)列中等待,則說(shuō)明此時(shí)的緩沖區(qū)是空的。

  1. 如果緩沖區(qū)還有多余的空間,那么將數(shù)據(jù)寫(xiě)入緩沖區(qū)。寫(xiě)入緩沖區(qū)后,將發(fā)送位置往后移動(dòng)一個(gè)單位,然后將 qcount 加 1
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

其中 chanbuf 函數(shù)從 buf 中取出第 i 個(gè)元素的存放地址:

func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

typedmemmove 函數(shù)將類(lèi)型為 c.elemtype 的 ep 的內(nèi)容 拷貝到 qp 中。

  1. 如果執(zhí)行前面的所有步驟還沒(méi)有成功發(fā)送,那么就表示緩沖區(qū)沒(méi)有空間了,而且也沒(méi)有任何接收者在等待。所以后面必須要將 goroutine 掛起然后等待新的接收者了。但對(duì)于非阻塞的調(diào)用,不能等待,返回 false 表示數(shù)據(jù)發(fā)送不成功。
if !block {
        unlock(&c.lock)
        return false
    }
  1. 創(chuàng)建 sudog 對(duì)象,然后入隊(duì)并且讓 goroutine 進(jìn)入等待狀態(tài)。直到被喚醒時(shí) goparkunlock 才會(huì)返回。
gp := getg()

mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c

gp.waiting = mysg
gp.param = nil

c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  1. goparkunlock 返回后,代表已經(jīng)發(fā)送完數(shù)據(jù)了,此時(shí)做一些清理工作,如將 sudog 對(duì)象釋放,將 g 的 waiting 置空等。

6. 接收數(shù)據(jù)

接收數(shù)據(jù)的操作和發(fā)送數(shù)據(jù)的操作大同小異,它的實(shí)現(xiàn)函數(shù)為 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 
  • chanrecv 從 c 中接收數(shù)據(jù),并且將接收到的數(shù)據(jù)存到 ep 中,block 表示是否需要阻塞。
  • 如果沒(méi)有數(shù)據(jù)可以接收,而且是非阻塞的情況,則返回 (false,flase)。如果 c 已經(jīng)關(guān)閉了,將 ep 指向的值置為 0值,并且返回 (true, false)。其它情況返回值為 (true,true),表示成功從 c 中獲取到了數(shù)據(jù)。

同樣地,block 是為了實(shí)現(xiàn)以下語(yǔ)義:

select {
case v = <-c:
    ... foo
default:
    ... bar
}

它被編譯成:

if selectnbrecv(&v, c) {
    ... foo
} else {
    ... bar
}

其中 selectnbrecv 的實(shí)現(xiàn)為:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(c, elem, false)  // 非阻塞接收
    return
}

接下來(lái),我們分析以下 recv 的邏輯:

  1. 如果 c 為空且是非阻塞模式,那么直接返回 (false,false)。否則永遠(yuǎn)等待
if c == nil {
    if !block {
        return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
}
  1. 對(duì)于非阻塞的情況,如果當(dāng)前沒(méi)有數(shù)據(jù)可以接收了,那么返回 (false,false)。
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    atomic.Load(&c.closed) == 0 {
    return
}

和非阻塞發(fā)送有兩個(gè)不同的地方:

  • 對(duì) closed 的判斷放到了后面。
  • 使用了 atomic。

我們先來(lái)看一下下面這段代碼:

c := make(chan int, 1)
c <- 1

go func() {
    select {
    case <-c:
        println("recv from c")
    default:
        println("c is not ready - BUG!")
    }
}()

close(c)
<-c

從 go 的語(yǔ)義上來(lái)說(shuō),不論何時(shí),default 都不應(yīng)該被執(zhí)行:如果 select 發(fā)生在 close 之前,那么從 c 中取出來(lái)的數(shù)據(jù)應(yīng)該是 1。 如果 select 發(fā)生在 close 之后但是在 <-c 之前,那么也應(yīng)該從 c 中取出 1。如果 select 發(fā)生在 <-c 之后,從 c 中取出的數(shù)據(jù)是 0 ,而且接收數(shù)據(jù)是失敗的,但是不會(huì)執(zhí)行 default。

那么,如果把對(duì) closed 的判斷放到通道是否有數(shù)據(jù)可接收的判斷之前,像這樣:

if !block && atomic.Load(&c.closed) == 0 && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0)  {
    return
}

這意味著 if 測(cè)試通過(guò)后的一瞬間存在兩種情況:

  • 通道未關(guān)閉,但是不存在數(shù)據(jù)可接收,也沒(méi)有發(fā)送者在等待。對(duì)于這種情況,應(yīng)該返回 (false,false)。執(zhí)行 default 段的代碼。
  • 通道已關(guān)閉,且不存在數(shù)據(jù)可接收,也沒(méi)有發(fā)送者在等待。對(duì)于這種情況,根據(jù) go 語(yǔ)義,應(yīng)該返回 (true, false),并且執(zhí)行 case 段的代碼。但是我們的這個(gè)實(shí)現(xiàn)顯然是錯(cuò)誤的,它返回了 (false,false)。就上面的接收例子而言, close(c)<-c 正好發(fā)生在 atomic.Load(&c.closed) == 0 執(zhí)行完成之后,但還沒(méi)有執(zhí)行后面的判斷,那 if 再執(zhí)行后面的判斷,顯然也是通過(guò)的。所以問(wèn)題就出來(lái)了。

再來(lái)看一下正確的實(shí)現(xiàn),它也會(huì)在 if 測(cè)試通過(guò)后的一瞬間存在兩種情況:

  • 不存在數(shù)據(jù)可接收,而且通道沒(méi)有關(guān)閉。此時(shí)返回 (false,false)
  • 存在數(shù)據(jù)可接收,而且通道沒(méi)有關(guān)閉。此時(shí)應(yīng)該返回 (true,true)。但是,這種情況意味著上一種情況曾今存在過(guò), 而且至少在 if 執(zhí)行前的那一瞬間還存在。所以我們認(rèn)為它返回 (false,false) 是合理的。

另外 atomic 在這里是為了保證內(nèi)存順序的正確性。

  1. 加鎖,然后判斷如果通道已經(jīng)關(guān)閉而且沒(méi)有剩余的數(shù)據(jù)可以讀取了,那么就返回 (true,false)。
lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

typedmemclr 的作用是將 ep 指向的類(lèi)型為 elemtype 的內(nèi)存塊置為 0 值。

  1. 如果有發(fā)送者在隊(duì)列等待,那么直接從發(fā)送者那里提取數(shù)據(jù),并且喚醒這個(gè)發(fā)送者。當(dāng)然對(duì)于帶緩沖區(qū)的 chan,它會(huì)先將緩沖區(qū)的數(shù)據(jù)提取出來(lái),然后將等待中的發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū)中。
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx 
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

recv 函數(shù)判斷 chan 是否帶有緩沖區(qū),如果不帶緩沖區(qū),直接從發(fā)送者那里復(fù)制數(shù)據(jù)到 ep。如果帶緩沖區(qū),那么你應(yīng)該能夠理解,由于有發(fā)送者在等待,所以緩沖區(qū)一定是滿的。它將緩沖區(qū)的第一個(gè)數(shù)據(jù)復(fù)制到 ep,然后將發(fā)送者的數(shù)據(jù)復(fù)制到緩沖區(qū)。這是為了盡量滿足先來(lái)后到的需求(當(dāng)然,由于并發(fā)的存在,這樣做實(shí)際上不能完全確定)。

接下來(lái),通過(guò) goready 將發(fā)送者喚醒。

  1. 如果緩沖區(qū)中有數(shù)據(jù),那么從緩沖區(qū)復(fù)制數(shù)據(jù)到 ep,并且修改下次接收位置和 qcount
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}
  1. 在執(zhí)行完成上面的流程后,仍然沒(méi)有返回,說(shuō)明緩沖區(qū)內(nèi)已經(jīng)沒(méi)有數(shù)據(jù)了,而且也沒(méi)有發(fā)送者在等待中。所以如果是非阻塞接收,那么直接返回 (false,false)。
if !block {
    unlock(&c.lock)
    return false, false
}
  1. 對(duì)于阻塞接收的情況,將調(diào)用者 goroutine 掛起,并且等待被喚醒。
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
  1. goparkunlock 返回后,說(shuō)明已經(jīng)接收到數(shù)據(jù)了,或者是通道已經(jīng)被關(guān)閉了。此時(shí)和發(fā)送一樣,做一些清理工作。然后根據(jù)是否為關(guān)閉導(dǎo)致的返回對(duì)應(yīng)的 bool 值。

7. 關(guān)閉通道

closechan 函數(shù)實(shí)現(xiàn)了通道的關(guān)閉,它的聲明如下:

func closechan(c *hchan)

closechan 按照如下的流程執(zhí)行:

  1. 加鎖,然后判斷如果通道早已關(guān)閉了,就 panic。(你不能對(duì)一個(gè)被關(guān)閉的通道再執(zhí)行關(guān)閉操作)
lock(&c.lock)
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
}
  1. 將關(guān)閉標(biāo)志置為 1.
c.closed = 1
  1. 喚醒所有的接收者,并且將接收數(shù)據(jù)置為 0 值。喚醒所有發(fā)送者,令其 panic。 gList 就是一個(gè) g 對(duì)象的列表。
var glist gList

for {
    sg := c.recvq.dequeue()
    if sg == nil {
        break
    }
    if sg.elem != nil {
        typedmemclr(c.elemtype, sg.elem)
        sg.elem = nil
    }
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    glist.push(gp)
}

for {
    sg := c.sendq.dequeue()
    if sg == nil {
        break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    glist.push(gp)
}
unlock(&c.lock)

for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
}

8. select

select 函數(shù)是本文的最后一部分,也是最復(fù)雜的一部分。它的實(shí)現(xiàn)函數(shù)是 selectgo

8.1 selectgo 的聲明

runtime 通過(guò)遍歷+等待的方式實(shí)現(xiàn) select 語(yǔ)義,遍歷時(shí)判斷如果 有可執(zhí)行的 case 或者 select 中帶有 default,那么就執(zhí)行之。如果沒(méi)有,就通過(guò) gopark 將調(diào)用者轉(zhuǎn)換為等待狀態(tài),使用 sudog 鏈表表示它在多個(gè)通道上等待。其中任意一個(gè)通道對(duì)應(yīng)的 sudog 都可以喚醒調(diào)用者。

函數(shù) selectgo 實(shí)現(xiàn)了 select 語(yǔ)義。它的第一個(gè)返回值表示需要執(zhí)行哪個(gè) case, 第 2 個(gè)返回值表示如果要執(zhí)行的 case 是 caseRecv,那么接收數(shù)據(jù)是否成功(對(duì)于已經(jīng)關(guān)閉的通道來(lái)說(shuō),這個(gè)返回值會(huì)是 false,這個(gè)我們?cè)?chanrecv 函數(shù)中已經(jīng)看到了)。

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool)
  • 參數(shù) cas0 指向 scase 數(shù)組的第一個(gè)元素, 每個(gè) scase 表示一個(gè) case 分支, scase 的定義如下:
type scase struct {
    c           *hchan         // chan
    elem        unsafe.Pointer // data element
    kind        uint16
      // ...
}

const (
    caseNil = iota
    caseRecv
    caseSend
    caseDefault
)

c 表示這個(gè) case 對(duì)應(yīng)的通道 ,elem 表示接收數(shù)據(jù)的地址或者要發(fā)送的數(shù)據(jù)的地址。kind 取值為 caseNil 表示一個(gè) 0 值,在真實(shí)的 select 中沒(méi)有任何東西和它對(duì)應(yīng),它用于表示無(wú)效的的意思。caseRecv 和 caseSend 分別表示接收和發(fā)送的 case。caseDefault 對(duì)應(yīng) default 分支。

  • order0 參數(shù)指向的是一個(gè) 2 倍 case 數(shù)量大小的數(shù)組,它用來(lái)為 selectgo 提供額外的空間用來(lái)使用堆排序和隨機(jī)順序執(zhí)行。你可能在想,這個(gè)空間它自己也能分配,為什么要讓外部提供?其實(shí)這樣做是有它的目的的,首先在 selectgo 中,它不知道調(diào)用者的 case 究竟有多少個(gè),那么它無(wú)法分配棧內(nèi)存,它只能分配堆內(nèi)存,而我們的代碼中 for + select 的用法是很常見(jiàn)的,這樣小而且頻繁的堆內(nèi)存分配勢(shì)必給 gc 帶來(lái)非常大的壓力。其次,在 select 的調(diào)用處,編譯器能夠知道你有多少個(gè) case,所以它可以給你分配固定大小的棧內(nèi)存。(對(duì)于這一段,如果你覺(jué)得難以理解,可以先跳過(guò),不影響你理解后文)。
  • ncases 表示的是 case 的數(shù)量,包括 default。

8.2 避免死鎖

在繼續(xù)探索這個(gè)函數(shù)之前,可能還需要了解一個(gè)東西。那就是對(duì)多個(gè)鎖的占有和釋放。
在 selectgo 中,毫無(wú)疑問(wèn)要同時(shí)訪問(wèn)多個(gè)通道,每個(gè)通道都應(yīng)該加鎖才能訪問(wèn)。那么要獲得多個(gè)鎖的所有權(quán),為了不造成死鎖,需要按照固定的順序加鎖和解鎖(我想你應(yīng)該知道死鎖是什么,而且這種按順序的加鎖和解鎖方式可以避免死鎖)。
runtime 中的 sellock 和 selunlock 用于對(duì) scase 數(shù)組加鎖和解鎖。注意解鎖的時(shí)候順序和加鎖的順序是相反的。
另外由于一個(gè) select 語(yǔ)句中可能存在多個(gè) case 對(duì)同一個(gè)通道的操作,而對(duì)于同一個(gè)通道來(lái)說(shuō),只能加鎖一次,也只能解鎖一次。所以加鎖迭代中需要判斷是否和上次加鎖的通道一樣,解鎖迭代中需要判斷下個(gè)要解鎖的通道是否和當(dāng)前通道一樣。 lockorder 是要保證同一個(gè)通道存在多次,那么它們需要是相鄰的。

func sellock(scases []scase, lockorder []uint16) {
    var c *hchan
    for _, o := range lockorder {
        c0 := scases[o].c
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}

func selunlock(scases []scase, lockorder []uint16) {
    for i := len(scases) - 1; i >= 0; i-- {
        c := scases[lockorder[i]].c
        if c == nil {
            break
        }
        if i > 0 && c == scases[lockorder[i-1]].c {
            continue // will unlock it on the next iteration
        }
        unlock(&c.lock)
    }
}

接下來(lái)我們深入探索 selectgo 這個(gè)函數(shù)的實(shí)現(xiàn),根據(jù)代碼結(jié)構(gòu),本節(jié)將按照分段的方式對(duì)這個(gè)函數(shù)進(jìn)行講解。

8.3 pollorder 和 lockorder

pollorder 表示輪詢(xún)順序,為了實(shí)現(xiàn) select 中的隨機(jī)語(yǔ)義,輪詢(xún)應(yīng)該是隨機(jī)的。 pollorder 對(duì)應(yīng)參數(shù) order0 指針的前半部分。pollorder 包含 0~ncases-1 中的所有數(shù)字,下面是隨機(jī)生成 pollorder 的代碼

for i := 1; i < ncases; i++ {
    j := fastrandn(uint32(i + 1))
    pollorder[i] = pollorder[j]
    pollorder[j] = uint16(i)
}

這個(gè)很有意思,它假設(shè)第一個(gè)元素初始為 0,而且沒(méi)有對(duì)后面的元素做任何假設(shè)。每次迭代中,從前面的所有元素中隨機(jī)挑選一個(gè),然后將當(dāng)前索引和它置換。從而生成 0~ncases-1 的值。

它只要求第一個(gè)元素初始值為 0 ,這樣編譯器可以為我們對(duì) select 的調(diào)用生成更加高效的代碼。

lockorder 表示加鎖順序,用以傳給 sellock 和 selunlock 加鎖和解鎖。它最后存儲(chǔ)的值為按照地址排序的通道的。利用 pollorder 構(gòu)建一個(gè)最大堆:

for i := 0; i < ncases; i++ {
    j := i
    c := scases[pollorder[i]].c
    for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
        k := (j - 1) / 2
        lockorder[j] = lockorder[k]
        j = k
    }
    lockorder[j] = pollorder[i]
}

注意和常規(guī)的最小堆構(gòu)建稍有不同,因?yàn)樗鼘⑵渌鼉?nèi)存的內(nèi)容構(gòu)建成最小堆放到了當(dāng)前內(nèi)存中,并且使用插入法建堆。這種方式的時(shí)間復(fù)雜度是 O(nlogn)。相比常規(guī)的建堆時(shí)間復(fù)雜度是 O(n)??此坡耍珜?shí)際上在數(shù)據(jù)量比較小的時(shí)候,插入法建堆更快,而且如果在這里使用的是常規(guī)建堆方法,需要先執(zhí)行一次內(nèi)存拷貝操作。

接下來(lái)就是使用大根堆的排序了:

    for i := ncases - 1; i >= 0; i-- {
        o := lockorder[i]
        c := scases[o].c
        lockorder[i] = lockorder[0]
        j := 0
        for {
            k := j*2 + 1
            if k >= i {
                break
            }
            if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                k++
            }
            if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                lockorder[j] = lockorder[k]
                j = k
                continue
            }
            break
        }
        lockorder[j] = o
    }

每次外層迭代,都將最大的元素移到后面,然后重新調(diào)整位置滿足堆的屬性。

8.3 loop 段

在 loop 段開(kāi)始之前,selectgo 先使用了 sellock 對(duì)所有的通道加鎖,注意 lockorder 在這里的作用。

sellock(scases, lockorder)

loop 段是 selectgo 函數(shù)的核心部分,它的目的是先遍歷一次所有的 case 和 default 語(yǔ)句,看一下是否有可執(zhí)行的分支,如果有,那么就轉(zhuǎn)移到對(duì)應(yīng)的段去處理。否則就阻塞并且等待被喚醒。

我們先看循環(huán)部分:

loop:
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
    casi = int(pollorder[i])
    cas = &scases[casi]
    c = cas.c

    switch cas.kind {
    case caseNil:
        continue

    case caseRecv:
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }

    case caseSend:
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }

    case caseDefault:
        dfli = casi
        dfl = cas
    }
}

它遍歷了所有的 case+default,然后按照 case 的類(lèi)別做如下處理:

  • 無(wú)效的 case,不處理
  • 接收 case,根據(jù)不同的情況分別跳轉(zhuǎn)到 recv, bufrecv, rclose 段。注意這里的順序,rclose 是放在最后面的。
  • 發(fā)送 case,根據(jù)不同的情況分別跳轉(zhuǎn)到 sclose,send, bufsend 段。這里是要把 sclose 放在最前面的,因?yàn)橄蛞粋€(gè)已經(jīng)關(guān)閉的通道發(fā)送數(shù)據(jù),就應(yīng)該 panic
  • 對(duì)于 default,selectgo 簡(jiǎn)單的將這個(gè) case 信息保存下來(lái),留給后面處理。

當(dāng)循環(huán)結(jié)束后,如果有 default 語(yǔ)句存在,那么執(zhí)行 default 的內(nèi)容。

if dfl != nil {
    selunlock(scases, lockorder)
    casi = dfli
    cas = dfl
    goto retc
}

selectgo 用 casi 表示要執(zhí)行哪個(gè) case 的內(nèi)容, cas 表示要執(zhí)行的分支的 scase 對(duì)象。這里它簡(jiǎn)單的對(duì)這兩個(gè)變量賦值,然后轉(zhuǎn)移到 retc 段。

8.4 loop 之后

當(dāng)上面的流程都執(zhí)行完了,還沒(méi)有 goto 出去,說(shuō)明沒(méi)有任何 case 當(dāng)前可以執(zhí)行。那么就掛起并等待被喚醒。

gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    if cas.kind == caseNil {
        continue
    }
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    switch cas.kind {
    case caseRecv:
        c.recvq.enqueue(sg)

    case caseSend:
        c.sendq.enqueue(sg)
    }
}
gp.param = nil
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

它按照鎖順序一次遍歷每個(gè) case,然后將其放到 g.waitlink 這個(gè) sudog 鏈表中,表明是在等待多個(gè) case 。并且對(duì)于每個(gè) case,都往 recvq 或者 sendq 里面插入這個(gè) sudog,用以表示這個(gè)等待者。
然后使用 gopark 將當(dāng)前 goroutine 切換到等待狀態(tài)。

當(dāng) gopark 返回時(shí),說(shuō)明已經(jīng)被某個(gè) channel 喚醒了,后面主要是一些清理工作。

8.5 bufrecv 段

bufrecv 段從帶 buf 的通道中接收數(shù)據(jù)。執(zhí)行到 bufrecv 段了,說(shuō)明對(duì)應(yīng)的通道緩沖區(qū)有數(shù)據(jù)可以接收了

bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
    c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc

這一段的實(shí)現(xiàn)和之前討論的 recv 函數(shù)類(lèi)似,但是最后它把所有權(quán)交給 retc

8.6 bufsend 段

bufsend 段向緩沖區(qū)寫(xiě)入數(shù)據(jù),與 send 函數(shù)類(lèi)似,但是最后把所有權(quán)讓給了 retc

bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
    c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc

后面還有 recv段,rclose段,send 段,sclose 段等,這些邏輯基本上都可以在 chansend 與 chanrecv 中找到共通點(diǎn)。

8.7 retc 段

retc:
  return casi, recvOK

它簡(jiǎn)單的做了一個(gè)返回工作 (當(dāng)然還有其它的部分,但這部分已經(jīng)超出本文范圍)

9. 寫(xiě)在最后

本文只展示了最核心的邏輯部分,完整的源碼請(qǐng)參考 $GOROOT/src/runtime/chan.go$GOROOT/src/runtime/select.go

本文如有錯(cuò)誤,歡迎大家指出。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容