鎖
type Locker interface {
Lock()
Unlock()
}
Mutex 互斥鎖
互斥即不可同時運行。即使用了互斥鎖的兩個代碼片段互相排斥,只有其中一個代碼片段執(zhí)行完成后,另一個才能執(zhí)行。
type Mutex struct {
state int32
sema uint32
}
state 表示當(dāng)前互斥鎖的狀態(tài),而 sema 是用于控制鎖狀態(tài)的信號量。
互斥鎖的狀態(tài)比較復(fù)雜,如下圖所示,最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當(dāng)前有多少個 Goroutine 在等待互斥鎖的釋放

在默認(rèn)情況下,互斥鎖的所有狀態(tài)位都是 0,int32 中的不同位分別表示了不同的狀態(tài):
- mutexLocked — 表示互斥鎖的鎖定狀態(tài);
- mutexWoken — 表示從正常模式被從喚醒;
- mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài);
- waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個數(shù);
Mutex 正常模式和饑餓模式
- 在正常模式下,鎖的等待者會按照先進(jìn)先出的順序獲取鎖。
- 但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時,大概率會獲取不到鎖,為了減少這種情況的出現(xiàn),一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當(dāng)前互斥鎖切換饑餓模式,防止部分 Goroutine 被『餓死』。
- 在饑餓模式中,互斥鎖會直接交給等待隊列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖、也不會進(jìn)入自旋狀態(tài),它們只會在隊列的末尾等待。
- 如果一個 Goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當(dāng)前的互斥鎖就會切換回正常模式。
- 與饑餓模式相比,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時。
const (
mutexLocked = 1 << iota //1 互斥鎖被鎖定 + 未被喚醒
mutexWoken //2 互斥鎖未鎖定 + 被喚醒
mutexStarving //4 互斥鎖未鎖定 + 未被喚醒 + 饑餓模式啟動
mutexWaiterShift = iota //3 互斥鎖被鎖定 + 被喚醒
starvationThresholdNs = 1e6
)
加鎖和解鎖
func (m *Mutex) Lock() {
// 當(dāng)鎖的狀態(tài)是 0 時,將state置位 mutexLocked 為 1:
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 如果互斥鎖的狀態(tài)不是 0 時就會調(diào)用 sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
該方法的主體是一個非常大 for 循環(huán),這里將它分成幾個部分介紹獲取鎖的過程:
- 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋;
- 通過自旋等待互斥鎖的釋放;
- 計算互斥鎖的最新狀態(tài);
- 更新互斥鎖的狀態(tài)并獲取鎖;
自旋是一種多線程同步機制,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會一直保持 CPU 的占用,持續(xù)檢查某個條件是否為真
在多核的 CPU 上,自旋可以避免 Goroutine 的切換,使用恰當(dāng)會對性能帶來很大的增益,但是使用的不恰當(dāng)就會拖慢整個程序,所以 Goroutine 進(jìn)入自旋的條件非常苛刻
- 互斥鎖只有在普通模式才能進(jìn)入自旋;
- runtime.sync_runtime_canSpin 需要返回 true:
- 運行在多 CPU 的機器上;
- 當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;
- 當(dāng)前機器上至少存在一個正在運行的處理器 P 并且處理的運行隊列為空;
下面代碼為 進(jìn)入自旋 邏輯
// 開始等待時間戳
var waitStartTime int64
// 饑餓模式標(biāo)識
starving := false
// 喚醒標(biāo)識
awoke := false
// 自旋次數(shù)
iter := 0
// 保存當(dāng)前對象鎖狀態(tài)
old := m.state
for {
// 鎖是非饑餓狀態(tài),鎖還沒被釋放,嘗試自旋
// 判斷相當(dāng)于xxxx...x0xx & 0101 = 01,當(dāng)前對象狀態(tài)為:xxxx…x0xx 當(dāng)前對象鎖被使用
// runtime_canSpin(iter) 根據(jù)iter自旋次數(shù)判斷
// 互斥鎖被鎖定時 進(jìn)入自旋狀態(tài) continue
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke &&
// 判斷相當(dāng)于: xxxx...xx0x & 0010 = 0
// 當(dāng)前對象狀態(tài)為未被喚醒時
old&mutexWoken == 0 &&
// 當(dāng)前有 Goroutine 在等待互斥鎖的釋放時,也就是有g(shù)oroution在排隊時 old>>mutexWaiterShift != 0 為 true
old>>mutexWaiterShift != 0 &&
// 將對象 改為喚醒狀態(tài):xxxx...xx0x | 0010 = xxxx...xx1x
// 在將 m.state 改為被喚醒狀態(tài)
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
{
// 告知解鎖,不要喚醒其他阻塞的goroutines
awoke = true
}
// 進(jìn)入自旋狀態(tài) 當(dāng)前goroutine并不掛起,仍然在占用cpu資源,重試一定次數(shù)后,退出自旋狀態(tài)
runtime_doSpin()
//表示自旋次數(shù)
iter++
// 保存mutex對象即將被設(shè)置成的狀態(tài)
//再次獲取鎖的狀態(tài),之后會檢查是否鎖被釋放了
old = m.state
continue
}
一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會調(diào)用runtime.sync_runtime_doSpin 和 runtime.procyield 并執(zhí)行 30 次的 PAUSE 指令,該指令只會占用 CPU 并消耗 CPU 時間:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
處理了自旋相關(guān)的特殊邏輯之后,互斥鎖會根據(jù)上下文計算當(dāng)前互斥鎖最新的狀態(tài)。幾個不同的條件分別會更新 state 字段中存儲的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
new := old
// old 后三位為 mutexLocked,mutexWaiterShift,mutexWoken 時 old&mutexStarving == 0 為true
// xxxx...x0xx & 0100 = 0 xxxx...x0xx
// old&mutexStarving == 0 篩選當(dāng)前狀態(tài)為正常模式(非饑餓模式)
// 將新來的goroutines 互斥鎖鎖定
if old&mutexStarving == 0 {
// 將未鎖定狀態(tài)改為鎖定狀態(tài)
// 非饑餓狀態(tài),加鎖
new |= mutexLocked
}
// xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;
// old 互斥鎖被鎖定時但是狀態(tài)為饑餓模式時,mutex的等待goroutine數(shù)目加1
if old&(mutexLocked|mutexStarving) != 0 {
// new + 8 在等待互斥鎖的釋放的Goroutine數(shù)量 +1
// 更新阻塞goroutine的數(shù)量,表示mutex的等待goroutine數(shù)目加1
new += 1 << mutexWaiterShift
}
// xxxx...xxx1 & 0001 != 0;鎖狀態(tài)為鎖定狀態(tài)
if starving && old&mutexLocked != 0 {
// new + 4 表示從正常模式進(jìn)入饑餓模式
// xxxx...xxx | 0100 => xxxx...x1xx
new |= mutexStarving
}
// 當(dāng) 喚醒標(biāo)志為 true時
if awoke {
// xxxx...xx1x & 0010 = 0, 如果喚醒標(biāo)志為0 ,表示為被喚醒時,panic
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設(shè)置喚醒狀態(tài)位0,goroutine是被喚醒的,新狀態(tài)清除喚醒標(biāo)志
new &^= mutexWoken
}
計算了新的互斥鎖狀態(tài)之后,會使用 CAS 函數(shù) sync/atomic.compareandswapint32 更新狀態(tài)
//判斷cas鎖是否 設(shè)置新狀態(tài) 成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//原來鎖的狀態(tài)已釋放,并且不是饑餓狀態(tài),正常請求到了鎖,返回
// xxxx...x0x0 & 0101 = 0, 互斥鎖未加鎖且為正常模式(非饑餓模式)
if old&(mutexLocked|mutexStarving) == 0 {
//結(jié)束cas
break
}
// 如果以前就在隊列里面,加入到隊列頭
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
//獲取當(dāng)前運行納秒時間戳
waitStartTime = runtime_nanotime()
}
//通過信號量保證資源不會被兩個 Goroutine 獲取
//runtime.sync_runtime_SemacquireMutex 會在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 判斷 等待時間 是否超出限制 1e6 將starving 標(biāo)志為 true
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// xxxx...x1xx & 0100 != 0 判斷鎖狀態(tài)是否 進(jìn)入饑餓狀態(tài)
if old&mutexStarving != 0 {
//當(dāng)前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當(dāng)前 Goroutine,互斥鎖還會從饑餓模式中退出
// xxxx...xx00 & 0011 = 0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加鎖并且將waiter數(shù)減1
// delta -7 7 = 0111
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//判斷 等待隊列中 是否只存在當(dāng)前 Goroutine
if !starving || old>>mutexWaiterShift == 1 {
// delta -11 11 = 1011
delta -= mutexStarving
}
// m.state - 7 表示當(dāng)前 Goroutine 會獲得互斥鎖
// m.state - 11 表示當(dāng)前 Goroutine 會獲得互斥鎖還會從饑餓模式中退出
atomic.AddInt32(&m.state, delta)
break
}
// 設(shè)置喚醒標(biāo)記 為true
awoke = true
// 重置迭代次數(shù)
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
如果沒有通過 CAS 獲得鎖,會調(diào)用 runtime.sync_runtime_SemacquireMutex 通過信號量保證資源不會被兩個 Goroutine 獲取
runtime.sync_runtime_SemacquireMutex 會在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回,sync.Mutex.Lock 的剩余代碼也會繼續(xù)執(zhí)行。
- 在正常模式下,這段代碼會設(shè)置喚醒和饑餓標(biāo)記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán);
- 在饑餓模式下,當(dāng)前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當(dāng)前 Goroutine,互斥鎖還會從饑餓模式中退出;
互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單,該過程會先使用 sync/atomic.AddInt32 函數(shù)快速解鎖,這時會發(fā)生下面的兩種情況:
- 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖;
- 如果該函數(shù)返回的新狀態(tài)不等于 0,這段代碼會調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// m.state - 1
// 新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖
// 新狀態(tài)不等于 0,這段代碼會調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow 會先校驗鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了會直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序。
在正常情況下會根據(jù)當(dāng)前互斥鎖的狀態(tài),分別處理正常模式和饑餓模式下的互斥鎖:
func (m *Mutex) unlockSlow(new int32) {
// 校驗鎖狀態(tài)的合法性
if (new+mutexLocked)&mutexLocked == 0 {
//如果 當(dāng)前互斥鎖已經(jīng)被解鎖過了會直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序
throw("sync: unlock of unlocked mutex")
}
// 分別處理正常模式和饑餓模式下的互斥鎖
if new&mutexStarving == 0 {
// 正常狀態(tài)
old := new
for {
// 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving、mutexWoken 狀態(tài)不都為 0(有喚醒者或鎖已經(jīng)加鎖),那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者;
//如果沒有其它的 waiter,說明對這個鎖的競爭的 goroutine 只有一個,那就可以直接返回了;如果這個時候有喚醒的 goroutine,或者是又被別人加了鎖,那么,無需我們操勞,其它 goroutine 自己干得都很好,當(dāng)前的這個 goroutine 就可以放心返回了。
//xxxx…x000 & (0001 | 0010 | 0100) => xxxx…x000 & 0111 = 0
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 如果互斥鎖存在等待者,會通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);
//如果有等待者,并且沒有喚醒的 waiter,那就需要喚醒一個等待的 waiter。在喚醒之前,需要將 waiter 數(shù)量減 1,并且將 mutexWoken 標(biāo)志設(shè)置上,這樣,Unlock 就可以返回了。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饑餓狀態(tài)
// 當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態(tài)
runtime_Semrelease(&m.sema, true, 1)
}
}
在正常模式下,上述代碼會使用如下所示的處理過程:
- 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving、mutexWoken 狀態(tài)不都為 0,那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者;
- 如果互斥鎖存在等待者,會通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);
在饑餓模式下,上述代碼會直接調(diào)用 sync.runtime_Semrelease 將當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態(tài);
總結(jié)
互斥鎖的加鎖過程比較復(fù)雜,它涉及自旋、信號量以及調(diào)度等概念:
- 如果互斥鎖處于初始化狀態(tài),會通過置位 mutexLocked 加鎖;
- 如果互斥鎖處于 mutexLocked 狀態(tài)并且在普通模式下工作,會進(jìn)入自旋,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
- 如果當(dāng)前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到饑餓模式;
- 互斥鎖在正常情況下會通過 runtime.sync_runtime_SemacquireMutex 將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài),等待鎖的持有者喚醒;
- 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,那么它會將互斥鎖切換回正常模式;
互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數(shù)不多、邏輯清晰,也比較容易理解:
- 當(dāng)互斥鎖已經(jīng)被解鎖時,調(diào)用 sync.Mutex.Unlock 會直接拋出異常;
- 當(dāng)互斥鎖處于饑餓模式時,將鎖的所有權(quán)交給隊列中的下一個等待者,等待者會負(fù)責(zé)設(shè)置 mutexLocked 標(biāo)志位;
- 當(dāng)互斥鎖處于普通模式時,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖,會直接返回;在其他情況下會通過 sync.runtime_Semrelease 喚醒對應(yīng)的 Goroutine;
RWMutex 讀寫鎖
讀寫互斥鎖 sync.RWMutex 是細(xì)粒度的互斥鎖,它不限制資源的并發(fā)讀,但是讀寫、寫寫操作無法并行執(zhí)行。
sync.RWMutex 中總共包含以下 5 個字段:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- w — 復(fù)用互斥鎖提供的能力;
- writerSem 和 readerSem — 分別用于寫等待讀和讀等待寫:
- readerCount 存儲了當(dāng)前正在執(zhí)行的讀操作數(shù)量;
- readerWait 表示當(dāng)寫操作被阻塞時等待的讀操作個數(shù);
寫鎖
當(dāng)資源的使用者想要獲取寫鎖時,需要調(diào)用 sync.RWMutex.Lock 方法:
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
//進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
- 調(diào)用結(jié)構(gòu)體持有的 sync.Mutex 結(jié)構(gòu)體的 sync.Mutex.Lock 阻塞后續(xù)的寫操作;因為互斥鎖已經(jīng)被獲取,其他 Goroutine 在獲取寫鎖時會進(jìn)入自旋或者休眠;
- 調(diào)用 sync/atomic.AddInt32 函數(shù)阻塞后續(xù)的讀操作:
- 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 會調(diào)用 runtime.sync_runtime_SemacquireMutex 進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒;
寫鎖的釋放會調(diào)用 sync.RWMutex.Unlock:
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
與加鎖的過程正好相反,寫鎖的釋放分以下幾個執(zhí)行:
- 調(diào)用 sync/atomic.AddInt32 函數(shù)將 readerCount 變回正數(shù),釋放讀鎖;
- 通過 for 循環(huán)釋放所有因為獲取讀鎖而陷入等待的 Goroutine:
- 調(diào)用 sync.Mutex.Unlock 釋放寫鎖;
獲取寫鎖時會先阻塞寫鎖的獲取,后阻塞讀鎖的獲取,這種策略能夠保證讀操作不會被連續(xù)的寫操作『餓死』。
讀鎖
讀鎖的加鎖方法 sync.RWMutex.RLock 很簡單,該方法會通過 sync/atomic.AddInt32 將 readerCount 加一:
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
- 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖,當(dāng)前 Goroutine 就會調(diào)用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待鎖的釋放;
- 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖,當(dāng)前方法會成功返回;
當(dāng) Goroutine 想要釋放讀鎖時,會調(diào)用如下所示的 sync.RWMutex.RUnlock 方法:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
該方法會先減少正在讀資源的 readerCount 整數(shù),根據(jù) sync/atomic.AddInt32 的返回值不同會分別進(jìn)行處理:
- 如果返回值大于等于零 — 讀鎖直接解鎖成功;
- 如果返回值小于零 — 有一個正在執(zhí)行的寫操作,在這時會調(diào)用sync.RWMutex.rUnlockSlow 方法;
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow 會減少獲取鎖的寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem,該信號量被觸發(fā)時,調(diào)度器就會喚醒嘗試獲取寫鎖的 Goroutine。
雖然讀寫互斥鎖 sync.RWMutex 提供的功能比較復(fù)雜,但是因為它建立在 sync.Mutex 上,所以實現(xiàn)會簡單很多。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:
- 調(diào)用 sync.RWMutex.Lock 嘗試獲取寫鎖時;
- 每次 sync.RWMutex.RUnlock 都會將 readerCount 其減一,當(dāng)它歸零時該 Goroutine 會獲得寫鎖;
- 將 readerCount 減少 rwmutexMaxReaders 個數(shù)以阻塞后續(xù)的讀操作;
- 調(diào)用 sync.RWMutex.Unlock 釋放寫鎖時,會先通知所有的讀操作,然后才會釋放持有的互斥鎖;
讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時提升性能。
WaitGroup
sync.WaitGroup 可以等待一組 Goroutine 的返回,一個比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個 Goroutine 中并發(fā)執(zhí)行,加快程序處理的速度。

