segmentfault Go 語言基礎(chǔ)——協(xié)程(goroutine)&共享內(nèi)存線程安全
github 并發(fā)理念
go 中協(xié)成一些方法
- sync.WaitGroup()
- channel
- channel select
一些術(shù)語
串行
即按照指定的順序一個(gè)個(gè)執(zhí)行,是最古老的執(zhí)行方式
并發(fā)
采用調(diào)度算法,來回切換執(zhí)行,造成宏觀上的一起執(zhí)行(發(fā)現(xiàn)后切換運(yùn)行)
并行
多核實(shí)現(xiàn), 真正的一起執(zhí)行。齊頭并進(jìn)
程序執(zhí)行的狀態(tài)
操作系統(tǒng)會(huì)分為兩大區(qū)域,一個(gè)是內(nèi)核區(qū),一個(gè)是用戶區(qū)
- 內(nèi)核區(qū)
大量的系統(tǒng)底層函數(shù),比如 open(),write(), 上層語言基于這些函數(shù)接口開發(fā)自己的庫函數(shù) (c 的 fopen(),fwrite()),方便開發(fā)人員使用。
- 用戶區(qū)
用戶自己的函數(shù)區(qū)域,不是調(diào)用內(nèi)核函數(shù)的狀態(tài)
程序執(zhí)行的時(shí)候,先執(zhí)行用戶自己寫的函數(shù),這個(gè)狀態(tài)為用戶態(tài),當(dāng)調(diào)用內(nèi)核函數(shù)的時(shí)候,程序就進(jìn)入了 內(nèi)核態(tài)。
之所以這樣,就是因?yàn)閮?nèi)核區(qū)域太重要了,所有的上層語言,都是基于這些內(nèi)核函數(shù)開發(fā)的,所以內(nèi)核內(nèi)存只有內(nèi)核函數(shù)才能訪問(內(nèi)存隔離)
線程 進(jìn)程,協(xié)程的區(qū)別
線程,協(xié)程 內(nèi)存共享,進(jìn)程內(nèi)存隔離
線程,協(xié)程 有資源競(jìng)爭(zhēng),進(jìn)程沒有
線程需要調(diào)度分配,不聽切換,爭(zhēng)搶模式,協(xié)程是程序自己調(diào)度切換或者遇到 I/O ,協(xié)調(diào)模式,不需要再消耗調(diào)度的資源了。
go 語言本身就實(shí)現(xiàn)了 協(xié)程,通過管道進(jìn)行通信(因?yàn)楣艿赖讓邮羌湘i的,安全,當(dāng)然你也可以直接用變量,需要考慮資源競(jìng)爭(zhēng)的關(guān)系)
go 語言中的 進(jìn)程,協(xié)成操作
-
進(jìn)程
一個(gè)cpu 內(nèi)核,同一時(shí)刻只能運(yùn)行一個(gè)進(jìn)程,但是CPU可以在多個(gè)進(jìn)程間進(jìn)行來回切換,我們稱之為上下文切換。 context 在 go 中用處非常廣泛。
操作系統(tǒng)會(huì)按照調(diào)度算法為每個(gè)進(jìn)程分配一定的CPU運(yùn)行時(shí)間,稱之為時(shí)間輪片,每個(gè)進(jìn)程在運(yùn)行時(shí)都會(huì)認(rèn)為自己獨(dú)占了CPU,如圖所示

父進(jìn)程無法預(yù)測(cè)子進(jìn)程什么時(shí)候結(jié)束,只有進(jìn)程完成工作后,父進(jìn)程才會(huì)調(diào)用子進(jìn)程的終止態(tài)
進(jìn)程回收
一個(gè)進(jìn)程結(jié)束,能回收自己的用戶空間的內(nèi)存,但是不能回收內(nèi)核區(qū)的資源, 內(nèi)核區(qū)的資源必須父進(jìn)程調(diào)用 wait 函數(shù)回收。
- 孤兒進(jìn)程:父結(jié)束,子進(jìn)程還在運(yùn)行。這時(shí)子進(jìn)程會(huì)被 init 進(jìn)程 管理
- 僵尸進(jìn)程
子進(jìn)程結(jié)束,但未被父進(jìn)程回收。
(可以殺死父進(jìn)程,然后子進(jìn)程就可以被 init 回收)
進(jìn)程通信
文件、管道、信號(hào)、共享內(nèi)存、消息隊(duì)列、套接字
go 支持的ipc 方式:
管道、信號(hào)、socket (http, rpc, ws 等等 tcp 的應(yīng)用層協(xié)議)
-
管道
管道 實(shí)質(zhì)是 內(nèi)核緩沖區(qū)
數(shù)據(jù)從寫端流向 讀取端,只可讀取一次數(shù)據(jù)就刪除了。
讀寫默認(rèn)都是阻塞的。
進(jìn)程同步
進(jìn)程是內(nèi)存隔離的,但是如果是兩個(gè)進(jìn)程同時(shí)操作一個(gè)文件,那也會(huì)產(chǎn)生競(jìng)爭(zhēng)的。所以也需要同步
使用 互斥鎖 吧!Golang的sync包也有對(duì)互斥的支持
— — —— — — — — 進(jìn)程結(jié)束— — — — — — — — — —
-
線程
多進(jìn)程示意圖:


