16.并發(fā)

并發(fā)是編程里面一個(gè)非常重要的概念,Go語(yǔ)言在語(yǔ)言層面天生支持并發(fā),這也是Go語(yǔ)言流行的一個(gè)很重要的原因。

Go語(yǔ)言中的并發(fā)編程

并發(fā)與并行

并發(fā):同一時(shí)間段內(nèi)執(zhí)行多個(gè)任務(wù)(你在用微信和兩個(gè)女朋友聊天)。

并行:同一時(shí)刻執(zhí)行多個(gè)任務(wù)(你和你朋友都在用微信和女朋友聊天)。

Go語(yǔ)言的并發(fā)通過(guò)goroutine實(shí)現(xiàn)。goroutine類似于線程,屬于用戶態(tài)的線程,我們可以根據(jù)需要?jiǎng)?chuàng)建成千上萬(wàn)個(gè)goroutine并發(fā)工作。goroutine是由Go語(yǔ)言的運(yùn)行時(shí)(runtime)調(diào)度完成,而線程是由操作系統(tǒng)調(diào)度完成。

Go語(yǔ)言還提供channel在多個(gè)goroutine間進(jìn)行通信。goroutinechannel是 Go 語(yǔ)言秉承的 CSP(Communicating Sequential Process)并發(fā)模式的重要實(shí)現(xiàn)基礎(chǔ)。

goroutine

在java/c++中我們要實(shí)現(xiàn)并發(fā)編程的時(shí)候,我們通常需要自己維護(hù)一個(gè)線程池,并且需要自己去包裝一個(gè)又一個(gè)的任務(wù),同時(shí)需要自己去調(diào)度線程執(zhí)行任務(wù)并維護(hù)上下文切換,這一切通常會(huì)耗費(fèi)程序員大量的心智。那么能不能有一種機(jī)制,程序員只需要定義很多個(gè)任務(wù),讓系統(tǒng)去幫助我們把這些任務(wù)分配到CPU上實(shí)現(xiàn)并發(fā)執(zhí)行呢?

Go語(yǔ)言中的goroutine就是這樣一種機(jī)制,goroutine的概念類似于線程,但 goroutine是由Go的運(yùn)行時(shí)(runtime)調(diào)度和管理的。Go程序會(huì)智能地將 goroutine 中的任務(wù)合理地分配給每個(gè)CPU。Go語(yǔ)言之所以被稱為現(xiàn)代化的編程語(yǔ)言,就是因?yàn)樗谡Z(yǔ)言層面已經(jīng)內(nèi)置了調(diào)度和上下文切換的機(jī)制。

在Go語(yǔ)言編程中你不需要去自己寫進(jìn)程、線程、協(xié)程,你的技能包里只有一個(gè)技能–goroutine,當(dāng)你需要讓某個(gè)任務(wù)并發(fā)執(zhí)行的時(shí)候,你只需要把這個(gè)任務(wù)包裝成一個(gè)函數(shù),開啟一個(gè)goroutine去執(zhí)行這個(gè)函數(shù)就可以了,就是這么簡(jiǎn)單粗暴。

使用goroutine

Go語(yǔ)言中使用goroutine非常簡(jiǎn)單,只需要在調(diào)用函數(shù)的時(shí)候在前面加上go關(guān)鍵字,就可以為一個(gè)函數(shù)創(chuàng)建一個(gè)goroutine。

一個(gè)goroutine必定對(duì)應(yīng)一個(gè)函數(shù),可以創(chuàng)建多個(gè)goroutine去執(zhí)行相同的函數(shù)。

啟動(dòng)單個(gè)goroutine

啟動(dòng)goroutine的方式非常簡(jiǎn)單,只需要在調(diào)用的函數(shù)(普通函數(shù)和匿名函數(shù))前面加上一個(gè)go關(guān)鍵字。

舉個(gè)例子如下:

func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

這個(gè)示例中hello函數(shù)和下面的語(yǔ)句是串行的,執(zhí)行的結(jié)果是打印完Hello Goroutine!后打印main goroutine done!。

