Golang并發(fā):除了channel,你還有其他選擇

我們都知道Golang并發(fā)優(yōu)選channel,但channel不是萬能的,Golang為我們提供了另一種選擇:sync。通過這篇文章,你會了解sync包最基礎、最常用的方法,至于sync和channel之爭留給下一篇文章。

sync包提供了基礎的異步操作方法,比如互斥鎖(Mutex)、單次執(zhí)行(Once)和等待組(WaitGroup),這些異步操作主要是為低級庫提供,上層的異步/并發(fā)操作最好選用通道和通信。

sync包提供了:

  1. Mutex:互斥鎖
  2. RWMutex:讀寫鎖
  3. WaitGroup:等待組
  4. Once:單次執(zhí)行
  5. Cond:信號量
  6. Pool:臨時對象池
  7. Map:自帶鎖的map

這篇文章是sync包的入門文章,所以只介紹常用的結(jié)構(gòu)和方法:Mutex、RWMutexWaitGroup、Once,而Cond、PoolMap留給大家自行探索,或有需求再介紹。

互斥鎖

常做并發(fā)工作的朋友對互斥鎖應該不陌生,Golang里互斥鎖需要確保的是某段時間內(nèi),不能有多個協(xié)程同時訪問一段代碼(臨界區(qū))。

互斥鎖被稱為Mutex,它有2個函數(shù),Lock()Unlock()分別是獲取鎖和釋放鎖,如下:

type Mutex
func (m *Mutex) Lock(){}
func (m *Mutex) Unlock(){}

Mutex的初始值為未鎖的狀態(tài),并且Mutex通常作為結(jié)構(gòu)體的匿名成員存在

經(jīng)過了上面這么“官方”的介紹,舉個例子:你在工商銀行有100元存款,這張卡綁定了支付寶和微信,在中午12點你用支付寶支付外賣30元,你在微信發(fā)紅包,搶到10塊。銀行需要按順序執(zhí)行上面兩件事,先減30再加10或者先加10再減30,結(jié)果都是80,但如果同時執(zhí)行,結(jié)果可能是,只減了30或者只加了10,即你有70元或者你有110元。前一個結(jié)果是你賠了,后一個結(jié)果是銀行賠了,銀行可不希望把這種事算錯。

看看實際使用吧:創(chuàng)建一個銀行,銀行里存每個賬戶的錢,存儲查詢都加了鎖操作,這樣銀行就不會算錯賬了。
銀行的定義:

type Bank struct {
    sync.Mutex
    saving map[string]int // 每賬戶的存款金額
}

func NewBank() *Bank {
    b := &Bank{
        saving: make(map[string]int),
    }
    return b
}

銀行的存取錢:

// Deposit 存款
func (b *Bank) Deposit(name string, amount int) {
    b.Lock()
    defer b.Unlock()

    if _, ok := b.saving[name]; !ok {
        b.saving[name] = 0
    }
    b.saving[name] += amount
}

// Withdraw 取款,返回實際取到的金額
func (b *Bank) Withdraw(name string, amount int) int {
    b.Lock()
    defer b.Unlock()

    if _, ok := b.saving[name]; !ok {
        return 0
    }
    if b.saving[name] < amount {
        amount = b.saving[name]
    }
    b.saving[name] -= amount

    return amount
}

// Query 查詢余額
func (b *Bank) Query(name string) int {
    b.Lock()
    defer b.Unlock()

    if _, ok := b.saving[name]; !ok {
        return 0
    }

    return b.saving[name]
}

模擬操作:小米支付寶存了100,并且同時花了20。

func main() {
    b := NewBank()
    go b.Deposit("xiaoming", 100)
    go b.Withdraw("xiaoming", 20)
    go b.Deposit("xiaogang", 2000)

    time.Sleep(time.Second)
    fmt.Printf("xiaoming has: %d\n", b.Query("xiaoming"))
    fmt.Printf("xiaogang has: %d\n", b.Query("xiaogang"))
}

結(jié)果:先存后花。

?  sync_pkg git:(master) ? go run mutex.go
xiaoming has: 80
xiaogang has: 2000

也可能是:先花后存,因為先花20,因為小明沒錢,所以沒花出去。

?  sync_pkg git:(master) ? go run mutex.go
xiaoming has: 100
xiaogang has: 2000

