控制并發(fā)有三種種經(jīng)典的方式,一種是通過channel通知實現(xiàn)并發(fā)控制 一種是WaitGroup,另外一種就是Context。
1. 使用最基本通過channel通知實現(xiàn)并發(fā)控制
無緩沖通道
無緩沖的通道指的是通道的大小為0,也就是說,這種類型的通道在接收前沒有能力保存任何值,它要求發(fā)送 goroutine 和接收 goroutine 同時準備好,才可以完成發(fā)送和接收操作。
從上面無緩沖的通道定義來看,發(fā)送 goroutine 和接收 gouroutine 必須是同步的,同時準備后,如果沒有同時準備好的話,先執(zhí)行的操作就會阻塞等待,直到另一個相對應(yīng)的操作準備好為止。這種無緩沖的通道我們也稱之為同步通道。
正式通過無緩沖通道來實現(xiàn)多 goroutine 并發(fā)控制
func main() {
ch := make(chan struct{})
go func() {
fmt.Println("do something..")
time.Sleep(time.Second * 1)
ch <- struct{}{}
}()
<-ch
fmt.Println("I am finished")
}
當主 goroutine 運行到 <-ch 接受 channel 的值的時候,如果該 channel 中沒有數(shù)據(jù),就會一直阻塞等待,直到有值。 這樣就可以簡單實現(xiàn)并發(fā)控制
2. 通過sync包中的WaitGroup實現(xiàn)并發(fā)控制
在 sync 包中,提供了 WaitGroup ,它會等待它收集的所有 goroutine 任務(wù)全部完成。在WaitGroup里主要有三個方法
- Add, 可以添加或減少 goroutine的數(shù)量
- Done, 相當于Add(-1)
- Wait, 執(zhí)行后會堵塞主線程,直到WaitGroup 里的值減至0
在主 goroutine 中 Add(delta int) 索要等待goroutine 的數(shù)量。在每一個 goroutine 完成后 Done() 表示這一個goroutine 已經(jīng)完成,當所有的 goroutine 都完成后,在主 goroutine 中 WaitGroup 返回返回。
func main(){
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
http.Get(url)
}(url)
}
wg.Wait()
}
但是在Golang官網(wǎng)中,有這么一句話
- A WaitGroup must not be copied after first use.
翻譯夠來過來就是,在 WaitGroup 第一次使用后,不能被拷貝,因為會出現(xiàn)一下問題
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
log.Printf("i:%d", i)
wg.Done()
}(wg, i)
}
wg.Wait()
log.Println("exit")
}
運行結(jié)果如下
2009/11/10 23:00:00 i:4
2009/11/10 23:00:00 i:0
2009/11/10 23:00:00 i:1
2009/11/10 23:00:00 i:2
2009/11/10 23:00:00 i:3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1040a13c, 0x44bc)
/usr/local/go/src/runtime/sema.go:47 +0x40
sync.(*WaitGroup).Wait(0x1040a130, 0x121460)
/usr/local/go/src/sync/waitgroup.go:131 +0x80
main.main()
/tmp/sandbox894380819/main.go:19 +0x120
它提示我所有的 goroutine 都已經(jīng)睡眠了,出現(xiàn)了死鎖。這是因為 wg 給拷貝傳遞到了 goroutine 中,導(dǎo)致只有 Add 操作,其實 Done操作是在 wg 的副本執(zhí)行的。因此 Wait 就死鎖了。
改正方法一:
將匿名函數(shù)中wg的傳入類型改為*sync.WaitGrou,這樣就能引用到正確的WaitGroup了。改正方法二:
將匿名函數(shù)中的wg的傳入?yún)?shù)去掉,因為Go支持閉包類型,在匿名函數(shù)中可以直接使用外面的wg變量
go 中五種引用類型有 slice, channel, function, map, interface
interface是Go語言中最成功的設(shè)計之一,空的interface可以被當作“鴨子”類型使用,它使得Go這樣的靜態(tài)語言擁有了一定的動態(tài)性,但卻又不損失靜態(tài)語言在類型安全方面擁有的編譯時檢查的優(yōu)勢。依賴于接口而不是實現(xiàn),優(yōu)先使用組合而不是繼承,這是程序抽象的基本原則。但是長久以來以C++為代表的“面向?qū)ο蟆闭Z言曲解了這些原則,讓人們走入了誤區(qū)。為什么要將方法和數(shù)據(jù)綁死?為什么要有多重繼承這么變態(tài)的設(shè)計?面向?qū)ο笾凶顝娬{(diào)的應(yīng)該是對象間的消息傳遞,卻為什么被演繹成了封裝繼承和多態(tài)。面向?qū)ο笫欠駥崿F(xiàn)程序程序抽象的合理途徑,又或者是因為它存在我們就認為它合理了。歷史原因,中間出現(xiàn)了太多的錯誤。不管怎么樣,Go的interface給我們打開了一扇新的窗。
3. 在Go 1.7 以后引進的強大的Context上下文,實現(xiàn)并發(fā)控制
3.1 簡介
在一些簡單場景下使用 channel 和 WaitGroup 已經(jīng)足夠了,但是當面臨一些復(fù)雜多變的網(wǎng)絡(luò)并發(fā)場景下 channel 和 WaitGroup 顯得有些力不從心了。比如一個網(wǎng)絡(luò)請求 Request,每個 Request 都需要開啟一個 goroutine 做一些事情,這些 goroutine 又可能會開啟其他的 goroutine,比如數(shù)據(jù)庫和RPC服務(wù)。所以我們需要一種可以跟蹤 goroutine 的方案,才可以達到控制他們的目的,這就是Go語言為我們提供的 Context,稱之為上下文非常貼切,它就是goroutine 的上下文。它是包括一個程序的運行環(huán)境、現(xiàn)場和快照等。每個程序要運行時,都需要知道當前程序的運行狀態(tài),通常Go 將這些封裝在一個 Context 里,再將它傳給要執(zhí)行的 goroutine 。
context 包主要是用來處理多個 goroutine 之間共享數(shù)據(jù),及多個 goroutine 的管理。
3.2 package context
context 包的核心是 struct Context,接口聲明如下:
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this `Context` is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this Context was canceled, after the Done channel
// is closed.
Err() error
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{}
}
-
Done()返回一個只能接受數(shù)據(jù)的channel類型,當該context關(guān)閉或者超時時間到了的時候,該channel就會有一個取消信號 -
Err()在Done()之后,返回context取消的原因。 -
Deadline()設(shè)置該context cancel的時間點 -
Value()方法允許Context對象攜帶request作用域的數(shù)據(jù),該數(shù)據(jù)必須是線程安全的。
Context 對象是線程安全的,你可以把一個 Context 對象傳遞給任意個數(shù)的 gorotuine,對它執(zhí)行 取消 操作時,所有 goroutine 都會接收到取消信號。
一個 Context 不能擁有 Cancel 方法,同時我們也只能 Done channel 接收數(shù)據(jù)。
背后的原因是一致的:接收取消信號的函數(shù)和發(fā)送信號的函數(shù)通常不是一個。
一個典型的場景是:父操作為子操作操作啟動 goroutine,子操作也就不能取消父操作。
3.3 繼承 context
context 包提供了一些函數(shù),協(xié)助用戶從現(xiàn)有的 Context 對象創(chuàng)建新的 Context 對象。
這些 Context 對象形成一棵樹:當一個 Context 對象被取消時,繼承自它的所有 Context 都會被取消。
Background 是所有 Context 對象樹的根,它不能被取消。它的聲明如下:
// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level `Context` for incoming requests.
func Background() Context
WithCancel 和 WithTimeout 函數(shù) 會返回繼承的 Context 對象, 這些對象可以比它們的父 Context 更早地取消。
當請求處理函數(shù)返回時,與該請求關(guān)聯(lián)的 Context 會被取消。 當使用多個副本發(fā)送請求時,可以使用 WithCancel 取消多余的請求。 WithTimeout 在設(shè)置對后端服務(wù)器請求超時時間時非常有用。 下面是這三個函數(shù)的聲明:
// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// A CancelFunc cancels a Context.
type CancelFunc func()
// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue 函數(shù)能夠?qū)⒄埱笞饔糜虻臄?shù)據(jù)與 Context 對象建立關(guān)系。聲明如下:
// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context
3.4 context例子
當然,想要知道 Context 包是如何工作的,最好的方法是看一個例子。
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Message struct {
netId int
Data string
}
type ServerConn struct {
sendCh chan Message
handleCh chan Message
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
netId int
}
func main() {
conn := &ServerConn{
sendCh: make(chan Message),
handleCh: make(chan Message),
wg: &sync.WaitGroup{},
netId: 100,
}
conn.ctx, conn.cancel = context.WithCancel(context.WithValue(context.Background(), "key", conn.netId))
loopers := []func(*ServerConn, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, looper := range loopers {
conn.wg.Add(1)
go looper(conn, conn.wg)
}
go func() {
time.Sleep(time.Second * 3)
conn.cancel()
}()
conn.wg.Wait()
}
func readLoop(c *ServerConn, wg *sync.WaitGroup) {
netId, _ := c.ctx.Value("key").(int)
handlerCh := c.handleCh
ctx, _ := context.WithCancel(c.ctx)
cDone := ctx.Done()
defer wg.Done()
for {
time.Sleep(time.Second * 1)
select {
case <-cDone:
fmt.Println("readLoop close")
return
default:
handlerCh <- Message{netId, "Hello world"}
}
}
}
func handleLoop(c *ServerConn, wg *sync.WaitGroup) {
handlerCh := c.handleCh
sendCh := c.sendCh
ctx, _ := context.WithCancel(c.ctx)
cDone := ctx.Done()
defer wg.Done()
for {
select {
case handleData, ok := <-handlerCh:
if ok {
handleData.netId++
handleData.Data = "I am whole world"
sendCh <- handleData
}
case <-cDone:
fmt.Println("handleLoop close")
return
}
}
}
func writeLoop(c *ServerConn, wg *sync.WaitGroup) {
sendCh := c.sendCh
ctx, _ := context.WithCancel(c.ctx)
cDone := ctx.Done()
defer wg.Done()
for {
select {
case sendData, ok := <-sendCh:
if ok {
fmt.Println(sendData)
}
case <-cDone:
fmt.Println("writeLoop close")
return
}
}
}
在上面的例子中,?模仿了Golang后臺程序主要業(yè)務(wù)流程, 當一個TCP連接到來時通過啟動三個goroutine來分別處理收發(fā)和處理數(shù)據(jù)。而這三個goroutine的是并發(fā)運行的,通過channel、sync.WaitGroup和context控制數(shù)據(jù)的處理。
在?每一個循環(huán)中產(chǎn)生一個goroutine,每一個goroutine中都傳入context,在每個goroutine中通過傳入ctx創(chuàng)建一個子Context,并且通過select一直監(jiān)控該Context的運行情況,當在父Context退出的時候,代碼中并沒有?明顯調(diào)用子Context的Cancel函數(shù),但是分析結(jié)果,子Context還是被正確合理的關(guān)閉了,這是因為,所有基于這個Context或者衍生的子Context都會收到通知,這時就可以進行清理操作了,最終釋放goroutine,這就優(yōu)雅的解決了goroutine啟動后不可控的問題。
下面是運行結(jié)果:

3.5 Context 使用原則
- 不要把
Context放在結(jié)構(gòu)體中,要以參數(shù)的方式傳遞 - 以
Context作為參數(shù)的函數(shù)方法,應(yīng)該把Context作為第一個參數(shù),放在第一位。 - 給一個函數(shù)方法傳遞
Context的時候,不要傳遞nil,如果不知道傳遞什么,就使用context.TODO -
Context的Value相關(guān)方法應(yīng)該傳遞必須的數(shù)據(jù),不要什么數(shù)據(jù)都使用這個傳遞 -
Context是線程安全的,可以放心的在多個goroutine中傳遞