GO sync.RWMutex - 解決并發(fā)讀寫問(wèn)題

image

當(dāng)多個(gè)線程訪問(wèn)共享數(shù)據(jù)時(shí),會(huì)出現(xiàn)并發(fā)讀寫問(wèn)題(reader-writer problems)。有兩種訪問(wèn)數(shù)據(jù)的線程類型:

  • 讀線程 reader:只進(jìn)行數(shù)據(jù)讀取
  • 寫線程 writer:進(jìn)行數(shù)據(jù)修改

當(dāng) writer 獲取到數(shù)據(jù)的訪問(wèn)權(quán)限后,其他任何線程(reader 或 writer)都無(wú)權(quán)限訪問(wèn)此數(shù)據(jù)。這種約束亦存在于現(xiàn)實(shí)中,比如,當(dāng) writer 在修改數(shù)據(jù)無(wú)法保證原子性時(shí)(如數(shù)據(jù)庫(kù)),此時(shí)讀取未完成的修改必須被阻塞,以防止加載臟數(shù)據(jù)(譯者注:數(shù)據(jù)庫(kù)中的臟讀)。還有許多諸如此類的核心問(wèn)題,例如:

  • writer 不能無(wú)限等待
  • reader 不能無(wú)限等待
  • 不允許線程出現(xiàn)無(wú)限等待

多讀/單寫互斥鎖(如sync.RWMutex)的具體實(shí)現(xiàn)解決了一種并發(fā)讀寫問(wèn)題。接下來(lái),讓我們看下在 Go 語(yǔ)言中是如何實(shí)現(xiàn)的,同時(shí)它提供了哪些的數(shù)據(jù)可靠性保證機(jī)制。

作為額外的工作,我們將深入研究分析競(jìng)態(tài)情況下的互斥鎖。

用法

在深入研究實(shí)現(xiàn)細(xì)節(jié)之前,我們先看看sync.RWMutex的使用實(shí)例。下面的程序使用讀寫互斥鎖來(lái)保護(hù)臨界區(qū)–sleep()。為了更好的展示整個(gè)過(guò)程,臨界區(qū)部分計(jì)算了當(dāng)前正在執(zhí)行的 reader 和 writer 的數(shù)量(源碼)。


    package main

    import (
        "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
    )

    func init() {
        rand.Seed(time.Now().Unix())
    }

    func sleep() {
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }

    func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
        sleep()
        m.RLock()
        c <- 1
        sleep()
        c <- -1
        m.RUnlock()
        wg.Done()
    }

    func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
        sleep()
        m.Lock()
        c <- 1
        sleep()
        c <- -1
        m.Unlock()
        wg.Done()
    }

    func main() {
        var m sync.RWMutex
        var rs, ws int
        rsCh := make(chan int)
        wsCh := make(chan int)
        go func() {
            for {
                select {
                case n := <-rsCh:
                    rs += n
                case n := <-wsCh:
                    ws += n
                }
                fmt.Printf("%s%s\n", strings.Repeat("R", rs),
                    strings.Repeat("W", ws))
            }
        }()
        wg := sync.WaitGroup{}
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go reader(rsCh, &m, &wg)
        }
        for i := 0; i < 3; i++ {
            wg.Add(1)
            go writer(wsCh, &m, &wg)
        }
        wg.Wait()
    }

play.golang.org 加載的程序環(huán)境是確定的(比如開始時(shí)間),所以rand.Seed(time.Now().Unix())總是返回相同的數(shù)值,此時(shí)程序的執(zhí)行結(jié)果可能總是相同的。為了避免這種情況,可通過(guò)修改不同的隨機(jī)種子值或者在自己的機(jī)器上執(zhí)行程序。

程序執(zhí)行結(jié)果:

W

R
RR
RRR
RRRR
RRRRR
RRRR
RRR
RRRR
RRR
RR
R

W

R
RR
RRR
RRRR
RRR
RR
R

W

譯者注:不同機(jī)器上運(yùn)行的結(jié)果會(huì)有所不同
每次執(zhí)行完一組 goroutine(reader 和 writer)的臨界區(qū)代碼后,都會(huì)打印新的一行。很顯然,RWMutex 允許至少一個(gè) reader(一個(gè)或多個(gè) reader)存在而 writer 同時(shí)只能存在一個(gè)。

同樣重要且將進(jìn)一步討論的是:writer 調(diào)用到Lock()時(shí),將會(huì)使新的 reader/writer 被阻塞。當(dāng)存在 reader 加了 RLock 時(shí),writer 會(huì)等待這一組 reader 完成正在執(zhí)行的任務(wù),當(dāng)這一組任務(wù)完成后,writer 將開始執(zhí)行。從輸出可以很明顯的看到,每一行的 R 都會(huì)遞減一個(gè),直到?jīng)]有 R 之后將打印一個(gè) W。

