go-channel初識

了解過go的都知道,go最為突出的優(yōu)點就是它天然支持高并發(fā),但是所有高并發(fā)情況都面臨著一個很明顯的問題,就是并發(fā)的多線程或多協(xié)程之間如何通信,而channel就是go中g(shù)oroutine通信的‘管道’。

channel在go中時如何使用的

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit = make(chan string, 1)

func main() {
  go dealSignal()
  exited := make(chan struct{}, 1)
  go channel1(exited)
  count := 0
  t := time.Tick(time.Second)
Loop:
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exited:
      fmt.Println("main exit begin")
      break Loop
    }
  }
  fmt.Println("main exit end")
}

func dealSignal() {
  c := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    exit <- "shutdown"
  }()
}

func channel1(exited chan<- struct{}) {
  t := time.Tick(time.Second)
  count := 0
  for {
    select {
    case <-t:
      count++
      fmt.Printf("channel1 run %d\n", count)
    case <-exit:
      fmt.Println("channel1 exit")
      close(exited)
      return
    }
  }
}

這個例子首先并發(fā)出一個dealsign方法,用來接收關(guān)閉信號,如果接收到關(guān)閉信號后往exit channel發(fā)送一條消息,然后并發(fā)運行channel1,channel1中定了一個ticker,正常情況下channel1每秒打印第一個case語句,如果接收到exit的信號,進入第二個case,然后關(guān)閉傳入的exited channel,那么main中的Loop,接收到exited關(guān)閉的信號后,打印“main exit begin”, 然后退出循環(huán),進程成功退出。這個例子演示了channel在goroutine中起到的傳遞消息的作用。這個例子是為了向大家展示channel在多個goroutine之間進行通信。

Channel在底層是什么樣的


type hchan struct {
  qcount   uint           // total data in the queue;chan中的元素總數(shù)
  dataqsiz uint           // size of the circular queue;底層循環(huán)數(shù)組的size
  buf      unsafe.Pointer // points to an array of dataqsiz elements,指向底層循環(huán)數(shù)組的指針,只針對有緩沖的channel
  elemsize uint16  //chan中元素的大小
  closed   uint32  //chan是否關(guān)閉
  elemtype *_type // element type;元素類型
  sendx    uint   // send index;已發(fā)送元素在循環(huán)數(shù)組中的索引
  recvx    uint   // receive index;已接收元素在循環(huán)數(shù)組中的索引
  recvq    waitq  // list of recv waiters,等待接收消息的goroutine隊列
  sendq    waitq  // list of send waiters,等待發(fā)送消息的goroutine隊列

  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock mutex
}

type waitq struct {
  first *sudog
  last  *sudog
}

創(chuàng)建一個底層數(shù)組容量為5,元素類型為int,那么channel的數(shù)據(jù)結(jié)構(gòu)如下圖所示:


創(chuàng)建channel的時候到底發(fā)生了什么

創(chuàng)建channel的時候,其實底層是調(diào)用makechan方法,我們來看下源碼:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // compiler checks this but be safe.
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }

  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }

  // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
  // buf points into the same allocation, elemtype is persistent.
  // SudoG's are referenced from their owning thread so they can't be collected.
  // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  var c *hchan
  switch {
  case mem == 0:
    // Queue or element size is zero.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // Race detector uses this location for synchronization.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // Elements do not contain pointers.
    // Allocate hchan and buf in one call.
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  }
  return c
}

從函數(shù)原型來看,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針。

具體來看下代碼:
可以看出makechan中其實主要的代碼就是一個switch,針對不同的情況:

1、case mem == 0代表無緩沖型channel,只分配hchan本身結(jié)構(gòu)體大小的內(nèi)存
2、case elem.ptrdata==0 代表元素類型不含指針,只分配hchan本身結(jié)構(gòu)體大小+元素大小*個數(shù)的內(nèi)存,是連續(xù)的內(nèi)存空間
3、default元素類型包括指針,兩次分配內(nèi)存的操作

channel的接收與發(fā)送

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先創(chuàng)建了一個無緩沖型的channel,然后啟動兩個goroutine去消費channel的數(shù)據(jù),緊接著向channel中發(fā)送數(shù)據(jù)。我們一步一步來分析channel是如何接收和發(fā)送數(shù)據(jù)的,首先來看接收,golang中接收channel數(shù)據(jù)有兩種方式:

i <- ch
i, ok <- ch

// 位于 src/runtime/chan.go

// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài),將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函數(shù)調(diào)用者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內(nèi)容 …………

    // 如果是一個 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否則,接收一個 nil 的 channel,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會執(zhí)行到這里
        throw("unreachable")
    }

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
    // 當我們觀察到 channel 沒準備好接收:
    // 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
    // 2. 緩沖型,但 buf 里沒有元素
    // 之后,又觀察到 closed == 0,即 channel 未關(guān)閉。
    // 因為 channel 不可能被重復打開,所以前一個觀測的時候 channel 也是未關(guān)閉的,
    // 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
    // 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
    // 也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
    // buf 里有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
            // 那么接收的值將是一個該類型的零值
            // typedmemclr 根據(jù)類型清理相應地址的內(nèi)存
            typedmemclr(c.elemtype, ep)
        }
        // 從一個已關(guān)閉的 channel 接收,selected 會返回true
        return true, false
    }

    // 等待發(fā)送隊列里有 goroutine 存在,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩沖型的 channel
    // 2. 緩沖型的 channel,但 buf 滿了
    // 針對 1,直接進行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
    // 針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩沖型,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // 直接從循環(huán)數(shù)組里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循環(huán)數(shù)組里相應位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游標向前移動
        c.recvx++
        // 接收游標歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數(shù)組里的元素個數(shù)減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構(gòu)造一個 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收數(shù)據(jù)的地址保存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進入channel 的等待接收隊列
    c.recvq.enqueue(mysg)
    // 將當前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
Step1

如果channel是nil:如果是非阻塞模式,直接返回(false,false);如果是阻塞模式,調(diào)用goprak掛起goroutine,會阻塞下去。

if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
Step2

快速操作(不用獲取鎖,快速返回),三組條件全部滿足,快速返(false,false)

條件1:首先是在非阻塞模式下
條件2:如果是非緩沖型(datasiz=0)并且等待發(fā)送goroutine隊列為空(sendq.first=nil,就是沒人往channel寫數(shù)據(jù)),或者緩沖型channel(datasiz>0)并且buf中沒有數(shù)據(jù);
條件3:channel未關(guān)閉


//##################step2####################
  if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    atomic.Load(&c.closed) == 0 {
    return
  }
Step3

首先加鎖,如果channel已經(jīng)關(guān)閉,并且buf中沒有元素,返回對應類型的0值,但是received為false;兩種情況

情形1:非緩沖型,channel已關(guān)閉
情形2:緩沖型,channel已關(guān)閉,并且buf無元素

也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel,
buf 里有元素的情況下還能接收到元素


//##################step3####################
  lock(&c.lock)

  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
step4

如果等待發(fā)送隊列中有元素,證明channel已經(jīng)滿了,兩種情形

情形1:非緩沖型,無buf
情形2:緩沖型,buf滿了

//##################step4####################
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

兩種情形都正常進入recv方法,我們來看下源碼:


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  //##################step4-1####################
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      // copy data from sender
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
     //##################step4-2####################
    // Queue is full. Take the item at the
    // head of the queue. Make the sender enqueue
    // its item at the tail of the queue. Since the
    // queue is full, those are both the same slot.
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
    }
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

針對 1,直接進行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)(從發(fā)送者的棧copy到接收者的棧)
針對 2,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部.
然后喚醒等待發(fā)送隊列中的goroutine,等待調(diào)度器調(diào)度。

step5

沒有等待發(fā)送的隊列,并且buf中有元素,直接把接收游標處的數(shù)據(jù)copy到接收數(shù)據(jù)的地址,然后改變hchan中元素數(shù)據(jù)。

if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
step6

如果是非阻塞,那么直接返回;如果是阻塞的,構(gòu)造sudog,保存各種值;將sudog保存到channel的recvq中,調(diào)用goparkunlock將goroutine掛起

if !block {
    unlock(&c.lock)
    return false, false
  }
// no sender available: block on this channel.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

  // someone woke us up
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, !closed

非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值

我們繼續(xù)之前的例子。前面說到第 14 行,創(chuàng)建了一個非緩沖型的 channel,接著,第 15、16 行分別創(chuàng)建了一個 goroutine,各自執(zhí)行了一個接收操作。通過前面的源碼分析,我們知道,這兩個 goroutine (后面稱為 G1 和 G2 好了)都會被阻塞在接收操作。G1 和 G2 會掛在 channel 的 recq 隊列中,形成一個雙向循環(huán)鏈表。

在程序的 17 行之前,chan 的整體數(shù)據(jù)結(jié)構(gòu)如下:


buf 指向一個長度為 0 的數(shù)組,qcount 為 0,表示 channel 中沒有元素。重點關(guān)注 recvq 和 sendq,它們是 waitq 結(jié)構(gòu)體,而 waitq 實際上就是一個雙向鏈表,鏈表的元素是 sudog,里面包含 g 字段,g 表示一個 goroutine,所以 sudog 可以看成一個 goroutine。recvq 存儲那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲那些嘗試寫入 channel,但被阻塞的 goroutine。

此時,我們可以看到,recvq 里掛了兩個 goroutine,也就是前面啟動的 G1 和 G2。因為沒有 goroutine 接收,而 channel 又是無緩沖類型,所以 G1 和 G2 被阻塞。sendq 沒有被阻塞的 goroutine。

再從整體上來看一下 chan 此時的狀態(tài):


當一個channel關(guān)閉后,我們依然可以從中讀出數(shù)據(jù),如果chan的buf中有元素,則讀出的是chan中buf的數(shù)據(jù),如果buf為空,則輸出對應元素類型的零值。那么我們來看下如下的一段程序:

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit1 = make(chan struct{}, 1)

func main() {
  go dealSignal1()
  count := 0
  t := time.Tick(time.Second)
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exit1:
      fmt.Println("main exit begin")
    }
  }
  fmt.Println("main exit over")
}

func dealSignal1() {
  c := make(chan os.Signal, 2)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    close(exit1)
  }()
}

發(fā)送

接著上面的例子,G1 和 G2 現(xiàn)在都在 recvq 隊列里了。
17 行向 channel 發(fā)送了一個元素 3。

發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù),直接上源碼,同樣大部分都注釋了,可以看懂主流程:

// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未發(fā)送成功
        if !block {
            return false
        }
        // 當前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關(guān)……

    // 對于不阻塞的 send,快速檢測失敗場景
    //
    // 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間。這可能是:
    // 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
    // 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel,并發(fā)安全
    lock(&c.lock)

    // 如果 channel 關(guān)閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對于緩沖型的 channel,如果還有緩沖空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將數(shù)據(jù)從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送游標值加 1
        c.sendx++
        // 如果發(fā)送游標值等于容量值,游標值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩沖區(qū)的元素數(shù)量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,則直接返回錯誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了,發(fā)送方會被阻塞。接下來會構(gòu)造一個 sudog

    // 獲取當前 goroutine 的指針
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當前 goroutine 進入發(fā)送等待隊列
    c.sendq.enqueue(mysg)

    // 當前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這里開始被喚醒了(channel 有機會可以發(fā)送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒后,channel 關(guān)閉了??拥?,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上綁定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

我們繼續(xù)往下走,G1、G2被掛起后,往channel中發(fā)送一個數(shù)據(jù)3,其實調(diào)用的是chansend方法,我們還是逐步的去講解

step1

如果channel=nil,當前goroutine會被掛起


if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
step2

依然是一個不加鎖的快速操作,三組條件

條件1:非阻塞
條件2:channel未關(guān)閉
條件3:channel是非緩沖型,并且等待接收隊列為空;或者緩沖型,并且循環(huán)數(shù)組已經(jīng)滿了

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
  }
step3

加鎖,如果channel已經(jīng)關(guān)閉,直接panic

lock(&c.lock)

if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
step4

如果等待接收隊列不為空,說明什么?

情形1:非緩沖型,等待接收隊列不為空
情形2:緩沖型,等待接收隊列不為空(說明buf為空)

兩種情形,都是直接將待發(fā)送數(shù)據(jù)直接copy到接收處

