一文讀懂 Go sync.Cond 設計

Go 語言通過 go 關(guān)鍵字開啟 goroutine 讓開發(fā)者可以輕松地實現(xiàn)并發(fā)編程,而并發(fā)程序的有效運行,往往離不開 sync 包的保駕護航。目前,sync 包的賦能列表包括: sync.atomic 下的原子操作、sync.Map 并發(fā)安全 map、sync.Mutexsync.RWMutex 提供的互斥鎖與讀寫鎖、sync.Pool 復用對象池、sync.Once 單例模式、 sync.Waitgroup 的多任務協(xié)作模式、sync.Cond 的監(jiān)視器模式。當然,除了 sync 包,還有封裝層面更高的 channelcontext。

要想寫出合格的 Go 程序,以上的這些并發(fā)原語是必須要掌握的。對于大多數(shù) Gopher 而言,sync.Cond 應該是最為陌生,本文將一探究竟。

初識 sync.Cond

sync.Cond 字面意義就是同步條件變量,它實現(xiàn)的是一種監(jiān)視器(Monitor)模式。

In concurrent programming(also known as parallel programming), a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become false.

對于 Cond 而言,它實現(xiàn)一個條件變量,是 goroutine 間等待和通知的點。條件變量與共享的數(shù)據(jù)隔離,它可以同時阻塞多個 goroutine,直到另外的 goroutine 更改了條件變量,并通知喚醒阻塞著的一個或多個 goroutine。

初次接觸的讀者,可能會不太明白,那么下面我們看一下 GopherCon 2018 上《Rethinking Classical Concurrency Patterns》 中的演示代碼例子。

type Item = int

type Queue struct {
   items     []Item
   itemAdded sync.Cond
}

func NewQueue() *Queue {
   q := new(Queue)
   q.itemAdded.L = &sync.Mutex{} // 為 Cond 綁定鎖
   return q
}

func (q *Queue) Put(item Item) {
   q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   q.items = append(q.items, item)
   q.itemAdded.Signal()        // 當 Queue 中加入數(shù)據(jù)成功,調(diào)用 Singal 發(fā)送通知
}

func (q *Queue) GetMany(n int) []Item {
   q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   for len(q.items) < n {     // 等待 Queue 中有 n 個數(shù)據(jù)
      q.itemAdded.Wait()      // 阻塞等待 Singal 發(fā)送通知
   }
   items := q.items[:n:n]
   q.items = q.items[n:]
   return items
}

func main() {
   q := NewQueue()

   var wg sync.WaitGroup
   for n := 10; n > 0; n-- {
      wg.Add(1)
      go func(n int) {
         items := q.GetMany(n)
         fmt.Printf("%2d: %2d\n", n, items)
         wg.Done()
      }(n)
   }

   for i := 0; i < 100; i++ {
      q.Put(i)
   }

   wg.Wait()
}

在這個例子中,Queue 是存儲數(shù)據(jù) Item 的結(jié)構(gòu)體,它通過 Cond 類型的 itemAdded 來控制數(shù)據(jù)的輸入與輸出??梢宰⒁獾?,這里通過 10 個 goroutine 來消費數(shù)據(jù),但它們所需的數(shù)據(jù)量并不相等,我們可以稱之為 batch,依次在 1-10 之間。之后,逐步添加 100 個數(shù)據(jù)至 Queue 中。最后,我們能夠看到 10 個 gotoutine 都能被喚醒,得到它想要的數(shù)據(jù)。

程序運行結(jié)果如下

 6: [ 7  8  9 10 11 12]
 5: [50 51 52 53 54]
 9: [14 15 16 17 18 19 20 21 22]
 1: [13]
 2: [33 34]
 4: [35 36 37 38]
 3: [39 40 41]
 7: [ 0  1  2  3  4  5  6]
 8: [42 43 44 45 46 47 48 49]
10: [23 24 25 26 27 28 29 30 31 32]

當然,程序每次運行結(jié)果都不會相同,以上輸出只是某一種情況。

sync.Cond 實現(xiàn)

$GOPATH/src/sync/cond.go 中,Cond 的結(jié)構(gòu)體定義如下

type Cond struct {
   noCopy noCopy
   L Locker
   notify  notifyList
   checker copyChecker
}

其中,noCopychecker 字段均是為了避免 Cond 在使用過程中被復制,詳見小菜刀的 《no copy 機制》 一文。

L 是 Locker 接口,一般該字段的實際對象是 *RWmutex 或者 *Mutex。

type Locker interface {
   Lock()
   Unlock()
}

notifyList 記錄的是一個基于票號的通知列表,這里初次看注釋看不懂沒關(guān)系,和下文來回連貫著看。

