本文為轉(zhuǎn)載,原文:Golang 學(xué)習(xí)筆記(06)—— 多線程

介紹
線程是cpu調(diào)度的最小單位,只有不同的線程才能同時(shí)在多核cpu上同時(shí)運(yùn)行。但線程太占資源,線程調(diào)度開銷大。go中的goroutine是一個(gè)輕量級(jí)的線程,執(zhí)行時(shí)只需要4-5k的內(nèi)存,比線程更易用,更高效,更輕便,調(diào)度開銷比線程小,可同時(shí)運(yùn)行上千萬個(gè)并發(fā)。
go語言中開啟一個(gè)goroutine非常簡單,go函數(shù)名(),就開啟了個(gè)線程。
默認(rèn)情況下,調(diào)度器僅使用單線程,要想發(fā)揮多核處理器的并行處理能力,必須調(diào)用runtine.GOMAXPROCS(n)來設(shè)置可并發(fā)的線程數(shù),也可以通過設(shè)置環(huán)境變量GOMAXPROCS打到相同的目的。
goroutine
Runtime包中提供了幾個(gè)與goroutine相關(guān)的函數(shù)。Gosched()讓當(dāng)前正在執(zhí)行的goroutine放棄CPU執(zhí)行權(quán)限。調(diào)度器安排其他正在等待的線程運(yùn)行。
請看以下例子:
package main
import (
"runtime"
"fmt"
)
func main(){
go sayHello()
go sayWorld()
var str string
fmt.Scan(&str)
}
func sayHello(){
for i := 0; i < 10; i++{
fmt.Print("hello ")
runtime.Gosched()
}
}
func sayWorld(){
for i := 0; i < 10; i++ {
fmt.Println("world")
runtime.Gosched()
}
}

從上面輸出結(jié)果可知,我們啟動(dòng)了兩個(gè)線程,其中一個(gè)線程輸出一句后調(diào)用Gosched函數(shù),釋放CPU權(quán)限;之后另一個(gè)線程獲得CPU權(quán)限。這樣兩個(gè)線程交替獲得cpu權(quán)限,才輸出了以上結(jié)果。
runtime.NumCPU()返回了cpu核數(shù),runtime.NumGoroutine()返回當(dāng)前進(jìn)程的goroutine線程數(shù)。即便我們沒有開啟新的goroutine。
package main
import (
"runtime"
"fmt"
)
func main(){
fmt.Println(runtime.NumCPU())
fmt.Println(runtime.NumGoroutine())
}

runtime.Goexit()函數(shù)用于終止當(dāng)前的goroutine,單defer函數(shù)將會(huì)繼續(xù)被調(diào)用。
package main
import (
"runtime"
"fmt"
)
func test(){
defer func(){
fmt.Println(" in defer")
}()
for i := 0; i < 10; i++{
fmt.Print(i)
if i > 5{
runtime.Goexit()
}
}
}
func main(){
go test()
var str string
fmt.Scan(&str)
}

在這里大家或許有個(gè)疑問,下面這兩句代碼干嘛的呢
var str string
fmt.Scan(&str)
這兩句代碼是等待輸入的意思,在這里用來阻止主線程關(guān)閉的。如果沒有這兩句的話,會(huì)發(fā)現(xiàn)我們的程序瞬間就結(jié)束了,而且什么都沒有輸出。這是因?yàn)橹骶€程關(guān)閉之后,所有開啟的goroutine都會(huì)強(qiáng)制關(guān)閉,他還沒有來得及輸出,就結(jié)束了。
但是這樣感覺怪怪的。如果有一種機(jī)制,在子線程結(jié)束的時(shí)候通知一下主線程,然后主線程再關(guān)閉,豈不是更好,這樣就不用無休止的等待了。于是就有了channel。
channel
goroutine之間通過channel來通訊,可以認(rèn)為channel是一個(gè)管道或者先進(jìn)先出的隊(duì)列。你可以從一個(gè)goroutine中向channel發(fā)送數(shù)據(jù),在另一個(gè)goroutine中取出這個(gè)值。
使用make創(chuàng)建
var channel chan int = make(chan int)
// 或
channel := make(chan int)
生產(chǎn)者/消費(fèi)者是最經(jīng)典的使用示例。生產(chǎn)者goroutine負(fù)責(zé)將數(shù)據(jù)放入channel,消費(fèi)者goroutine從channel中取出數(shù)據(jù)進(jìn)行處理。
package main
import (
"fmt"
)
func main(){
buf:=make(chan int)
flg := make(chan int)
go producer(buf)
go consumer(buf, flg)
<-flg //等待接受完成
}
func producer(c chan int){
defer close(c) // 關(guān)閉channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到數(shù)據(jù)被消費(fèi)者取走后,才能發(fā)送下一條數(shù)據(jù)
}
}
func consumer(c, f chan int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生產(chǎn)者放入數(shù)據(jù)后繼續(xù)讀取數(shù)據(jù)
}else{
break
}
}
f<-1 //發(fā)送數(shù)據(jù),通知main函數(shù)已接受完成
}

