Goroutine
Go在語(yǔ)言層面對(duì)并發(fā)編程提供支持,采用輕量級(jí)線程(協(xié)程)實(shí)現(xiàn)。只需要在函數(shù)調(diào)用語(yǔ)句前添加go關(guān)鍵字,就可以創(chuàng)建并發(fā)執(zhí)行單元。開(kāi)發(fā)人員無(wú)需了解任何執(zhí)行細(xì)節(jié),調(diào)度器會(huì)自動(dòng)將其安排到合適的系統(tǒng)線程上執(zhí)行。goroutine是一種非常輕量級(jí)的實(shí)現(xiàn),可在單個(gè)進(jìn)程里執(zhí)行成千上萬(wàn)的并發(fā)任務(wù)。事實(shí)上,入口函數(shù)main就以goroutine運(yùn)行。另有與之配套的channel類型,用以實(shí)現(xiàn)“以通訊來(lái)共享內(nèi)存”的CSP模式。
go func() {
println("Hello, World!")
}
調(diào)度器不能保證多個(gè)goroutine執(zhí)行次序,且進(jìn)程退出時(shí)不會(huì)等待它們結(jié)束。默認(rèn)情況下,進(jìn)程啟動(dòng)后僅允許一個(gè)系統(tǒng)線程服務(wù)于goroutine??墒褂铆h(huán)境變量或標(biāo)準(zhǔn)函數(shù)runtime.GOMAXPROCS修改(Go 1.5默認(rèn)方式)。讓高度器用多個(gè)線程實(shí)現(xiàn)多核并行,而不僅僅是并發(fā)。
func sum(id int) {
var x int64
for i := 0; i < math.MaxUint32; i++ {
x += int64(i)
}
println(id, x)
}
func main() {
wg := new(sync.WaitGroup)
wg.Add(2)
for i := 0; i < 2; i++ {
go func(id int) {
defer wg.Done()
sum(id)
}(i)
}
wg.Wait()
}
輸出:
$ go build -o test
$ time -p ./test
0 9223372030412324865
1 9223372030412324865
real 7.70 // 程序開(kāi)始到結(jié)束時(shí)間差 (非非 CPU 時(shí)間)
user 7.66 // 用用戶態(tài)所使用用 CPU 時(shí)間片片 (多核累加)
sys 0.01 // 內(nèi)核態(tài)所使用用 CPU 時(shí)間片片
$ GOMAXPROCS=2 time -p ./test
0 9223372030412324865
1 9223372030412324865
real 4.18
user 7.61// 雖然總時(shí)間差不多,但由 2 個(gè)核并行行,real 時(shí)間自自然少了許多。
sys 0.02
調(diào)用 runtime.Goexit 將立即終止當(dāng)前 goroutine 執(zhí)行,調(diào)度器確保所有已注冊(cè) defer延遲調(diào)用被執(zhí)行。
func main() {
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
defer println("A.defer")
func() {
defer println("B.defer")
runtime.Goexit() // 終止止當(dāng)前 goroutine
println("B") // 不會(huì)執(zhí)行行
}()
println("A") // 不會(huì)執(zhí)行行
}()
wg.Wait()
}
//輸出:
B.defer
A.defer
和協(xié)程 yield 作用類似,Gosched 讓出底層線程,將當(dāng)前 goroutine 暫停,放回隊(duì)列等待下次被調(diào)度執(zhí)行。
func main() {
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 6; i++ {
println(i)
if i == 3 { runtime.Gosched() }
}
}()
go func() {
defer wg.Done()
println("Hello, World!")
}()
wg.Wait()
}
//輸出
$ go run main.go
0
1
2
3
Hello, World!
4
5
Channel
引用類型Channel是CSP模型的具體實(shí)現(xiàn),用于多個(gè)goruntine之間進(jìn)行通訊。其內(nèi)部實(shí)現(xiàn)了同步,確保了并發(fā)安全。默認(rèn)為同步模式,需要發(fā)送和接收配對(duì)。否則會(huì)被阻塞,直到另一方準(zhǔn)備好后被喚醒。
func main() {
data := make(chan int) // 數(shù)據(jù)交換隊(duì)列
exit := make(chan bool) // 退出通知
go func() {
for d := range data {// 從隊(duì)列迭代接收數(shù)據(jù),直到 close 。
fmt.Println(d)
}
fmt.Println("recv over.")
exit <- true// 發(fā)出退出通知。
}()
data <- 1// 發(fā)送數(shù)據(jù)。
data <- 2
data <- 3
close(data)// 關(guān)閉隊(duì)列。
fmt.Println("send over.")
<-exit// 等待退出通知。
}
//輸出:
1
2
3
send over.
recv over.
異步方式通過(guò)判斷緩沖區(qū)來(lái)決定是否阻塞。如果緩沖區(qū)已滿,發(fā)送被阻塞;緩沖區(qū)為空,接收被阻塞。通常情況下,異步channel可減少排隊(duì)阻塞,具備更高的效率。但應(yīng)該考慮使用指針規(guī)避大對(duì)象拷貝,將多個(gè)元素打包,減少緩沖區(qū)大小等。
func main() {
data := make(chan int, 3)// 緩沖區(qū)可以存儲(chǔ) 3 個(gè)元素
exit := make(chan bool)
data <- 1// 在緩沖區(qū)未滿前,不會(huì)阻塞。
data <- 2
data <- 3
go func() {
for d := range data {// 在緩沖區(qū)未空前,不會(huì)阻塞。
fmt.Println(d)
}
exit <- true
}()
data <- 4// 如果緩沖區(qū)已滿,阻塞。
data <- 5
close(data)
<-exit
}
緩沖區(qū)是內(nèi)部屬性,并非類型構(gòu)成要素。
var a, b chan int = make(chan int), make(chan int, 3)
除用用 range 外,還可用 ok-idiom 模式判斷 channel 是否關(guān)閉。
for {
if d, ok := <-data; ok {
fmt.Println(d)
} else {
break
}
}
向 closed channel 發(fā)送數(shù)據(jù)引發(fā) panic 錯(cuò)誤,接收立即返回零值。而 nil channel,無(wú)論收發(fā)都會(huì)被阻塞。內(nèi)置函數(shù) len 返回未被讀取的緩沖元素?cái)?shù)量,cap 返回緩沖區(qū)大小。
單向
可以將channel隱式轉(zhuǎn)換為單身隊(duì)列,只收或只發(fā)。
c := make(chan int, 3)
var send chan <- int = c // send only
var recv <- chan int = c // receiver only
選擇
如果需要同時(shí)處理多個(gè)channel,可以用select語(yǔ)句,它隨機(jī)選擇一個(gè)可用的channel做收發(fā)操作,或執(zhí)行default case。
func main() {
a, b := make(chan int, 3), make(chan int)
go func() {
v, ok, s := 0, false, ""
for {
select {// 隨機(jī)選擇可用用 channel,接收數(shù)據(jù)。
case v, ok = <-a: s = "a"
case v, ok = <-b: s = "b"
}
if ok {
fmt.Println(s, v)
} else {
os.Exit(0)
}
}
}()
for i := 0; i < 5; i++ {
select {// 隨機(jī)選擇可用用 channel,發(fā)送數(shù)據(jù)。
case a <- i:
case b <- i:
}
}
close(a)
select {}// 沒(méi)有可用用 channel,阻塞 main goroutine。
}
//輸出:
b 3
a 0
a 1
a 2
b 4
模式
用簡(jiǎn)單工廠模式打包并發(fā)任務(wù)和channel。
func NewTest() chan int {
c := make(chan int)
rand.Seed(time.Now().UnixNano())
go func() {
time.Sleep(time.Second)
c <- rant.Int()
}()
return c
}
func main() {
t := NewTest()
println(<-t) //等待gorountime結(jié)束返回。
}
用channel實(shí)現(xiàn)信號(hào)量(semaphore)。
func main() {
wg := sync.WaitGroup{}
wg.Add(3)
sem := make(chan int, 1)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
sem <- 1 // 向 sem 發(fā)送數(shù)據(jù),阻塞或者成功。
for x := 0; x < 3; x++ {
fmt.Println(id, x)
}
<- sem // 接收數(shù)據(jù),使得其他阻塞 goroutine 可以發(fā)送數(shù)據(jù)。
}(i)
}
wg.Wait()
}
用closed channel發(fā)出退出通知。
func main() {
var wg sync.WaitGroup
quit := make(chan bool)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := func() {
println(id, time.Now().Nanosecond())
time.Sleep(time.Second)
}
for {
select {
case <- quit: // closed channel 不會(huì)阻塞,因此可用作退出通知。
return
default://執(zhí)行正常任務(wù)
task()
}
}
}(i)
}
time.Sleep(time.Second * 5) // 讓測(cè)試 goroutine 運(yùn)行一會(huì)。
close(quit) // 發(fā)出退出通知。
wg.Wait()
}
用select 實(shí)現(xiàn)超時(shí) (timeout)。channel 是第一類對(duì)象,可傳參 (內(nèi)部實(shí)現(xiàn)為指針) 或者作為結(jié)構(gòu)成員。
type Request struct {
data []int
ret chan int
}
func NewRequest(data ...int) *Request {
return &Request{ data, make(chan int, 1) }
}
func Process(req *Request) {
x := 0
for _, i := range req.data {
x += i
}
req.ret <- x
}
func main() {
req := NewRequest(10, 20, 30)
Process(req)
fmt.Println(<-req.ret)
}