接下來(lái)我們?cè)谡{(diào)用hello函數(shù)前面加上關(guān)鍵字go,也就是啟動(dòng)一個(gè)goroutine去執(zhí)行hello這個(gè)函數(shù)。

func main() {
    go hello() // 啟動(dòng)另外一個(gè)goroutine去執(zhí)行hello函數(shù)
    fmt.Println("main goroutine done!")
}

這一次的執(zhí)行結(jié)果只打印了main goroutine done!,并沒有打印Hello Goroutine!。為什么呢?

在程序啟動(dòng)時(shí),Go程序就會(huì)為main()函數(shù)創(chuàng)建一個(gè)默認(rèn)的goroutine。

當(dāng)main()函數(shù)返回的時(shí)候該goroutine就結(jié)束了,所有在main()函數(shù)中啟動(dòng)的goroutine會(huì)一同結(jié)束,main函數(shù)所在的goroutine就像是權(quán)利的游戲中的夜王,其他的goroutine都是異鬼,夜王一死它轉(zhuǎn)化的那些異鬼也就全部GG了。

所以我們要想辦法讓main函數(shù)等一等hello函數(shù),最簡(jiǎn)單粗暴的方式就是time.Sleep了。

func main() {
    go hello() // 啟動(dòng)另外一個(gè)goroutine去執(zhí)行hello函數(shù)
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

執(zhí)行上面的代碼你會(huì)發(fā)現(xiàn),這一次先打印main goroutine done!,然后緊接著打印Hello Goroutine!

首先為什么會(huì)先打印main goroutine done!是因?yàn)槲覀冊(cè)趧?chuàng)建新的goroutine的時(shí)候需要花費(fèi)一些時(shí)間,而此時(shí)main函數(shù)所在的goroutine是繼續(xù)執(zhí)行的。

  • goroutine什么時(shí)候結(jié)束?

  • goroutine對(duì)應(yīng)的函數(shù)結(jié)束了,goroutine結(jié)束了。

啟動(dòng)多個(gè)goroutine

在Go語(yǔ)言中實(shí)現(xiàn)并發(fā)就是這樣簡(jiǎn)單,我們還可以啟動(dòng)多個(gè)goroutine。讓我們?cè)賮?lái)一個(gè)例子: (這里使用了sync.WaitGroup來(lái)實(shí)現(xiàn)goroutine的同步)

var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine結(jié)束就登記-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 啟動(dòng)一個(gè)goroutine就登記+1
        go hello(i)
    }
    wg.Wait() // 等待所有登記的goroutine都結(jié)束
}

多次執(zhí)行上面的代碼,會(huì)發(fā)現(xiàn)每次打印的數(shù)字的順序都不一致。這是因?yàn)?0個(gè)goroutine是并發(fā)執(zhí)行的,而goroutine的調(diào)度是隨機(jī)的。

goroutine與線程

可增長(zhǎng)的棧

OS線程(操作系統(tǒng)線程)一般都有固定的棧內(nèi)存(通常為2MB),一個(gè)goroutine的棧在其生命周期開始時(shí)只有很小的棧(典型情況下2KB),goroutine的棧不是固定的,他可以按需增大和縮小,goroutine的棧大小限制可以達(dá)到1GB,雖然極少會(huì)用到這個(gè)大。所以在Go語(yǔ)言中一次創(chuàng)建十萬(wàn)左右的goroutine也是可以的。

goroutine調(diào)度

GPM是Go語(yǔ)言運(yùn)行時(shí)(runtime)層面的實(shí)現(xiàn),是go語(yǔ)言自己實(shí)現(xiàn)的一套調(diào)度系統(tǒng)。區(qū)別于操作系統(tǒng)調(diào)度OS線程。

  • G很好理解,就是個(gè)goroutine的,里面除了存放本goroutine信息外 還有與所在P的綁定等信息。
  • P管理著一組goroutine隊(duì)列,P里面會(huì)存儲(chǔ)當(dāng)前goroutine運(yùn)行的上下文環(huán)境(函數(shù)指針,堆棧地址及地址邊界),P會(huì)對(duì)自己管理的goroutine隊(duì)列做一些調(diào)度(比如把占用CPU時(shí)間較長(zhǎng)的goroutine暫停、運(yùn)行后續(xù)的goroutine等等)當(dāng)自己的隊(duì)列消費(fèi)完了就去全局隊(duì)列里取,如果全局隊(duì)列里也消費(fèi)完了會(huì)去其他P的隊(duì)列里搶任務(wù)。
  • M(machine)是Go運(yùn)行時(shí)(runtime)對(duì)操作系統(tǒng)內(nèi)核線程的虛擬, M與內(nèi)核線程一般是一一映射的關(guān)系, 一個(gè)groutine最終是要放到M上執(zhí)行的;