一個(gè)進(jìn)程會(huì)有一個(gè)主線程,這個(gè)是時(shí)間輪片的最小單元
線程同步
互斥量(常見互斥鎖)
死鎖:資源沒有綁在一起,導(dǎo)致互斥量拿到的資源不全,一直阻塞
解決辦法:
試鎖定: 即拿到一個(gè)資源后,嘗試鎖定后續(xù)需要的資源,如果不能全部鎖定,則解除已經(jīng)鎖定的資源。然后重新爭(zhēng)奪鎖
(其實(shí)這個(gè)操作就是相當(dāng)于吧所有資源做一個(gè)綁定,有點(diǎn)原子的意思, 比較復(fù)雜)
差分資源:
就是把需要的資源差分一個(gè)個(gè)的,這樣針對(duì)單個(gè)資源鎖定,自然不會(huì)死鎖了( 不靈活 )
條件變量
互斥量有時(shí)候也不能完美解決問題,比如最常見的生產(chǎn)消費(fèi)模型中
由于生產(chǎn)者線程和消費(fèi)者線程都會(huì)對(duì)數(shù)據(jù)隊(duì)列進(jìn)行并發(fā)訪問,那么我們肯定會(huì)為數(shù)據(jù)隊(duì)列進(jìn)行加鎖操作,以實(shí)現(xiàn)同步
此時(shí)如果生產(chǎn)者線程獲得互斥量,發(fā)現(xiàn)數(shù)據(jù)隊(duì)列已滿,無法添加新數(shù)據(jù),生產(chǎn)者線程就可能在臨界區(qū)一直等待,直到有空閑區(qū)間。這種做法明顯是錯(cuò)誤的,因?yàn)樵摼€程一直阻塞在臨界區(qū),直接影響了其他消費(fèi)者線程的使用!生產(chǎn)者線程應(yīng)該在發(fā)現(xiàn)沒有空閑區(qū)間時(shí)直接解鎖退出
條件變量有三種操作:
等待通知 單發(fā)通知 廣播通知
就是說條件變量就是通知線程,滿足條件了,可以操作數(shù)據(jù)了,不用等待,然后線程直接使用互斥量操作數(shù)據(jù)。
艸, 直奔主題, goroutine
channel
其實(shí)除了 channel , 全局變量也可以交換數(shù)據(jù),只不過要自己枷鎖
- 無緩沖channel
make(chan int) // 不加長度,默認(rèn)為 0 長度
無緩沖的管道,讀寫至少有兩個(gè) goroutine ,否則報(bào)錯(cuò)
func nocache_chan(){
ch := make(chan int)
go func(){ch <- 10}() // go 開啟的另一個(gè)協(xié)成
<-ch // 主協(xié)成
}
下面報(bào)錯(cuò):
func wrong(){
ch := make(chan int)
ch <- 10
<-ch
}
>>
fatal error: all goroutines are asleep - deadlock!
- 有緩存channel
make(chan int, 10) // 管道長度 > 0
首先,無緩沖上面報(bào)錯(cuò)的例子這里就正常運(yùn)行拉了
func wrong(){
ch := make(chan int, 2)
ch <- 10
fmt.Println(<-ch)
}
>> 10
同樣的,當(dāng)數(shù)據(jù)全部讀取完畢后,再次讀取也會(huì)造成阻塞,如下所示
func main() {
ch := make(chan int, 1)
ch <- 10
// ch <- 10 加這個(gè)也會(huì)報(bào)錯(cuò)
<-ch
// <-ch 加上這個(gè)依然報(bào)錯(cuò)
}
很顯然,如果在一個(gè) 協(xié)成(上個(gè)例子為主協(xié)成,要注意寫入讀取按照順序,如果有阻塞則匯報(bào)錯(cuò))
- channel的相關(guān)操作
遍歷: // 用 range 遍歷等 channel 關(guān)閉,就自動(dòng)退出循環(huán),不會(huì)報(bào)錯(cuò),這里不用手動(dòng)取值判斷 管道是否關(guān)閉
for data := range ch {
fmt.Println("data==", data)
if data == 3 {
break
}
}
這樣就省得 <- chan 取了
通道關(guān)閉 可以不用管主動(dòng)回收,也可以自己關(guān)閉
ch := make(chan int)
close(ch) // 關(guān)閉通道
ch <- 1 // 報(bào)錯(cuò):send on closed channel
從通道中接收數(shù)據(jù)時(shí),可以利用多返回值判斷通道是否已經(jīng)關(guān)閉
func close_chan(){
var c = make(chan int, 2)
go func(){ c <- 1; c <- 2 ; close(c) }()
go func(){fmt.Println(<-c, "\n", <-c, "\n", <-c)}()
time.Sleep(time.Second)
x, ok := <- c
fmt.Println(x, ok)
}
>>
1
2
0
0 false
channel已經(jīng)關(guān)閉則:
不能再向其寫入數(shù)據(jù), 可以讀數(shù)據(jù),如果沒有多余數(shù)據(jù),則取到的是 類型零值
通道讀寫
有的時(shí)候分為只讀只寫的管道(默認(rèn)為雙向管道)
var chan1 chan<- int // 聲明 只寫channel
var chan2 <-chan int // 聲明 只讀channel
這樣記:
(chan)<- (chan) type // <- 代表左邊是數(shù)據(jù)進(jìn)入方向
默認(rèn)是 chan type
單向管道不能轉(zhuǎn)雙向,但雙向可以轉(zhuǎn)單向
隱式轉(zhuǎn)換
var ch chan int // 聲明一個(gè)雙向
ch = make(chan int, 10) // 初始化
func write(ch chan<- int) {}
func read(ch <-chan int) {}
go write(ch)
go read(ch)
// 這樣 write 函數(shù),的chan 就只可以放數(shù)據(jù),read 函數(shù)的chan只可以取數(shù)據(jù)
**顯示轉(zhuǎn)換 (這個(gè)好像有問題) **
ch := make(chan int) // 聲明普通channel
ch1 := <-chan int(ch) // 轉(zhuǎn)換為 只讀channel
ch2 := chan<- int(ch) // 轉(zhuǎn)換為 只寫channel
** 等待組 sync.WaitGroup 同步數(shù)據(jù)**
sync.WaitGroup類型的值也是并發(fā)安全的
(wg *WaitGroup) Add(delta int) 等待組計(jì)數(shù)器+1,該方法也可以傳
入負(fù)值讓等待計(jì)數(shù)
(wg *WaitGroup) Done() 等待組計(jì)數(shù)器-1,等同于Add傳入負(fù)值
(wg *WaitGroup) Wait() 等待組計(jì)數(shù)器!=0時(shí)阻塞,直到為0
應(yīng)用場(chǎng)景:WaitGroup一般用于協(xié)調(diào)多個(gè)goroutine運(yùn)行, 當(dāng)然你可以用 一個(gè) channel 計(jì)數(shù)阻塞,但是沒有 WaitGroup 輕便
實(shí)例:
var mt sync.Mutex
var wg sync.WaitGroup
var money = 10000
// 開啟10個(gè)協(xié)程,每個(gè)協(xié)程內(nèi)部 循環(huán)1000次,每次循環(huán)值+10
for i := 0; i < 10; i++ {
wg.Add(1)
go func(index int) {
mt.Lock()
fmt.Printf("協(xié)程 %d 搶到鎖\n", index)
for j := 0; j < 100; j++ {
money += 10 // 多個(gè)協(xié)程對(duì) money產(chǎn)生了競(jìng)爭(zhēng)
}
fmt.Printf("協(xié)程 %d 準(zhǔn)備解鎖\n", index)
mt.Unlock()
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("最終的monet = ", money) // 應(yīng)該輸出20000才正確
就是說 穿件 WaitGroup -> add -> done -> wait
channel select
即滿足一個(gè)條件就執(zhí)行,不會(huì)從上倒下阻塞(switch 是從上倒下順序判斷的)
select {
case 操作1:
響應(yīng)操作1
case 操作2:
響應(yīng)操作2
...
default:
沒有操作的情況
}
以下例子:兩個(gè)管道中只要有一個(gè)管道能夠取出數(shù)據(jù),那么就使用該數(shù)據(jù)
(select中的case必須是I/O操作)
func fn1(ch chan string) {
time.Sleep(time.Second * 3)
ch <- "fn1111"
}
func fn2(ch chan string) {
time.Sleep(time.Second * 6)
ch <- "fn2222"
}
func main() {
ch1 := make(chan string)
go fn1(ch1)
ch2 := make(chan string)
go fn2(ch2)
select {
case r1 := <-ch1:
fmt.Println("r1=", r1)
case r2 := <-ch2:
fmt.Println("r2=", r2)
}
}
利用select()可以實(shí)現(xiàn)超時(shí)處理:
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // 等待1秒鐘
timeout <- true
}()
select {
case <-ch: // 能取到數(shù)據(jù)
case <-timeout: // 沒有從-cha中取到數(shù)據(jù),此時(shí)能從timeout中取得數(shù)據(jù)
}
// 就是人為 弄一個(gè)一定時(shí)間后 管道有值的select 語句