WaitGroup 等待多個 Goroutine
結(jié)構(gòu)體
sync.WaitGroup 結(jié)構(gòu)體中只包含兩個成員變量:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
- noCopy — 保證 sync.WaitGroup 不會被開發(fā)者通過再賦值的方式拷貝;
- state1 — 存儲著狀態(tài)和信號量;
sync.noCopy 是一個特殊的私有結(jié)構(gòu)體,tools/go/analysis/passes/copylock 包中的分析器會在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy 或者實現(xiàn)了 Lock 和 Unlock 方法
sync.WaitGroup 結(jié)構(gòu)體中包含一個總共占用 12 字節(jié)的數(shù)組,這個數(shù)組會存儲當(dāng)前結(jié)構(gòu)體的狀態(tài),在 64 位與 32 位的機器上表現(xiàn)也非常不同。

WaitGroup 在 64 位和 32 位機器的不同狀態(tài)
- 64 位機器上本身就能保證 64 位對齊,所以按照 64 位對齊來取數(shù)據(jù),拿到 state1[0], state1[1] 本身就是64 位對齊的。但是 32 位機器上并不能保證 64 位對齊,因為 32 位機器是 4 字節(jié)對齊,如果也按照 64 位機器取 state[0],state[1] 就有可能會造成 atmoic 的使用錯誤。
- 32 位機器上空出第一個 32 位,也就使后面 64 位天然滿足 64 位對齊,第一個 32 位放入 sema 剛好合適
WaitGroup.state1 其實代表三個字段:counter,waiter,sema
- counter :可以理解為一個計數(shù)器,計算經(jīng)過 wg.Add(N), wg.Done() 后的值。
- waiter :當(dāng)前等待 WaitGroup 任務(wù)結(jié)束的等待者數(shù)量。其實就是調(diào)用 wg.Wait() 的次數(shù),所以通常這個值是 1 。
- sema : 信號量,用來喚醒 Wait() 函數(shù)。
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出它的狀態(tài)和信號量。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
接口
sync.WaitGroup 對外暴露了三個方法 — sync.WaitGroup.Add、sync.WaitGroup.Wait 和 sync.WaitGroup.Done.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//更新worker計數(shù)器
state := atomic.AddUint64(statep, uint64(delta)<<32)
//worker計數(shù)器:v 是 statep *uint64 的左32位
//waiter計數(shù)器:w 是 statep *uint64 的右32位
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// worker 大于 0 或者 waiter 等于 0 說明還有Goroutine沒有執(zhí)行完,直接返回
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 狀態(tài)設(shè)置為0
*statep = 0
// 通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine(喚醒 Wait())
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計數(shù)器 counter。
雖然 sync.WaitGroup.Add 方法傳入的參數(shù)可以為負(fù)數(shù),但是計數(shù)器只能是非負(fù)數(shù),一旦出現(xiàn)負(fù)數(shù)就會發(fā)生程序崩潰。
當(dāng)調(diào)用計數(shù)器歸零,即所有任務(wù)都執(zhí)行完成時,才會通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine。
sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
sync.WaitGroup 的另一個方法 sync.WaitGroup.Wait 會在計數(shù)器大于 0 并且不存在等待的 Goroutine 時,調(diào)用 runtime.sync_runtime_Semacquire 陷入睡眠。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 循環(huán)判斷 是否滿足退出條件
for {
//worker計數(shù)器:v 是 statep *uint64 的左32位
//waiter計數(shù)器:w 是 statep *uint64 的右32位
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 當(dāng)計數(shù)器為0時,表示所有Goroutine 都執(zhí)行完成,立即返回
if v == 0 {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 更新waiter計數(shù)器 atomic.CompareAndSwapUint64 對uint64值執(zhí)行比較和交換操作
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
// Goroutine 進(jìn)入睡眠狀態(tài),等待信號量喚醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
當(dāng) sync.WaitGroup 的計數(shù)器歸零時,陷入睡眠狀態(tài)的 Goroutine 會被喚醒,上述方法也會立刻返回。
通過對 sync.WaitGroup 的分析和研究,我們能夠得出以下結(jié)論:
- sync.WaitGroup 必須在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
- sync.WaitGroup.Done 只是對 sync.WaitGroup.Add 方法的簡單封裝,我們可以向 sync.WaitGroup.Add 方法傳入任意負(fù)數(shù)(需要保證計數(shù)器非負(fù))快速將計數(shù)器歸零以喚醒等待的 Goroutine;
- 可以同時有多個 Goroutine 等待當(dāng)前 sync.WaitGroup 計數(shù)器的歸零,這些 Goroutine 會被同時喚醒;
once
Go 語言標(biāo)準(zhǔn)庫中 sync.Once 可以保證在 Go 程序運行期間的某段代碼只會執(zhí)行一次
結(jié)構(gòu)體
每一個 sync.Once 結(jié)構(gòu)體中都只包含一個用于標(biāo)識代碼塊是否執(zhí)行過的 done 以及一個互斥鎖 sync.Mutex:
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do 是 sync.Once 結(jié)構(gòu)體對外唯一暴露的方法,該方法會接收一個入?yún)榭盏暮瘮?shù):
- 如果傳入的函數(shù)已經(jīng)執(zhí)行過,會直接返回;
- 如果傳入的函數(shù)沒有執(zhí)行過,會調(diào)用 sync.Once.doSlow 執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 為當(dāng)前 Goroutine 獲取互斥鎖;
- 執(zhí)行傳入的無入?yún)⒑瘮?shù);
- 運行延遲函數(shù)調(diào)用,將成員變量 done 更新成 1;
sync.Once 會通過成員變量 done 確保函數(shù)不會執(zhí)行第二次。
作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once 結(jié)構(gòu)體,它使用互斥鎖和 sync/atomic 包提供的方法實現(xiàn)了某個函數(shù)在程序運行期間只能執(zhí)行一次的語義。在使用該結(jié)構(gòu)體時,我們也需要注意以下的問題:
- sync.Once.Do方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了 panic;
- 兩次調(diào)用 sync.Once.Do 方法傳入不同的函數(shù)只會執(zhí)行第一次調(diào)傳入的函數(shù);
Cond
sync.Cond 用來協(xié)調(diào)想要訪問共享資源的 goroutine。
sync.Cond 經(jīng)常用在多個 goroutine 等待,一個 goroutine 通知(事件發(fā)生)的場景。如果是一個通知,一個等待,使用互斥鎖或 channel 就能搞定了。
使用場景
- 有一個協(xié)程在異步地接收數(shù)據(jù),剩下的多個協(xié)程必須等待這個協(xié)程接收完數(shù)據(jù),才能讀取到正確的數(shù)據(jù)。在這種情況下,如果單純使用 chan 或互斥鎖,那么只能有一個協(xié)程可以等待,并讀取到數(shù)據(jù),沒辦法通知其他的協(xié)程也讀取數(shù)據(jù)。
- 這個時候,就需要有個全局的變量來標(biāo)志第一個協(xié)程數(shù)據(jù)是否接受完畢,剩下的協(xié)程,反復(fù)檢查該變量的值,直到滿足要求。或者創(chuàng)建多個 channel,每個協(xié)程阻塞在一個 channel 上,由接收數(shù)據(jù)的協(xié)程在數(shù)據(jù)接收完畢后,逐個通知??傊?,需要額外的復(fù)雜度來完成這件事
結(jié)構(gòu)體
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
- noCopy — 用于保證結(jié)構(gòu)體不會在編譯期間拷貝;
- copyChecker — 用于禁止運行期間發(fā)生的拷貝;
- L — 用于保護內(nèi)部的 notify 字段,Locker 接口類型的變量;
- notify — 一個 Goroutine 的鏈表,它是實現(xiàn)同步機制的核心結(jié)構(gòu);
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
在 sync.notifyList 結(jié)構(gòu)體中,head 和 tail 分別指向的鏈表的頭和尾,wait 和 notify 分別表示當(dāng)前正在等待的和已經(jīng)通知到的 Goroutine 的索引。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
newCond
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
NewCond 創(chuàng)建 Cond 實例時,需要關(guān)聯(lián)一個鎖。
接口
wait
sync.Cond 對外暴露的 sync.Cond.Wait 方法會將當(dāng)前 Goroutine 陷入休眠狀態(tài),它的執(zhí)行過程分成以下兩個步驟:
- 調(diào)用 runtime.notifyListAdd 將等待計數(shù)器加一并解鎖;
- 調(diào)用 runtime.notifyListWait 等待其他 Goroutine 的喚醒并加鎖:
func (c *Cond) Wait() {
// 檢查c是否是被復(fù)制的,如果是就panic
c.checker.check()
// 將當(dāng)前goroutine加入等待隊列
t := runtime_notifyListAdd(&c.notify)
// 解鎖
c.L.Unlock()
// 等待隊列中的所有的goroutine執(zhí)行等待喚醒操作
runtime_notifyListWait(&c.notify, t)
// 鎖
c.L.Lock()
}
調(diào)用 Wait 會自動釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine,因此當(dāng)前協(xié)程會阻塞在 Wait 方法調(diào)用的地方。
如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時,會重新給 c.L 加鎖,并且繼續(xù)執(zhí)行 Wait 后面的代碼。
runtime.notifyListWait 會獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
除了將當(dāng)前 Goroutine 追加到鏈表的末端之外,我們還會調(diào)用 runtime.goparkunlock 將當(dāng)前 Goroutine 陷入休眠,該函數(shù)也是在 Go 語言切換 Goroutine 時經(jīng)常會使用的方法,它會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒。