...
RRRRR
RRRR
RRR
RR
R

W
...

一旦 writer 結(jié)束,之前被阻塞的 reader 將恢復(fù)執(zhí)行,然后下一個(gè) writer 也將開始啟動(dòng)。值得一提的是,如果一個(gè) writer 完成,并且有 reader 和 writer 都在等待,那么首個(gè) reader 將解除阻塞,然后才輪到 writer。這種交替執(zhí)行的方式使得 writer 需等待當(dāng)前這組 reader 完成,所以無(wú)論 reader 還是 writer 都不會(huì)有無(wú)限等待的情況。

實(shí)現(xiàn)

注意,本文針對(duì)的RWMutex實(shí)現(xiàn)(Go commit: 718d6c58)在 Go 不同版本中可能隨時(shí)有修改。
RWMutex為 reader 提供兩個(gè)方法(RLockRUnlock)、也為 writer 提供了兩個(gè)方法(LockUnlock

讀鎖 RLock

為了簡(jiǎn)潔起見(jiàn),我們先跳過(guò)源碼中競(jìng)態(tài)檢測(cè)相關(guān)部分(它們將被...代替)。

func (rw *RWMutex) RLock() {
    ...
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {    
        runtime_SemacquireMutex(&rw.ReadeSem, false)
    }
    ...
}

readerCount字段是int32類型的值,表示待處理的 reader 數(shù)量(正在讀取數(shù)據(jù)或被 writer 阻塞)。這基本上是已調(diào)用 RLock 函數(shù),但尚未調(diào)用 RUnlock 函數(shù)的 reader 數(shù)量。

atomic.AddInt32等價(jià)于如下原子性表達(dá):

*addr += delta
return *addr

addr*int32類型變量,deltaint32類型。因?yàn)榇瞬僮骶哂性有?,所以累?code>delta操作不會(huì)影響其他線程(更多詳見(jiàn)Fetch-and-add)。

如果沒(méi)有 writer,則readerCount總是會(huì)大于或等于 0(譯者注:因?yàn)?writer 會(huì)把 readerCount 置為負(fù)數(shù),通過(guò) Lock 函數(shù)的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders),此時(shí) reader 是一種運(yùn)行速度很快的非阻塞方式,因?yàn)橹恍枰{(diào)用atomic.AddInt32。

信號(hào)量 Semaphore

信號(hào)量是 Edsger Dijkstra 發(fā)明的數(shù)據(jù)結(jié)構(gòu),在解決多種同步問(wèn)題時(shí)很有用。其本質(zhì)是一個(gè)整數(shù),并關(guān)聯(lián)兩個(gè)操作:

  • 申請(qǐng)acquire(也稱為 wait、decrementP 操作)
  • 釋放release(也稱 signalincrementV 操作)

acquire操作將信號(hào)量減 1,如果結(jié)果值為負(fù)則線程阻塞,且直到其他線程進(jìn)行了信號(hào)量累加為正數(shù)才能恢復(fù)。如結(jié)果為正數(shù),線程則繼續(xù)執(zhí)行。

release操作將信號(hào)量加 1,如存在被阻塞的線程,此時(shí)他們中的一個(gè)線程將解除阻塞。

Go 運(yùn)行時(shí)提供的runtime_SemacquireMutexruntime_Semrelease函數(shù)可用來(lái)實(shí)現(xiàn)sync.RWMutex互斥鎖。

鎖 Lock

實(shí)現(xiàn)源碼:

func (rw *RWMutex) Lock() {
    ...
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {     
        runtime_SemacquireMutex(&rw.writerSem, false)
    }
    ...
}

writer 通過(guò)Lock方法獲取共享數(shù)據(jù)的獨(dú)占權(quán)限。首先,它會(huì)申請(qǐng)阻塞其他寫操作的互斥鎖(rw.w.Lock()),此互斥鎖在Unlock函數(shù)的最后才會(huì)進(jìn)行解鎖。下一步,將readerCount減去rwmutexMaxreader(值為 1 左移 30 位, 1<<30)使其為負(fù)數(shù)。當(dāng)readerCount變?yōu)樨?fù)數(shù)時(shí),Rlock 將阻塞接下來(lái)的所有讀請(qǐng)求。

再回過(guò)頭來(lái)看下Rlock()函數(shù)中邏輯:

if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.    
    runtime_SemacquireMutex(&rw.SeadeSem, false)
}

