sync.Cond實現(xiàn)了一個條件變量,用于等待一個或一組goroutines滿足條件后喚醒的場景。每個Cond關(guān)聯(lián)一個Locker通常是一個*Mutex或RWMutex`根據(jù)需求初始化不同的鎖。
基本用法
老規(guī)矩正式剖析源碼前,先來看看sync.Cond如何使用。比如我們實現(xiàn)一個FIFO的隊列
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
type FIFO struct {
lock sync.Mutex
cond *sync.Cond
queue []int
}
type Queue interface {
Pop() int
Offer(num int) error
}
func (f *FIFO) Offer(num int) error {
f.lock.Lock()
defer f.lock.Unlock()
f.queue = append(f.queue, num)
f.cond.Broadcast()
return nil
}
func (f *FIFO) Pop() int {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
f.cond.Wait()
}
item := f.queue[0]
f.queue = f.queue[1:]
return item
}
}
func main() {
l := sync.Mutex{}
fifo := &FIFO{
lock: l,
cond: sync.NewCond(&l),
queue: []int{},
}
go func() {
for {
fifo.Offer(rand.Int())
}
}()
time.Sleep(time.Second)
go func() {
for {
fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
}
}()
go func() {
for {
fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
我們定一個FIFO 隊列有Offer和Pop兩個操作,我們起一個gorountine不斷向隊列投放數(shù)據(jù),另外兩個gorountine不斷取拿數(shù)據(jù)。
-
Pop操作會判斷如果隊列里沒有數(shù)據(jù)len(f.queue) == 0則調(diào)用f.cond.Wait()將goroutine掛起。 - 等到
Offer操作投放數(shù)據(jù)成功,里面調(diào)用f.cond.Broadcast()來喚醒所有掛起在這個mutex上的goroutine。當然sync.Cond也提供了一個Signal(),有點兒類似Java中的notify()和notifyAll()的意思 主要是喚醒一個和喚醒全部的區(qū)別。
總結(jié)一下sync.Mutex的大致用法
- 首先聲明一個
mutex,這里sync.Mutex/sync.RWMutex可根據(jù)實際情況選用 - 調(diào)用
sync.NewCond(l Locker) *Cond使用1中的mutex作為入?yún)?注意 這里傳入的是指針 為了避免c.L.Lock()、c.L.Unlock()調(diào)用頻繁復制鎖 導致死鎖 - 根據(jù)業(yè)務條件 滿足則調(diào)用
cond.Wait()掛起goroutine -
cond.Broadcast()喚起所有掛起的gorotune另一個方法cond.Signal()喚醒一個最先掛起的goroutine
需要注意的是cond.wait()的使用需要參照如下模版 具體為啥我們后續(xù)分析
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
源碼分析
數(shù)據(jù)結(jié)構(gòu)
分析具體方法前,我們先來了解下sync.Cond的數(shù)據(jù)結(jié)構(gòu)。具體源碼如下:
type Cond struct {
noCopy noCopy // Cond使用后不允許拷貝
// L is held while observing or changing the condition
L Locker
//通知列表調(diào)用wait()方法的goroutine會被放到notifyList中
notify notifyList
checker copyChecker //檢查Cond實例是否被復制
}
noCopy之前講過 不清楚的可以看下《你真的了解mutex嗎》,除此之外,Locker是我們剛剛談到的mutex,copyChecker是用來檢查Cond實例是否被復制的,就有一個方法 :
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
大致意思是說,初始type copyChecker uintptr默認為0,當?shù)谝淮握{(diào)用check()會將copyChecker自身的地址復制給自己,至于為什么uintptr(*c) != uintptr(unsafe.Pointer(c))會被調(diào)用2次,因為期間goroutine可能已經(jīng)改變copyChecker。二次調(diào)用如果不相等,則說明sync.Cond被復制,重新分配了內(nèi)存地址。
sync.Cond比較有意思的是notifyList
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32 // 等待goroutine操作的數(shù)量
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32 // 喚醒goroutine操作的數(shù)量
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
包含了3類字段:
-
wait和notify兩個無符號整型,分別表示了Wait()操作的次數(shù)和goroutine被喚醒的次數(shù),wait應該是恒大于等于notify -
lock mutex這個跟sync.Mutex我們分析信號量阻塞隊列時semaRoot里的mutex一樣,并不是Go提供開發(fā)者使用的sync.Mutex,而是系統(tǒng)內(nèi)部運行時實現(xiàn)的一個簡單版本的互斥鎖。 -
head和tail看名字,我們就能腦補出跟鏈表很像 沒錯這里就是維護了阻塞在當前sync.Cond上的goroutine構(gòu)成的鏈表
整體來講sync.Cond大體結(jié)構(gòu)為:

操作方法
Wait()操作
func (c *Cond) Wait() {
//1. 檢查cond是否被拷貝
c.checker.check()
//2. notifyList.wait+1
t := runtime_notifyListAdd(&c.notify)
//3. 釋放鎖 讓出資源給其他goroutine
c.L.Unlock()
//4. 掛起goroutine
runtime_notifyListWait(&c.notify, t)
//5. 嘗試獲得鎖
c.L.Lock()
}
從Wait()方法源碼很容易看出它的操作大概分了5步:
- 調(diào)用
copyChecker.check()保證sync.Cond不會被拷貝 - 每次調(diào)用
Wait()會將sync.Cond.notifyList.wait屬性進行加一操作,這也是它完成FIFO的基石,根據(jù)wait來判斷`goroutine1等待的順序
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
- 調(diào)用
c.L.Unlock()釋放鎖,因為當前goroutine即將被gopark,讓出鎖給其他goroutine避免死鎖 - 調(diào)用
runtime_notifyListWait(&c.notify, t)可能稍微復雜一點兒
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果已經(jīng)被喚醒 則立即返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
// 把等待遞增序號賦值給s.ticket 為FIFO打基礎(chǔ)
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 將當前goroutine插入到notifyList鏈表中
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 最終調(diào)用gopark掛起當前goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
// goroutine被喚醒后釋放sudog
releaseSudog(s)
}
主要完成兩個任務:
- 將當前goroutine插入到notifyList鏈表中
- 調(diào)用gopark將當前goroutine掛起
- 當其他goroutine調(diào)用了
Signal或Broadcast方法,當前goroutine被喚醒后 再次嘗試獲得鎖
Signal操作
Signal喚醒一個等待時間最長的goroutine,調(diào)用時不要求持有鎖。
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
具體實現(xiàn)也不復雜,先判斷sync.Cond是否被復制,然后調(diào)用runtime_notifyListNotifyOne
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// wait==notify 說明沒有等待的goroutine了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// 鎖下二次檢查
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 更新下一個需要被喚醒的ticket number
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
//這里是FIFO實現(xiàn)的核心 其實就是遍歷鏈表 sudog.ticket查找指定需要喚醒的節(jié)點
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
主要邏輯:
- 判斷是否存在等待需要被喚醒的goroutine 沒有直接返回
- 遞增
notify屬性,因為是根據(jù)notify和sudog.ticket匹配來查找需要喚醒的goroutine,因為其是遞增生成的,故而有了FIFO語義。 - 遍歷notifyList持有的鏈表,從
head開始依據(jù)next指針依次遍歷。這個過程是線性的,故而時間復雜度為O(n),不過官方說法這個過程實際比較快This scan looks linear but essentially always stops quickly.
有個小細節(jié):還記得我們Wait()操作中,wait屬性原子更新和goroutine插入等待鏈表是兩個單獨的步驟,所以存在競爭的情況下,鏈表中的節(jié)點可能會輕微的亂序產(chǎn)生。但是不要擔心,因為ticket是原子遞增的 所以喚醒順序不會亂。
Broadcast操作
Broadcast()與Singal()區(qū)別主要是它可以喚醒全部等待的goroutine,并直接將wait屬性的值賦值給notify。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path 無等待goroutine直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
// 直接更新notify=wait
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 依次調(diào)用goready喚醒goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
邏輯比較簡單不再贅述
總結(jié)
-
sync.Cond一旦創(chuàng)建使用 不允許被拷貝,由noCopy和copyChecker來限制保護。 -
Wait()操作先是遞增notifyList.wait屬性 然后將goroutine封裝進sudog,將notifyList.wait賦值給sudog.ticket,然后將sudog插入notifyList鏈表中 -
Singal()實際是按照notifyList.notify跟notifyList鏈表中節(jié)點的ticket匹配 來確定喚醒的goroutine,因為notifyList.notify和notifyList.wait都是原子遞增的,故而有了FIFO的語義 -
Broadcast()相對簡單 就是喚醒全部等待的goroutine