P與M一般也是一一對(duì)應(yīng)的。他們關(guān)系是: P管理著一組G掛載在M上運(yùn)行。當(dāng)一個(gè)G長(zhǎng)久阻塞在一個(gè)M上時(shí),runtime會(huì)新建一個(gè)M,阻塞G所在的P會(huì)把其他的G 掛載在新建的M上。當(dāng)舊的G阻塞完成或者認(rèn)為其已經(jīng)死掉時(shí) 回收舊的M。

P的個(gè)數(shù)是通過(guò)runtime.GOMAXPROCS設(shè)定(最大256),Go1.5版本之后默認(rèn)為物理線程數(shù)。 在并發(fā)量大的時(shí)候會(huì)增加一些P和M,但不會(huì)太多,切換太頻繁的話得不償失。

單從線程調(diào)度講,Go語(yǔ)言相比起其他語(yǔ)言的優(yōu)勢(shì)在于OS線程是由OS內(nèi)核來(lái)調(diào)度的,goroutine則是由Go運(yùn)行時(shí)(runtime)自己的調(diào)度器調(diào)度的,這個(gè)調(diào)度器使用一個(gè)稱為m:n調(diào)度的技術(shù)(復(fù)用/調(diào)度m個(gè)goroutine到n個(gè)OS線程)。 其一大特點(diǎn)是goroutine的調(diào)度是在用戶態(tài)下完成的, 不涉及內(nèi)核態(tài)與用戶態(tài)之間的頻繁切換,包括內(nèi)存的分配與釋放,都是在用戶態(tài)維護(hù)著一塊大的內(nèi)存池, 不直接調(diào)用系統(tǒng)的malloc函數(shù)(除非內(nèi)存池需要改變),成本比調(diào)度OS線程低很多。 另一方面充分利用了多核的硬件資源,近似的把若干goroutine均分在物理線程上, 再加上本身goroutine的超輕量,以上種種保證了go調(diào)度方面的性能。

點(diǎn)我了解更多

GOMAXPROCS

Go運(yùn)行時(shí)的調(diào)度器使用GOMAXPROCS參數(shù)來(lái)確定需要使用多少個(gè)OS線程來(lái)同時(shí)執(zhí)行Go代碼。默認(rèn)值是機(jī)器上的CPU核心數(shù)。例如在一個(gè)8核心的機(jī)器上,調(diào)度器會(huì)把Go代碼同時(shí)調(diào)度到8個(gè)OS線程上(GOMAXPROCS是m:n調(diào)度中的n)。

Go語(yǔ)言中可以通過(guò)runtime.GOMAXPROCS()函數(shù)設(shè)置當(dāng)前程序并發(fā)時(shí)占用的CPU邏輯核心數(shù)。

Go1.5版本之前,默認(rèn)使用的是單核心執(zhí)行。Go1.5版本之后,默認(rèn)使用全部的CPU邏輯核心數(shù)。

我們可以通過(guò)將任務(wù)分配到不同的CPU邏輯核心上實(shí)現(xiàn)并行的效果,這里舉個(gè)例子:

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

兩個(gè)任務(wù)只有一個(gè)邏輯核心,此時(shí)是做完一個(gè)任務(wù)再做另一個(gè)任務(wù)。 將邏輯核心數(shù)設(shè)為2,此時(shí)兩個(gè)任務(wù)并行執(zhí)行,代碼如下。

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

Go語(yǔ)言中的操作系統(tǒng)線程和goroutine的關(guān)系:

  1. 一個(gè)操作系統(tǒng)線程對(duì)應(yīng)用戶態(tài)多個(gè)goroutine。
  2. go程序可以同時(shí)使用多個(gè)操作系統(tǒng)線程。
  3. goroutine和OS線程是多對(duì)多的關(guān)系,即m:n。

channel

單純地將函數(shù)并發(fā)執(zhí)行是沒有意義的。函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義。

雖然可以使用共享內(nèi)存進(jìn)行數(shù)據(jù)交換,但是共享內(nèi)存在不同的goroutine中容易發(fā)生競(jìng)態(tài)問題。為了保證數(shù)據(jù)交換的正確性,必須使用互斥量對(duì)內(nèi)存進(jìn)行加鎖,這種做法勢(shì)必造成性能問題。

Go語(yǔ)言的并發(fā)模型是CSP(Communicating Sequential Processes),提倡通過(guò)通信共享內(nèi)存而不是通過(guò)共享內(nèi)存而實(shí)現(xiàn)通信

如果說(shuō)goroutine是Go程序并發(fā)的執(zhí)行體,channel就是它們之間的連接。channel是可以讓一個(gè)goroutine發(fā)送特定值到另一個(gè)goroutine的通信機(jī)制。

