Go|sync.mutex 源代碼分析

sync.mutex 源代碼分析

[TOC]

針對 Golang 1.10.3 的 sync.Mutex 進行分析,代碼位置:sync/mutex.go

sync_mutex.jpeg

結(jié)構(gòu)體定義


type Mutex struct {
    state int32  // 指代mutex鎖當前的狀態(tài)
    sema  uint32 // 信號量,用于喚醒goroutine
}

Mutex 中的 state 用于指代鎖當前的狀態(tài),如下所示


1111 1111 ...... 1111 1111
\_________29__________/|||
 存儲等待 goroutine 數(shù)量 
 
[0]第0個bit 表示當前 mutex 是否已被某個goroutine所擁有,1=加鎖
[1]第1個bit 表示當前 mutex 是否被喚醒,也就是有某個喚醒的goroutine要嘗試獲取鎖
[2]第2個bit 表示 mutex 當前是否處于饑餓狀態(tài)

常量定義


const (
    mutexLocked = 1 << iota
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
    starvationThresholdNs = 1e6
)

  • mutexLocked 值為1,根據(jù) mutex.state & mutexLocked 得到 mutex 的加鎖狀態(tài),結(jié)果為1表示已加鎖,0表示未加鎖

  • mutexWoken 值為2(二進制:10),根據(jù) mutex.state & mutexWoken 得到 mutex 的喚醒狀態(tài),結(jié)果為1表示已喚醒,0表示未喚醒

  • mutexStarving 值為4(二進制:100),根據(jù) mutex.state & mutexStarving 得到 mutex 的饑餓狀態(tài),結(jié)果為1表示處于饑餓狀態(tài),0表示處于正常狀態(tài)

  • mutexWaiterShift 值為3,根據(jù) mutex.state >> mutexWaiterShift 得到當前等待的 goroutine 數(shù)目

  • starvationThresholdNs 值為1e6納秒,也就是1毫秒,當?shù)却犃兄嘘犑?goroutine 等待時間超過 starvationThresholdNs,mutex 進入饑餓模式

饑餓模式與正常模式

根據(jù)Mutex的注釋,當前的Mutex有如下的性質(zhì)。這些注釋將極大的幫助我們理解Mutex的實現(xiàn)。

Mutex 有兩種工作模式:正常模式和饑餓模式

正常模式

在正常模式中,所有等待鎖的 goroutine 按照 FIFO 的順序排隊獲取鎖,但是一個被喚醒的等待者有時候并不能獲取 mutex,它還需要和新到來的 goroutine 們競爭 mutex 的使用權(quán)。新到來的 goroutine 具有優(yōu)勢,它們已經(jīng)在 CPU 上運行且它們數(shù)量很多,因此一個被喚醒的等待者有很大的概率獲取不到鎖。在這種情況下,這個被喚醒的 goroutine 會加入到等待隊列的前面。如果一個等待的 goroutine 超過 1ms 沒有獲取鎖,它就會將 mutex 切換到饑餓模式

饑餓模式

在饑餓模式中,鎖的所有權(quán)將從 unlock 的 gorutine 直接交給交給等待隊列中的第一個。新來的 goroutine 將不會嘗試去獲得鎖,即使鎖看起來是 unlock 狀態(tài), 也不會去嘗試自旋操作,而是放在等待隊列的尾部。

如果一個等待的goroutine獲取了鎖,并且滿足一以下其中的任何一個條件,它會將鎖的狀態(tài)轉(zhuǎn)換為正常狀態(tài)。

  1. 它是等待隊列中的最后一個;
  2. 它等待的時間少于1ms。

函數(shù)

在分析源代碼之前, 我們要從多線程(goroutine)的并發(fā)場景去理解為什么實現(xiàn)中有很多的分支。

當一個goroutine獲取這個鎖的時候, 有可能這個鎖根本沒有競爭者, 那么這個goroutine輕輕松松獲取了這個鎖。

