golang channel源碼分析

channel是golang中特有的一種數據結構,通常與goroutine一起使用,下面我們就介紹一下這種數據結構。

channel數據結構

channel最重要的一個結構體就是hchan,我們創(chuàng)建一個channel的時候,實際上是創(chuàng)建了一個下面結構體的實例。

hchan結構體

// src/runtime/chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

字段說明

  • qcount 當前channel中的元素數量
  • dataqsiz 環(huán)形隊列的大小
  • buf 指向dataqsize的數組指針,只有緩沖chan有效
  • closed 當前channel關閉狀態(tài)
  • elemsize 存儲元素的大小
  • elemtype 存儲元素的數據類型
  • sendx 發(fā)送操作處理到的索引位置,最大值為數組buf的最大下標值
  • recvx 接收操作處理到的索引位置,最大值為數組buf的最大下標值
  • recvq 接收隊列,雙向鏈表,阻塞元素
  • sendq 發(fā)送列隊,雙向鏈表,阻塞元素
  • lock 鎖,,用來保護sudog里的所的字段
image

hchan struct

其中elemsizeelemtype 表示存儲數據的大小和類型;sendxrecvx是指向底層數據的索引位置,表示當前處理的進度位置;recvqsendq 是一個由雙向鏈表實現的隊列,它存儲的內容是由于隊列dataqsize過小,而阻塞的數據。

每次進行發(fā)送數據和讀取數據時都需要加鎖。

waitq結構體

// src/runtime/chan.go

type waitq struct {
    first *sudog
    last  *sudog
}

sudog結構體

// src/runtime/runtime2.go

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

這里 sudog 實際上是對 goroutine 的一個封裝,一個sudog 就是一個goroutine,如在channal上發(fā)送和接收。

sudogs 是通過一個特殊的池來分配的,通過 acquireSudog()releaseSudog()進行獲取和釋放。

sudog里的字段是由 hchan.lock 鎖來進行保護。

channel 整體結構圖

image

<figcaption>hchan 結構圖(來源:互聯網技術窩)</figcaption>

創(chuàng)建

// 無緩沖通道
ch1 := make(chan int)
// 有緩沖通道
ch2 := make(chan int, 10)

通過編譯可以發(fā)現channel的創(chuàng)建是由[makechan()](https://github.com/golang/go/blob/go1.15.6/src/runtime/chan.go#L71-L118)函數來完成的。源碼

// src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
    }
    return c
}

函數返回的是一個指針類型,因此我們可以在函數中通過參數直接傳遞,不需要再轉為指針使傳遞。

步驟

  1. 數據合法性檢查,包括發(fā)送數據的類型和大小
  2. 根據不同場景分配內存,主要針對buf字段
    a. 內存大小為0,注意這時c.buf 的值為c.raceaddr()
    b. 元素不包含指針,一次性分配一段內存地址
    c. 元素包含指針,分配內存
  3. 初始化其它字段

第一個參數 *chantype 結構定義

// src/runtime/type.go

type chantype struct {
    typ  _type
    elem *_type
    dir  uintptr
}

實際上創(chuàng)建一個channel, 只是對一個hchan結構體進行了一些初始化操作,并返回其指針。因此我們在函數傳遞時,不需要傳遞指針,直接使用就可以了,因為它本身就是一個指針的類型。

注意:對于chan內存是在heap上分配的。

發(fā)送數據

對于channel的寫操作是由 chansend() 函數來實現的。

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    ...
}

在chan為nil的情況下, 如果是非阻塞則直接返回,否則panic。

對于分送數據chan有三種情況,分別是直接發(fā)送,緩存區(qū)發(fā)送阻塞發(fā)送,其中阻塞發(fā)送涉及到GMP 的調度,理解起來有些吃力。

在發(fā)送數據前需要進行加鎖操作,發(fā)送完再解鎖,保證原子性操作。