Go 語(yǔ)言中的通道(channel)是一種特殊的類型。通道像一個(gè)傳送帶或者隊(duì)列,總是遵循先入先出(First In First Out)的規(guī)則,保證收發(fā)數(shù)據(jù)的順序。每一個(gè)通道都是一個(gè)具體類型的導(dǎo)管,也就是聲明channel的時(shí)候需要為其指定元素類型。

channel類型

channel是一種類型,一種引用類型。聲明通道類型的格式如下:

var 變量 chan 元素類型

舉幾個(gè)例子:

var ch1 chan int   // 聲明一個(gè)傳遞整型的通道
var ch2 chan bool  // 聲明一個(gè)傳遞布爾型的通道
var ch3 chan []int // 聲明一個(gè)傳遞int切片的通道

創(chuàng)建channel

通道是引用類型,通道類型的空值是nil。

var ch chan int
fmt.Println(ch) // <nil>

聲明的通道后需要使用make函數(shù)初始化之后才能使用。

創(chuàng)建channel的格式如下:

make(chan 元素類型, [緩沖大小])

channel的緩沖大小是可選的。

舉幾個(gè)例子:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

channel操作

通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作。

發(fā)送和接收都使用<-符號(hào)。

現(xiàn)在我們先使用以下語(yǔ)句定義一個(gè)通道:

ch := make(chan int)

發(fā)送

將一個(gè)值發(fā)送到通道中。

ch <- 10 // 把10發(fā)送到ch中

接收

從一個(gè)通道中接收值。

x := <- ch // 從ch中接收值并賦值給變量x
<-ch       // 從ch中接收值,忽略結(jié)果

關(guān)閉

我們通過(guò)調(diào)用內(nèi)置的close函數(shù)來(lái)關(guān)閉通道。

close(ch)

關(guān)于關(guān)閉通道需要注意的事情是,只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時(shí)候才需要關(guān)閉通道。通道是可以被垃圾回收機(jī)制回收的,它和關(guān)閉文件是不一樣的,在結(jié)束操作之后關(guān)閉文件是必須要做的,但關(guān)閉通道不是必須的。

關(guān)閉后的通道有以下特點(diǎn):

  1. 對(duì)一個(gè)關(guān)閉的通道再發(fā)送值就會(huì)導(dǎo)致panic。
  2. 對(duì)一個(gè)關(guān)閉的通道進(jìn)行接收會(huì)一直獲取值直到通道為空。
  3. 對(duì)一個(gè)關(guān)閉的并且沒有值的通道執(zhí)行接收操作會(huì)得到對(duì)應(yīng)類型的零值。
  4. 關(guān)閉一個(gè)已經(jīng)關(guān)閉的通道會(huì)導(dǎo)致panic。

無(wú)緩沖的通道

無(wú)緩沖的通道又稱為阻塞的通道。我們來(lái)看一下下面的代碼:

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("發(fā)送成功")
}

上面這段代碼能夠通過(guò)編譯,但是執(zhí)行的時(shí)候會(huì)出現(xiàn)以下錯(cuò)誤:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

為什么會(huì)出現(xiàn)deadlock錯(cuò)誤呢?

因?yàn)槲覀兪褂?code>ch := make(chan int)創(chuàng)建的是無(wú)緩沖的通道,無(wú)緩沖的通道只有在有人接收值的時(shí)候才能發(fā)送值。就像你住的小區(qū)沒有快遞柜和代收點(diǎn),快遞員給你打電話必須要把這個(gè)物品送到你的手中,簡(jiǎn)單來(lái)說(shuō)就是無(wú)緩沖的通道必須有接收才能發(fā)送。

上面的代碼會(huì)阻塞在ch <- 10這一行代碼形成死鎖,那如何解決這個(gè)問題呢?

一種方法是啟用一個(gè)goroutine去接收值,例如:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 啟用goroutine從通道接收值
    ch <- 10
    fmt.Println("發(fā)送成功")
}

無(wú)緩沖通道上的發(fā)送操作會(huì)阻塞,直到另一個(gè)goroutine在該通道上執(zhí)行接收操作,這時(shí)值才能發(fā)送成功,兩個(gè)goroutine將繼續(xù)執(zhí)行。相反,如果接收操作先執(zhí)行,接收方的goroutine將阻塞,直到另一個(gè)goroutine在該通道上發(fā)送一個(gè)值。

