golang - channel

1. 原理

hchan

通過(guò)var聲明或者make函數(shù)創(chuàng)建的channel變量是一個(gè)存儲(chǔ)在函數(shù)棧幀上的指針,占用8個(gè)字節(jié),指向堆上的hchan結(jié)構(gòu)體
源碼包中src/runtime/chan.go定義了hchan的數(shù)據(jù)結(jié)構(gòu)如下:


hchan
type hchan struct {
    qcount   uint           // total data in the queue   循環(huán)數(shù)組中的元素?cái)?shù)量
    dataqsiz uint           // size of the circular queue  循環(huán)數(shù)組的長(zhǎng)度
        //channel分為無(wú)緩沖和有緩沖兩種。
       // 對(duì)于有緩沖的channel存儲(chǔ)數(shù)據(jù),使用了 ring buffer(環(huán)形緩沖區(qū)) 來(lái)緩存寫(xiě)入的數(shù)據(jù),本質(zhì)是循環(huán)數(shù)組
       // 為啥是循環(huán)數(shù)組?普通數(shù)組不行嗎,普通數(shù)組容量固定更適合指定的空間,彈出元素時(shí),普通數(shù)組需要全部都前移
        // 當(dāng)下標(biāo)超過(guò)數(shù)組容量后會(huì)回到第一個(gè)位置,所以需要有兩個(gè)字段記錄當(dāng)前讀和寫(xiě)的下標(biāo)位置
    buf      unsafe.Pointer // points to an array of dataqsiz elements  指向底層循環(huán)數(shù)組的指針(環(huán)形緩沖區(qū))
    elemsize uint16     //元素的大小
    closed   uint32       //channel是否關(guān)閉的標(biāo)志
    elemtype *_type // element type  channel中的元素類(lèi)型
    sendx    uint   // send index   // 下一次寫(xiě)下標(biāo)的位置
    recvx    uint   // receive index    // 下一次讀下標(biāo)的位置
    recvq    waitq  // list of recv waiters  // 讀等待隊(duì)列
    sendq    waitq  // list of send waiters  // 寫(xiě)等待隊(duì)列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex    //互斥鎖,保證讀寫(xiě)channel時(shí)不存在并發(fā)競(jìng)爭(zhēng)問(wèn)題
}

hchan結(jié)構(gòu)體的主要組成部分有四個(gè):
用來(lái)保存goroutine之間傳遞數(shù)據(jù)的循環(huán)數(shù)組:buf
用來(lái)記錄此循環(huán)數(shù)組當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值:sendx和recvx
用于保存向該chan發(fā)送和從該chan接收數(shù)據(jù)被阻塞的goroutine隊(duì)列: sendq 和 recvq
保證channel寫(xiě)入和讀取數(shù)據(jù)時(shí)線程安全的鎖:lock

環(huán)形數(shù)組

環(huán)形數(shù)組作為channel 的緩沖區(qū) 數(shù)組的長(zhǎng)度就是定義channnel 時(shí)channel 的緩沖大小


等待隊(duì)列 waitq

在hchan 中包括了讀/寫(xiě) 等待隊(duì)列, waitq是一個(gè)雙向隊(duì)列,包括了一個(gè)頭結(jié)點(diǎn)和尾節(jié)點(diǎn)。 每個(gè)節(jié)點(diǎn)是一個(gè)sudog結(jié)構(gòu)體變量

type waitq struct {
    first *sudog
    last  *sudog
}


