本文結(jié)構(gòu)
背景
源碼
思考
問題
背景
最近寫go,看到學(xué)習(xí)一些go源碼記錄上,另外相關(guān)依賴代碼這里簡(jiǎn)單說明一下
讀寫鎖
概念去搜一下就好,可以上多個(gè)讀鎖,一個(gè)寫鎖
上寫鎖時(shí)要等之前的讀鎖釋放
代碼前提
信號(hào)量的獲取與釋放,看注釋就夠了,這個(gè)是runtime的
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
func runtime_Semrelease(s *uint32, handoff bool)
源碼
變量
w Mutex 互斥鎖
writerSem uint32 寫鎖信號(hào)量
readerSem uint32 讀鎖信號(hào)量
readerCount int32 還未釋放讀鎖的reader數(shù)量(偏差2^30,有write就是原有reader數(shù)量-2^30)
readerWait int32 寫鎖要等待的reader數(shù)量(之前的reader釋放了鎖,writer才能跑)
源碼
下面的源碼沒有分析鎖競(jìng)爭(zhēng)(非公平鎖)相關(guān)代碼邏輯
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"internal/race"
"sync/atomic"
"unsafe"
)
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
w Mutex // held if there are pending writers 互斥鎖
writerSem uint32 // semaphore for writers to wait for completing readers 寫鎖信號(hào)量
readerSem uint32 // semaphore for readers to wait for completing writers 讀鎖信號(hào)量
readerCount int32 // number of pending readers 還未釋放讀鎖的reader數(shù)量(偏差2^30,有write就是原有reader數(shù)量-2^30)
readerWait int32 // number of departing readers 寫鎖要等待的reader數(shù)量(之前的reader釋放了鎖,writer才能跑)
}
const rwmutexMaxReaders = 1 << 30 //假定最多2^30個(gè)reader
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() { //讀鎖
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 { //當(dāng)前有寫鎖了
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem) //等待reader信號(hào)量
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() { //釋放寫鎖
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { //reader數(shù)量-1,如果<0的話
if r+1 == 0 || r+1 == -rwmutexMaxReaders { //如果已經(jīng)沒有讀鎖的,還去釋放(如釋放多次)
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending. 下面的情況代表有寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 { //寫鎖的reader wait數(shù)量-1
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)//如果wait數(shù)量到0,釋放writer信號(hào)量
}
}
if race.Enabled {
race.Enable()
}
}
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock() //上互斥鎖
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders //readerCount變成負(fù)數(shù),代表有寫鎖,r就是計(jì)算前readCount的絕對(duì)值(存疑,atomic計(jì)算)
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {//當(dāng)前readCount不為0,并且上一步r計(jì)算之后RUnlock的次數(shù)和之前readerCount相同(上一步計(jì)算后可能有多次RUnlock,readerWait會(huì)變成負(fù)數(shù))
runtime_Semacquire(&rw.writerSem)//等待writer信號(hào)量
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)//readerCount + 2^30變成正數(shù)
if r >= rwmutexMaxReaders {//釋放寫鎖多次
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {//發(fā)送多次reader信號(hào)量
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
思考
如何區(qū)分當(dāng)前是被讀鎖還是寫鎖占有
readerCount>0則是讀鎖,<0則是寫鎖
整體思路
通過readerCount和readerWait進(jìn)行計(jì)算
對(duì)讀寫鎖的獲取進(jìn)行判斷,是否等待reader或者writer信號(hào)量
對(duì)讀寫鎖的釋放也進(jìn)行判斷,釋放reader以及writer相關(guān)信號(hào)量
多次調(diào)用寫鎖不會(huì)成功的原理
里面有rw.w.lock(),這個(gè)是互斥鎖,釋放寫鎖的時(shí)候釋放這個(gè)互斥鎖
獲取寫鎖里面if的第二個(gè)判斷邏輯何時(shí)生效

if條件第二句什么時(shí)候滿足
在上面計(jì)算r之后,readerCount可能變化了,即再計(jì)算r之后,其他線程又執(zhí)行了Rlock和Runlock,并且整體Runlock次數(shù)>Rlock次數(shù),就有可能出現(xiàn)readerWait為負(fù)數(shù)了
考慮場(chǎng)景
1.計(jì)算r,當(dāng)前readerCount為1,r為1,計(jì)算后readerCount為1-2^30
2.Runlock一次,readerCount為-2^30,滿足下面條件

image.png
3.此時(shí)readerWait設(shè)置成-1
4.Lock里面,計(jì)算if條件,第一個(gè)條件r!=0(滿足) 第二個(gè)條件readerWait=-1,r=1,相加為0,滿足條件,不用處理
問題
代碼假定最多有2^30個(gè)readers
所以代碼里面readerCount在寫模式會(huì)-2^30
如何和java讀寫鎖比較
印象里面java里面是有AQS隊(duì)列,公平非公平鎖的概念更清晰,并且記錄了隊(duì)列結(jié)構(gòu)