問(wèn)題
當(dāng)main goroutine為了等待work goroutine都運(yùn)行完畢,不得不在程序末尾使用time.Sleep()來(lái)休眠一段時(shí)間,等待work goroutine充分運(yùn)行。
$ vim ./test/goroutine_test.go
package test
import (
"fmt"
"testing"
"time"
)
func TestGoRoutine(t *testing.T) {
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
time.Sleep(time.Second)
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
3
1
2
4
5
6
7
8
0
--- PASS: TestGoRoutine (1.00s)
PASS
ok command-line-arguments 1.291s
但對(duì)于實(shí)際應(yīng)用中,休眠1秒是完全不夠的,同時(shí)大部分時(shí)間都無(wú)法預(yù)知for循環(huán)內(nèi)代碼運(yùn)行時(shí)間的長(zhǎng)短,此時(shí)就不能使用time.Sleep()來(lái)完成等待操作。
可以使用管道來(lái)完成上述操作
func TestGoRoutine(t *testing.T) {
count := 10
ch := make(chan bool, count)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
ch <- true
}(i)
}
for i := 0; i < count; i++ {
<-ch
}
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
0
5
6
7
8
2
1
4
3
--- PASS: TestGoRoutine (0.00s)
PASS
ok command-line-arguments 0.304s
使用管道可以達(dá)到目的,但有些大材小用,因?yàn)楣艿辣辉O(shè)計(jì)出來(lái)不僅僅只是在這里做簡(jiǎn)單的同步處理的,因此這里使用管道實(shí)際上是不合適的。假如有上萬(wàn)、上十萬(wàn)、上百萬(wàn)的循環(huán),也要申請(qǐng)同樣數(shù)量大小的管道,對(duì)內(nèi)存會(huì)是一個(gè)不小的開(kāi)銷(xiāo)。
對(duì)于這種情況,Golang中有一種工具sync.WaitGroup能更加方便地幫助達(dá)到目的。
sync.WaitGroup
Golang中除了使用Channel通道和Mutex互斥鎖實(shí)現(xiàn)兩個(gè)并發(fā)程序之間的同步外,還可以通過(guò)WaitGroup等待組實(shí)現(xiàn)多個(gè)任務(wù)的同步,WaitGroup可以保證在并發(fā)環(huán)境中完成指定數(shù)量的任務(wù)。
-
WaitGroup在Golang中用于goroutine同步,解決同步阻塞等外的問(wèn)題。
通俗來(lái)講goroutine分為兩類(lèi)角色,一種gorouine作為一個(gè)worker小弟,老老實(shí)實(shí)的干活。另一種goroutine作為master管理者來(lái)監(jiān)督小弟干活,當(dāng)然master自身也是一個(gè)worker。
當(dāng)有很多worker干活時(shí),master沒(méi)事干歇著,但同時(shí)master又希望得到一個(gè)通知,了解所有worker們什么時(shí)候干完。
從程序開(kāi)發(fā)角度來(lái)看,就是維護(hù)一個(gè)worker總數(shù)和一個(gè)channel,每個(gè)worker干完就向channel發(fā)送一個(gè)空message。master阻塞在channel的監(jiān)聽(tīng)上,來(lái)一個(gè)message就說(shuō)明有一個(gè)worker干完活了,記錄下有多少message,message和worker總數(shù)一致則說(shuō)明全干完活。master就可以關(guān)閉channel,驗(yàn)收worker的工作成果。
-
WaitGroup是指等待(Wait)一系列執(zhí)行(Group)完成后才會(huì)繼續(xù)向下執(zhí)行 -
WaitGroup能一直等到所有的work goroutine執(zhí)行完畢,同時(shí)阻塞main goroutine的執(zhí)行,直到所有的goroutine執(zhí)行完成。 -
WaitGroup類(lèi)似發(fā)布訂閱,只不過(guò)訂閱者接收到的不是消息,而是一種事件信號(hào)。
計(jì)數(shù)器
WaitGroup內(nèi)部擁有一個(gè)計(jì)數(shù)器,最初從0開(kāi)始。
type WaitGroup struct{
noCopy noCopy
state1 [3]byte
}

- Counter:Worker計(jì)數(shù)器
master gortouine調(diào)用WaitGroup.Add(delta int)時(shí)會(huì)增加delta,調(diào)用WaitGroup.Done()時(shí)會(huì)減少1。 - Waiter:Waiter計(jì)數(shù)器
調(diào)用WaitGroup.Wait()時(shí)Waiter計(jì)數(shù)器加1,worker goroutine計(jì)數(shù)器降低到0時(shí),會(huì)重置Waiter計(jì)數(shù)器。 - Sema:信號(hào)量
用于阻塞master goroutine,調(diào)用WaitGroup.Wait()時(shí)會(huì)通過(guò)runtime_Semacquire獲取信號(hào)量。降低Waiter計(jì)數(shù)器時(shí),通過(guò)runtime_Semrelease釋放信號(hào)量。
方法
WaitGroup擁有三個(gè)方法分別是Add()、Done()、Wait()用來(lái)控制計(jì)數(shù)器的數(shù)量