type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    success bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}
操作
  • 創(chuàng)建
    使用 make(chan T, cap) 來(lái)創(chuàng)建 channel,channel 可分為帶緩沖和不帶緩沖的, cap 就是緩沖區(qū)的大小

    // 帶緩沖,緩沖大小為3
    ch := make(chan int, 3)
    // 不帶緩沖
    ch := make(chan int)
    

    make 語(yǔ)法會(huì)在編譯時(shí),轉(zhuǎn)換為 makechan64 和 makechan

    // 源碼
    func makechan64(t *chantype, size int64) *hchan {
      if int64(int(size)) != size {
          panic(plainError("makechan: size out of range"))
      }
    
      return makechan(t, int(size))
    }
    
    func makechan(t *chantype, size int) *hchan {
      elem := t.elem
    
      // compiler checks this but be safe.
      if elem.size >= 1<<16 {
          throw("makechan: invalid channel element type")
      }
      if hchanSize%maxAlign != 0 || elem.align > maxAlign {
          throw("makechan: bad alignment")
      }
    
      mem, overflow := math.MulUintptr(elem.size, uintptr(size))
      if overflow || mem > maxAlloc-hchanSize || size < 0 {
          panic(plainError("makechan: size out of range"))
      }
    
      // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
      // buf points into the same allocation, elemtype is persistent.
      // SudoG's are referenced from their owning thread so they can't be collected.
      // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
      var c *hchan
      switch {
      case mem == 0:
          // Queue or element size is zero.
          c = (*hchan)(mallocgc(hchanSize, nil, true))
          // Race detector uses this location for synchronization.
          c.buf = c.raceaddr()
      case elem.ptrdata == 0:
          // Elements do not contain pointers.
          // Allocate hchan and buf in one call.
          c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
          c.buf = add(unsafe.Pointer(c), hchanSize)
      default:
          // Elements contain pointers.
          c = new(hchan)
          c.buf = mallocgc(mem, elem, true)
      }
    
      c.elemsize = uint16(elem.size)
      c.elemtype = elem
      c.dataqsiz = uint(size)
      lockInit(&c.lock, lockRankHchan)
    
      if debugChan {
          print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
      }
      return c
    }
    

    創(chuàng)建時(shí)的會(huì)做一些檢查:

    • 元素大小不能超過(guò) 64K
    • 元素的對(duì)齊大小不能超過(guò) maxAlign 也就是 8 字節(jié)
    • 計(jì)算出來(lái)的內(nèi)存是否超過(guò)限制

    創(chuàng)建時(shí)的策略:

    • 如果是無(wú)緩沖的 channel,會(huì)直接給 hchan 分配內(nèi)存
    • 如果是有緩沖的 channel,并且元素不包含指針,那么會(huì)為 hchan 和底層數(shù)組分配一段連續(xù)的地址
    • 如果是有緩沖的 channel,并且元素包含指針,那么會(huì)為 hchan 和底層數(shù)組分別分配地址
  • 發(fā)送
    發(fā)送操作,編譯時(shí)轉(zhuǎn)換為runtime.chansend函數(shù)

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
    

    通過(guò)block 參數(shù),確認(rèn)是否是阻塞式發(fā)送
    阻塞式:調(diào)用chansend 函數(shù)時(shí) block=true

        ch<- 1
    

    非阻塞式:調(diào)用chansend 函數(shù)時(shí) block=false

    select {
     case ch <- 10:
     ...
    
    default
    }
    

    向 channel 中發(fā)送數(shù)據(jù)時(shí)大概分為兩大塊:檢查和數(shù)據(jù)發(fā)送,數(shù)據(jù)發(fā)送流程如下:

    • 如果 channel 的讀等待隊(duì)列存在接收者goroutine
      將數(shù)據(jù)直接發(fā)送給第一個(gè)等待的 goroutine, 喚醒接收的 goroutine
    • 如果 channel 的讀等待隊(duì)列不存在接收者goroutine
      a. 如果循環(huán)數(shù)組buf未滿,那么將會(huì)把數(shù)據(jù)發(fā)送到循環(huán)數(shù)組buf的隊(duì)尾
      b. 如果循環(huán)數(shù)組buf已滿,這個(gè)時(shí)候就會(huì)走阻塞發(fā)送的流程,將當(dāng)前 goroutine 加入寫(xiě)等待隊(duì)列,并掛起等待喚醒
  • 接收
    接收操作,編譯時(shí)轉(zhuǎn)換為runtime.chanrecv函數(shù)

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 
    

    阻塞式:

    調(diào)用chanrecv函數(shù),并且block=true

    <ch
    
    v := <ch
    
    v, ok := <ch
    
    // 當(dāng)channel關(guān)閉時(shí),for循環(huán)會(huì)自動(dòng)退出,無(wú)需主動(dòng)監(jiān)測(cè)channel是否關(guān)閉,可以防止讀取已經(jīng)關(guān)閉的  channel,造成讀到數(shù)據(jù)為通道所存儲(chǔ)的數(shù)據(jù)類(lèi)型的零值
    for i := range ch {
        fmt.Println(i)
    }
    

    非阻塞式:

    調(diào)用chanrecv函數(shù),并且block=false

    select {
        case <-ch:
        ...
    
      default
    }
    

    向 channel 中接收數(shù)據(jù)時(shí)大概分為兩大塊,檢查和數(shù)據(jù)發(fā)送,而數(shù)據(jù)接收流程如下:

    • 如果 channel 的寫(xiě)等待隊(duì)列存在發(fā)送者goroutine
      如果是無(wú)緩沖 channel,直接從第一個(gè)發(fā)送者goroutine那里把數(shù)據(jù)拷貝給接收變量,喚醒發(fā)送的 goroutine
      如果是有緩沖 channel(已滿),將循環(huán)數(shù)組buf的隊(duì)首元素拷貝給接收變量,將第一個(gè)發(fā)送者goroutine的數(shù)據(jù)拷貝到 buf循環(huán)數(shù)組隊(duì)尾,喚醒發(fā)送的 goroutine
    • 如果 channel 的寫(xiě)等待隊(duì)列不存在發(fā)送者goroutine
      如果循環(huán)數(shù)組buf非空,將循環(huán)數(shù)組buf的隊(duì)首元素拷貝給接收變量
      如果循環(huán)數(shù)組buf為空,這個(gè)時(shí)候就會(huì)走阻塞接收的流程,將當(dāng)前 goroutine 加入讀等待隊(duì)列,并掛起等待喚醒
  • 關(guān)閉

    關(guān)閉操作,調(diào)用close函數(shù),編譯時(shí)轉(zhuǎn)換為runtime.closechan函數(shù)

    close(ch)
    func closechan(c *hchan) 
    

