Go channel 實(shí)現(xiàn)源碼解析

下載Go源碼后,根目錄結(jié)構(gòu)如下:
VERSION-- 文件,當(dāng)前Go版本
api-- 目錄,包含所有API列表
doc-- 目錄,Go語(yǔ)言的各種文檔,官網(wǎng)上有的,這里基本會(huì)有
favicon.ico-- 文件,官網(wǎng)logo
include-- 目錄,Go 基本工具依賴的庫(kù)的頭文件
lib-- 目錄,文檔模板
misc-- 目錄,其他的一些工具,大部分是各種編輯器的Go語(yǔ)言支持,還有cgo的例子等
robots.txt-- 文件,搜索引擎 robots文件
src -- 目錄,Go語(yǔ)言源碼:基本工具(編譯器等)、標(biāo)準(zhǔn)庫(kù)
test-- 目錄,包含很多測(cè)試程序(并非_test.go方式的單元測(cè)試,而是包含main包的測(cè)試)包括一些fixbug測(cè)試;可以通過這個(gè)學(xué)到一些特性的使用

channel實(shí)現(xiàn)文件目錄:

 /go/src/runtime/chan.go
channel 數(shù)據(jù)結(jié)構(gòu)

channel 是 goroutine 之間通信的一種方式,CSP 模型中 消息通道對(duì)應(yīng)的就是 channel;channel 結(jié)構(gòu)體定義如下:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

結(jié)構(gòu)體元素解析:

  • qcount 緩沖通道中的元素個(gè)數(shù)
  • dataqsiz 緩沖通道中的容量
  • buf 有緩沖channel的緩沖區(qū),一個(gè)定長(zhǎng)環(huán)形數(shù)組
  • elemsize 通道中存儲(chǔ)元素的長(zhǎng)度
  • closed 關(guān)閉通道使用非0表示關(guān)閉
  • elemtype 通道中存儲(chǔ)元素的類型
  • sendx 當(dāng)前發(fā)送元素指向buf環(huán)形數(shù)組的下標(biāo)指針
  • recvx 當(dāng)前接收元素指向buf環(huán)形數(shù)組的下標(biāo)指針
  • recvq 因消費(fèi)者而阻塞的等待隊(duì)列
  • sendq 因生產(chǎn)者而阻塞的等待隊(duì)列
  • lock 鎖保護(hù) hchan 中的所有字段

核心的部分是存放 channel 數(shù)據(jù)的環(huán)形隊(duì)列,dataqsiz、qcount 分別指定了隊(duì)列的容量和當(dāng)前使用量;另一個(gè)重要部分就是recvq 和 sendq 兩個(gè)鏈表,recvq 是因讀這個(gè)通道而導(dǎo)致阻塞的 goroutine,sendq 是因?qū)戇@個(gè)通道而阻塞的 goroutine;如果一個(gè) goroutine 阻塞于 channel 了,那么它就被掛在 recvq 或 sendq 中;waitq是鏈表的定義,包含一個(gè)頭結(jié)點(diǎn)和一個(gè)尾結(jié)點(diǎn):

type waitq struct {
    first *sudog
    last  *sudog
}

鏈表中每個(gè)元素都是sudog結(jié)構(gòu)體如下:

type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

結(jié)構(gòu)中主要的就是 g、elem:

  • g 代表著 G-M-P模型中的 G,sudog 是對(duì)g的封裝便于在 csp 模型中 g 可以同時(shí)阻塞在不同的 channel 上
  • elem 用于存儲(chǔ) goroutine 的數(shù)據(jù);讀通道時(shí),數(shù)據(jù)會(huì)從 hchan 的隊(duì)列中拷貝到 sudog 的 elem 域;寫通道時(shí),數(shù)據(jù)則是由 sudog 的elem 域拷貝到 hchan 的隊(duì)列中
創(chuàng)建channel實(shí)現(xiàn)

創(chuàng)建方法:

ch := make(chan string,5)

實(shí)現(xiàn)函數(shù)如下:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem 

    // compiler checks this but be safe.
  // 異常判斷 元素類型大小限制
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
  // 異常判斷 對(duì)齊限制
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
  // maxAlloc 是 Arena 區(qū)域的最大值,緩沖元素的大小與hchan相加不能超過 緩沖槽大小
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
    // 無緩沖channel 
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
    //  buf 是不分配空間 緩存地址就指向自己
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
    // 分配一整塊內(nèi)存 存儲(chǔ)hchan和 buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
    // 是指針類型 分配hchan結(jié)構(gòu)體 buf單獨(dú)分配
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    // 初始化元素類型的大小
    c.elemsize = uint16(elem.size)
  // 初始化元素的類型
    c.elemtype = elem
  // 初始化 channel 的容量
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}
通道緩沖chanbuf實(shí)現(xiàn)

