golang源碼學(xué)習(xí)之channel

數(shù)據(jù)結(jié)構(gòu)

type hchan struct {
    qcount   uint           //  隊(duì)列中的元素總量
    dataqsiz uint           //  緩存大小
    buf      unsafe.Pointer //  緩存指針 (環(huán)線數(shù)組)
    elemsize uint16         //  數(shù)據(jù)項(xiàng)大小
    closed   uint32         //  close 關(guān)閉標(biāo)記
    elemtype *_type         //  數(shù)據(jù)類(lèi)型
    sendx    uint           //  待發(fā)送元素在緩存中的索引
    recvx    uint           //  待接收元素在緩存中的索引
    recvq    waitq          //  接收等待隊(duì)列,用于阻塞接收協(xié)程
    sendq    waitq          //  發(fā)送等待隊(duì)列,用于阻塞發(fā)送協(xié)程
    lock mutex // 互斥鎖
}

chan 創(chuàng)建

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    // 數(shù)據(jù)項(xiàng)不能超過(guò)64kb
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // 緩沖槽大小檢查
    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case size == 0 || elem.size == 0: 
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        // 緩存地址就指向自己
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0: //指針
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 跳過(guò)chan對(duì)象的大小就是緩沖首地址
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        // chan對(duì)象和緩存是兩個(gè)內(nèi)存
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

消息發(fā)送

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 上面block傳的true,所以下面只考慮true
    if c == nil {
        if !block {
            return false
        }

        // 當(dāng) chan 為 nil時(shí), 阻塞
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 獲取同步鎖
    lock(&c.lock)

    // 當(dāng)向已關(guān)閉的channel發(fā)送消息時(shí)會(huì)產(chǎn)生panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // CASE 1: 如果當(dāng)有 G 在接受隊(duì)列上等待時(shí),直接將消息發(fā)送給 G
    if sg := c.recvq.dequeue(); sg != nil {
        // send中有個(gè)goready(gp, skip+1)可以喚醒 G
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // CASE 2: 緩存隊(duì)列未滿,則將消息復(fù)制到緩存隊(duì)列上
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 計(jì)算加入的地址
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 復(fù)制到緩存中
        typedmemmove(c.elemtype, qp, ep)

        // 發(fā)送的index 加1
        c.sendx++
        // 注意這里被重置為0,所以是環(huán)線數(shù)組
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }

        // 緩存?zhèn)€數(shù) + 1
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // CASE 3: 緩存隊(duì)列已滿, 將 G 加入到 send隊(duì)列中
    // Block on the channel. Some receiver will complete our operation for us.
    // getg()獲取的是當(dāng)前的 g, 那么意味著將當(dāng)前 goroutine 加入 send 隊(duì)列并阻塞。
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 加入send隊(duì)列
    c.sendq.enqueue(mysg)
    // 阻塞
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    // someone woke us up.
    // 被喚醒
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 當(dāng) G 被喚醒時(shí), 已關(guān)閉的chan也是會(huì)報(bào)錯(cuò)的
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    // 釋放 G
    releaseSudog(mysg)
    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ....
    gp.param = unsafe.Pointer(sg) //注意這里的param不為空
    .....
    // 喚醒 G
    goready(gp, skip+1)
}

接收消息

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    // 因?yàn)樯厦?block傳到true, 所以下面只考慮true的情況
    // 當(dāng)初 為 nil 的chan中接收消息時(shí),永久阻塞
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 獲取同步鎖
    lock(&c.lock)

    // CASE 1: 向已經(jīng)close且為空的chan中獲取消息時(shí),返回空值。這里的false可以判斷是否close
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        // 釋放同步鎖
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // CASE 2: send 隊(duì)列不為空,什么時(shí)候send隊(duì)列不為空呢? 只有兩種可能 1. 無(wú)緩存chan  2. 緩存隊(duì)列已滿
    if sg := c.sendq.dequeue(); sg != nil {
        // CASE 2.1: 無(wú)緩存chan, 直接從send中獲取消息
        // CASE 2.2: 緩存隊(duì)列已滿, 從隊(duì)列獲取頭元素,喚醒send 將其消息加入隊(duì)列尾部(由于是環(huán)線隊(duì)列,所以尾部和頭部是同一位置)移動(dòng)recvx
        // 同樣在recv中有g(shù)oready(gp, skip+1)可以喚醒G
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // CASE 3: 緩存隊(duì)列不會(huì)空,直接從隊(duì)列獲取元素,移動(dòng)頭索引
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // CASE 4: 緩存隊(duì)列為空, 將當(dāng)前 G 加入接收隊(duì)列中, 休眠
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil

    // 將當(dāng)前 G 加入 接收隊(duì)列中
    c.recvq.enqueue(mysg)
    // 休眠
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // close喚醒時(shí)param為空,在send中喚醒時(shí)param不為空
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

關(guān)閉通道

func closechan(c *hchan) {

    // 關(guān)閉為nil的chan報(bào)錯(cuò)
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 獲取鎖
    lock(&c.lock)
    // 重復(fù)關(guān)閉,報(bào)錯(cuò)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

    c.closed = 1

    var glist *g

    // release all readers
    // 遍歷接收隊(duì)列
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        //對(duì)比send(),這里的param是nil。在接收方法中根據(jù)param是否為空判斷是否為close
        gp.param = nil  
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        gp.schedlink.set(glist)
        glist = gp
    }

    // release all writers (they will panic)
    // 遍歷發(fā)送隊(duì)列
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)  // 這里會(huì)將接收隊(duì)列和發(fā)送隊(duì)列全部喚醒
    }
}

通過(guò)上面的chansend、chanrecv、closechan發(fā)現(xiàn)里面都加了鎖,所以chan是線程安全的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容