type notifyList struct {
   wait   uint32         // 用于記錄下一個等待者 waiter 的票號
   notify uint32         // 用于記錄下一個應該被通知的 waiter 的票號
   lock   uintptr        // 內(nèi)部鎖
   head   unsafe.Pointer // 指向等待者 waiter 的隊列隊頭
   tail   unsafe.Pointer // 指向等待者 waiter 的隊列隊尾
}

其中,headtail 是指向 sudog 結(jié)構(gòu)體的指針,sudog 是代表的處于等待列表的 goroutine,它本身就是雙向鏈表。值得一提的是,在 sudog 中有一個字段 ticket 就是用于給當前 goroutine 記錄票號使用的。

Cond 實現(xiàn)的核心模式為票務系統(tǒng)(ticket system),每一個想要來買票的 goroutine (調(diào)用Cond.Wait())我們稱之為 waiter,票務系統(tǒng)會給每個 waiter 分配一個取票碼,等供票方有該取票碼的號時,就會喚醒 waiter。賣票的 goroutine 有兩種,第一種是調(diào)用 Cond.Signal() 的,它會按照票號喚醒一個買票的 waiter (如果有的話),第二種是調(diào)用 Cond.Broadcast() 的,它會通知喚醒所有的阻塞 waiter。為了方便讀者能夠比較輕松地理解票務系統(tǒng),下面我們給出圖解示例。

水印圖片.png

在 上文中,我們知道 Cond 字段中 notifyList 結(jié)構(gòu)體是一個記錄票號的通知列表。這里將 notifyList 比作排隊取票買電影票,當 G1 通過 Wait 來買票時,發(fā)現(xiàn)此時并沒有票可買,因此他只能阻塞等待有票之后的通知,此時他手上已經(jīng)取得了專屬取票碼 0。同樣的,G2 和 G3 也同樣無票可買,它們分別取到了自己的取票碼 1和 2。而 G4 是電影票提供商,它是賣票的,它通過兩次 Signal 先后帶來了兩張票,按照票號順序依次通知了 G1 和 G2 來取票,并把 notify 更新為了最新的 1。G5 也是買票的,它發(fā)現(xiàn)此時已經(jīng)無票可買了,拿了自己的取票碼 3 ,就阻塞等待了。G6 是個大票商,它通過 Broadcast 可以滿足所有正在等待的買票者都買到票,此時等待的是 G3 和 G5,因此他直接喚醒了 G3 和 G5,并將 notify 更新到和 wait 值相等。

理解了上述取票系統(tǒng)的運作原理后,我們下面來看 Cond 包下四個實際對外方法函數(shù)的實現(xiàn)。

  • NewCond 方法
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

用于初始化 Cond 對象,就是初始化控制鎖。

  • Cond.Wait 方法
func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}

runtime_notifyListAdd 的實現(xiàn)在 runtime/sema.go 的 notifyListAdd ,它用于原子性地增加等待者的 waiter 票號,并返回當前 goroutine 應該取的票號值 t 。runtime_notifyListWait 的實現(xiàn)在runtime/sema.go 的 notifyListWait,它會嘗試去比較此時 goroutine 的應取票號 tnotify 中記錄的當前應該被通知的票號。如果 t 小于當前票號,那么直接能得到返回,否則將會則塞等待,通知取號。

同時,這里需要注意的是,由于在進入 runtime_notifyListWait 時,當前 goroutine 通過 c.L.Unlock() 將鎖解了,這就意味著有可能會有多個 goroutine 來讓條件發(fā)生變化。那么,當前 goroutine 是不能保證在 runtime_notifyListWait 返回后,條件就一定是真的,因此需要循環(huán)判斷條件。正確的 Wait 使用姿勢如下:

//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
  • Cond.Signal 方法
func (c *Cond) Signal() {   c.checker.check()   runtime_notifyListNotifyOne(&c.notify)}

runtime_notifyListNotifyOne 的詳細實現(xiàn)在 runtime/sema.go 的 notifyListNotifyOne,它的目的就是通知 waiter 取票。具體操作是:如果在上一次通知取票之后沒有新的 waiter 取票者,那么該函數(shù)會直接返回。否則,它會將取票號 +1,并通知喚醒等待取票的 waiter。

需要注意的是,調(diào)用 Signal 方法時,并不需要持有 c.L 鎖。

  • Cond.Broadcast 方法
func (c *Cond) Broadcast() {   c.checker.check()   runtime_notifyListNotifyAll(&c.notify)}

runtime_notifyListNotifyAll 的詳細實現(xiàn)在 runtime/sema.go 的 notifyListNotifyAll,它會通知喚醒所有的 waiter,并將 notify 值置為 和 wait 值相等。調(diào)用 Broadcast 方法時,也不需要持有 c.L 鎖。

討論

$GOPATH/src/sync/cond.go 下,我們可以發(fā)現(xiàn)其代碼量非常之少,但它呈現(xiàn)的只是核心邏輯,其實現(xiàn)細節(jié)位于 runtime/sema.go 之中,依賴的是 runtime 層的調(diào)度原語,對細節(jié)感興趣的讀者可以深入學習。

