當(dāng)多個(gè)線程訪問(wèn)共享數(shù)據(jù)時(shí),會(huì)出現(xiàn)并發(fā)讀寫問(wèn)題(reader-writer problems)。有兩種訪問(wèn)數(shù)據(jù)的線程類型:
- 讀線程 reader:只進(jìn)行數(shù)據(jù)讀取
- 寫線程 writer:進(jìn)行數(shù)據(jù)修改
當(dāng) writer 獲取到數(shù)據(jù)的訪問(wèn)權(quán)限后,其他任何線程(reader 或 writer)都無(wú)權(quán)限訪問(wèn)此數(shù)據(jù)。這種約束亦存在于現(xiàn)實(shí)中,比如,當(dāng) writer 在修改數(shù)據(jù)無(wú)法保證原子性時(shí)(如數(shù)據(jù)庫(kù)),此時(shí)讀取未完成的修改必須被阻塞,以防止加載臟數(shù)據(jù)(譯者注:數(shù)據(jù)庫(kù)中的臟讀)。還有許多諸如此類的核心問(wèn)題,例如:
- writer 不能無(wú)限等待
- reader 不能無(wú)限等待
- 不允許線程出現(xiàn)無(wú)限等待
多讀/單寫互斥鎖(如sync.RWMutex)的具體實(shí)現(xiàn)解決了一種并發(fā)讀寫問(wèn)題。接下來(lái),讓我們看下在 Go 語(yǔ)言中是如何實(shí)現(xiàn)的,同時(shí)它提供了哪些的數(shù)據(jù)可靠性保證機(jī)制。
作為額外的工作,我們將深入研究分析競(jìng)態(tài)情況下的互斥鎖。
用法
在深入研究實(shí)現(xiàn)細(xì)節(jié)之前,我們先看看sync.RWMutex的使用實(shí)例。下面的程序使用讀寫互斥鎖來(lái)保護(hù)臨界區(qū)–sleep()。為了更好的展示整個(gè)過(guò)程,臨界區(qū)部分計(jì)算了當(dāng)前正在執(zhí)行的 reader 和 writer 的數(shù)量(源碼)。
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func sleep() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.RLock()
c <- 1
sleep()
c <- -1
m.RUnlock()
wg.Done()
}
func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.Lock()
c <- 1
sleep()
c <- -1
m.Unlock()
wg.Done()
}
func main() {
var m sync.RWMutex
var rs, ws int
rsCh := make(chan int)
wsCh := make(chan int)
go func() {
for {
select {
case n := <-rsCh:
rs += n
case n := <-wsCh:
ws += n
}
fmt.Printf("%s%s\n", strings.Repeat("R", rs),
strings.Repeat("W", ws))
}
}()
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go reader(rsCh, &m, &wg)
}
for i := 0; i < 3; i++ {
wg.Add(1)
go writer(wsCh, &m, &wg)
}
wg.Wait()
}
play.golang.org 加載的程序環(huán)境是確定的(比如開始時(shí)間),所以rand.Seed(time.Now().Unix())總是返回相同的數(shù)值,此時(shí)程序的執(zhí)行結(jié)果可能總是相同的。為了避免這種情況,可通過(guò)修改不同的隨機(jī)種子值或者在自己的機(jī)器上執(zhí)行程序。
程序執(zhí)行結(jié)果:
W
R
RR
RRR
RRRR
RRRRR
RRRR
RRR
RRRR
RRR
RR
R
W
R
RR
RRR
RRRR
RRR
RR
R
W
譯者注:不同機(jī)器上運(yùn)行的結(jié)果會(huì)有所不同
每次執(zhí)行完一組 goroutine(reader 和 writer)的臨界區(qū)代碼后,都會(huì)打印新的一行。很顯然,RWMutex 允許至少一個(gè) reader(一個(gè)或多個(gè) reader)存在而 writer 同時(shí)只能存在一個(gè)。
同樣重要且將進(jìn)一步討論的是:writer 調(diào)用到Lock()時(shí),將會(huì)使新的 reader/writer 被阻塞。當(dāng)存在 reader 加了 RLock 時(shí),writer 會(huì)等待這一組 reader 完成正在執(zhí)行的任務(wù),當(dāng)這一組任務(wù)完成后,writer 將開始執(zhí)行。從輸出可以很明顯的看到,每一行的 R 都會(huì)遞減一個(gè),直到?jīng)]有 R 之后將打印一個(gè) W。
...
RRRRR
RRRR
RRR
RR
R
W
...
一旦 writer 結(jié)束,之前被阻塞的 reader 將恢復(fù)執(zhí)行,然后下一個(gè) writer 也將開始啟動(dòng)。值得一提的是,如果一個(gè) writer 完成,并且有 reader 和 writer 都在等待,那么首個(gè) reader 將解除阻塞,然后才輪到 writer。這種交替執(zhí)行的方式使得 writer 需等待當(dāng)前這組 reader 完成,所以無(wú)論 reader 還是 writer 都不會(huì)有無(wú)限等待的情況。
實(shí)現(xiàn)
注意,本文針對(duì)的RWMutex實(shí)現(xiàn)(Go commit: 718d6c58)在 Go 不同版本中可能隨時(shí)有修改。
RWMutex為 reader 提供兩個(gè)方法(RLock和RUnlock)、也為 writer 提供了兩個(gè)方法(Lock和Unlock)
讀鎖 RLock
為了簡(jiǎn)潔起見(jiàn),我們先跳過(guò)源碼中競(jìng)態(tài)檢測(cè)相關(guān)部分(它們將被...代替)。
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.ReadeSem, false)
}
...
}
readerCount字段是int32類型的值,表示待處理的 reader 數(shù)量(正在讀取數(shù)據(jù)或被 writer 阻塞)。這基本上是已調(diào)用 RLock 函數(shù),但尚未調(diào)用 RUnlock 函數(shù)的 reader 數(shù)量。
atomic.AddInt32等價(jià)于如下原子性表達(dá):
*addr += delta
return *addr
addr是*int32類型變量,delta是int32類型。因?yàn)榇瞬僮骶哂性有?,所以累?code>delta操作不會(huì)影響其他線程(更多詳見(jiàn)Fetch-and-add)。
如果沒(méi)有 writer,則readerCount總是會(huì)大于或等于 0(譯者注:因?yàn)?writer 會(huì)把 readerCount 置為負(fù)數(shù),通過(guò) Lock 函數(shù)的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders),此時(shí) reader 是一種運(yùn)行速度很快的非阻塞方式,因?yàn)橹恍枰{(diào)用atomic.AddInt32。
信號(hào)量 Semaphore
信號(hào)量是 Edsger Dijkstra 發(fā)明的數(shù)據(jù)結(jié)構(gòu),在解決多種同步問(wèn)題時(shí)很有用。其本質(zhì)是一個(gè)整數(shù),并關(guān)聯(lián)兩個(gè)操作:
- 申請(qǐng)
acquire(也稱為wait、decrement或P操作) - 釋放
release(也稱signal、increment或V操作)
acquire操作將信號(hào)量減 1,如果結(jié)果值為負(fù)則線程阻塞,且直到其他線程進(jìn)行了信號(hào)量累加為正數(shù)才能恢復(fù)。如結(jié)果為正數(shù),線程則繼續(xù)執(zhí)行。
release操作將信號(hào)量加 1,如存在被阻塞的線程,此時(shí)他們中的一個(gè)線程將解除阻塞。
Go 運(yùn)行時(shí)提供的runtime_SemacquireMutex和runtime_Semrelease函數(shù)可用來(lái)實(shí)現(xiàn)sync.RWMutex互斥鎖。
鎖 Lock
實(shí)現(xiàn)源碼:
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
...
}
writer 通過(guò)Lock方法獲取共享數(shù)據(jù)的獨(dú)占權(quán)限。首先,它會(huì)申請(qǐng)阻塞其他寫操作的互斥鎖(rw.w.Lock()),此互斥鎖在Unlock函數(shù)的最后才會(huì)進(jìn)行解鎖。下一步,將readerCount減去rwmutexMaxreader(值為 1 左移 30 位, 1<<30)使其為負(fù)數(shù)。當(dāng)readerCount變?yōu)樨?fù)數(shù)時(shí),Rlock 將阻塞接下來(lái)的所有讀請(qǐng)求。
再回過(guò)頭來(lái)看下Rlock()函數(shù)中邏輯:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.SeadeSem, false)
}
后續(xù)的 reader 將會(huì)被阻塞,那么已運(yùn)行的 reader 會(huì)怎樣呢?readerWait字段用來(lái)記錄當(dāng)前 reader 執(zhí)行的數(shù)量。writer 被信號(hào)量writerSem阻塞,直到最后一個(gè) reader 在使用后面討論的RUnlock方法解鎖后會(huì)把writerSem加 1,此時(shí)信號(hào)量將變成 0,writer被解除阻塞(譯者注:RUnlock 函數(shù)中的runtime_Semrelease(&rw.writerSem, false))
如果沒(méi)有有效的 reader,那么 writer 將繼續(xù)其執(zhí)行。
最大 reader 數(shù) rwmutexMaxreader
在rwmutex.go中定義的常量:
const rwmutexMaxreader = 1 << 30
那么,其用途是什么,以及1<<30表示什么意義呢?
readerCount字段是int32類型,其范圍為:
[-1 << 31, (1 << 31) — 1] or [-2147483648, 2147483647]
RWMutext使用此字段來(lái)計(jì)算掛起的 reader 和 writer 的標(biāo)記(置為負(fù)數(shù))。在Lock方法中:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
Lock會(huì)將readerCount字段減去1<<30,當(dāng)readerCount負(fù)值時(shí)表示 writer 調(diào)用了Lock正等待被處理,atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders) + rwmutexMaxreaders這個(gè)操作既讓readerCount變?yōu)樨?fù)數(shù)又使r存儲(chǔ)回了 readerCount。rwmutexMaxreaders也可以限制被掛起 reader 的數(shù)量。如果有rwmutexMaxreader個(gè)或更多掛起的 reader,那么readerCount將是非負(fù)值,此時(shí)將導(dǎo)致整個(gè)機(jī)制的崩潰。所以,reader 實(shí)際的限制數(shù)是:rwmutexMaxreader - 1,此值1073741823超過(guò)了10億。
解讀鎖 RUnlock
實(shí)現(xiàn)源碼:
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxreader {
race.Enable()
thrSw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.WriteSem, false)
}
}
...
}
每次調(diào)用此方法將使readerCount減 1(RLock 方法中增加 1)。如果減完后readerCount值為負(fù),則表示當(dāng)前存在 writer 正在等待或運(yùn)行。這是因?yàn)樵?code>Lock()方法中readerCount減去了rwmutexMaxreader。然后,當(dāng)檢查到將完成的 reader 數(shù)量(readerWait 數(shù)值)最終為 0 時(shí),則表示 writer 可以最終申請(qǐng)信號(hào)量。(譯者注:r < 0時(shí),存在兩個(gè)分支,當(dāng)走 r+1 == 0 的分支時(shí),表示 readerCount 此時(shí)為 0 即沒(méi)有 RLock,所以 throw 了。當(dāng)走下面那個(gè)分支時(shí),r < 0則是因?yàn)榇嬖?writer 把 readerCount 置為了負(fù)數(shù)在等待 reader 結(jié)束,那么當(dāng)最后一個(gè) reader 解鎖時(shí)需要將 WriteSem 信號(hào)量加 1,喚醒 writer)
解鎖 Unlock
實(shí)現(xiàn)源碼:
func (rw *RWMutex) Unlock() {
...
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxreader)
if r >= rwmutexMaxreader {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
...
}
解鎖被 writer 持有的互斥鎖時(shí),首先通過(guò)atomic.AddInt32將readerCount加上rwmutexMaxreader,這時(shí)readerCount將變成非負(fù)值。如readerCount比 0 大,則表示存在 reader 正在等待 writer 執(zhí)行完成,此時(shí)應(yīng)喚醒這些等待的 reader。之后寫鎖將被釋放,從而允許其他 writer 為了寫入而鎖定互斥鎖。(譯者注:如果還存在掛起的 reader,則在 writer 解鎖之前需要通過(guò)信號(hào)量 readerSem 喚醒這些 reader 執(zhí)行)
如果 reader 或 writer 嘗試解鎖未鎖定的互斥鎖時(shí),調(diào)用Unlock或Runlock方法將拋出錯(cuò)誤(示例源碼)。
m := sync.RWMutex{}
m.Unlock()
fatal error: sync: Unlock of unlocked RWMutex
...
遞歸讀鎖定 Recursive read locking
文檔描述:
如果一個(gè) reader goroutine 持有了讀鎖,而此時(shí)另一個(gè) writer goroutine 調(diào)用
Lock申請(qǐng)加寫鎖,此后在最初的讀鎖被釋放前其他 goroutine 不能獲取到讀鎖。特定情況下,這能防止遞歸讀鎖,這種策略保證了鎖的可用性,Lock的調(diào)用會(huì)阻止其他新的 reader 來(lái)獲得鎖。
RWMutex 的工作方式是,如果有一個(gè) writer 調(diào)用了 Lock,則所有調(diào)用 RLock 都將被鎖定,無(wú)論是否已經(jīng)獲得了讀鎖定(示例源碼):
示例代碼:
package main
import (
"fmt"
"sync"
"time"
)
var m sync.RWMutex
func f(n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done <- 1
}()
f(4)
<-done
}
輸出:
RLock RLock RLock Lock RLock fatal error: all goroutines are asleep - deadlock!
譯者注(至下一節(jié)以前均為譯者注):為什么會(huì)發(fā)送死鎖呢?原作者用遞歸函數(shù)在 defer 里面解鎖,那么在加第三層讀鎖的時(shí)候,還沒(méi)有讀鎖解鎖。這時(shí),readCount 是 3,此時(shí)正好加了一個(gè) Lock 寫鎖,由于 readCount 是 3
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
由上可知,此時(shí) writer 需要等待所有進(jìn)行中的 reader 完成,此時(shí)又調(diào)用了 RLock,
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
由于在第四個(gè) RLock 前,加了 Lock 操作,使得 readerCount 為負(fù)數(shù)。所以就造成了死鎖,即 reader 在等待 readerSem,writer 在等待 writerSem*
復(fù)制鎖 Copying locks
go tool vet可以檢測(cè)鎖是否被復(fù)制了,因?yàn)閺?fù)制鎖會(huì)導(dǎo)致死鎖。更多關(guān)于此問(wèn)題可參考:Detect locks passed by value in Go
性能 Performance
之前有人發(fā)現(xiàn),在 CPU 核數(shù)增多時(shí) RWMutex 的性能會(huì)有下降,詳見(jiàn):sync: RWMutex scales poorly with CPU count
爭(zhēng)用 Contention
Go 版本 ≥ 1.8 之后,支持分析爭(zhēng)用的互斥鎖(runtime: Profile goroutines holding contended mutexes.)。我們來(lái)看下如何做:
package main
import (
"net/http"
_ "net/http/pprof"
"runtime"
"sync"
"time"
)
func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i < 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention
當(dāng)mutexcontention程序運(yùn)行時(shí),執(zhí)行 pprof:
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE main.main.func1 in .../src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i < 10; i++ {
. . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(":8888", nil)
注意,為什么這里耗時(shí) 57.28s,且指向了mu.Unlock()呢?
當(dāng) goroutine 調(diào)用Lock而阻塞時(shí),會(huì)記錄當(dāng)前發(fā)生的準(zhǔn)確時(shí)間–叫做acquiretime。當(dāng)另一個(gè) groutine 解鎖,至少存在一個(gè) goroutine 在等待獲得鎖,則其中一個(gè)解除阻塞并調(diào)用其mutexevent函數(shù)。該mutexevent函數(shù)通過(guò)檢查SetMutexProfileFraction設(shè)置的速率來(lái)決定此事件應(yīng)被保留還是丟棄。此事件包含整個(gè)等待的時(shí)間(當(dāng)前時(shí)間 - 獲得時(shí)間)。從上面的例子可以看出,所有阻塞在特定互斥鎖的 goroutines 的總等待時(shí)間會(huì)被收集和展示。
在 Go 1.11(sync: enable profiling of RWMutex)中將增加讀鎖(Rlock 和 RUnlock)的爭(zhēng)用。
資料 Resources
Allen B. Downey: The Little Book of Semaphores
critical section:臨界區(qū)
Mutual exclusion,縮寫 Mutex:互斥鎖
轉(zhuǎn)載:姜姜和張張
原文地址
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。