實(shí)現(xiàn)函數(shù)如下:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

傳入 hchan 對(duì)象及元素在緩沖區(qū)環(huán)形數(shù)組中的下標(biāo)計(jì)算該下標(biāo)槽點(diǎn)內(nèi)存地址并返回

發(fā)送數(shù)據(jù)channelsend實(shí)現(xiàn)

實(shí)現(xiàn)函數(shù)如下:

// 傳入?yún)?shù) hchan ,發(fā)送數(shù)據(jù)地址,是否阻塞發(fā)送, select中的通道操作使用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 判斷 channel 為空  向其中發(fā)送數(shù)據(jù)將會(huì)永久阻塞
    if c == nil {
    // 如果非阻塞返回 false
        if !block {
            return false
        }
    // 如果阻塞 
    // gopark 會(huì)使當(dāng)前 goroutine 掛起,通過 unlockf 喚醒;調(diào)用gopark時(shí)傳入的unlockf為nil,會(huì)被一直休眠
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }
    // 如果開啟競(jìng)爭(zhēng)檢測(cè)
    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
  // 寫入數(shù)據(jù)到 channel
  // 1.非阻塞寫 2.沒有關(guān)閉channel 3.無緩沖channel并且消費(fèi)者環(huán)形隊(duì)列頭結(jié)點(diǎn)為空 或 有緩沖channel中存儲(chǔ)的元素?cái)?shù)量與容量相等 返回false
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    // 計(jì)時(shí)器 
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // 獲取同步鎖
    lock(&c.lock)
    // 判斷 channel 關(guān)閉,解鎖并 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 當(dāng)有 goroutine 在 recvq 隊(duì)列上等待時(shí),跳過緩存隊(duì)列,將消息直接發(fā)給 reciever goroutine;dequeue 從等待接受的 goroutine 隊(duì)列鏈表獲取一個(gè)sudog,goready 喚醒阻塞的 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // 緩存隊(duì)列未滿,將消息復(fù)制到緩存隊(duì)列上并移動(dòng) sendx 下標(biāo),hchan buf 數(shù)據(jù)量增加
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 數(shù)據(jù)拷貝到 buf 中
        typedmemmove(c.elemtype, qp, ep)
    // index 移動(dòng)
        c.sendx++
    // 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
    // 緩沖元素?cái)?shù)量加 1 
        c.qcount++
    // 解鎖返回
        unlock(&c.lock)
        return true
    }
  // 阻塞 解鎖直接返回 false
    if !block {
        unlock(&c.lock)
        return false
    }
    // chan隊(duì)列已滿,阻塞 將本協(xié)程放入等待協(xié)程中,同時(shí)休眠此協(xié)程
    // Block on the channel. Some receiver will complete our operation for us.
  // 創(chuàng)建 goroutine 
    gp := getg()
  // 創(chuàng)建 sudog
    mysg := acquireSudog()
  // 初始化 釋放時(shí)間
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
  // 初始化寫入的數(shù)據(jù)
    mysg.elem = ep
    mysg.waitlink = nil
  // 初始化 goroutine
    mysg.g = gp
    mysg.isSelect = false
  // 初始化 hchan
    mysg.c = c
  // goroutine 設(shè)置的休眠 sudog
    gp.waiting = mysg
    gp.param = nil
  // 加入到寫阻塞的等待隊(duì)列
    c.sendq.enqueue(mysg)
  // 掛起休眠
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
  // 保證數(shù)據(jù)不被回收
    KeepAlive(ep)

  // 此時(shí)被喚醒 gp.waiting不是當(dāng)前的 mysg 直接 panic
    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
  // 等待的 sudog 置為 nil
    gp.waiting = nil
  // 喚醒時(shí)傳遞的參數(shù)為 nil 說明出問題了直接 panic
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
  // 將 hchan 置為 nil
    mysg.c = nil
  // 釋放 sudog
    releaseSudog(mysg)
    return true
}

生產(chǎn)者數(shù)據(jù)發(fā)送send實(shí)現(xiàn)如下:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
  // 寫入的數(shù)據(jù)不為空
    if sg.elem != nil {
    // 將數(shù)據(jù)拷貝到 hchan
        sendDirect(c.elemtype, sg, ep)
    // sudog 中數(shù)據(jù)置為 nil
        sg.elem = nil
    }
  // 取數(shù) goroutine
    gp := sg.g
    unlockf()
  // 傳入 sudug 使 param 不為空
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
  // 喚醒 goroutine 
    goready(gp, skip+1)
}
接收數(shù)據(jù)chanrecv實(shí)現(xiàn)