而如果這個鎖已經(jīng)被別的goroutine擁有, 就需要考慮怎么處理當前的期望獲取鎖的goroutine。

同時, 當并發(fā)goroutine很多的時候,有可能會有多個競爭者, 而且還會有通過信號量喚醒的等待者。

以下代碼已經(jīng)去掉了與核心代碼無關(guān)的 race 代碼。

Lock

Lock 方法申請對 mutex 加鎖,Lock 執(zhí)行的時候,分三種情況:

  1. 無沖突 通過 CAS 操作把當前狀態(tài)設(shè)置為加鎖狀態(tài)
  2. 有沖突 開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內(nèi)釋放了該鎖,直接獲得該鎖;如果沒有釋放,進入3
  3. 有沖突,且已經(jīng)過了自旋階段 通過調(diào)用 semacquire 函數(shù)來讓當前 goroutine 進入等待狀態(tài)

func (m *Mutex) Lock() {
    // 如果 mutext 的 state 沒有被鎖,也沒有等待/喚醒的 goroutine , 鎖處于正常狀態(tài),那么獲得鎖,返回.
    // 比如鎖第一次被 goroutine 請求時,就是這種狀態(tài)?;蛘哝i處于空閑的時候,也是這種狀態(tài)。
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    var waitStartTime int64 // 當前 goroutine 開始等待的時間
    starving := false       // 當前 goroutine 是否已經(jīng)處于饑餓狀態(tài)
    awoke := false          // 當前 goroutine 是否被喚醒
    iter := 0               // 自旋迭代的次數(shù)
    old := m.state          // old 保存當前 mutex 的狀態(tài)
    for {
        // 第一個條件是 state 已被鎖,但是不是饑餓狀態(tài)。如果是饑餓狀態(tài),自旋是沒有用的,鎖的擁有權(quán)直接交給了等待隊列的第一個。
        // 第二個條件是還可以自旋,多核、壓力不大并且在一定次數(shù)內(nèi)可以自旋, 具體的條件可以參考`sync_runtime_canSpin`的實現(xiàn)(匯編實現(xiàn),內(nèi)部持續(xù)調(diào)用 PAUSE 指令,消耗 CPU 時間)。
        // 如果滿足這兩個條件,不斷自旋來等待鎖被釋放、或者進入饑餓狀態(tài)、或者不能再自旋。
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 自旋的過程中如果發(fā)現(xiàn) state 還沒有設(shè)置 woken 標識,則設(shè)置它的 woken 標識, 并標記自己為被喚醒。
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        // 到了這一步, state的狀態(tài)可能是:
        // 1. 鎖還沒有被釋放,鎖處于正常狀態(tài)
        // 2. 鎖還沒有被釋放, 鎖處于饑餓狀態(tài)
        // 3. 鎖已經(jīng)被釋放, 鎖處于正常狀態(tài)
        // 4. 鎖已經(jīng)被釋放, 鎖處于饑餓狀態(tài)
        // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已經(jīng)設(shè)置了state的woken標識)

        // new 復(fù)制 state的當前狀態(tài), 用來設(shè)置新的狀態(tài)。old 是鎖當前的狀態(tài)
        new := old
        // 如果 old state 狀態(tài)不是饑餓狀態(tài), new state 設(shè)置鎖, 嘗試通過CAS獲取鎖。
        // 如果 old state 狀態(tài)是饑餓狀態(tài), 則不設(shè)置 new state 的鎖,因為饑餓狀態(tài)下鎖直接轉(zhuǎn)給等待隊列的第一個.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 當 mutex 處于加鎖狀態(tài)或饑餓狀態(tài)的時候,新到來的 goroutine 進入等待隊列
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // 當前 goroutine 將 mutex 切換為饑餓狀態(tài),但如果當前 mutex 未加鎖,則不需要切換
        // Unlock 操作希望饑餓模式存在等待者
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 如果本 goroutine 已經(jīng)設(shè)置為喚醒狀態(tài), 需要清除 new state 的喚醒標記,
        // 因為本 goroutine 要么獲得了鎖,要么進入休眠,總之state的新狀態(tài)不再是woken狀態(tài).
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // 調(diào)用 CAS 更新 state 狀態(tài)
        // 注意 new 的鎖標記不一定是 true, 也可能只是標記一下鎖的 state 是饑餓狀態(tài).
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果 old state 的狀態(tài)是未被鎖狀態(tài),并且鎖不處于饑餓狀態(tài),
            // 那么當前 goroutine 已經(jīng)獲取了鎖的擁有權(quán),返回
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }
            // 設(shè)置/計算本 goroutine 的等待時間
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 既然未能獲取到鎖, 那么就使用 sleep 原語阻塞本 goroutine
            // 如果是新來的 goroutine, queueLifo=false, 加入到等待隊列的尾部,耐心等待
            // 如果是喚醒的 goroutine, queueLifo=true, 加入到等待隊列的頭部
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 如果當前 goroutine 等待時間超過 starvationThresholdNs,mutex 進入饑餓模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果當前的 state 已經(jīng)是饑餓狀態(tài)
            // 那么鎖應(yīng)該處于 Unlock 狀態(tài),鎖被直接交給了本 goroutine
            if old&mutexStarving != 0 {
                // 如果當前的 state 已被鎖,或者已標記為喚醒, 或者等待的隊列中不為空,
                // 那么 state 是一個非法狀態(tài)
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 等待狀態(tài)的 goroutine - 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 如果不是饑餓模式了或者當前等待著只剩下一個,退出饑餓模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                // 更新狀態(tài)
                // 因為已經(jīng)獲得了鎖,退出、返回
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 如果當前的鎖是正常模式,本 goroutine 被喚醒,自旋次數(shù)清零,從 for 循環(huán)開始處重新開始
            awoke = true
            iter = 0
        } else {
            // 如果CAS不成功,重新獲取鎖的 state, 從 for 循環(huán)開始處重新開始
            old = m.state
        }
    }
}