使用無(wú)緩沖通道進(jìn)行通信將導(dǎo)致發(fā)送和接收的goroutine同步化。因此,無(wú)緩沖通道也被稱為同步通道

有緩沖的通道

解決上面問題的方法還有一種就是使用有緩沖區(qū)的通道。我們可以在使用make函數(shù)初始化通道的時(shí)候?yàn)槠渲付ㄍǖ赖娜萘浚纾?/p>

func main() {
    ch := make(chan int, 1) // 創(chuàng)建一個(gè)容量為1的有緩沖區(qū)通道
    ch <- 10
    fmt.Println("發(fā)送成功")
}

只要通道的容量大于零,那么該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數(shù)量。就像你小區(qū)的快遞柜只有那么個(gè)多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個(gè)快遞員就能往里面放一個(gè)。

我們可以使用內(nèi)置的len函數(shù)獲取通道內(nèi)元素的數(shù)量,使用cap函數(shù)獲取通道的容量,雖然我們很少會(huì)這么做。

for range從通道循環(huán)取值

當(dāng)向通道中發(fā)送完數(shù)據(jù)時(shí),我們可以通過(guò)close函數(shù)來(lái)關(guān)閉通道。

當(dāng)通道被關(guān)閉時(shí),再往該通道發(fā)送值會(huì)引發(fā)panic,從該通道取值的操作會(huì)先取完通道中的值,再然后取到的值一直都是對(duì)應(yīng)類型的零值。那如何判斷一個(gè)通道是否被關(guān)閉了呢?

我們來(lái)看下面這個(gè)例子:

// channel 練習(xí)
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 開啟goroutine將0~100的數(shù)發(fā)送到ch1中
    go func() {
        for i := 0; i < 100; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    // 開啟goroutine從ch1中接收值,并將該值的平方發(fā)送到ch2中
    go func() {
        for {
            i, ok := <-ch1 // 通道關(guān)閉后再取值ok=false
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中從ch2中接收值打印
    for i := range ch2 { // 通道關(guān)閉后會(huì)退出for range循環(huán)
        fmt.Println(i)
    }
}

從上面的例子中我們看到有兩種方式在接收值的時(shí)候判斷該通道是否被關(guān)閉,不過(guò)我們通常使用的是for range的方式。使用for range遍歷通道,當(dāng)通道被關(guān)閉的時(shí)候就會(huì)退出for range。

單向通道

有的時(shí)候我們會(huì)將通道作為參數(shù)在多個(gè)任務(wù)函數(shù)間傳遞,很多時(shí)候我們?cè)诓煌娜蝿?wù)函數(shù)中使用通道都會(huì)對(duì)其進(jìn)行限制,比如限制通道在函數(shù)中只能發(fā)送或只能接收。

Go語(yǔ)言中提供了單向通道來(lái)處理這種情況。例如,我們把上面的例子改造如下:

func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,

  • chan<- int是一個(gè)只寫單向通道(只能對(duì)其寫入int類型值),可以對(duì)其執(zhí)行發(fā)送操作但是不能執(zhí)行接收操作;
  • <-chan int是一個(gè)只讀單向通道(只能從其讀取int類型值),可以對(duì)其執(zhí)行接收操作但是不能執(zhí)行發(fā)送操作。

在函數(shù)傳參及任何賦值操作中可以將雙向通道轉(zhuǎn)換為單向通道,但反過(guò)來(lái)是不可以的。

通道總結(jié)

channel常見的異??偨Y(jié),如下圖:

channel01.png

關(guān)閉已經(jīng)關(guān)閉的channel也會(huì)引發(fā)panic。

worker pool(goroutine池)

在工作中我們通常會(huì)使用可以指定啟動(dòng)的goroutine數(shù)量–worker pool模式,控制goroutine的數(shù)量,防止goroutine泄漏和暴漲。

一個(gè)簡(jiǎn)易的work pool示例代碼如下:

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("worker:%d start job:%d\n", id, j)
        time.Sleep(time.Second)
        fmt.Printf("worker:%d end job:%d\n", id, j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    // 開啟3個(gè)goroutine
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    // 5個(gè)任務(wù)
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    // 輸出結(jié)果
    for a := 1; a <= 5; a++ {
        <-results
    }
}

取值,沒有值,接收值,滿了,channel中如果沒有位置,則阻塞。
通道被關(guān)閉的時(shí)候,取值為0,false。

select多路復(fù)用