實(shí)現(xiàn)函數(shù)如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }
    // hchan 為 nil 
    if c == nil {
        if !block {
            return
        }
    // hchan 中接收消息永久阻塞
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not ready for receiving, we observe that the
    // channel is not closed. Each of these observations is a single word-sized read
    // (first c.sendq.first or c.qcount, and second c.closed).
    // Because a channel cannot be reopened, the later observation of the channel
    // being not closed implies that it was also not closed at the moment of the
    // first observation. We behave as if we observed the channel at that moment
    // and report that the receive cannot proceed.
    //
    // The order of operations is important here: reversing the operations can lead to
    // incorrect behavior when racing with a close.
  // 1.非阻塞讀 2.無緩沖channel并且消費(fèi)者環(huán)形隊(duì)列頭結(jié)點(diǎn)為空 或 有緩沖channel中存儲(chǔ)的元素?cái)?shù)量為0  3.沒有關(guān)閉channel 直接返回
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }
    // 計(jì)時(shí)器 
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // 獲取同步鎖
    lock(&c.lock)
    // channel 關(guān)閉 且 緩沖元素為0 返回空值
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
      // typedmemclr 使返回值 ep 變成了零值
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    //  如果有 send 生產(chǎn)者阻塞在隊(duì)列中,直接從 send 生產(chǎn)者取數(shù)據(jù)
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    // 緩存隊(duì)列不為空,從隊(duì)列頭取出元素 
    if c.qcount > 0 {
        // Receive directly from queue
    // 根據(jù)hchan 索引獲取數(shù)據(jù)地址
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
      // 數(shù)據(jù)拷貝到 ep 中
            typedmemmove(c.elemtype, ep, qp)
        }
    // 清空環(huán)形數(shù)組己經(jīng)讀取的 gp
        typedmemclr(c.elemtype, qp)
    // 移動(dòng)索引
        c.recvx++
     // 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
    // 緩存隊(duì)列元素?cái)?shù)量減 1
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

  // 沒有數(shù)據(jù) 讀非阻塞 直接解鎖返回
    if !block {
        unlock(&c.lock)
        return false, false
    }
    // chan隊(duì)列為空,阻塞 將本協(xié)程放入等待協(xié)程中,同時(shí)休眠此協(xié)程
    // no sender available: block on this channel.
  // 獲取 goroutine
    gp := getg()
  // 獲取 SudoG
    mysg := acquireSudog()
  // 初始化釋放時(shí)間
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
  // goroutine 加入到讀阻塞等待隊(duì)列
    c.recvq.enqueue(mysg)
  // 休眠
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

  // 此時(shí)被喚醒 gp.waiting不是當(dāng)前的 mysg 直接 panic
    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
  // 將 goroutine 中 param 參數(shù)置為 nil
    gp.param = nil
  // SudoG 中的 hchan 置為 nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

消費(fèi)者數(shù)據(jù)接收recv實(shí)現(xiàn)如下:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // 緩存隊(duì)列不為空,直接從生產(chǎn)者獲取數(shù)據(jù)
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
    // 有 send 阻塞在這里,從 buf 中獲取數(shù)據(jù)
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
      // 將 buf 中未讀的當(dāng)前位置數(shù)據(jù)拷貝給消費(fèi)者
            typedmemmove(c.elemtype, ep, qp)
        }
        // 將阻塞的生產(chǎn)者數(shù)據(jù)拷貝此位置
        typedmemmove(c.elemtype, qp, sg.elem)
    // 接收元素索引向后移動(dòng)
        c.recvx++
    // 環(huán)形隊(duì)列如果已經(jīng)加到最大就置 0
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
    // 環(huán)形隊(duì)列讀取的索引位置就是寫入數(shù)據(jù)環(huán)形的末端
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  // 數(shù)據(jù)置為 nil
    sg.elem = nil
  // 獲取 SudoG 中的 goroutine 傳遞給 param 參數(shù)
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
  // 喚醒 sendq 里面 SudoG 對(duì)應(yīng)的 g
    goready(gp, skip+1)
}
closechan 實(shí)現(xiàn)

關(guān)閉通道設(shè)置chan關(guān)閉標(biāo)志位,closed=1;函數(shù)如下:

func closechan(c *hchan) {
  // 關(guān)閉為 nil 的 hchan 直接 panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    // 獲取同步鎖
    lock(&c.lock)
  // 已關(guān)閉 hchan 釋放鎖 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    // 將 closed 置為 1 
    c.closed = 1

    var glist gList
    // 遍歷接收隊(duì)列  
    // release all readers
    for {
    // 取出讀阻塞隊(duì)列中的 SudoG
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
      // typedmemclr 使返回值 ep 變成了零值
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
    // 獲取 goroutine 將參數(shù) param 值為空,在接收方法中根據(jù) param 是否為空判斷是否為close
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
    // 將 goroutine 加入到 glist
        glist.push(gp)
    }

  // 遍歷發(fā)送隊(duì)列  
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
    // 將接收隊(duì)列和發(fā)送隊(duì)列全部喚醒
        goready(gp, 3)
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容