這個例子只是介紹了mutex的基本使用,如果你想多研究下mutex,那就去我的Github(閱讀原文)下載下來代碼,自己修改測試。Github中還提供了沒有鎖的例子,運行多次總能碰到錯誤:

fatal error: concurrent map writes
這是由于并發(fā)訪問map造成的。

讀寫鎖

讀寫鎖是互斥鎖的特殊變種,如果是計算機基本知識扎實的朋友會知道,讀寫鎖來自于讀者和寫者的問題,這個問題就不介紹了,介紹下我們的重點:讀寫鎖要達到的效果是同一時間可以允許多個協(xié)程讀數(shù)據(jù),但只能有且只有1個協(xié)程寫數(shù)據(jù)。

也就是說,讀和寫是互斥的,寫和寫也是互斥的,但讀和讀并不互斥。具體講,當有至少1個協(xié)程讀時,如果需要進行寫,就必須等待所有已經(jīng)在讀的協(xié)程結(jié)束讀操作,寫操作的協(xié)程才獲得鎖進行寫數(shù)據(jù)。當寫數(shù)據(jù)的協(xié)程已經(jīng)在進行時,有其他協(xié)程需要進行讀或者寫,就必須等待已經(jīng)在寫的協(xié)程結(jié)束寫操作。

讀寫鎖是RWMutex,它有5個函數(shù),它需要為讀操作和寫操作分別提供鎖操作,這樣就4個了:

  • Lock()Unlock()是給寫操作用的。
  • RLock()RUnlock()是給讀操作用的。

RLocker()能獲取讀鎖,然后傳遞給其他協(xié)程使用。使用較少

type RWMutex
func (rw *RWMutex) Lock(){}
func (rw *RWMutex) RLock(){}
func (rw *RWMutex) RLocker() Locker{}
func (rw *RWMutex) RUnlock(){}
func (rw *RWMutex) Unlock(){}

上面的銀行實現(xiàn)不合理:大家都是拿手機APP查余額,可以同時幾個人一起查呀,這根本不影響,銀行的鎖可以換成讀寫鎖。存、取錢是寫操作,查詢金額是讀操作,代碼修改如下,其他不變:

type Bank struct {
    sync.RWMutex
    saving map[string]int // 每賬戶的存款金額
}

// Query 查詢余額
func (b *Bank) Query(name string) int {
    b.RLock()
    defer b.RUnlock()

    if _, ok := b.saving[name]; !ok {
        return 0
    }

    return b.saving[name]
}

func main() {
    b := NewBank()
    go b.Deposit("xiaoming", 100)
    go b.Withdraw("xiaoming", 20)
    go b.Deposit("xiaogang", 2000)

    time.Sleep(time.Second)
    print := func(name string) {
        fmt.Printf("%s has: %d\n", name, b.Query(name))
    }

    nameList := []string{"xiaoming", "xiaogang", "xiaohong", "xiaozhang"}
    for _, name := range nameList {
        go print(name)
    }

    time.Sleep(time.Second)
}

結(jié)果,可能不一樣,因為協(xié)程都是并發(fā)執(zhí)行的,執(zhí)行順序不固定

?  sync_pkg git:(master) ? go run rwmutex.go
xiaohong has: 0
xiaozhang has: 0
xiaogang has: 2000
xiaoming has: 100

等待組

互斥鎖和讀寫鎖大多數(shù)人可能比較熟悉,而對等待組(WaitGroup)可能就不那么熟悉,甚至有點陌生,所以先來介紹下等待組在現(xiàn)實中的例子。

你們團隊有5個人,你作為隊長要帶領大家打開藏有寶藏的箱子,但這個箱子需要4把鑰匙才能同時打開,你把尋找4把鑰匙的任務,分配給4個隊員,讓他們分別去尋找,而你則守著寶箱,在這等待,等他們都找到回來后,一起插進鑰匙打開寶箱。

這其中有個很重要的過程叫等待:等待一些工作完成后,再進行下一步的工作。如果使用Golang實現(xiàn),就得使用等待組。

等待組是WaitGroup,它有3個函數(shù):

  • Add():在被等待的協(xié)程啟動前加1,代表要等待1個協(xié)程。
  • Done():被等待的協(xié)程執(zhí)行Done,代表該協(xié)程已經(jīng)完成任務,通知等待協(xié)程。
  • Wait(): 等待其他協(xié)程的協(xié)程,使用Wait進行等待。