在某些場(chǎng)景下我們需要同時(shí)從多個(gè)通道接收數(shù)據(jù)。通道在接收數(shù)據(jù)時(shí),如果沒有數(shù)據(jù)可以接收將會(huì)發(fā)生阻塞。你也許會(huì)寫出如下代碼使用遍歷的方式來(lái)實(shí)現(xiàn):

for{
    // 嘗試從ch1接收值
    data, ok := <-ch1
    // 嘗試從ch2接收值
    data, ok := <-ch2
    …
}

這種方式雖然可以實(shí)現(xiàn)從多個(gè)通道接收值的需求,但是運(yùn)行性能會(huì)差很多。為了應(yīng)對(duì)這種場(chǎng)景,Go內(nèi)置了select關(guān)鍵字,可以同時(shí)響應(yīng)多個(gè)通道的操作。

select的使用類似于switch語(yǔ)句,它有一系列case分支和一個(gè)默認(rèn)的分支。每個(gè)case會(huì)對(duì)應(yīng)一個(gè)通道的通信(接收或發(fā)送)過(guò)程。select會(huì)一直等待,直到某個(gè)case的通信操作完成時(shí),就會(huì)執(zhí)行case分支對(duì)應(yīng)的語(yǔ)句。具體格式如下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默認(rèn)操作
}

舉個(gè)小例子來(lái)演示下select的使用:

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
        }
    }
}

使用select語(yǔ)句能提高代碼的可讀性。

  • 可處理一個(gè)或多個(gè)channel的發(fā)送/接收操作。
  • 如果多個(gè)case同時(shí)滿足,select會(huì)隨機(jī)選擇一個(gè)。
  • 對(duì)于沒有caseselect{}會(huì)一直等待,可用于阻塞main函數(shù)。

并發(fā)安全和鎖

有時(shí)候在Go代碼中可能會(huì)存在多個(gè)goroutine同時(shí)操作一個(gè)資源(臨界區(qū)),這種情況會(huì)發(fā)生競(jìng)態(tài)問題(數(shù)據(jù)競(jìng)態(tài))。類比現(xiàn)實(shí)生活中的例子有十字路口被各個(gè)方向的的汽車競(jìng)爭(zhēng);還有火車上的衛(wèi)生間被車廂里的人競(jìng)爭(zhēng)。

舉個(gè)例子:

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i < 5000; i++ {
        x = x + 1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代碼中我們開啟了兩個(gè)goroutine去累加變量x的值,這兩個(gè)goroutine在訪問和修改x變量的時(shí)候就會(huì)存在數(shù)據(jù)競(jìng)爭(zhēng),導(dǎo)致最后的結(jié)果與期待的不符。

互斥鎖

sync.Mutex:是一個(gè)結(jié)構(gòu)體,在傳參是一定要傳指針。
互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時(shí)只有一個(gè)goroutine可以訪問共享資源。Go語(yǔ)言中使用sync包的Mutex類型來(lái)實(shí)現(xiàn)互斥鎖。 使用互斥鎖來(lái)修復(fù)上面代碼的問題:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加鎖
        x = x + 1
        lock.Unlock() // 解鎖
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥鎖能夠保證同一時(shí)間有且只有一個(gè)goroutine進(jìn)入臨界區(qū),其他的goroutine則在等待鎖;當(dāng)互斥鎖釋放后,等待的goroutine才可以獲取鎖進(jìn)入臨界區(qū),多個(gè)goroutine同時(shí)等待一個(gè)鎖時(shí),喚醒的策略是隨機(jī)的。

讀寫互斥鎖

互斥鎖是完全互斥的,但是有很多實(shí)際的場(chǎng)景下是讀多寫少的,當(dāng)我們并發(fā)的去讀取一個(gè)資源不涉及資源修改的時(shí)候是沒有必要加鎖的,這種場(chǎng)景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語(yǔ)言中使用sync包中的RWMutex類型。

讀寫鎖分為兩種:讀鎖和寫鎖。當(dāng)一個(gè)goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會(huì)繼續(xù)獲得鎖,如果是獲取寫鎖就會(huì)等待;當(dāng)一個(gè)goroutine獲取寫鎖之后,其他的goroutine無(wú)論是獲取讀鎖還是寫鎖都會(huì)等待。

讀寫鎖示例:

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥鎖
    rwlock.Lock() // 加寫鎖
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假設(shè)讀操作耗時(shí)10毫秒
    rwlock.Unlock()                   // 解寫鎖
    // lock.Unlock()                     // 解互斥鎖
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥鎖
    rwlock.RLock()               // 加讀鎖
    time.Sleep(time.Millisecond) // 假設(shè)讀操作耗時(shí)1毫秒
    rwlock.RUnlock()             // 解讀鎖
    // lock.Unlock()                // 解互斥鎖
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

需要注意的是讀寫鎖非常適合讀多寫少的場(chǎng)景,如果讀和寫的操作差別不大,讀寫鎖的優(yōu)勢(shì)就發(fā)揮不出來(lái)。

sync.WaitGroup

在代碼中生硬的使用time.Sleep肯定是不合適的,Go語(yǔ)言中可以使用sync.WaitGroup來(lái)實(shí)現(xiàn)并發(fā)任務(wù)的同步。 sync.WaitGroup有以下幾個(gè)方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 計(jì)數(shù)器+delta
(wg *WaitGroup) Done() 計(jì)數(shù)器-1
(wg *WaitGroup) Wait() 阻塞直到計(jì)數(shù)器變?yōu)?