Unlock

Unlock方法釋放所申請的鎖


func (m *Mutex) Unlock() {
    // 如果 state 不是處于鎖的狀態(tài), 那么就是 Unlock 根本沒有加鎖的 mutex, panic
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

    // 釋放鎖,并通知其它等待者
    // 鎖如果處于饑餓狀態(tài),直接交給等待隊列的第一個, 喚醒它,讓它去獲取鎖
    // mutex 正常模式
    if new&mutexStarving == 0 {
        old := new
        for {
            // 如果沒有等待者,或者已經(jīng)存在一個 goroutine 被喚醒或得到鎖、或處于饑餓模式
            // 直接返回.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 將等待的 goroutine-1,并設(shè)置 woken 標識
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // 設(shè)置新的 state, 這里通過信號量會喚醒一個阻塞的 goroutine 去獲取鎖.
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // mutex 饑餓模式,直接將 mutex 擁有權(quán)移交給等待隊列最前端的 goroutine
        // 注意此時 state 的 mutex 還沒有加鎖,喚醒的 goroutine 會設(shè)置它。
        // 在此期間,如果有新的 goroutine 來請求鎖, 因為 mutex 處于饑餓狀態(tài), mutex 還是被認為處于鎖狀態(tài),
        // 新來的 goroutine 不會把鎖搶過去.
        runtime_Semrelease(&m.sema, true)
    }
}

sync.RWMutex 源碼分析

RWMutex 是讀寫互斥鎖,鎖可以由任意數(shù)量的讀取器或單個寫入器來保持

RWMutex 的零值是一個解鎖的互斥鎖

RWMutex 是搶占式的讀寫鎖,寫鎖之后來的讀鎖是加不上的

代碼位置:sync/rwmutex.go

結(jié)構(gòu)體定義


type RWMutex struct {
    w           Mutex  // 互斥鎖
    writerSem   uint32 // 寫鎖信號量
    readerSem   uint32 // 讀鎖信號量
    readerCount int32  // 讀鎖計數(shù)器
    readerWait  int32  // 獲取寫鎖時需要等待的讀鎖釋放數(shù)量
}

常量定義


const rwmutexMaxReaders = 1 << 30   // 支持最多2^30個讀鎖

函數(shù)

以下是 sync.RWMutex 提供的4個方法

Lock

提供寫鎖加鎖操作


func (rw *RWMutex) Lock() {
    // 使用 Mutex 鎖
    rw.w.Lock()
    // 將當前的 readerCount 置為負數(shù),告訴 RUnLock 當前存在寫鎖等待
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 等待讀鎖釋放
        runtime_Semacquire(&rw.writerSem)
    }
}

