sync.mutex 源代碼分析
[TOC]
針對 Golang 1.10.3 的 sync.Mutex 進行分析,代碼位置:sync/mutex.go

結(jié)構(gòu)體定義
type Mutex struct {
state int32 // 指代mutex鎖當前的狀態(tài)
sema uint32 // 信號量,用于喚醒goroutine
}
Mutex 中的 state 用于指代鎖當前的狀態(tài),如下所示
1111 1111 ...... 1111 1111
\_________29__________/|||
存儲等待 goroutine 數(shù)量
[0]第0個bit 表示當前 mutex 是否已被某個goroutine所擁有,1=加鎖
[1]第1個bit 表示當前 mutex 是否被喚醒,也就是有某個喚醒的goroutine要嘗試獲取鎖
[2]第2個bit 表示 mutex 當前是否處于饑餓狀態(tài)
常量定義
const (
mutexLocked = 1 << iota
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
mutexLocked 值為1,根據(jù) mutex.state & mutexLocked 得到 mutex 的加鎖狀態(tài),結(jié)果為1表示已加鎖,0表示未加鎖
mutexWoken 值為2(二進制:10),根據(jù) mutex.state & mutexWoken 得到 mutex 的喚醒狀態(tài),結(jié)果為1表示已喚醒,0表示未喚醒
mutexStarving 值為4(二進制:100),根據(jù) mutex.state & mutexStarving 得到 mutex 的饑餓狀態(tài),結(jié)果為1表示處于饑餓狀態(tài),0表示處于正常狀態(tài)
mutexWaiterShift 值為3,根據(jù) mutex.state >> mutexWaiterShift 得到當前等待的 goroutine 數(shù)目
starvationThresholdNs 值為1e6納秒,也就是1毫秒,當?shù)却犃兄嘘犑?goroutine 等待時間超過 starvationThresholdNs,mutex 進入饑餓模式
饑餓模式與正常模式
根據(jù)Mutex的注釋,當前的Mutex有如下的性質(zhì)。這些注釋將極大的幫助我們理解Mutex的實現(xiàn)。
Mutex 有兩種工作模式:正常模式和饑餓模式
正常模式
在正常模式中,所有等待鎖的 goroutine 按照 FIFO 的順序排隊獲取鎖,但是一個被喚醒的等待者有時候并不能獲取 mutex,它還需要和新到來的 goroutine 們競爭 mutex 的使用權(quán)。新到來的 goroutine 具有優(yōu)勢,它們已經(jīng)在 CPU 上運行且它們數(shù)量很多,因此一個被喚醒的等待者有很大的概率獲取不到鎖。在這種情況下,這個被喚醒的 goroutine 會加入到等待隊列的前面。如果一個等待的 goroutine 超過 1ms 沒有獲取鎖,它就會將 mutex 切換到饑餓模式
饑餓模式
在饑餓模式中,鎖的所有權(quán)將從 unlock 的 gorutine 直接交給交給等待隊列中的第一個。新來的 goroutine 將不會嘗試去獲得鎖,即使鎖看起來是 unlock 狀態(tài), 也不會去嘗試自旋操作,而是放在等待隊列的尾部。
如果一個等待的goroutine獲取了鎖,并且滿足一以下其中的任何一個條件,它會將鎖的狀態(tài)轉(zhuǎn)換為正常狀態(tài)。
- 它是等待隊列中的最后一個;
- 它等待的時間少于1ms。
函數(shù)
在分析源代碼之前, 我們要從多線程(goroutine)的并發(fā)場景去理解為什么實現(xiàn)中有很多的分支。
當一個goroutine獲取這個鎖的時候, 有可能這個鎖根本沒有競爭者, 那么這個goroutine輕輕松松獲取了這個鎖。
而如果這個鎖已經(jīng)被別的goroutine擁有, 就需要考慮怎么處理當前的期望獲取鎖的goroutine。
同時, 當并發(fā)goroutine很多的時候,有可能會有多個競爭者, 而且還會有通過信號量喚醒的等待者。
以下代碼已經(jīng)去掉了與核心代碼無關(guān)的 race 代碼。
Lock
Lock 方法申請對 mutex 加鎖,Lock 執(zhí)行的時候,分三種情況:
- 無沖突 通過 CAS 操作把當前狀態(tài)設(shè)置為加鎖狀態(tài)
- 有沖突 開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內(nèi)釋放了該鎖,直接獲得該鎖;如果沒有釋放,進入3
- 有沖突,且已經(jīng)過了自旋階段 通過調(diào)用 semacquire 函數(shù)來讓當前 goroutine 進入等待狀態(tài)
func (m *Mutex) Lock() {
// 如果 mutext 的 state 沒有被鎖,也沒有等待/喚醒的 goroutine , 鎖處于正常狀態(tài),那么獲得鎖,返回.
// 比如鎖第一次被 goroutine 請求時,就是這種狀態(tài)?;蛘哝i處于空閑的時候,也是這種狀態(tài)。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
var waitStartTime int64 // 當前 goroutine 開始等待的時間
starving := false // 當前 goroutine 是否已經(jīng)處于饑餓狀態(tài)
awoke := false // 當前 goroutine 是否被喚醒
iter := 0 // 自旋迭代的次數(shù)
old := m.state // old 保存當前 mutex 的狀態(tài)
for {
// 第一個條件是 state 已被鎖,但是不是饑餓狀態(tài)。如果是饑餓狀態(tài),自旋是沒有用的,鎖的擁有權(quán)直接交給了等待隊列的第一個。
// 第二個條件是還可以自旋,多核、壓力不大并且在一定次數(shù)內(nèi)可以自旋, 具體的條件可以參考`sync_runtime_canSpin`的實現(xiàn)(匯編實現(xiàn),內(nèi)部持續(xù)調(diào)用 PAUSE 指令,消耗 CPU 時間)。
// 如果滿足這兩個條件,不斷自旋來等待鎖被釋放、或者進入饑餓狀態(tài)、或者不能再自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的過程中如果發(fā)現(xiàn) state 還沒有設(shè)置 woken 標識,則設(shè)置它的 woken 標識, 并標記自己為被喚醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 到了這一步, state的狀態(tài)可能是:
// 1. 鎖還沒有被釋放,鎖處于正常狀態(tài)
// 2. 鎖還沒有被釋放, 鎖處于饑餓狀態(tài)
// 3. 鎖已經(jīng)被釋放, 鎖處于正常狀態(tài)
// 4. 鎖已經(jīng)被釋放, 鎖處于饑餓狀態(tài)
// 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已經(jīng)設(shè)置了state的woken標識)
// new 復(fù)制 state的當前狀態(tài), 用來設(shè)置新的狀態(tài)。old 是鎖當前的狀態(tài)
new := old
// 如果 old state 狀態(tài)不是饑餓狀態(tài), new state 設(shè)置鎖, 嘗試通過CAS獲取鎖。
// 如果 old state 狀態(tài)是饑餓狀態(tài), 則不設(shè)置 new state 的鎖,因為饑餓狀態(tài)下鎖直接轉(zhuǎn)給等待隊列的第一個.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 當 mutex 處于加鎖狀態(tài)或饑餓狀態(tài)的時候,新到來的 goroutine 進入等待隊列
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 當前 goroutine 將 mutex 切換為饑餓狀態(tài),但如果當前 mutex 未加鎖,則不需要切換
// Unlock 操作希望饑餓模式存在等待者
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果本 goroutine 已經(jīng)設(shè)置為喚醒狀態(tài), 需要清除 new state 的喚醒標記,
// 因為本 goroutine 要么獲得了鎖,要么進入休眠,總之state的新狀態(tài)不再是woken狀態(tài).
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 調(diào)用 CAS 更新 state 狀態(tài)
// 注意 new 的鎖標記不一定是 true, 也可能只是標記一下鎖的 state 是饑餓狀態(tài).
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果 old state 的狀態(tài)是未被鎖狀態(tài),并且鎖不處于饑餓狀態(tài),
// 那么當前 goroutine 已經(jīng)獲取了鎖的擁有權(quán),返回
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 設(shè)置/計算本 goroutine 的等待時間
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然未能獲取到鎖, 那么就使用 sleep 原語阻塞本 goroutine
// 如果是新來的 goroutine, queueLifo=false, 加入到等待隊列的尾部,耐心等待
// 如果是喚醒的 goroutine, queueLifo=true, 加入到等待隊列的頭部
runtime_SemacquireMutex(&m.sema, queueLifo)
// 如果當前 goroutine 等待時間超過 starvationThresholdNs,mutex 進入饑餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果當前的 state 已經(jīng)是饑餓狀態(tài)
// 那么鎖應(yīng)該處于 Unlock 狀態(tài),鎖被直接交給了本 goroutine
if old&mutexStarving != 0 {
// 如果當前的 state 已被鎖,或者已標記為喚醒, 或者等待的隊列中不為空,
// 那么 state 是一個非法狀態(tài)
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 等待狀態(tài)的 goroutine - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不是饑餓模式了或者當前等待著只剩下一個,退出饑餓模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新狀態(tài)
// 因為已經(jīng)獲得了鎖,退出、返回
atomic.AddInt32(&m.state, delta)
break
}
// 如果當前的鎖是正常模式,本 goroutine 被喚醒,自旋次數(shù)清零,從 for 循環(huán)開始處重新開始
awoke = true
iter = 0
} else {
// 如果CAS不成功,重新獲取鎖的 state, 從 for 循環(huán)開始處重新開始
old = m.state
}
}
}
Unlock
Unlock方法釋放所申請的鎖
func (m *Mutex) Unlock() {
// 如果 state 不是處于鎖的狀態(tài), 那么就是 Unlock 根本沒有加鎖的 mutex, panic
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 釋放鎖,并通知其它等待者
// 鎖如果處于饑餓狀態(tài),直接交給等待隊列的第一個, 喚醒它,讓它去獲取鎖
// mutex 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有等待者,或者已經(jīng)存在一個 goroutine 被喚醒或得到鎖、或處于饑餓模式
// 直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 將等待的 goroutine-1,并設(shè)置 woken 標識
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 設(shè)置新的 state, 這里通過信號量會喚醒一個阻塞的 goroutine 去獲取鎖.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// mutex 饑餓模式,直接將 mutex 擁有權(quán)移交給等待隊列最前端的 goroutine
// 注意此時 state 的 mutex 還沒有加鎖,喚醒的 goroutine 會設(shè)置它。
// 在此期間,如果有新的 goroutine 來請求鎖, 因為 mutex 處于饑餓狀態(tài), mutex 還是被認為處于鎖狀態(tài),
// 新來的 goroutine 不會把鎖搶過去.
runtime_Semrelease(&m.sema, true)
}
}
sync.RWMutex 源碼分析
RWMutex 是讀寫互斥鎖,鎖可以由任意數(shù)量的讀取器或單個寫入器來保持
RWMutex 的零值是一個解鎖的互斥鎖
RWMutex 是搶占式的讀寫鎖,寫鎖之后來的讀鎖是加不上的
代碼位置:sync/rwmutex.go
結(jié)構(gòu)體定義
type RWMutex struct {
w Mutex // 互斥鎖
writerSem uint32 // 寫鎖信號量
readerSem uint32 // 讀鎖信號量
readerCount int32 // 讀鎖計數(shù)器
readerWait int32 // 獲取寫鎖時需要等待的讀鎖釋放數(shù)量
}
常量定義
const rwmutexMaxReaders = 1 << 30 // 支持最多2^30個讀鎖
函數(shù)
以下是 sync.RWMutex 提供的4個方法
Lock
提供寫鎖加鎖操作
func (rw *RWMutex) Lock() {
// 使用 Mutex 鎖
rw.w.Lock()
// 將當前的 readerCount 置為負數(shù),告訴 RUnLock 當前存在寫鎖等待
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 等待讀鎖釋放
runtime_Semacquire(&rw.writerSem)
}
}
Unlock
提供寫鎖釋放操作
func (rw *RWMutex) Unlock() {
// 加上 Lock 的時候減去的 rwmutexMaxReaders
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 沒執(zhí)行Lock調(diào)用Unlock,拋出異常
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 通知當前等待的讀鎖
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 釋放 Mutex 鎖
rw.w.Unlock()
}
RLock
提供讀鎖操作
func (rw *RWMutex) RLock() {
// 每次 goroutine 獲取讀鎖時,readerCount+1
// 如果寫鎖已經(jīng)被獲取,那么 readerCount 在 -rwmutexMaxReaders 與 0 之間,這時掛起獲取讀鎖的 goroutine
// 如果寫鎖沒有被獲取,那么 readerCount > 0,獲取讀鎖, 不阻塞
// 通過 readerCount 判斷讀鎖與寫鎖互斥, 如果有寫鎖存在就掛起goroutine, 多個讀鎖可以并行
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 將 goroutine 排到G隊列的后面,掛起 goroutine
runtime_Semacquire(&rw.readerSem)
}
}
RUnLock
對讀鎖進行解鎖
func (rw *RWMutex) RUnlock() {
// 寫鎖等待狀態(tài),檢查當前是否可以進行獲取
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// r + 1 == 0表示直接執(zhí)行RUnlock()
// r + 1 == -rwmutexMaxReaders表示執(zhí)行Lock()再執(zhí)行RUnlock()
// 兩總情況均拋出異常
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 當讀鎖釋放完畢后,通知寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
}
總結(jié)
sync.Mutex
- 同一個時刻只有一個線程能夠拿到鎖
注意:
- 不要重復(fù)鎖定互斥鎖
- 不要忘記解鎖互斥鎖
- 不要在多個函數(shù)之間直接傳遞互斥鎖
sync.RWMutex
- 如果設(shè)置了一個寫鎖,那么其它讀的線程以及寫的線程都拿不到鎖,這個時候,與互斥鎖的功能相同
- 如果設(shè)置了一個讀鎖,那么其它寫的線程是拿不到鎖的,但是其它讀的線程是可以拿到鎖
讀寫互斥鎖的實現(xiàn)比較有技巧性一些,需要幾點
- 讀鎖不能阻塞讀鎖,引入readerCount實現(xiàn)
- 讀鎖需要阻塞寫鎖,直到所有讀鎖都釋放,引入readerSem實現(xiàn)
- 寫鎖需要阻塞讀鎖,直到所有寫鎖都釋放,引入wirterSem實現(xiàn)
- 寫鎖需要阻塞寫鎖,引入Metux實現(xiàn)