sync.WaitGroup內(nèi)部維護(hù)著一個(gè)計(jì)數(shù)器,計(jì)數(shù)器的值可以增加和減少。例如當(dāng)我們啟動(dòng)了N 個(gè)并發(fā)任務(wù)時(shí),就將計(jì)數(shù)器值增加N。每個(gè)任務(wù)完成時(shí)通過(guò)調(diào)用Done()方法將計(jì)數(shù)器減1。通過(guò)調(diào)用Wait()來(lái)等待并發(fā)任務(wù)執(zhí)行完,當(dāng)計(jì)數(shù)器值為0時(shí),表示所有并發(fā)任務(wù)已經(jīng)完成。

我們利用sync.WaitGroup將上面的代碼優(yōu)化一下:

var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello Goroutine!")
}
func main() {
    wg.Add(1)
    go hello() // 啟動(dòng)另外一個(gè)goroutine去執(zhí)行hello函數(shù)
    fmt.Println("main goroutine done!")
    wg.Wait()
}

需要注意sync.WaitGroup是一個(gè)結(jié)構(gòu)體,傳遞的時(shí)候要傳遞指針。

sync.Once

說(shuō)在前面的話:這是一個(gè)進(jìn)階知識(shí)點(diǎn)。

在編程的很多場(chǎng)景下我們需要確保某些操作在高并發(fā)的場(chǎng)景下只執(zhí)行一次,例如只加載一次配置文件、只關(guān)閉一次通道等。

Go語(yǔ)言中的sync包中提供了一個(gè)針對(duì)只執(zhí)行一次場(chǎng)景的解決方案–sync.Once。

sync.Once只有一個(gè)Do方法,其簽名如下:

func (o *Once) Do(f func()) {}

備注:如果要執(zhí)行的函數(shù)f需要傳遞參數(shù)就需要搭配閉包來(lái)使用。

加載配置文件示例

延遲一個(gè)開銷很大的初始化操作到真正用到它的時(shí)候再執(zhí)行是一個(gè)很好的實(shí)踐。因?yàn)轭A(yù)先初始化一個(gè)變量(比如在init函數(shù)中完成初始化)會(huì)增加程序的啟動(dòng)耗時(shí),而且有可能實(shí)際執(zhí)行過(guò)程中這個(gè)變量沒有用上,那么這個(gè)初始化操作就不是必須要做的。我們來(lái)看一個(gè)例子:

var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 被多個(gè)goroutine調(diào)用時(shí)不是并發(fā)安全的
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

多個(gè)goroutine并發(fā)調(diào)用Icon函數(shù)時(shí)不是并發(fā)安全的,現(xiàn)代的編譯器和CPU可能會(huì)在保證每個(gè)goroutine都滿足串行一致的基礎(chǔ)上自由地重排訪問內(nèi)存的順序。loadIcons函數(shù)可能會(huì)被重排為以下結(jié)果:

func loadIcons() {
    icons = make(map[string]image.Image)
    icons["left"] = loadIcon("left.png")
    icons["up"] = loadIcon("up.png")
    icons["right"] = loadIcon("right.png")
    icons["down"] = loadIcon("down.png")
}

在這種情況下就會(huì)出現(xiàn)即使判斷了icons不是nil也不意味著變量初始化完成了??紤]到這種情況,我們能想到的辦法就是添加互斥鎖,保證初始化icons的時(shí)候不會(huì)被其他的goroutine操作,但是這樣做又會(huì)引發(fā)性能問題。

使用sync.Once改造的示例代碼如下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 是并發(fā)安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

并發(fā)安全的單例模式