后續(xù)的 reader 將會(huì)被阻塞,那么已運(yùn)行的 reader 會(huì)怎樣呢?readerWait字段用來(lái)記錄當(dāng)前 reader 執(zhí)行的數(shù)量。writer 被信號(hào)量writerSem阻塞,直到最后一個(gè) reader 在使用后面討論的RUnlock方法解鎖后會(huì)把writerSem加 1,此時(shí)信號(hào)量將變成 0,writer被解除阻塞(譯者注:RUnlock 函數(shù)中的runtime_Semrelease(&rw.writerSem, false)

如果沒(méi)有有效的 reader,那么 writer 將繼續(xù)其執(zhí)行。

最大 reader 數(shù) rwmutexMaxreader

rwmutex.go中定義的常量:

const rwmutexMaxreader = 1 << 30

那么,其用途是什么,以及1<<30表示什么意義呢?

readerCount字段是int32類型,其范圍為:

[-1 << 31, (1 << 31) — 1] or [-2147483648, 2147483647]
RWMutext使用此字段來(lái)計(jì)算掛起的 reader 和 writer 的標(biāo)記(置為負(fù)數(shù))。在Lock方法中:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
Lock會(huì)將readerCount字段減去1<<30,當(dāng)readerCount負(fù)值時(shí)表示 writer 調(diào)用了Lock正等待被處理,atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders) + rwmutexMaxreaders這個(gè)操作既讓readerCount變?yōu)樨?fù)數(shù)又使r存儲(chǔ)回了 readerCount。rwmutexMaxreaders也可以限制被掛起 reader 的數(shù)量。如果有rwmutexMaxreader個(gè)或更多掛起的 reader,那么readerCount將是非負(fù)值,此時(shí)將導(dǎo)致整個(gè)機(jī)制的崩潰。所以,reader 實(shí)際的限制數(shù)是:rwmutexMaxreader - 1,此值1073741823超過(guò)了10億

解讀鎖 RUnlock

實(shí)現(xiàn)源碼:

func (rw *RWMutex) RUnlock() {
    ...
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if r+1 == 0 || r+1 == -rwmutexMaxreader {
            race.Enable()
            thrSw("sync: RUnlock of unlocked RWMutex")
        }
        // A writer is pending.
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // The last reader unblocks the writer.       
            runtime_Semrelease(&rw.WriteSem, false)
        }
    }
    ...
}

每次調(diào)用此方法將使readerCount減 1(RLock 方法中增加 1)。如果減完后readerCount值為負(fù),則表示當(dāng)前存在 writer 正在等待或運(yùn)行。這是因?yàn)樵?code>Lock()方法中readerCount減去了rwmutexMaxreader。然后,當(dāng)檢查到將完成的 reader 數(shù)量(readerWait 數(shù)值)最終為 0 時(shí),則表示 writer 可以最終申請(qǐng)信號(hào)量。(譯者注:r < 0時(shí),存在兩個(gè)分支,當(dāng)走 r+1 == 0 的分支時(shí),表示 readerCount 此時(shí)為 0 即沒(méi)有 RLock,所以 throw 了。當(dāng)走下面那個(gè)分支時(shí),r < 0則是因?yàn)榇嬖?writer 把 readerCount 置為了負(fù)數(shù)在等待 reader 結(jié)束,那么當(dāng)最后一個(gè) reader 解鎖時(shí)需要將 WriteSem 信號(hào)量加 1,喚醒 writer)

解鎖 Unlock

實(shí)現(xiàn)源碼:

func (rw *RWMutex) Unlock() {
    ...
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxreader)
    if r >= rwmutexMaxreader {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    rw.w.Unlock()
    ...
}

解鎖被 writer 持有的互斥鎖時(shí),首先通過(guò)atomic.AddInt32readerCount加上rwmutexMaxreader,這時(shí)readerCount將變成非負(fù)值。如readerCount比 0 大,則表示存在 reader 正在等待 writer 執(zhí)行完成,此時(shí)應(yīng)喚醒這些等待的 reader。之后寫鎖將被釋放,從而允許其他 writer 為了寫入而鎖定互斥鎖。(譯者注:如果還存在掛起的 reader,則在 writer 解鎖之前需要通過(guò)信號(hào)量 readerSem 喚醒這些 reader 執(zhí)行)

如果 reader 或 writer 嘗試解鎖未鎖定的互斥鎖時(shí),調(diào)用UnlockRunlock方法將拋出錯(cuò)誤(示例源碼)。

m := sync.RWMutex{}
m.Unlock()
fatal error: sync: Unlock of unlocked RWMutex
...

遞歸讀鎖定 Recursive read locking

文檔描述:

如果一個(gè) reader goroutine 持有了讀鎖,而此時(shí)另一個(gè) writer goroutine 調(diào)用Lock申請(qǐng)加寫鎖,此后在最初的讀鎖被釋放前其他 goroutine 不能獲取到讀鎖。特定情況下,這能防止遞歸讀鎖,這種策略保證了鎖的可用性,Lock的調(diào)用會(huì)阻止其他新的 reader 來(lái)獲得鎖。