if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接從ep copy到sg
    return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if raceenabled {
    if c.dataqsiz == 0 {
      racesync(c, sg)
    } else {
      // Pretend we go through the buffer, even though
      // we copy directly. Note that we need to increment
      // the head/tail locations only when raceenabled.
      qp := chanbuf(c, c.recvx)
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
        c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  }
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

兩種情形,都直接從一個用一個goroutine操作另一個goroutine的棧,因此在sendDirect方法中會有一次寫屏障

step5

如果等待隊列為空,并且緩沖區(qū)未滿,肯定是緩沖型的channel


if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

將元素放在sendx處,然后sendx加1,channel總量加1

step6

如果以上情況都沒有命中,說明什么?說明channel已經(jīng)滿了,如果是非阻塞的直接返回,否則需要調(diào)用gopack將這個goroutine掛起,等待被喚醒。

if !block {
    unlock(&c.lock)
    return false
  }

  // Block on the channel. Some receiver will complete our operation for us.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  // Ensure the value being sent is kept alive until the
  // receiver copies it out. The sudog has a pointer to the
  // stack object, but sudogs aren't considered as roots of the
  // stack tracer.
  KeepAlive(ep)

我們對照程序分析下,在前一個小節(jié)G1、G2被掛起來了,等待sender的解救;這時候往ch中發(fā)送了一個3,(step4)這時sender發(fā)現(xiàn)ch的等待接收隊列recvq中有receiver,就會出隊一個sudog,然后將元素直接copy到sudog的elem處,然后調(diào)用goready將G1喚醒,繼續(xù)執(zhí)行G1原來的代碼,打印出結(jié)果。如下圖:



當調(diào)度器光顧 G1 時,將 G1 變成 running 狀態(tài),執(zhí)行 goroutineA 接下來的代碼。G 表示其他可能有的 goroutine。

這里其實涉及到一個協(xié)程寫另一個協(xié)程棧的操作。有兩個 receiver 在 channel 的一邊虎視眈眈地等著,這時 channel 另一邊來了一個 sender 準備向 channel 發(fā)送數(shù)據(jù),為了高效,用不著通過 channel 的 buf “中轉(zhuǎn)”一次,直接從源地址把數(shù)據(jù) copy 到目的地址就可以了,效率高?。?/p>

關(guān)閉

close一個channel會調(diào)用closechan方法,比較簡單,我們也來看下

func closechan(c *hchan) {
    // 關(guān)閉一個 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上鎖
    lock(&c.lock)
    // 如果 channel 已經(jīng)關(guān)閉
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改關(guān)閉狀態(tài)
    c.closed = 1

    var glist *g

    // 將 channel 所有等待接收隊列的里 sudog 釋放
    for {
        // 從接收隊列里出隊一個 sudog
        sg := c.recvq.dequeue()
        // 出隊完畢,跳出循環(huán)
        if sg == nil {
            break
        }

        // 如果 elem 不為空,說明此 receiver 未忽略接收數(shù)據(jù)
        // 給它賦一個相應類型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相連,形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 將 channel 等待發(fā)送隊列里的 sudog 釋放
    // 如果存在,這些 goroutine 將會 panic
    for {
        // 從發(fā)送隊列里出隊一個 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 發(fā)送者會 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍歷鏈表
    for glist != nil {
        // 取最后一個
        gp := glist
        // 向前走一步,下一個喚醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 喚醒相應 goroutine
        goready(gp, 3)
    }
}
step1

如果channel為nil,會直接panic

if c == nil {
    panic(plainError("close of nil channel"))
  }
step2

加鎖,如果channel已經(jīng)關(guān)閉,再次關(guān)閉會panic


lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
step3

首選將hchan對應close標志置為1,然后聲明一個鏈表;將等待接收隊列中的所有sudog加入到鏈表,并將其elem賦予一個相應類型的0值;


c.closed = 1

  var glist gList

  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
step4

向所有等待發(fā)送隊列的sudog加入鏈表


// release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)
step5

喚醒sudog所有g(shù)oroutine

for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }

close 邏輯比較簡單,對于一個 channel,recvq 和 sendq 中分別保存了阻塞的發(fā)送者和接收者。關(guān)閉 channel 后,對于等待接收者而言,會收到一個相應類型的零值。對于等待發(fā)送者,會直接 panic。所以,在不了解 channel 還有沒有接收者的情況下,不能貿(mào)然關(guān)閉 channel。

close 函數(shù)先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒。

喚醒之后,該干嘛干嘛。sender 會繼續(xù)執(zhí)行 chansend 函數(shù)里 goparkunlock 函數(shù)之后的代碼,很不幸,檢測到 channel 已經(jīng)關(guān)閉了,panic。receiver 則比較幸運,進行一些掃尾工作后,返回。這里,selected 返回 true,而返回值 received 則要根據(jù) channel 是否關(guān)閉,返回不同的值。如果 channel 關(guān)閉,received 為 false,否則為 true。

總結(jié)

總結(jié)一下,發(fā)生 panic 的情況有三種:

1.向一個關(guān)閉的 channel 進行寫操作;

  1. 關(guān)閉一個 nil 的 channel;
  2. 重復關(guān)閉一個 channel。

讀、寫一個 nil channel 都會被阻塞。

channel發(fā)送和接收元素的本質(zhì)還是值得拷貝
channel是并發(fā)安全的(加鎖)

參考:博客園-深度解密Go語言只channel
好未來Golang源碼系列三:Channel實現(xiàn)原理分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容