可以將channel指定為單向通信。比如
<-chan int僅能接收,chan<-int僅能發(fā)送。之前的生產(chǎn)者消費(fèi)者可以改為一下方式:
func producer(c chan<-int){
defer close(c) // 關(guān)閉channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到數(shù)據(jù)被消費(fèi)者取走后,才能發(fā)送下一條數(shù)據(jù)
}
}
func consumer(c <-chan int, f chan<-int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生產(chǎn)者放入數(shù)據(jù)后繼續(xù)讀取數(shù)據(jù)
}else{
break
}
}
f<-1 //發(fā)送數(shù)據(jù),通知main函數(shù)已接受完成
}
channle可以是帶緩沖的。make的第二個(gè)參數(shù)作為緩沖長度來初始化一個(gè)帶緩沖的channel:
c := make(chan int, 5)
向帶緩沖的channel發(fā)送數(shù)據(jù)時(shí),只有緩沖區(qū)滿時(shí),發(fā)送操作才會(huì)被阻塞。當(dāng)緩沖區(qū)空時(shí),接收才會(huì)阻塞。
可以通過以下程序調(diào)整發(fā)送和接收的順序調(diào)試
package main
import (
"fmt"
)
func main(){
c := make(chan int, 2)
c <- 1
c <- 2
fmt.Println(<-c)
fmt.Println(<-c)
}
select
如果有多個(gè)channel需要監(jiān)聽,可以考慮用select,隨機(jī)處理一個(gè)可用的channel
package main
import (
"fmt"
)
func main(){
c := make(chan int)
quit := make(chan int)
go func(){
for i := 0; i < 10; i++{
fmt.Printf("%d ", <-c)
}
quit <- 1
}()
testMuti(c, quit)
}
func testMuti(c, quit chan int){
x, y := 0, 1
for {
select{
case c<-x:
x, y = y, x+y
case <-quit:
fmt.Print("\nquit")
return
}
}
}

channle超時(shí)機(jī)制
當(dāng)一個(gè)channel被read/write阻塞時(shí),會(huì)被一直阻塞下去,直到channel關(guān)閉。產(chǎn)生一個(gè)異常退出程序。channel內(nèi)部沒有超時(shí)的定時(shí)器。但我們可以用select來實(shí)現(xiàn)channel的超時(shí)機(jī)制
package main
import (
"time"
"fmt"
)
func main(){
c := make(chan int)
select{
case <- c:
fmt.Println("沒有數(shù)據(jù)")
case <-time.After(5* time.Second):
fmt.Println("超時(shí)退出")
}
}

線程同步
假設(shè)現(xiàn)在我們有兩個(gè)線程,一個(gè)線程寫文件,一個(gè)線程讀文件。如果在讀文件的同時(shí),寫文件的線程向文件中寫數(shù)據(jù),就會(huì)出現(xiàn)問題。為了保證能夠正確的讀寫文件,在讀文件的時(shí)候,不能進(jìn)行寫入文件的操作,在寫入時(shí),不能進(jìn)行讀的操作。這就需要互斥鎖?;コ怄i是線程間同步的一種機(jī)制,用了保證在同一時(shí)刻只用一個(gè)線程訪問共享資源。go中的互斥鎖在sync包中。下面是個(gè)線程安全的map:
package main
import (
"errors"
"sync"
"fmt"
)
func main(){
m := &MyMap{mp:make(map[string]int), mutex:new(sync.Mutex)}
go SetValue(m)
go m.Display()
var str string
fmt.Scan(&str)
}
type MyMap struct{
mp map[string]int
mutex *sync.Mutex
}
func (this *MyMap)Get(key string)(int, error){
this.mutex.Lock()
i, ok := this.mp[key]
this.mutex.Unlock()
if !ok{
return i, errors.New("不存在")
}
return i, nil
}
func (this *MyMap)Set(key string, val int){
this.mutex.Lock()
defer this.mutex.Unlock()
this.mp[key] = val
}
func (this *MyMap)Display(){
this.mutex.Lock()
defer this.mutex.Unlock()
for key, val := range this.mp{
fmt.Println(key, "=", val)
}
}
func SetValue(m *MyMap){
var a rune
a = 'a'
for i := 0; i< 10; i++{
m.Set(string(a+rune(i)), i)
}
}

完
轉(zhuǎn)載請注明出處
Golang 學(xué)習(xí)筆記(06)—— 多線程
目錄
上一節(jié):Golang 學(xué)習(xí)筆記(05)—— 面向?qū)ο缶幊?/a>
下一節(jié):Golang 學(xué)習(xí)筆記(07)—— 錯(cuò)誤及異常處理