-
Add()將計(jì)數(shù)器設(shè)置為n,用于增加或減少worker goroutine的數(shù)量。
func (wg *WaitGroup) Add(delta int)
-
Done()每次會(huì)將計(jì)數(shù)器減少1
func (wg *WaitGroup) Done()
WaitGroup.Done()和WaitGroup.Add(-1)完全等價(jià)
-
Wait()會(huì)阻塞代碼的運(yùn)行,直到計(jì)數(shù)器的值減少為0。
func (wg *WaitGroup) Wait()
使用方法
-
master goroutine通過(guò)調(diào)用WaitGroup.Add(delta int)來(lái)設(shè)置worker goroutine的個(gè)數(shù),然后創(chuàng)建work goroutine。 -
worker goroutine執(zhí)行結(jié)束后需調(diào)用WaitGroup.Done() -
master goroutine調(diào)用WaitGroup.Wait()且被block阻塞,直到所有的worker goroutine全部執(zhí)行結(jié)束后返回。
例如:
$ vim ./test/sync_test.go
package test
import (
"fmt"
"sync"
"testing"
)
func TestWaitGroup(t *testing.T) {
count := 10
//添加goroutine數(shù)量
wg := sync.WaitGroup{}
wg.Add(count)
//循環(huán)模擬并發(fā)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
wg.Done() //設(shè)置gorooutine為-1
}(i)
}
//執(zhí)行main goroutine阻塞,直到所有WaitGroup數(shù)量為0。
wg.Wait()
}
$ go test -v -run TestWaitGroup sync_test.go
=== RUN TestWaitGroup
9
4
5
6
7
8
2
3
1
0
--- PASS: TestWaitGroup (0.00s)
PASS
ok command-line-arguments 0.294s
注意
- WaitGroup對(duì)象不是一個(gè)引用類(lèi)型,函數(shù)傳值時(shí)需使用地址(地址傳值)。
- WaitGroup的計(jì)數(shù)器不能為負(fù)數(shù),不能使用
Add()給WaitGroup對(duì)象設(shè)置一個(gè)負(fù)值。
應(yīng)用
需要一個(gè)用戶(hù)的畫(huà)像服務(wù),當(dāng)一個(gè)請(qǐng)求到來(lái)時(shí)需要
- 從請(qǐng)求中解析出用戶(hù)ID和用戶(hù)畫(huà)像維度參數(shù)
- 根據(jù)用戶(hù)ID從五個(gè)服務(wù)比如數(shù)據(jù)庫(kù)、存儲(chǔ)、RPC等拉取不同維度的數(shù)據(jù)
- 將讀取到的數(shù)據(jù)進(jìn)行整合返回給調(diào)用方
假如每個(gè)服務(wù)的響應(yīng)時(shí)間是20ms到50ms,如果順序調(diào)用服務(wù)讀取數(shù)據(jù)不考慮數(shù)據(jù)整合消耗的時(shí)間,服務(wù)端整體的響應(yīng)時(shí)間將會(huì)在100ms到250ms。先不說(shuō)業(yè)務(wù)能不能接受,響應(yīng)時(shí)間顯然存在很大的優(yōu)化空間。最直接的優(yōu)化方向是取數(shù)邏輯總時(shí)間應(yīng)該是單個(gè)服務(wù)最大消耗時(shí)間。
func TestTask(t *testing.T) {
var wg sync.WaitGroup
for _,task := range tasks{
task := task
wg.Add(1)
go func(){
defer wg.Done()
task()
}()
}
wg.Wait()
}
使用注意
-
WaitGroup.Done()必須在所有WaitGroup.Add()之后執(zhí)行,要保證兩個(gè)函數(shù)都在master goroutine中調(diào)用。 -
WaitGroup.Done()在worker goroutine中調(diào)用,尤其要保證調(diào)用一次,不能因?yàn)?code>panic或任何原因?qū)е聸](méi)有執(zhí)行,因此建議使用defer WaitGroup.Done()。 -
WaitGroup.Done()和WaitGroup.Wait()在時(shí)序上沒(méi)有先后順序
task := task
由于Golang對(duì)切片遍歷時(shí)runtime會(huì)將tasks[i]拷貝到task的內(nèi)存地址中,下標(biāo)i會(huì)變化,而task的內(nèi)存地址是不會(huì)改變的。如果不做此次賦值操作,所有的goroutine可能讀取到的都是最后一個(gè)task。
例如:
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc000006040 0xc00003c500
task2 0xc000006040 0xc00003c508
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.296s
執(zhí)行結(jié)果說(shuō)明
- 遍歷時(shí)數(shù)據(jù)的內(nèi)存地址不變
unsafe.Pointer(&task) - 遍歷時(shí)通過(guò)下標(biāo)獲取數(shù)據(jù)時(shí)內(nèi)存地址不同
unsafe.Pointer(&tasks[index])
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task := task
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc0000c0030 0xc0000884f0
task2 0xc0000c0038 0xc0000884f8
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.320s
執(zhí)行結(jié)果說(shuō)明
- 遍歷內(nèi)部創(chuàng)建的局部變量,即使名稱(chēng)相同,內(nèi)存地址也不會(huì)復(fù)用。
- 遍歷時(shí)數(shù)據(jù)的內(nèi)存地址不同
unsafe.Pointer(&task) - 遍歷時(shí)通過(guò)下標(biāo)獲取數(shù)據(jù)時(shí)內(nèi)存地址不同
unsafe.Pointer(&tasks[index])