sync包 rwmutex源碼閱讀

借鑒于Go夜讀,加了個人理解:https://reading.developerlearning.cn/articles/sync/sync_rwmutex_source_code_analysis/

go版本:go1.12 windows/amd64

sync/rwmutex.go是對 runtime/rwmutex.go文件的拷貝。

當(dāng)一個協(xié)程獲取到讀鎖,同時存在另一個協(xié)程調(diào)用寫鎖,這時不允許新的協(xié)程來獲取讀鎖直到最新的讀鎖被釋放;這能確保在遞歸循環(huán)讀鎖的情況下,寫鎖也能被調(diào)用,寫鎖阻止新的reader來獲取lock.

也就是說,上寫鎖時要等之前的讀鎖釋放,新的讀操作會卡住,等待舊的讀操作完,寫鎖運(yùn)行完,卡住的讀操作才會繼續(xù)進(jìn)行。

結(jié)構(gòu)體

// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    w           Mutex  // 互斥鎖
    writerSem   uint32 // 寫鎖信號量
    readerSem   uint32 // 讀鎖信號量
    readerCount int32  // 還未釋放讀鎖的reader數(shù)
    readerWait  int32  // 獲取寫鎖時需要等待的讀鎖釋放數(shù)量
}

常量

const rwmutexMaxReaders = 1 << 30   // 支持最多2^30個讀鎖

方法

以下是 sync.RWMutex 提供的4個方法

RLock

// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock
func (rw *RWMutex) RLock() { //讀鎖
   if race.Enabled {
       _ = rw.w.state
       race.Disable()
   }
   // 每次 goroutine獲取讀鎖時, readCount+1
   // 如果寫鎖已經(jīng)被獲取,那么 readCount 在 -rwmutexMaxReaders 與 0 之間(當(dāng)為0的時候,代表有2^30個讀鎖在等待,應(yīng)該會出錯,但是極端條件不會出現(xiàn))
   // 通過readCount 判斷讀鎖與寫鎖 是否互斥,如果有寫鎖存在就掛起 goroutine,多個讀鎖可以并行
   if atomic.AddInt32(&rw.readerCount, 1) < 0 { 
       // A writer is pending, wait for it.
       runtime_Semacquire(&rw.readerSem) //等待reader信號量
   }
   if race.Enabled {
       race.Enable()
       race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}

RUnlock

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {  //釋放讀鎖
   if race.Enabled {
       _ = rw.w.state
       race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
       race.Disable()
   }
  //  檢查當(dāng)前是否可以進(jìn)行釋放鎖
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { 
       // 1.r+1==0時,rw.readerCount -1= -1,rw.readerCount  = 0則不存在讀鎖,表示直接執(zhí)行RUnlock()
       // 2.r+1=-rwmutexMaxReaders,rw.readerCount = -rwmutexMaxReaders ,
       // 這種情況出現(xiàn)在獲取Lock()方法,atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders),這時rw.readerCount  = 0 也不存在讀鎖,表示執(zhí)行Lock()再執(zhí)行RUnlock()
       if r+1 == 0 || r+1 == -rwmutexMaxReaders { //如果已經(jīng)沒有讀鎖的,還去釋放(如釋放多次)
           race.Enable()
           throw("sync: RUnlock of unlocked RWMutex")
       }
       // A writer is pending. 下面的情況代表有寫鎖
       if atomic.AddInt32(&rw.readerWait, -1) == 0 { //寫鎖的reader wait數(shù)量-1
           // The last reader unblocks the writer. 
           runtime_Semrelease(&rw.writerSem, false)//如果wait數(shù)量到0,釋放writer信號量 
       }
   }
   if race.Enabled {
       race.Enable()
   }
}

Lock

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() { // 寫鎖加鎖操作
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // First, resolve competition with other writers.
    rw.w.Lock() //上互斥鎖
    // Announce to readers there is a pending writer.
    // 將當(dāng)前的 readerCount 置為負(fù)數(shù),告訴 RUnLock 當(dāng)前存在寫鎖等待
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) +  rwmutexMaxReaders  
    // Wait for active readers. 
   // 等待讀鎖釋放
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {//當(dāng)前readCount不為0,并且上一步r計算之后RUnlock的次數(shù)和之前readerCount相同(上一步計算后可能有多次RUnlock,readerWait會變成負(fù)數(shù))
        runtime_Semacquire(&rw.writerSem)//等待writer信號量
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

Unlock

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock. 
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Release(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }

    // 加上 Lock 的時候減去的 rwmutexMaxReaders
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    // 1.沒執(zhí)行Lock調(diào)用Unlock 2.釋放寫鎖多次 拋出異常
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // 通知當(dāng)前等待的讀鎖
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    // 釋放 Mutex 鎖
    rw.w.Unlock()
    if race.Enabled {
        race.Enable()
    }
}

RLocker
只對讀操作加解鎖

// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

思考

當(dāng)調(diào)用寫鎖時,新的讀鎖會掛起,等待已經(jīng)執(zhí)行的讀鎖執(zhí)行完,然后才執(zhí)行寫鎖,鎖的顆粒度應(yīng)盡量小。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容