RWMutex 的工作方式是,如果有一個(gè) writer 調(diào)用了 Lock,則所有調(diào)用 RLock 都將被鎖定,無(wú)論是否已經(jīng)獲得了讀鎖定(示例源碼):
示例代碼:

package main

import (
    "fmt"
    "sync"
    "time"
)

var m sync.RWMutex

func f(n int) int {
    if n < 1 {
        return 0
    }
    fmt.Println("RLock")
    m.RLock()
    defer func() {
        fmt.Println("RUnlock")
        m.RUnlock()
    }()
    time.Sleep(100 * time.Millisecond)
    return f(n-1) + n
}

func main() {
    done := make(chan int)
    go func() {
        time.Sleep(200 * time.Millisecond)
        fmt.Println("Lock")
        m.Lock()
        fmt.Println("Unlock")
        m.Unlock()
        done <- 1
    }()
    f(4)
    <-done
}

輸出:

RLock RLock RLock Lock RLock fatal error: all goroutines are asleep - deadlock!
譯者注(至下一節(jié)以前均為譯者注):為什么會(huì)發(fā)送死鎖呢?原作者用遞歸函數(shù)在 defer 里面解鎖,那么在加第三層讀鎖的時(shí)候,還沒(méi)有讀鎖解鎖。這時(shí),readCount 是 3,此時(shí)正好加了一個(gè) Lock 寫鎖,由于 readCount 是 3

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
         runtime_Semacquire(&rw.writerSem)
        }

由上可知,此時(shí) writer 需要等待所有進(jìn)行中的 reader 完成,此時(shí)又調(diào)用了 RLock,

if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_Semacquire(&rw.readerSem)
}

由于在第四個(gè) RLock 前,加了 Lock 操作,使得 readerCount 為負(fù)數(shù)。所以就造成了死鎖,即 reader 在等待 readerSem,writer 在等待 writerSem*

復(fù)制鎖 Copying locks

go tool vet可以檢測(cè)鎖是否被復(fù)制了,因?yàn)閺?fù)制鎖會(huì)導(dǎo)致死鎖。更多關(guān)于此問(wèn)題可參考:Detect locks passed by value in Go

性能 Performance

之前有人發(fā)現(xiàn),在 CPU 核數(shù)增多時(shí) RWMutex 的性能會(huì)有下降,詳見(jiàn):sync: RWMutex scales poorly with CPU count

爭(zhēng)用 Contention

Go 版本 ≥ 1.8 之后,支持分析爭(zhēng)用的互斥鎖(runtime: Profile goroutines holding contended mutexes.)。我們來(lái)看下如何做:

package main

import (
    "net/http"
    _ "net/http/pprof"
    "runtime"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    runtime.SetMutexProfileFraction(5)
    for i := 0; i < 10; i++ {
        go func() {
            for {
                mu.Lock()
                time.Sleep(100 * time.Millisecond)
                mu.Unlock()
            }
        }()
    }
    http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention

當(dāng)mutexcontention程序運(yùn)行時(shí),執(zhí)行 pprof:

> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE main.main.func1 in .../src/github.com/mlowicki/mutexcontention/mutexcontention.go
0     57.28s (flat, cum)   100% of Total
.          .     14:   for i := 0; i < 10; i++ {
.          .     15:           go func() {
.          .     16:                   for {
.          .     17:                           mu.Lock()
.          .     18:                           time.Sleep(100 * time.Millisecond)
.     57.28s     19:                           mu.Unlock()
.          .     20:                   }
.          .     21:           }()
.          .     22:   }
.          .     23:
.          .     24:   http.ListenAndServe(":8888", nil)

注意,為什么這里耗時(shí) 57.28s,且指向了mu.Unlock()呢?

當(dāng) goroutine 調(diào)用Lock而阻塞時(shí),會(huì)記錄當(dāng)前發(fā)生的準(zhǔn)確時(shí)間–叫做acquiretime。當(dāng)另一個(gè) groutine 解鎖,至少存在一個(gè) goroutine 在等待獲得鎖,則其中一個(gè)解除阻塞并調(diào)用其mutexevent函數(shù)。該mutexevent函數(shù)通過(guò)檢查SetMutexProfileFraction設(shè)置的速率來(lái)決定此事件應(yīng)被保留還是丟棄。此事件包含整個(gè)等待的時(shí)間(當(dāng)前時(shí)間 - 獲得時(shí)間)。從上面的例子可以看出,所有阻塞在特定互斥鎖的 goroutines 的總等待時(shí)間會(huì)被收集和展示。

在 Go 1.11(sync: enable profiling of RWMutex)中將增加讀鎖(Rlock 和 RUnlock)的爭(zhēng)用。

資料 Resources

轉(zhuǎn)載:姜姜和張張
原文地址
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容