2. 特點(diǎn)

channel有2種類(lèi)型:無(wú)緩沖、有緩沖, 在創(chuàng)建時(shí)make(chan type cap) 通過(guò)cap 設(shè)定緩沖大小
channel有3種模式:寫(xiě)操作模式(單向通道)、讀操作模式(單向通道)、讀寫(xiě)操作模式(雙向通道)

寫(xiě)操作模式 讀操作模式 讀寫(xiě)操作模式
創(chuàng)建 make(chan<- int) make(<-chan int) make(chan int)

channel有3種狀態(tài):未初始化、正常、關(guān)閉

未初始化 關(guān)閉 正常
關(guān)閉 panic panic 正常關(guān)閉
發(fā)送 永遠(yuǎn)阻塞導(dǎo)致死鎖 panic 阻塞或者成功發(fā)送
接收 永遠(yuǎn)阻塞導(dǎo)致死鎖 緩沖區(qū)為空則為零值, 否則可以繼續(xù)讀 阻塞或者成功接收

如下幾種狀態(tài)會(huì)引發(fā)panic

1.關(guān)閉未初始化的channel 和已經(jīng)關(guān)閉的channel

  1. 向已經(jīng)關(guān)閉的channel 中發(fā)送數(shù)據(jù)
3. 線程安全

channel 是線程安全的,channel的底層實(shí)現(xiàn)中,hchan結(jié)構(gòu)體中采用Mutex鎖來(lái)保證數(shù)據(jù)讀寫(xiě)安全。在對(duì)循環(huán)數(shù)組buf中的數(shù)據(jù)進(jìn)行入隊(duì)和出隊(duì)操作時(shí),必須先獲取互斥鎖,才能操作channel數(shù)據(jù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 大綱 [toc] chan 是 golang 的最重要的一個(gè)結(jié)構(gòu),是區(qū)別于其他高級(jí)語(yǔ)言的最重要的特色之一,也是 g...
    奇伢云存儲(chǔ)閱讀 960評(píng)論 0 2
  • 前言 Golang在并發(fā)編程上有兩大利器,分別是channel和goroutine,這篇文章我們先聊聊channe...
    即將禿頭的Java程序員閱讀 1,216評(píng)論 0 2
  • channel 在 golang 中是一個(gè)非常重要的特性,它為我們提供了一個(gè)并發(fā)模型。對(duì)比鎖,通過(guò) chan 在多...
    安佳瑋閱讀 750評(píng)論 1 4
  • channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu),通常與goroutine一起使用,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)...
    cfanbo閱讀 333評(píng)論 0 0
  • channel是什么? 使用場(chǎng)景 使用方式##無(wú)緩沖區(qū)的channel創(chuàng)建 var NoRoutChannel c...
    哈哈_dfde閱讀 2,435評(píng)論 0 1

友情鏈接更多精彩內(nèi)容