go語言以并發(fā)作為其特性之一,并發(fā)必然會帶來對于資源的競爭,這時候我們就需要使用go提供的sync.Mutex這把互斥鎖來保證臨界資源的訪問互斥。
既然經(jīng)常會用這把鎖,那么了解一下其內(nèi)部實現(xiàn),就能了解這把鎖適用什么場景,特性如何了。
引子
在我第一次看這段代碼的時候,感覺真的是驚為天人,特別是整個Mutex只用到了兩個私有字段,以及一次CAS就加鎖的過程,這其中設(shè)計以及編程的理念真的讓我感覺自愧不如。
在看sync.Mutex的代碼的時候,一定要記住,同時會有多個goroutine會來要這把鎖,所以鎖的狀態(tài)state是可能會一直更改的。
鎖的性質(zhì)
先說結(jié)論:sync.Mutex是把公平鎖。
在源代碼中,有一段注釋:
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
看懂這段注釋對于我們理解mutex這把鎖有很大的幫助,這里面講了這把鎖的設(shè)計理念。大致意思如下:
// 公平鎖
//
// 鎖有兩種模式:正常模式和饑餓模式。
// 在正常模式下,所有的等待鎖的goroutine都會存在一個先進先出的隊列中(輪流被喚醒)
// 但是一個被喚醒的goroutine并不是直接獲得鎖,而是仍然需要和那些新請求鎖的(new arrivial)
// 的goroutine競爭,而這其實是不公平的,因為新請求鎖的goroutine有一個優(yōu)勢——它們正在CPU上
// 運行,并且數(shù)量可能會很多。所以一個被喚醒的goroutine拿到鎖的概率是很小的。在這種情況下,
// 這個被喚醒的goroutine會加入到隊列的頭部。如果一個等待的goroutine有超過1ms(寫死在代碼中)
// 都沒獲取到鎖,那么就會把鎖轉(zhuǎn)變?yōu)轲囸I模式。
//
// 在饑餓模式中,鎖的所有權(quán)會直接從釋放鎖(unlock)的goroutine轉(zhuǎn)交給隊列頭的goroutine,
// 新請求鎖的goroutine就算鎖是空閑狀態(tài)也不會去獲取鎖,并且也不會嘗試自旋。它們只是排到隊列的尾部。
//
// 如果一個goroutine獲取到了鎖之后,它會判斷以下兩種情況:
// 1. 它是隊列中最后一個goroutine;
// 2. 它拿到鎖所花的時間小于1ms;
// 以上只要有一個成立,它就會把鎖轉(zhuǎn)變回正常模式。
// 正常模式會有比較好的性能,因為即使有很多阻塞的等待鎖的goroutine,
// 一個goroutine也可以嘗試請求多次鎖。
// 饑餓模式對于防止尾部延遲來說非常的重要。
在下一步真正看源代碼之前,我們必須要理解一點:當一個goroutine獲取到鎖的時候,有可能沒有競爭者,也有可能會有很多競爭者,那么我們就需要站在不同的goroutine的角度上去考慮goroutine看到的鎖的狀態(tài)和實際狀態(tài)、期望狀態(tài)之間的轉(zhuǎn)化。
字段定義
sync.Mutex只包含兩個字段:
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
其中state是一個表示鎖的狀態(tài)的字段,這個字段會同時被多個goroutine所共用(使用atomic.CAS來保證原子性),第0個bit(1)表示鎖已被獲取,也就是已加鎖,被某個goroutine擁有;第1個bit(2)表示有g(shù)oroutine被喚醒,嘗試獲取鎖;第2個bit(4)標記這把鎖是否為饑餓狀態(tài)。
sema字段就是用來喚醒goroutine所用的信號量。
Lock
在看代碼之前,我們需要有一個概念:每個goroutine也有自己的狀態(tài),存在局部變量里面(也就是函數(shù)棧里面),goroutine有可能是新到的、被喚醒的、正常的、饑餓的。
atomic.CAS
先瞻仰一下驚為天人的一行代碼加鎖的CAS操作:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
...
}
這是第一段代碼,這段代碼調(diào)用了atomic包中的CompareAndSwapInt32這個方法來嘗試快速獲取鎖,這個方法的簽名如下:
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
意思是,如果addr指向的地址中存的值和old一樣,那么就把addr中的值改為new并返回true;否則什么都不做,返回false。由于是atomic中的函數(shù),所以是保證了原子性的。
我們來具體看看CAS的實現(xiàn)(src/runtime/internal/atomic/asm_amd64.s):
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
// 這里參數(shù)及返回值大小加起來是17,是因為一個指針在amd64下是8字節(jié),
// 然后int32分別是占用4字節(jié),最后的返回值是bool占用1字節(jié),所以加起來是17
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
// 為什么不把*val指針放到AX中呢?因為AX有特殊用處,
// 在下面的CMPXCHGL里面,會從AX中讀取要比較的其中一個數(shù)
MOVQ ptr+0(FP), BX
// 所以AX要用來存參數(shù)old
MOVL old+8(FP), AX
// 把new中的數(shù)存到寄存器CX中
MOVL new+12(FP), CX
// 注意這里了,這里使用了LOCK前綴,所以保證操作是原子的
LOCK
// 0(BX) 可以理解為 *val
// 把 AX中的數(shù) 和 第二個操作數(shù) 0(BX)——也就是BX寄存器所指向的地址中存的值 進行比較
// 如果相等,就把 第一個操作數(shù) CX寄存器中存的值 賦給 第二個操作數(shù) BX寄存器所指向的地址
// 并將標志寄存器ZF設(shè)為1
// 否則將標志寄存器ZF清零
CMPXCHGL CX, 0(BX)
// SETE的作用是:
// 如果Zero Flag標志寄存器為1,那么就把操作數(shù)設(shè)為1
// 否則把操作數(shù)設(shè)為0
// 也就是說,如果上面的比較相等了,就返回true,否則為false
// ret+16(FP)代表了返回值的地址
SETEQ ret+16(FP)
RET
如果看不懂也沒太大關(guān)系,只要知道這個函數(shù)的作用,以及這個函數(shù)是原子性的即可。
那么這段代碼的意思就是:先看看這把鎖是不是空閑狀態(tài),如果是的話,直接原子性地修改一下state為已被獲取就行了。多么簡潔(雖然后面的代碼并不是……)!
主流程
接下來具體看主流程的代碼,代碼中有一些位運算看起來比較暈,我會試著用偽代碼在邊上注釋。
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 用來存當前goroutine等待的時間
var waitStartTime int64
// 用來存當前goroutine是否饑餓
starving := false
// 用來存當前goroutine是否已喚醒
awoke := false
// 用來存當前goroutine的循環(huán)次數(shù)(想一想一個goroutine如果循環(huán)了2147483648次咋辦……)
iter := 0
// 復制一下當前鎖的狀態(tài)
old := m.state
// 自旋
for {
// 如果是饑餓情況之下,就不要自旋了,因為鎖會直接交給隊列頭部的goroutine
// 如果鎖是被獲取狀態(tài),并且滿足自旋條件(canSpin見后文分析),那么就自旋等鎖
// 偽代碼:if isLocked() and isNotStarving() and canSpin()
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 將自己的狀態(tài)以及鎖的狀態(tài)設(shè)置為喚醒,這樣當Unlock的時候就不會去喚醒其它被阻塞的goroutine了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 進行自旋(分析見后文)
runtime_doSpin()
iter++
// 更新鎖的狀態(tài)(有可能在自旋的這段時間之內(nèi)鎖的狀態(tài)已經(jīng)被其它goroutine改變)
old = m.state
continue
}
// 當走到這一步的時候,可能會有以下的情況:
// 1. 鎖被獲取+饑餓
// 2. 鎖被獲取+正常
// 3. 鎖空閑+饑餓
// 4. 鎖空閑+正常
// goroutine的狀態(tài)可能是喚醒以及非喚醒
// 復制一份當前的狀態(tài),目的是根據(jù)當前狀態(tài)設(shè)置出期望的狀態(tài),存在new里面,
// 并且通過CAS來比較以及更新鎖的狀態(tài)
// old用來存鎖的當前狀態(tài)
new := old
// 如果說鎖不是饑餓狀態(tài),就把期望狀態(tài)設(shè)置為被獲取(獲取鎖)
// 也就是說,如果是饑餓狀態(tài),就不要把期望狀態(tài)設(shè)置為被獲取
// 新到的goroutine乖乖排隊去
// 偽代碼:if isNotStarving()
if old&mutexStarving == 0 {
// 偽代碼:newState = locked
new |= mutexLocked
}
// 如果鎖是被獲取狀態(tài),或者饑餓狀態(tài)
// 就把期望狀態(tài)中的等待隊列的等待者數(shù)量+1(實際上是new + 8)
// (會不會可能有三億個goroutine等待拿鎖……)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果說當前的goroutine是饑餓狀態(tài),并且鎖被其它goroutine獲取
// 那么將期望的鎖的狀態(tài)設(shè)置為饑餓狀態(tài)
// 如果鎖是釋放狀態(tài),那么就不用切換了
// Unlock期望一個饑餓的鎖會有一些等待拿鎖的goroutine,而不只是一個
// 這種情況下不會成立
if starving && old&mutexLocked != 0 {
// 期望狀態(tài)設(shè)置為饑餓狀態(tài)
new |= mutexStarving
}
// 如果說當前goroutine是被喚醒狀態(tài),我們需要reset這個狀態(tài)
// 因為goroutine要么是拿到鎖了,要么是進入sleep了
if awoke {
// 如果說期望狀態(tài)不是woken狀態(tài),那么肯定出問題了
// 這里看不懂沒關(guān)系,wake的邏輯在下面
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 這句就是把new設(shè)置為非喚醒狀態(tài)
// &^的意思是and not
new &^= mutexWoken
}
// 通過CAS來嘗試設(shè)置鎖的狀態(tài)
// 這里可能是設(shè)置鎖,也有可能是只設(shè)置為饑餓狀態(tài)和等待數(shù)量
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果說old狀態(tài)不是饑餓狀態(tài)也不是被獲取狀態(tài)
// 那么代表當前goroutine已經(jīng)通過CAS成功獲取了鎖
// (能進入這個代碼塊表示狀態(tài)已改變,也就是說狀態(tài)是從空閑到被獲取)
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 如果之前已經(jīng)等待過了,那么就要放到隊列頭
queueLifo := waitStartTime != 0
// 如果說之前沒有等待過,就初始化設(shè)置現(xiàn)在的等待時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然獲取鎖失敗了,就使用sleep原語來阻塞當前goroutine
// 通過信號量來排隊獲取鎖
// 如果是新來的goroutine,就放到隊列尾部
// 如果是被喚醒的等待鎖的goroutine,就放到隊列頭部
runtime_SemacquireMutex(&m.sema, queueLifo)
// 這里sleep完了,被喚醒
// 如果當前goroutine已經(jīng)是饑餓狀態(tài)了
// 或者當前goroutine已經(jīng)等待了1ms(在上面定義常量)以上
// 就把當前goroutine的狀態(tài)設(shè)置為饑餓
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 再次獲取一下鎖現(xiàn)在的狀態(tài)
old = m.state
// 如果說鎖現(xiàn)在是饑餓狀態(tài),就代表現(xiàn)在鎖是被釋放的狀態(tài),當前goroutine是被信號量所喚醒的
// 也就是說,鎖被直接交給了當前goroutine
if old&mutexStarving != 0 {
// 如果說當前鎖的狀態(tài)是被喚醒狀態(tài)或者被獲取狀態(tài),或者說等待的隊列為空
// 那么是不可能的,肯定是出問題了,因為當前狀態(tài)肯定應該有等待的隊列,鎖也一定是被釋放狀態(tài)且未喚醒
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 當前的goroutine獲得了鎖,那么就把等待隊列-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果當前goroutine非饑餓狀態(tài),或者說當前goroutine是隊列中最后一個goroutine
// 那么就退出饑餓模式,把狀態(tài)設(shè)置為正常
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 原子性地加上改動的狀態(tài)
atomic.AddInt32(&m.state, delta)
break
}
// 如果鎖不是饑餓模式,就把當前的goroutine設(shè)為被喚醒
// 并且重置iter(重置spin)
awoke = true
iter = 0
} else {
// 如果CAS不成功,也就是說沒能成功獲得鎖,鎖被別的goroutine獲得了或者鎖一直沒被釋放
// 那么就更新狀態(tài),重新開始循環(huán)嘗試拿鎖
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
以上為什么CAS能拿到鎖呢?因為CAS會原子性地判斷old state和當前鎖的狀態(tài)是否一致;而總有一個goroutine會滿足以上條件成功拿鎖。
canSpin
接下來我們來看看上文提到的canSpin條件如何:
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// 這里的active_spin是個常量,值為4
// 簡單來說,sync.Mutex是有可能被多個goroutine競爭的,所以不應該大量自旋(消耗CPU)
// 自旋的條件如下:
// 1. 自旋次數(shù)小于active_spin(這里是4)次;
// 2. 在多核機器上;
// 3. GOMAXPROCS > 1并且至少有一個其它的處于運行狀態(tài)的P;
// 4. 當前P沒有其它等待運行的G;
// 滿足以上四個條件才可以進行自旋。
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
所以可以看出來,并不是一直無限自旋下去的,當自旋次數(shù)到達4次或者其它條件不符合的時候,就改為信號量拿鎖了。
doSpin
然后我們來看看doSpin的實現(xiàn)(其實也沒啥好看的):
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
這是一個匯編實現(xiàn)的函數(shù),簡單看兩眼amd64上的實現(xiàn):
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
看起來沒啥好看的,直接跳過吧。
Unlock
接下來我們來看看Unlock的實現(xiàn),對于Unlock來說,有兩個比較關(guān)鍵的特性:
- 如果說鎖不是處于locked狀態(tài),那么對鎖執(zhí)行Unlock會導致panic;
- 鎖和goroutine沒有對應關(guān)系,所以我們完全可以在goroutine 1中獲取到鎖,然后在goroutine 2中調(diào)用Unlock來釋放鎖(這是什么騷操作?。m然不推薦大家這么干……)
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 這里獲取到鎖的狀態(tài),然后將狀態(tài)減去被獲取的狀態(tài)(也就是解鎖),稱為new(期望)狀態(tài)
// 注意以上兩個操作是原子的,所以不用擔心多個goroutine并發(fā)的問題
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果說,期望狀態(tài)加上被獲取的狀態(tài),不是被獲取的話
// 那么就panic
// 在這里給大家提一個問題:干嘛要這么大費周章先減去再加上,直接比較一下原來鎖的狀態(tài)是否被獲取不就完事了?
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果說new狀態(tài)(也就是鎖的狀態(tài))不是饑餓狀態(tài)
if new&mutexStarving == 0 {
// 復制一下原先狀態(tài)
old := new
for {
// 如果說鎖沒有等待拿鎖的goroutine
// 或者鎖被獲取了(在循環(huán)的過程中被其它goroutine獲取了)
// 或者鎖是被喚醒狀態(tài)(表示有g(shù)oroutine被喚醒,不需要再去嘗試喚醒其它goroutine)
// 或者鎖是饑餓模式(會直接轉(zhuǎn)交給隊列頭的goroutine)
// 那么就直接返回,啥都不用做了
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 走到這一步的時候,說明鎖目前還是空閑狀態(tài),并且沒有g(shù)oroutine被喚醒且隊列中有g(shù)oroutine等待拿鎖
// 那么我們就要把鎖的狀態(tài)設(shè)置為被喚醒,等待隊列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 又是熟悉的CAS
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果狀態(tài)設(shè)置成功了,我們就通過信號量去喚醒goroutine
runtime_Semrelease(&m.sema, false)
return
}
// 循環(huán)結(jié)束的時候,更新一下狀態(tài),因為有可能在執(zhí)行的過程中,狀態(tài)被修改了(比如被Lock改為了饑餓狀態(tài))
old = m.state
}
} else {
// 如果是饑餓狀態(tài)下,那么我們就直接把鎖的所有權(quán)通過信號量移交給隊列頭的goroutine就好了
// handoff = true表示直接把鎖交給隊列頭部的goroutine
// 注意:在這個時候,鎖被獲取的狀態(tài)沒有被設(shè)置,會由被喚醒的goroutine在喚醒后設(shè)置
// 但是當鎖處于饑餓狀態(tài)的時候,我們也認為鎖是被獲取的(因為我們手動指定了獲取的goroutine)
// 所以說新來的goroutine不會嘗試去獲取鎖(在Lock中有體現(xiàn))
runtime_Semrelease(&m.sema, true)
}
}
總結(jié)
根據(jù)以上代碼的分析,可以看出,sync.Mutex這把鎖在你的工作負載(所需時間)比較低,比如只是對某個關(guān)鍵變量賦值的時候,性能還是比較好的,但是如果說對于臨界資源的操作耗時很長(特別是單個操作就大于1ms)的話,實際上性能上會有一定的問題,這也就是我們經(jīng)??吹健暗逆i一直處于饑餓狀態(tài)”的問題,對于這種情況,可能就需要另尋他法了。
好了,至此整個sync.Mutex的分析就此結(jié)束了,雖然只有短短200行代碼(包括150行注釋,實際代碼估計就50行),但是其中的算法、設(shè)計的思想、編程的理念卻是值得感悟,所謂大道至簡、少即是多可能就是如此吧。