問題來了,為什么在日常開發(fā)中,我們很少會使用到 sync.Cond ?

  • 無效喚醒

前文中我們提到,使用 Cond.Wait 正確姿勢如下

    c.L.Lock()    for !condition() {        c.Wait()    }    ... make use of condition ...    c.L.Unlock()

以文章開頭的例子而言,如果在每次調(diào)用 Put 方法時,使用 Broadcast 方法喚醒所有的 waiter,那么很大概率上被喚醒的 waiter 醒來發(fā)現(xiàn)條件并不滿足,又會重新進入等待。盡管是調(diào)用 Signal 方法喚醒指定的 waiter,但是它也不能保證喚醒的 waiter 條件一定滿足。因此,在實際的使用中,我們需要盡量保證喚醒操作是有效地,為了做到這點,代碼的復雜度難免會增加。

  • 饑餓問題

還是以文章開頭例子為例,如果同時有多個 goroutine 執(zhí)行 GetMany(3) 和 GetMany(3000),執(zhí)行 GetMany(3) 與執(zhí)行 GetMany(3000) 的 goroutine 被喚醒的概率是一樣的,但是由于 GetMany(3) 只需要 3個數(shù)據(jù)就能滿足條件,那么如果一直存在 GetMany(3) 的 goroutine,執(zhí)行 GetMany(3000) 的 goroutine 將永遠拿不到數(shù)據(jù),一直被無效喚醒。

  • 不能響應其他事件

條件變量的意義在于讓 goroutine 等待某種條件發(fā)生時進入睡眠狀態(tài)。但是這會讓 goroutine 在等待條件時,可能會錯過一些需要注意的其他事件。例如,調(diào)用 Cond.Wait 的函數(shù)中包含了 context 上下文,當 context 傳來取消信號時,它并不能像我們期望的一樣,獲取到取消信號并退出。Cond 的使用,讓我們不能同時選擇(select)條件和其他事件。

  • 可替代性

通過對 sync.Cond 幾個對外方法的分析,我們不難看到,它的使用場景是可以被 channel 所代替的,但是這也會增加代碼的復雜性。上文中的例子,可以使用 channel 改寫如下。

type Item = inttype waiter struct { n int   c chan []Item}type state struct {   items []Item    wait  []waiter}type Queue struct {  s chan state}func NewQueue() *Queue {   s := make(chan state, 1)    s <- state{}    return &Queue{s}}func (q *Queue) Put(item Item) {   s := <-q.s  s.items = append(s.items, item) for len(s.wait) > 0 {       w := s.wait[0]      if len(s.items) < w.n {         break       }       w.c <- s.items[:w.n:w.n]        s.items = s.items[w.n:]     s.wait = s.wait[1:] }   q.s <- s}func (q *Queue) GetMany(n int) []Item {    s := <-q.s  if len(s.wait) == 0 && len(s.items) >= n {      items := s.items[:n:n]      s.items = s.items[n:]       q.s <- s        return items    }   c := make(chan []Item)  s.wait = append(s.wait, waiter{n, c})   q.s <- s    return <-c}

最后,雖然在上文的討論中都是列出的 sync.Cond 潛在問題,但是如果開發(fā)者能夠在使用中考慮到以上的幾點問題,對于監(jiān)視器模型的實現(xiàn)而言,在代碼的語義邏輯上,sync.Cond 的使用會比 channel 的模式更易理解和維護。記住一點,通俗易懂的代碼模型總是比深奧的炫技要接地氣。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 鎖 Mutex 互斥鎖 互斥即不可同時運行。即使用了互斥鎖的兩個代碼片段互相排斥,只有其中一個代碼片段執(zhí)行完成后,...
    Xuenqlve閱讀 1,049評論 0 1
  • 官方包的注釋: sync包提供基礎的同步原語,sync.Mutext、sync.RWMutex、sync.Wait...
    thepoy閱讀 776評論 0 1
  • 本文從上下文Context、同步原語與鎖、Channel、調(diào)度器四個方面介紹Go語言是如何實現(xiàn)并發(fā)的。本文絕大部分...
    彥幀閱讀 1,711評論 1 3
  • 版本 go version 1.10.1 使用方法 數(shù)據(jù)結(jié)構(gòu) noCopy:noCopy對象,擁有一個Lock方法...
    不就是個名字么不要在意閱讀 1,271評論 0 1
  • 16宿命:用概率思維提高你的勝算 以前的我是風險厭惡者,不喜歡去冒險,但是人生放棄了冒險,也就放棄了無數(shù)的可能。 ...
    yichen大刀閱讀 7,803評論 0 4

友情鏈接更多精彩內(nèi)容