手摸手Go 深入理解sync.Cond

sync.Cond實現(xiàn)了一個條件變量,用于等待一個或一組goroutines滿足條件后喚醒的場景。每個Cond關(guān)聯(lián)一個Locker通常是一個*MutexRWMutex`根據(jù)需求初始化不同的鎖。

基本用法

老規(guī)矩正式剖析源碼前,先來看看sync.Cond如何使用。比如我們實現(xiàn)一個FIFO的隊列

package main

import (
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "time"
)

type FIFO struct {
    lock  sync.Mutex
    cond  *sync.Cond
    queue []int
}

type Queue interface {
    Pop() int
    Offer(num int) error
}

func (f *FIFO) Offer(num int) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.queue = append(f.queue, num)
    f.cond.Broadcast()
    return nil
}
func (f *FIFO) Pop() int {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            f.cond.Wait()
        }
        item := f.queue[0]
        f.queue = f.queue[1:]
        return item
    }
}

func main() {
    l := sync.Mutex{}
    fifo := &FIFO{
        lock:  l,
        cond:  sync.NewCond(&l),
        queue: []int{},
    }
    go func() {
        for {
            fifo.Offer(rand.Int())
        }
    }()
    time.Sleep(time.Second)
    go func() {
        for {
            fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
        }
    }()
    go func() {
        for {
            fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
        }
    }()

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

我們定一個FIFO 隊列有OfferPop兩個操作,我們起一個gorountine不斷向隊列投放數(shù)據(jù),另外兩個gorountine不斷取拿數(shù)據(jù)。

  1. Pop操作會判斷如果隊列里沒有數(shù)據(jù)len(f.queue) == 0則調(diào)用f.cond.Wait()goroutine掛起。
  2. 等到Offer操作投放數(shù)據(jù)成功,里面調(diào)用f.cond.Broadcast()來喚醒所有掛起在這個mutex上的goroutine。當然sync.Cond也提供了一個Signal(),有點兒類似Java中的notify()notifyAll()的意思 主要是喚醒一個和喚醒全部的區(qū)別。

總結(jié)一下sync.Mutex的大致用法

  1. 首先聲明一個mutex,這里sync.Mutex/sync.RWMutex可根據(jù)實際情況選用
  2. 調(diào)用sync.NewCond(l Locker) *Cond 使用1中的mutex作為入?yún)?注意 這里傳入的是指針 為了避免c.L.Lock()c.L.Unlock()調(diào)用頻繁復制鎖 導致死鎖
  3. 根據(jù)業(yè)務條件 滿足則調(diào)用cond.Wait()掛起goroutine
  4. cond.Broadcast()喚起所有掛起的gorotune 另一個方法cond.Signal()喚醒一個最先掛起的goroutine

需要注意的是cond.wait()的使用需要參照如下模版 具體為啥我們后續(xù)分析

    c.L.Lock()
    for !condition() {
        c.Wait()
    }
    ... make use of condition ...
   c.L.Unlock()

源碼分析

數(shù)據(jù)結(jié)構(gòu)

分析具體方法前,我們先來了解下sync.Cond的數(shù)據(jù)結(jié)構(gòu)。具體源碼如下:

type Cond struct {
    noCopy noCopy // Cond使用后不允許拷貝
    // L is held while observing or changing the condition
    L Locker
  //通知列表調(diào)用wait()方法的goroutine會被放到notifyList中
    notify  notifyList
    checker copyChecker //檢查Cond實例是否被復制
}

noCopy之前講過 不清楚的可以看下《你真的了解mutex嗎》,除此之外,Locker是我們剛剛談到的mutex,copyChecker是用來檢查Cond實例是否被復制的,就有一個方法 :

func (c *copyChecker) check() {
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}

大致意思是說,初始type copyChecker uintptr默認為0,當?shù)谝淮握{(diào)用check()會將copyChecker自身的地址復制給自己,至于為什么uintptr(*c) != uintptr(unsafe.Pointer(c))會被調(diào)用2次,因為期間goroutine可能已經(jīng)改變copyChecker。二次調(diào)用如果不相等,則說明sync.Cond被復制,重新分配了內(nèi)存地址。

sync.Cond比較有意思的是notifyList

type notifyList struct {
    // wait is the ticket number of the next waiter. It is atomically
    // incremented outside the lock.
    wait uint32 // 等待goroutine操作的數(shù)量

    // notify is the ticket number of the next waiter to be notified. It can
    // be read outside the lock, but is only written to with lock held.
    //
    // Both wait & notify can wrap around, and such cases will be correctly
    // handled as long as their "unwrapped" difference is bounded by 2^31.
    // For this not to be the case, we'd need to have 2^31+ goroutines
    // blocked on the same condvar, which is currently not possible.
    notify uint32 // 喚醒goroutine操作的數(shù)量

    // List of parked waiters.
    lock mutex
    head *sudog
    tail *sudog
}

包含了3類字段:

  • waitnotify兩個無符號整型,分別表示了Wait()操作的次數(shù)和goroutine被喚醒的次數(shù),wait應該是恒大于等于notify
  • lock mutex 這個跟sync.Mutex我們分析信號量阻塞隊列時semaRoot里的mutex一樣,并不是Go提供開發(fā)者使用的sync.Mutex,而是系統(tǒng)內(nèi)部運行時實現(xiàn)的一個簡單版本的互斥鎖。
  • headtail看名字,我們就能腦補出跟鏈表很像 沒錯這里就是維護了阻塞在當前sync.Cond上的goroutine構(gòu)成的鏈表

整體來講sync.Cond大體結(jié)構(gòu)為:

cond architecture

操作方法

Wait()操作

func (c *Cond) Wait() {
  //1. 檢查cond是否被拷貝
    c.checker.check()
  //2. notifyList.wait+1
    t := runtime_notifyListAdd(&c.notify)
  //3. 釋放鎖 讓出資源給其他goroutine
    c.L.Unlock()
  //4. 掛起goroutine
    runtime_notifyListWait(&c.notify, t)
  //5. 嘗試獲得鎖
    c.L.Lock()
}

Wait()方法源碼很容易看出它的操作大概分了5步:

  1. 調(diào)用copyChecker.check()保證sync.Cond不會被拷貝
  2. 每次調(diào)用Wait()會將sync.Cond.notifyList.wait屬性進行加一操作,這也是它完成FIFO的基石,根據(jù)wait來判斷`goroutine1等待的順序
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}
  1. 調(diào)用c.L.Unlock()釋放鎖,因為當前goroutine即將被gopark,讓出鎖給其他goroutine避免死鎖
  2. 調(diào)用runtime_notifyListWait(&c.notify, t)可能稍微復雜一點兒
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
    lockWithRank(&l.lock, lockRankNotifyList)

    // 如果已經(jīng)被喚醒 則立即返回
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
  // 把等待遞增序號賦值給s.ticket 為FIFO打基礎(chǔ)
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
  // 將當前goroutine插入到notifyList鏈表中
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
  // 最終調(diào)用gopark掛起當前goroutine
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
  // goroutine被喚醒后釋放sudog
    releaseSudog(s)
}