直接發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 直接發(fā)送
    // 如果接收隊列中有接收者,則直接將數據發(fā)給接收者,重點在send()函數,并在函數里進行解鎖
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    ......
}

如果接收隊列中有接收者,則優(yōu)化從接收者從隊列中取出來sg(sg := c.recvq.dequeue()),然后再通過調用 send() 函數將數據發(fā)送給接收者即可。

image

<figcaption>channel send</figcaption>

在send()函數里,會執(zhí)行一個回調函數主要用來進行解鎖c.lock。真正的發(fā)送操作是函數 sendDirect(),通過memmove(dst, src, t.size) 將數據復制過去。

緩沖區(qū)發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 緩沖區(qū)發(fā)送
    // 接收者隊列中沒有接收者goroutine
    // 當前channel中的元素<隊列的大小,有緩沖buffer未滿的情況
    // 將數據存放在sendx在buf數組中的索引位置,然后再將sendx索引+1
    // 由于是一個循環(huán)數組,所以如果達到了dataqsize,則從0開始,同時個數+1
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }   

    ......
}

如果當前recvq 隊列里沒有處于等待執(zhí)行的sudog的話,則需要將數據發(fā)送到緩沖隊列中(如果當前隊列為緩沖chan)。

假設當前buffer大小為6(dataqsiz=6),數據個數為0(qcount=0),這里寫入6個數據,如下圖。

image

<figcaption>channel send</figcaption>

如果當前緩沖區(qū)的元素數量<隊列的大小,說明緩沖區(qū)還沒有滿,還可以繼續(xù)裝載數據。

這時第一步先計算出 s.sendx 索引位置的內存地址,然后調用 typememmove() 函數將qp復制到內存地址,再將s.sendx索引值+1,同時c.qcount++。

sendx = dataqsiz 的時候,說明已到了數組最后一個元素,下次存儲數據的話,則需要重新從0開始了,所以需要重置為0。

buf是一個由數組組成的隊列,滿足隊列的FIFO的機制,最新存儲的數據也先消費,最多可以存儲 dataqsiz 個數量。超出這個數據量就需要使用第三種 阻塞發(fā)送 方式了。

sendx 始終保存的是下次存儲數據的數組索引位置,每次使用完記得+1 。每次存儲以前都需要判斷當前buffer是否有空間可用 c.qcount < c.dataqsiz 。

總結

  • q.sendx 最大值為 c.dataqsiz -1,即數組的最大索引值。
  • q.count 是當前chan 存儲的元素個數,有可能 > c.dataqsiz

阻塞發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 阻塞發(fā)送
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true 

    ......
}

如果當buff也寫滿的話,再send數據的話,則需要進行阻塞發(fā)送了。

channel send