下面是借助sync.Once實(shí)現(xiàn)的并發(fā)安全的單例模式:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Once其實(shí)內(nèi)部包含一個(gè)互斥鎖和一個(gè)布爾值,互斥鎖保證布爾值和數(shù)據(jù)的安全,而布爾值用來(lái)記錄初始化是否完成。這樣設(shè)計(jì)就能保證初始化操作的時(shí)候是并發(fā)安全的并且初始化操作也不會(huì)被執(zhí)行多次。

sync.Map

Go語(yǔ)言中內(nèi)置的map不是并發(fā)安全的。請(qǐng)看下面的示例:

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}

func set(key string, value int) {
    m[key] = value
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            set(key, n)
            fmt.Printf("k=:%v,v:=%v\n", key, get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

上面的代碼開啟少量幾個(gè)goroutine的時(shí)候可能沒什么問題,當(dāng)并發(fā)多了之后執(zhí)行上面的代碼就會(huì)報(bào)fatal error: concurrent map writes錯(cuò)誤。

像這種場(chǎng)景下就需要為map加鎖來(lái)保證并發(fā)的安全性了,Go語(yǔ)言的sync包中提供了一個(gè)開箱即用的并發(fā)安全版map–sync.Map。開箱即用表示不用像內(nèi)置的map一樣使用make函數(shù)初始化就能直接使用。同時(shí)sync.Map內(nèi)置了諸如StoreLoad、LoadOrStoreDelete、Range等操作方法。

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

原子操作

代碼中的加鎖操作因?yàn)樯婕皟?nèi)核態(tài)的上下文切換會(huì)比較耗時(shí)、代價(jià)比較高。針對(duì)基本數(shù)據(jù)類型我們還可以使用原子操作來(lái)保證并發(fā)安全,因?yàn)樵硬僮魇荊o語(yǔ)言提供的方法它在用戶態(tài)就可以完成,因此性能比加鎖操作更好。Go語(yǔ)言中原子操作由內(nèi)置的標(biāo)準(zhǔn)庫(kù)sync/atomic提供。

atomic包

方法 解釋
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) 讀取操作
方法 解釋
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 寫入操作
方法 解釋
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
方法 解釋
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交換操作
方法 解釋
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比較并交換操作

示例

我們填寫一個(gè)示例來(lái)比較下互斥鎖和原子操作的性能。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type Counter interface {
    Inc()
    Load() int64
}

// 普通版
type CommonCounter struct {
    counter int64
}

func (c CommonCounter) Inc() {
    c.counter++
}

func (c CommonCounter) Load() int64 {
    return c.counter
}

// 互斥鎖版
type MutexCounter struct {
    counter int64
    lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
    m.lock.Lock()
    defer m.lock.Unlock()
    m.counter++
}

func (m *MutexCounter) Load() int64 {
    m.lock.Lock()
    defer m.lock.Unlock()
    return m.counter
}

// 原子操作版
type AtomicCounter struct {
    counter int64
}

func (a *AtomicCounter) Inc() {
    atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
    var wg sync.WaitGroup
    start := time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            c.Inc()
            wg.Done()
        }()
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(c.Load(), end.Sub(start))
}

func main() {
    c1 := CommonCounter{} // 非并發(fā)安全
    test(c1)
    c2 := MutexCounter{} // 使用互斥鎖實(shí)現(xiàn)并發(fā)安全
    test(&c2)
    c3 := AtomicCounter{} // 并發(fā)安全且比互斥鎖效率更高
    test(&c3)
}

atomic包提供了底層的原子級(jí)內(nèi)存操作,對(duì)于同步算法的實(shí)現(xiàn)很有用。這些函數(shù)必須謹(jǐn)慎地保證正確使用。除了某些特殊的底層應(yīng)用,使用通道或者sync包的函數(shù)/類型實(shí)現(xiàn)同步更好。

練習(xí)題

  1. 使用goroutinechannel實(shí)現(xiàn)一個(gè)計(jì)算int64隨機(jī)數(shù)各位數(shù)和的程序。
    1. 開啟一個(gè)goroutine循環(huán)生成int64類型的隨機(jī)數(shù),發(fā)送到jobChan
    2. 開啟24個(gè)goroutinejobChan中取出隨機(jī)數(shù)計(jì)算各位數(shù)的和,將結(jié)果發(fā)送到resultChan
    3. goroutineresultChan取出結(jié)果并打印到終端輸出
  2. 為了保證業(yè)務(wù)代碼的執(zhí)行性能將之前寫的日志庫(kù)改寫為異步記錄日志方式。


    image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容