主要完成兩個任務:

  • 將當前goroutine插入到notifyList鏈表中
  • 調(diào)用gopark將當前goroutine掛起
  1. 當其他goroutine調(diào)用了SignalBroadcast方法,當前goroutine被喚醒后 再次嘗試獲得鎖

Signal操作

Signal喚醒一個等待時間最長的goroutine,調(diào)用時不要求持有鎖。

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

具體實現(xiàn)也不復雜,先判斷sync.Cond是否被復制,然后調(diào)用runtime_notifyListNotifyOne

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
  // wait==notify 說明沒有等待的goroutine了
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }
    lockWithRank(&l.lock, lockRankNotifyList)
    // 鎖下二次檢查
    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    // 更新下一個需要被喚醒的ticket number
    atomic.Store(&l.notify, t+1)

    // Try to find the g that needs to be notified.
    // If it hasn't made it to the list yet we won't find it,
    // but it won't park itself once it sees the new notify number.
    //
    // This scan looks linear but essentially always stops quickly.
    // Because g's queue separately from taking numbers,
    // there may be minor reorderings in the list, but we
    // expect the g we're looking for to be near the front.
    // The g has others in front of it on the list only to the
    // extent that it lost the race, so the iteration will not
    // be too long. This applies even when the g is missing:
    // it hasn't yet gotten to sleep and has lost the race to
    // the (few) other g's that we find on the list.
  //這里是FIFO實現(xiàn)的核心 其實就是遍歷鏈表 sudog.ticket查找指定需要喚醒的節(jié)點
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