Unlock

提供寫鎖釋放操作


func (rw *RWMutex) Unlock() {
    // 加上 Lock 的時候減去的 rwmutexMaxReaders
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    // 沒執(zhí)行Lock調(diào)用Unlock,拋出異常
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // 通知當前等待的讀鎖
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    // 釋放 Mutex 鎖
    rw.w.Unlock()
}

RLock

提供讀鎖操作


func (rw *RWMutex) RLock() {
    // 每次 goroutine 獲取讀鎖時,readerCount+1
    // 如果寫鎖已經(jīng)被獲取,那么 readerCount 在 -rwmutexMaxReaders 與 0 之間,這時掛起獲取讀鎖的 goroutine
    // 如果寫鎖沒有被獲取,那么 readerCount > 0,獲取讀鎖, 不阻塞
    // 通過 readerCount 判斷讀鎖與寫鎖互斥, 如果有寫鎖存在就掛起goroutine, 多個讀鎖可以并行
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 將 goroutine 排到G隊列的后面,掛起 goroutine
        runtime_Semacquire(&rw.readerSem)
    }
}

RUnLock

對讀鎖進行解鎖


func (rw *RWMutex) RUnlock() {
    // 寫鎖等待狀態(tài),檢查當前是否可以進行獲取
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // r + 1 == 0表示直接執(zhí)行RUnlock()
        // r + 1 == -rwmutexMaxReaders表示執(zhí)行Lock()再執(zhí)行RUnlock()
        // 兩總情況均拋出異常
        if r+1 == 0 || r+1 == -rwmutexMaxReaders {
            race.Enable()
            throw("sync: RUnlock of unlocked RWMutex")
        }
        // 當讀鎖釋放完畢后,通知寫鎖
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // The last reader unblocks the writer.
            runtime_Semrelease(&rw.writerSem, false)
        }
    }
}

總結(jié)

sync.Mutex

  • 同一個時刻只有一個線程能夠拿到鎖

注意:

  1. 不要重復(fù)鎖定互斥鎖
  2. 不要忘記解鎖互斥鎖
  3. 不要在多個函數(shù)之間直接傳遞互斥鎖

sync.RWMutex

  • 如果設(shè)置了一個寫鎖,那么其它讀的線程以及寫的線程都拿不到鎖,這個時候,與互斥鎖的功能相同
  • 如果設(shè)置了一個讀鎖,那么其它寫的線程是拿不到鎖的,但是其它讀的線程是可以拿到鎖

讀寫互斥鎖的實現(xiàn)比較有技巧性一些,需要幾點

  • 讀鎖不能阻塞讀鎖,引入readerCount實現(xiàn)
  • 讀鎖需要阻塞寫鎖,直到所有讀鎖都釋放,引入readerSem實現(xiàn)
  • 寫鎖需要阻塞讀鎖,直到所有寫鎖都釋放,引入wirterSem實現(xiàn)
  • 寫鎖需要阻塞寫鎖,引入Metux實現(xiàn)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容