type WaitGroup
func (wg *WaitGroup) Add(delta int){}
func (wg *WaitGroup) Done(){}
func (wg *WaitGroup) Wait(){}

來,一起看下怎么用WaitGroup實現(xiàn)上面的問題。

隊長先創(chuàng)建一個WaitGroup對象wg,每個隊員都是1個協(xié)程, 隊長讓隊員出發(fā)前,使用wg.Add(),隊員出發(fā)尋找鑰匙,隊長使用wg.Wait()等待(阻塞)所有隊員完成,某個隊員完成時執(zhí)行wg.Done(),等所有隊員找到鑰匙,wg.Wait()則返回,完成了等待的過程,接下來就是開箱。

結(jié)合之前的協(xié)程池的例子,修改成WG等待協(xié)程池協(xié)程退出,實例代碼:

func leader() {
    var wg sync.WaitGroup
    wg.Add(4)
    for i := 0; i < 4; i++ {
        go follower(&wg, i)
    }
    wg.Wait()
    
    fmt.Println("open the box together")
}

func follower(wg *sync.WaitGroup, id int) {
    fmt.Printf("follwer %d find key\n", id)
    wg.Done()
}

結(jié)果:

?  sync_pkg git:(master) ? go run waitgroup.go
follwer 3 find key
follwer 1 find key
follwer 0 find key
follwer 2 find key
open the box together

WaitGroup也常用在協(xié)程池的處理上,協(xié)程池等待所有協(xié)程退出,把上篇文章《Golang并發(fā)模型:輕松入門協(xié)程池》的例子改下:

func workerPool(n int, jobCh <-chan int, retCh chan<- string) {
    var wg sync.WaitGroup
    wg.Add(n)
    for i := 0; i < n; i++ {
        go worker(&wg, i, jobCh, retCh)
    }

    wg.Wait()
    close(retCh)
}

func worker(wg *sync.WaitGroup, id int, jobCh <-chan int, retCh chan<- string) {
    cnt := 0
    for job := range jobCh {
        cnt++
        ret := fmt.Sprintf("worker %d processed job: %d, it's the %dth processed by me.", id, job, cnt)
        retCh <- ret
    }

    wg.Done()
}

單次執(zhí)行

在程序執(zhí)行前,通常需要做一些初始化操作,但觸發(fā)初始化操作的地方是有多處的,但是這個初始化又只能執(zhí)行1次,怎么辦呢?

使用Once就能輕松解決,once對象是用來存放1個無入?yún)o返回值的函數(shù),once可以確保這個函數(shù)只被執(zhí)行1次

type Once
func (o *Once) Do(f func()){}

直接把官方代碼給大家搬過來看下,once在10個協(xié)程中調(diào)用,但once中的函數(shù)onceBody()只執(zhí)行了1次:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    onceBody := func() {
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody)
            done <- true
        }()
    }
    for i := 0; i < 10; i++ {
        <-done
    }
}

結(jié)果:

?  sync_pkg git:(master) ? go run once.go
Only once

示例源碼

本文所有示例源碼,及歷史文章、代碼都存儲在Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/sync_pkg

下期預告

這次先介紹入門的知識,下次再介紹一些深入思考、最佳實踐,不能一口吃個胖子,咱們慢慢來,順序漸進。

下一篇我以這些主題進行介紹,歡迎關注:

  1. 哪個協(xié)程先獲取鎖
  2. 一定要用鎖嗎
  3. 鎖與通道的選擇

文章推薦

  1. Golang并發(fā)模型:輕松入門流水線模型
  2. Golang并發(fā)模型:輕松入門流水線FAN模式
  3. Golang并發(fā)模型:并發(fā)協(xié)程的優(yōu)雅退出
  4. Golang并發(fā)模型:輕松入門select
  5. Golang并發(fā)模型:select進階
  6. Golang并發(fā)模型:輕松入門協(xié)程池
  7. Golang并發(fā)的次優(yōu)選擇:sync包
  1. 如果這篇文章對你有幫助,請點個贊/喜歡,感謝。
  2. 本文作者:大彬
  3. 如果喜歡本文,隨意轉(zhuǎn)載,但請保留此原文鏈接:http://lessisbetter.site/2019/01/04/golang-pkg-sync/
一起學Golang-分享有料的Go語言技術(shù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容