Golang 源碼導(dǎo)讀 —— channel

簡書前話:

由于簡書不支持 mermaid 流程圖,所以想看完整的版本,可以到我的個人博客 中查看

01.chan 的數(shù)據(jù)結(jié)構(gòu):

golang 中 chan 的源碼在 src/runtime/chan.go 文件中,hchan 則為 chan 的結(jié)構(gòu)體

hchan:

type hchan struct {
    qcount   uint // 當(dāng)前緩存數(shù)據(jù)的總量  
    dataqsiz uint // 緩存數(shù)據(jù)的容量      
    buf      unsafe.Pointer // 緩存數(shù)據(jù),為一個循環(huán)數(shù)組,容量大小為 dataqsiz,當(dāng)前大小為 qcount
    elemsize uint16 // 數(shù)據(jù)類型的大小,比如 int 為 4
    closed   uint32 // 標記是否關(guān)閉
    elemtype *_type // 數(shù)據(jù)的類型
    sendx    uint  // 發(fā)送隊列 sendq 的長度
    recvx    uint  // 接收隊列 recvq 的長度
    recvq    waitq // 阻塞的接收 goroutine 的隊列
    sendq    waitq // 阻塞的發(fā)送 goroutine 的隊列
    lock mutex     // 鎖,用于并發(fā)控制隊列操作
}

waitq:

type waitq struct {
    first *sudog
    last  *sudog
}

waitq 為雙向鏈表,sudog 代表一個封裝的 goroutine,其參數(shù) g 為 goroutine 實例結(jié),構(gòu)如下圖:

image.png

02. 新建 chan:

在 go 中,通過如下代碼創(chuàng)建 chan

c := make(chan int, 4)

以上代碼,對應(yīng)的是源碼:

func makechan(t *chantype, size int) *hchan

邏輯流程如下:

graph TD
A[makechan] -->|t, size| B{安全檢查}
B -->|N| ZR[ERROR]
B -->|Y| E{size 或 t.elem.size 是否為0?}
E -->|Y| F[mallocgc 默認大小 hchanSize 內(nèi)存]
E -->|N| G{數(shù)據(jù)類型是否為指針?}
G -->|Y| H[通過new單獨分配chan內(nèi)存]
G -->|N| I[mallocgc 內(nèi)存 hchanSize + mem]
H --> Z
F --> Z
I --> Z
Z[chan 賦值屬性]
Z --> ZB[END]
func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // 安全檢查,數(shù)據(jù)項大小不超過 16K
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    // 獲取要分配的內(nèi)存
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case mem == 0:
        // size 為 0 的情況,分配 hchan 結(jié)構(gòu)體大小的內(nèi)存,64位系統(tǒng)為 96 Byte.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // 數(shù)據(jù)項不為指針類型,調(diào)用 mallocgc 一次性分配內(nèi)存大小,hchan 結(jié)構(gòu)體大小 + 數(shù)據(jù)總量大小
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 數(shù)據(jù)項為指針類型,hchan 和 buf 分開分配內(nèi)存,GC 中指針類型判斷 reachable and unreadchable.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    // chan 賦值屬性, 數(shù)據(jù)項大小、數(shù)據(jù)項類型、緩存數(shù)據(jù)的容量
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

03.讀寫chan

在 go 中,寫入 chan 的代碼如下:

v := 1
c := make(chan int)
c <- v

讀取 chan 的代碼如下:

var v int
c := make(chan int)
c -> v

c <- v 操作對應(yīng)的源碼為 runtime 中的

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

c -> v 操作對應(yīng)源碼為 runtime 中的

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

其中 c 為 chansend 的 c, v 的地址為 chansend 的 ep.

邏輯流程如下:

