sync/rwmutex go讀寫鎖學(xué)習(xí)

本文結(jié)構(gòu)

背景
源碼
思考
問題

背景

最近寫go,看到學(xué)習(xí)一些go源碼記錄上,另外相關(guān)依賴代碼這里簡(jiǎn)單說明一下

讀寫鎖

概念去搜一下就好,可以上多個(gè)讀鎖,一個(gè)寫鎖
上寫鎖時(shí)要等之前的讀鎖釋放

代碼前提

信號(hào)量的獲取與釋放,看注釋就夠了,這個(gè)是runtime的


// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)

// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
func runtime_Semrelease(s *uint32, handoff bool)

源碼

變量

    w           Mutex    互斥鎖
    writerSem   uint32   寫鎖信號(hào)量
    readerSem   uint32   讀鎖信號(hào)量
    readerCount int32    還未釋放讀鎖的reader數(shù)量(偏差2^30,有write就是原有reader數(shù)量-2^30)
    readerWait  int32    寫鎖要等待的reader數(shù)量(之前的reader釋放了鎖,writer才能跑)

源碼

下面的源碼沒有分析鎖競(jìng)爭(zhēng)(非公平鎖)相關(guān)代碼邏輯

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)

// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    w           Mutex  // held if there are pending writers  互斥鎖
    writerSem   uint32 // semaphore for writers to wait for completing readers  寫鎖信號(hào)量
    readerSem   uint32 // semaphore for readers to wait for completing writers  讀鎖信號(hào)量
    readerCount int32  // number of pending readers  還未釋放讀鎖的reader數(shù)量(偏差2^30,有write就是原有reader數(shù)量-2^30)
    readerWait  int32  // number of departing readers  寫鎖要等待的reader數(shù)量(之前的reader釋放了鎖,writer才能跑)
}

const rwmutexMaxReaders = 1 << 30  //假定最多2^30個(gè)reader

// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() { //讀鎖
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    if atomic.AddInt32(&rw.readerCount, 1) < 0 { //當(dāng)前有寫鎖了
        // A writer is pending, wait for it.
        runtime_Semacquire(&rw.readerSem) //等待reader信號(hào)量
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {  //釋放寫鎖
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { //reader數(shù)量-1,如果<0的話
        if r+1 == 0 || r+1 == -rwmutexMaxReaders { //如果已經(jīng)沒有讀鎖的,還去釋放(如釋放多次)
            race.Enable()
            throw("sync: RUnlock of unlocked RWMutex")
        }
        // A writer is pending. 下面的情況代表有寫鎖
        if atomic.AddInt32(&rw.readerWait, -1) == 0 { //寫鎖的reader wait數(shù)量-1
            // The last reader unblocks the writer.
            runtime_Semrelease(&rw.writerSem, false)//如果wait數(shù)量到0,釋放writer信號(hào)量
        }
    }
    if race.Enabled {
        race.Enable()
    }
}

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // First, resolve competition with other writers.
    rw.w.Lock() //上互斥鎖
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders  //readerCount變成負(fù)數(shù),代表有寫鎖,r就是計(jì)算前readCount的絕對(duì)值(存疑,atomic計(jì)算)
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {//當(dāng)前readCount不為0,并且上一步r計(jì)算之后RUnlock的次數(shù)和之前readerCount相同(上一步計(jì)算后可能有多次RUnlock,readerWait會(huì)變成負(fù)數(shù))
        runtime_Semacquire(&rw.writerSem)//等待writer信號(hào)量
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Release(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }

    // Announce to readers there is no active writer.
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)//readerCount + 2^30變成正數(shù)
    if r >= rwmutexMaxReaders {//釋放寫鎖多次
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {//發(fā)送多次reader信號(hào)量
        runtime_Semrelease(&rw.readerSem, false)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
    if race.Enabled {
        race.Enable()
    }
}

// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

思考

如何區(qū)分當(dāng)前是被讀鎖還是寫鎖占有

readerCount>0則是讀鎖,<0則是寫鎖

整體思路

通過readerCount和readerWait進(jìn)行計(jì)算
對(duì)讀寫鎖的獲取進(jìn)行判斷,是否等待reader或者writer信號(hào)量
對(duì)讀寫鎖的釋放也進(jìn)行判斷,釋放reader以及writer相關(guān)信號(hào)量

多次調(diào)用寫鎖不會(huì)成功的原理

里面有rw.w.lock(),這個(gè)是互斥鎖,釋放寫鎖的時(shí)候釋放這個(gè)互斥鎖

獲取寫鎖里面if的第二個(gè)判斷邏輯何時(shí)生效

if條件第二句什么時(shí)候滿足

在上面計(jì)算r之后,readerCount可能變化了,即再計(jì)算r之后,其他線程又執(zhí)行了Rlock和Runlock,并且整體Runlock次數(shù)>Rlock次數(shù),就有可能出現(xiàn)readerWait為負(fù)數(shù)了
考慮場(chǎng)景

1.計(jì)算r,當(dāng)前readerCount為1,r為1,計(jì)算后readerCount為1-2^30
2.Runlock一次,readerCount為-2^30,滿足下面條件
image.png
3.此時(shí)readerWait設(shè)置成-1
4.Lock里面,計(jì)算if條件,第一個(gè)條件r!=0(滿足) 第二個(gè)條件readerWait=-1,r=1,相加為0,滿足條件,不用處理

問題

代碼假定最多有2^30個(gè)readers

所以代碼里面readerCount在寫模式會(huì)-2^30

如何和java讀寫鎖比較

印象里面java里面是有AQS隊(duì)列,公平非公平鎖的概念更清晰,并且記錄了隊(duì)列結(jié)構(gòu)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容