sync.Cond.Signal 和 sync.Cond.Broadcast 就是用來喚醒陷入休眠的 Goroutine 的方法,它們的實現(xiàn)有一些細(xì)微的差別:
- sync.Cond.Signal 方法會喚醒隊列最前面的 Goroutine;
- sync.Cond.Broadcast 方法會喚醒隊列中全部的 Goroutine;
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
Signal 只喚醒任意 1 個等待條件變量 c 的 goroutine,無需鎖保護。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Broadcast 喚醒所有等待條件變量 c 的 goroutine,無需鎖保護。
runtime.notifyListNotifyOne 只會從 sync.notifyList 鏈表中找到滿足 sudog.ticket == l.notify 條件的 Goroutine 并通過 runtime.readyWithTime 喚醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll 會依次通過 runtime.readyWithTime 喚醒鏈表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的喚醒順序也是按照加入隊列的先后順序,先加入的會先被喚醒,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度。
在一般情況下,我們都會先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件,當(dāng)滿足喚醒條件時,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個或者全部的 Goroutine。
Cond使用例子
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
sync.Cond 不是一個常用的同步機制,但是在條件長時間無法滿足時,與使用 for {} 進(jìn)行忙碌等待相比,sync.Cond 能夠讓出處理器的使用權(quán),提高 CPU 的利用率。使用時我們也需要注意以下問題:
- sync.Cond.Wait 在調(diào)用之前一定要使用獲取互斥鎖,否則會觸發(fā)程序崩潰;
- sync.Cond.Signal 喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
- sync.Cond.Broadcast 會按照一定順序廣播通知等待的全部 Goroutine;