假如我們有一個緩沖chan,但緩沖大小已經使用完,再次發(fā)送數據的話,則需要進入sendq隊列了(將sudog綁定到一個goroutine,并放在sendq,等待讀?。?/p>

對于阻塞的情況,理解起來有些吃力,因為涉及到GMP的關系和調度。

  1. 調用 getg() 函數獲取當前運行的goroutine
  2. 調用 acquireSudog() 函數獲取一個sudog,并進行數據綁定
  3. 將mysg 添加到發(fā)送隊列sendq,并設置為gp.waiting
  4. 更改goroutine狀態(tài)
  5. 設置goroutine為等待喚醒狀態(tài),調用 atomic.Store8(&gp.parkingOnChan, 1)函數?
  6. 通過keepAlive()函數可以保證發(fā)送的值一直有效,直到被接收者取走
  7. 進行清理工作
  8. 釋放 sudog 結構體

總結

讀取數據

對于channel的讀取方式:

v <- ch
v, ok <- ch

其中 v<-ch 對應的是 runtime.chanrecv1(), v, ok <-ch 對應的是`runtime.chanrecv2()。但這兩個函數最終調用的還是同一個函數,即 chanrecv()。

我們先看一下官方文檔對這個函數的說明

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
  • chanrecv 用來從chan 中接收數據,并將接收的數據寫入到ep
  • 如果ep為 nil 的話,則接收的數據將被忽略
  • 如果非阻塞的且沒有可接收的數據將返回 (false ,false)
  • 如果chan已關閉,零值 ep 和返回值將是true, false,否則使用一個元素代替ep并返回 (true, true)
  • 一個非nil的 ep, 必須指向heap或者調用stack
// src/runtime/chan.go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c為nil,表示非法操作,則直接gopark(),表示出讓當前GMP中的P的使用權,允許其它G使用
    if c == nil {
        // 如果非阻塞的話,直接返回;如果是阻塞的話,直接panic
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    ...

    // 如果chan已關閉且元素個數為0
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            // 設置內存內容為類型 c.elemtype 的零值
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

}

如果當前讀取的 chan 為nil的話,且非阻塞的情況,則會產生死鎖,最終提示

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:

否則返回零值。

同時出讓自己占用的P,允許其它goroutine搶占使用。

如果讀取的chan已關閉,則讀取出來的值為零值(函數說明第四條)。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    // 在沒有獲取鎖的情況下,檢查非阻塞操作失敗
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.

        // 如果當前chan未關閉
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    ...

}

這段代碼主要是對重排讀的情況,進行了雙重檢測,暫是未明白code中考慮的情況,改天再消化消化。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖,下面才是真正要讀取的邏輯
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    ...
}

讀取之前先加鎖。

對chan的讀取與發(fā)送一樣,同樣有三種方式,為直接讀取、緩沖區(qū)讀取和阻塞讀取。

直接讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 直接讀取 
    // 從c.sendq隊列中取sudog, 將數據復制到sg
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
}

獲取一個待發(fā)送者,如果buffer大小為0,則直接從發(fā)送者接收數據。否則從隊列頭部接收,并將發(fā)送者發(fā)送的數據放在隊列尾部。

chan recv

從c.sendq隊列里讀取一個 *sudog,通過調用 recv() 函數,將數據從發(fā)送者復制到ep中,并返回true,true,表示讀取成功。真正讀取函數為 recvDirect()。

緩沖區(qū)讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c.qcount>0,說明緩沖區(qū)有元素可直接讀取
    if c.qcount > 0 {
        // Receive directly from queue
        // 直接從隊列中讀取
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
}

如果c.qcount > 0,則說明緩沖區(qū)里有內容可以讀取。則

直接獲取 c.recvx 數組索引位置的內存地址,則

  1. r.recvx 索引地址的值讀取出來復制給 ep,
  2. 然后更新接收數組索引c.recvx++, 如果>數組索引最大索引值 ,重置為0
  3. 減少元素個數
  4. 釋放鎖 c.qcount--
  5. 最后unlock返回。
image

<figcaption>chan recv</figcaption>

阻塞讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......

    // c.sendq沒有sender,buffer里也是空的,直接阻塞讀取
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
  1. 通過getg()獲取一個goroutine
  2. 獲取一個sudog結構體
  3. 綁定兩者關系
  4. 加入 c.recvq 隊列
  5. 設置goroutine為等待喚醒狀態(tài)
  6. 清理狀態(tài)
chan recv

關閉chan

關閉chan語句

close(ch)

對于已關閉的chan,是不允許再次關閉的,否則會產生panic。對應的函數為 runtime.closechan() 。

// src/runtime/chan.go

func closechan(c *hchan) {
    // 如果chan未初始化,觸發(fā)panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // 關閉已關閉的chan,觸發(fā)panicc
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    ......

}

對于一個未初始化的chan,或者已關閉的chan,如果再次關閉則會觸發(fā)panic。

func closechan(c *hchan) {
    ......
    // 設置chan關閉狀態(tài)
    c.closed = 1

    // 聲明一個結構體鏈表gList,主要用來調度使用
    var glist gList

    // release all readers
    // 釋放所有readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }

        // 設置元素為nil
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 釋放所有writers,會引起panic,見下面說明
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 設置元素為nil
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 釋放鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 調度所有g
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        // 喚醒goroutine
        goready(gp, 3)
    }
}
  1. 聲明一個gList 鏈表結構體
  2. 將接收隊列 c.recvq 中的所有元素添加到gList 中,并將原來的值設置為
  3. 將發(fā)送隊列 c.sendq 中的所有元素添加到 gList 中,并將原來的值設置為
  4. 將所有的阻塞goroutine通過函數goready() 進行調度

文章里提到在對c.sendq 處理的時候可能會觸發(fā)panic。這是因為關閉chan后,執(zhí)行了 goready() 對原來sendq里的sudogs 進行了進行了重新調度,這時候發(fā)現chan已經關閉了,所以會panic。那么又是如何調度的呢?

package main

import (
    "fmt"
    "time"
)

var ch chan int

func f() {
}

func main() {
    ch := make(chan int, 10)
    // buffer大小為10,這里發(fā)送11個,使最后一個進入到c.sendq里面
    for i := 0; i < 11; i++ { // i < 10 則正常
        go func(v int) {
            ch <- v
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println("發(fā)送完畢")
    // 關閉chan,將對sendq里的g進行喚醒,喚醒后發(fā)現chan關閉狀態(tài),直接panic
    close(ch)
    for v := range ch {
        fmt.Println(v)
    }
    time.Sleep(time.Second)
}

有一條廣泛流傳的關閉 channel 的原則:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

不要從一個 receiver 側關閉 channel,也不要在有多個 sender 時,關閉 channel。對于只有一個sender的話,直接在sender端關閉就可以。但對于多個sender的話,則需要通過一個信號量進行關閉,參考這里

總結

close 操作會觸發(fā)goroutine的調度行為。

總結

  1. 在發(fā)送和讀取 chan的時候,如果chan為nil的話,這時候就根據是否阻塞進行判斷是否會發(fā)生panic。如果阻塞狀態(tài)的話,則會發(fā)生panic,否則會直接返回
  2. 對chan 發(fā)送或接收數據的時候要保證已初始化狀態(tài)
  3. 對于已關閉的chan再次關閉會觸發(fā)panic
  4. 對于發(fā)送和讀取數據都有三種處理情況,分別是直接讀寫,緩存區(qū)讀寫和阻塞讀寫
  5. 發(fā)送和接收數據的本質上是對值的復制操作。All transfer of value on the go channels happens with the copy of value.
  6. close(ch)會觸發(fā)goroutine 的調度行為
  7. 內部使用 sudogs對goroutine進行了一次封裝。
  8. 如果buffer中的元素無法保證消費完的話,則會產生內存泄漏的危險,這時gc是無法對這些元素時間清理的,過多的 chan就會占用大量的資源
  9. 對于chan的分配的內存是在哪里,heap還是stack?

參考

本文如有錯誤,歡迎大家在下方留言指出。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 基礎用法 channel chan T是雙向channel類型,編譯器允許對雙向channel同時進行發(fā)送和接收。...
    杰克慢閱讀 373評論 0 0
  • channel 在 golang 中是一個非常重要的特性,它為我們提供了一個并發(fā)模型。對比鎖,通過 chan 在多...
    安佳瑋閱讀 749評論 1 4
  • 簡書前話: 由于簡書不支持 mermaid 流程圖,所以想看完整的版本,可以到我的個人博客 中查看 01.chan...
    Abson在簡書閱讀 1,111評論 0 0
  • 簡介 熟悉Go的人都知道,它提倡著不要通過共享內存來通訊,而要通過通訊來共享內存。Go提供了一種獨特的并發(fā)同步技術...
    marsjhe閱讀 3,052評論 0 2
  • Golang channel 作為Go的核心的數據結構和Goroutine之間的通信,是支撐Go語言高并發(fā)的關鍵 ...
    LegendGo閱讀 526評論 0 1

友情鏈接更多精彩內容