graph TD
A[chansend 或 chanrecv] -->|hchan, ep| B{校驗}
B --> |Y| C[加鎖 lock]
B --> |N| D["gopark(), 阻塞當(dāng)前 goroutine 和 throw error"]
C --> E{chan close?}
E --> |Y| F[unlock和panic]
E --> |N| G[取出 recvq 或 sendq 隊列]
G --> H{是否等待的sudog?}
H --> |Y| I["send()或recv()"]
I --> J["goready(), 運行 sudog 的 goroutine"]
H --> |N| K{存在剩余緩沖區(qū)?}
K --> |Y| L["數(shù)據(jù)放入緩沖區(qū) buf, unlock"]
K --> |N| M["打包成sudog,加入sendq或recvq隊列"]
M --> O["gopark(),阻塞當(dāng)前goroutine等待被接受者喚醒"]
O -."如果被喚醒,說明數(shù)據(jù)已經(jīng)被接收,回收sudog".-> P["保存context,運行別的 goroutine"]
P --> Z
L --> Z
Z[End]

由于發(fā)送和接收的邏輯都是差不多的,所以這里就直接放上發(fā)送的邏輯代碼來分析就好了

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 校驗
    if c == nil {
        if !block {
            return false
        }
        // 參數(shù)異常,block == true, 進行阻塞 goroutine.
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }
    
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    
    // 加鎖,并發(fā)讀寫控制
    lock(&c.lock)

    // 查看 chan 是否關(guān)閉
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    
    // 從等待接收列隊 recvq 中試圖獲取獲取封裝的 goroutine sudog.
    if sg := c.recvq.dequeue(); sg != nil {
        // 找到等待接收 chan 的 goroutine sudog,直接發(fā)送 value 給接收者,并通過 goready() 喚醒接受者 goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 查看查看緩存空間是否 buf 是否還有剩余
    if c.qcount < c.dataqsiz {
        // 將數(shù)據(jù)移動到 qp 中并放入 chan 緩存,sendx++
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    
    // chan 如果為非阻塞,unlock 后直接返回
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 將當(dāng)前 goroutine 封裝 sudog,并放入到等待發(fā)送隊列 sendq 中
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // 阻塞當(dāng)前 goroutine,等待被接受者 chanrecv() 的喚醒
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    
    // KeepAlive 方法,由于 GC 的緣故,而調(diào)用
    KeepAlive(ep)

    // goroutine 被喚醒,重置 gorotuine 狀態(tài) 和 sudog
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

代碼的部分詳解:

gopark:M(工作線程) 會保存 goroutine 的上下文,而調(diào)度器會讓當(dāng)前工作線程線程 M 綁定執(zhí)行其他的 goroutine.

KeepAlive(ep): 由于 GC 的機制,當(dāng) ep 不再被上下文引用的時候,GC 會主動回收 eq,導(dǎo)致 buf 被回收,所以調(diào)用 KeepAlive,告訴 GC 不需要對 eq 變量進行內(nèi)存回收,具體可以查看 runtime.SetFinalizer 方法部分有詳細介紹.

喚醒:goroutine 會在 chanrecv 這個 chan 接收接收函數(shù)中,從 hchan.sendq 被取出,執(zhí)行 goready(), 通知調(diào)度器去喚醒,然后放入 P(邏輯處理器) 的執(zhí)行等待隊列中,等待被下一次調(diào)用.

send()/recv(): 通過 memmove() 的方式從發(fā)送方拷貝 buf 到接收方.


下面我們通過一個使用channel做生產(chǎn)/消費的模型來試圖分解一下 chan 的步驟:

func main(){
    //初始化任務(wù)隊列 channel
    ch := make(chan Data, 4)
    //生產(chǎn)者往channel丟數(shù)據(jù)
    for _, task := range  {
        ch <- task
    }
    //初始化消費者
    for i := 0; i< ConsumerNum; i++ {
        go consumer(ch)
    }
    ...
}

// 消費者
func consumer(ch chan Data){
    for {
        //收取任務(wù)并處理
        data := <- ch
        process(data)
    }
}

從 main 函數(shù)開始,golang 就會開啟一個 goroutine 來執(zhí)行代碼,我們可以將其記作生產(chǎn)者 G'p, 代碼中 go consumer 標記 consumer 函數(shù)也開啟一個 goroutine 來進行,我們記其為消費者 G'c.

  • 初始化任務(wù)隊列channel
    此時會在堆區(qū)域分配一塊內(nèi)存,用于存儲 hchan 結(jié)構(gòu)體和 buf 的緩存數(shù)據(jù)。hchan.buf指向一個大小為4的數(shù)組,并且hchan.sendx、hchan.recvx置0,hchan.dataqsiz置4。

  • 生產(chǎn)者往channel丟數(shù)據(jù)
    G'p 往 ch 發(fā)送數(shù)據(jù)的時候,會執(zhí)行 lock(&hchan.lock) 對 buf 加鎖,把要發(fā)送的數(shù)據(jù)拷貝到 buf 里,hchan.sendx++,之后 unlock(hchan.lock) 釋放鎖。

  • 消費者執(zhí)行消費行為
    G'c 從 ch 中獲取數(shù)據(jù)的時候,會執(zhí)行 lock(&hchan.lock) 對 buf 加鎖,將 buf 里面的一條數(shù)據(jù)拷貝到接收變量 data 對應(yīng)的空間中,hchan.recvx++,之后釋放鎖。


0.4 關(guān)閉 chan

在 go 中,關(guān)閉 chan 的代碼如下:

ch := make(chan int ,10)
close(ch)

close(ch) 對應(yīng)的runtime的函數(shù):

func closechan(c *hchan)

邏輯流程如下:

graph TD
A[closechan] --c--> B{檢查}
B --> |Y| C[加鎖 lock]
B --> |N| D[panic]
C --> E{chan是否已close}
E --> |Y| D
E --> |N| F[置 hchan.close = 1]
F --> G[釋放recvq的所有等待接收者]
G --> H[釋放sendq的所有等待發(fā)送者]
H -.發(fā)送者會panic.-> D
H --> I[unlock]
I --> J[喚醒recvq和sendq的所有g(shù)oroutine]
J --> End
func closechan(c *hchan) {
    // 檢查,chan 是否為空
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 加鎖,防止資源競爭
    lock(&c.lock)
    
    // chan 如果已關(guān)閉,則 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    
    // 置 hchan.close = 1, 標記已關(guān)閉
    c.closed = 1
    
    var glist gList
    
    // 釋放recvq的所有等待接收者
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
    // 釋放sendq的所有等待發(fā)送者
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
    // 解鎖,unlock
    unlock(&c.lock)
    
    // 喚醒recvq和sendq的所有g(shù)oroutine
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

close 的主要作用是用于喚醒所有監(jiān)測 chann 的 goroutine,但是要注意的是:

  • 如果 sendq 的緩沖區(qū)還有發(fā)送者,這些發(fā)送者都會 panic
  • 如果兩次 close chan,會導(dǎo)致 panic

0.5 關(guān)于 chan 的面試問題

  • chan 如何處理并發(fā)讀寫問題
    hchan 結(jié)構(gòu)體中通過鎖 lock mutex 參數(shù)進行對公共緩存資源 buf 的控制達到并發(fā)讀寫的 race 問題.

  • 如果往 chan 發(fā)送數(shù)據(jù),size 滿了,或者往 chan 獲取數(shù)據(jù),buf 空。這會導(dǎo)致阻塞,此時runtime的行為是怎么樣的呢?
    由于兩者邏輯一樣,我們就直接講往 chan 發(fā)送數(shù)據(jù),size 滿了的情況.
    如果往 chan 發(fā)送數(shù)據(jù),size 滿了,此時 goroutine 和 buf 會被打包成 sudog,通過 gopark 將 goroutine 狀態(tài)置為等待, 同時把 sudog 放入 hchan.sendq 等待發(fā)送隊列中,等待接收者接收并調(diào)用 goready() 重新調(diào)度 goroutine. 此時 goroutine 被阻塞后,M(工作線程) 會與 goroutine 解綁,通過 P(邏輯處理器) 重新進行調(diào)度,M 與新的 goroutine 重新綁定執(zhí)行.

感悟

還是有一部分以目前的知識還是無法看懂,以后慢慢積累后再回來補坑,或大佬們可以幫我指出一下,謝謝.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容