主要邏輯:

  1. 判斷是否存在等待需要被喚醒的goroutine 沒有直接返回
  2. 遞增notify屬性,因為是根據(jù)notifysudog.ticket匹配來查找需要喚醒的goroutine,因為其是遞增生成的,故而有了FIFO語義。
  3. 遍歷notifyList持有的鏈表,從head開始依據(jù)next指針依次遍歷。這個過程是線性的,故而時間復雜度為O(n),不過官方說法這個過程實際比較快This scan looks linear but essentially always stops quickly.

有個小細節(jié):還記得我們Wait()操作中,wait屬性原子更新和goroutine插入等待鏈表是兩個單獨的步驟,所以存在競爭的情況下,鏈表中的節(jié)點可能會輕微的亂序產(chǎn)生。但是不要擔心,因為ticket是原子遞增的 所以喚醒順序不會亂。

Broadcast操作

Broadcast()Singal()區(qū)別主要是它可以喚醒全部等待的goroutine,并直接將wait屬性的值賦值給notify。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
    // Fast-path 無等待goroutine直接返回
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    // 直接更新notify=wait
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // 依次調(diào)用goready喚醒goroutine
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

邏輯比較簡單不再贅述

總結(jié)

  1. sync.Cond一旦創(chuàng)建使用 不允許被拷貝,由noCopycopyChecker來限制保護。
  2. Wait()操作先是遞增notifyList.wait屬性 然后將goroutine封裝進sudog,將notifyList.wait賦值給sudog.ticket,然后將sudog插入notifyList鏈表中
  3. Singal()實際是按照notifyList.notifynotifyList鏈表中節(jié)點的ticket匹配 來確定喚醒的goroutine,因為notifyList.notifynotifyList.wait都是原子遞增的,故而有了FIFO的語義
  4. Broadcast()相對簡單 就是喚醒全部等待的goroutine
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文從上下文Context、同步原語與鎖、Channel、調(diào)度器四個方面介紹Go語言是如何實現(xiàn)并發(fā)的。本文絕大部分...
    彥幀閱讀 1,713評論 1 3
  • Select select 可見監(jiān)聽 Channel 上的數(shù)據(jù)流動; select 結(jié)構(gòu)與 switch 的結(jié)構(gòu)類...
    hellomyshadow閱讀 242評論 0 0
  • 版本 go version 1.10.1 使用方法 數(shù)據(jù)結(jié)構(gòu) noCopy:noCopy對象,擁有一個Lock方法...
    不就是個名字么不要在意閱讀 1,276評論 0 1
  • 如果能夠?qū)⑺袃?nèi)存都分配到棧上無疑性能是最佳的,但不幸的是我們不可避免需要使用堆上分配的內(nèi)存。我們可以優(yōu)化使用堆內(nèi)...
    光華路程序猿閱讀 598評論 0 1
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點、注意力、語言聯(lián)想、情景聯(lián)想 觀點: 1.統(tǒng)計學現(xiàn)在叫數(shù)據(jù)分析,社會...
    Jenaral閱讀 5,996評論 0 5

友情鏈接更多精彩內(nèi)容