多線程同步問(wèn)題
- 互斥鎖
- 互斥鎖的本質(zhì)是當(dāng)一個(gè)goroutine訪問(wèn)的時(shí)候, 其它goroutine都不能訪問(wèn)
- 這樣就能實(shí)現(xiàn)資源同步, 但是在避免資源競(jìng)爭(zhēng)的同時(shí)也降低了程序的并發(fā)性能. 程序由原來(lái)的并發(fā)執(zhí)行變成了串行
- 案例:
- 有一個(gè)打印函數(shù), 用于逐個(gè)打印字符串中的字符, 有兩個(gè)人都開(kāi)啟了goroutine去打印
- 如果沒(méi)有添加互斥鎖, 那么兩個(gè)人都有機(jī)會(huì)輸出自己的內(nèi)容
- 如果添加了互斥鎖, 那么會(huì)先輸出某一個(gè)的, 輸出完畢之后再輸出另外一個(gè)人的
package main
import (
"fmt"
"sync"
"time"
)
// 創(chuàng)建一把互斥鎖
var lock sync.Mutex
func printer(str string) {
// 讓先來(lái)的人拿到鎖, 把當(dāng)前函數(shù)鎖住, 其它人都無(wú)法執(zhí)行
// 上廁所關(guān)門
lock.Lock()
for _, v := range str{
fmt.Printf("%c", v)
time.Sleep(time.Millisecond * 500)
}
// 先來(lái)的人執(zhí)行完畢之后, 把鎖釋放掉, 讓其它人可以繼續(xù)使用當(dāng)前函數(shù)
// 上廁所開(kāi)門
lock.Unlock()
}
func person1() {
printer("hello")
}
func person2() {
printer("world")
}
func main() {
go person1()
go person2()
for{
;
}
}
生產(chǎn)者消費(fèi)者問(wèn)題
- 所謂的生產(chǎn)者消費(fèi)者模型就是
- 某個(gè)模塊(函數(shù))負(fù)責(zé)生產(chǎn)數(shù)據(jù), 這些數(shù)據(jù)由另一個(gè)模塊來(lái)負(fù)責(zé)處理
- 一般生產(chǎn)者消費(fèi)者模型包含三個(gè)部分"生產(chǎn)者"、"緩沖區(qū)"、"消費(fèi)者"
- 為什么生產(chǎn)者消費(fèi)者模型要含三個(gè)部分? 直接生產(chǎn)和消費(fèi)不行么?
- 一個(gè)案例說(shuō)明一切
- 生產(chǎn)者好比現(xiàn)實(shí)生活中的某個(gè)人
- 緩沖區(qū)好比現(xiàn)實(shí)生活中的郵箱
- 消費(fèi)者好比現(xiàn)實(shí)生活中的郵遞員
- 如果只有生產(chǎn)者和消費(fèi)者, 那么相當(dāng)于只有寫信的人和郵遞員, 那么如果將來(lái)過(guò)去的郵遞員離職了, 你想郵寄信件必須想辦法結(jié)識(shí)新的郵遞員(消費(fèi)者發(fā)生變化, 會(huì)直接影響生產(chǎn)者, 耦合性太強(qiáng))
- 如果在生產(chǎn)者和消費(fèi)者之間添加一個(gè)緩沖區(qū), 那么就好比有了郵箱, 以后郵寄信件不是找郵遞員, 只需把信件投遞到郵箱中即可, 寫信的人不需要關(guān)心郵遞員是誰(shuí)(解耦)
- 如果只有生產(chǎn)者和消費(fèi)者, 那么每個(gè)人郵寄信件都需要直接找郵遞員(1對(duì)1關(guān)系), 如果有10個(gè)人要郵寄信件, 那么郵遞員只能依次找到每個(gè)人, 然后才能取件(效率低下)
- 如果在生產(chǎn)者和消費(fèi)者之間添加一個(gè)緩沖區(qū), 那么所有的人只需要將信件投遞到郵箱即可, 郵遞員不用關(guān)心有多少人要郵寄信件, 也不用依次取件, 只需要找到郵箱從郵箱中統(tǒng)一取件即可(效率提高)
- 如果只有生產(chǎn)者和消費(fèi)者, 那么如果郵寄信件太多郵遞員無(wú)法一次拿走, 這個(gè)時(shí)候非常難辦
- 如果在生產(chǎn)者和消費(fèi)者之間添加一個(gè)緩沖區(qū), 那么如果信件太多可以先拿走一部分, 剩下的繼續(xù)放到郵箱中下次再拿
... ...
生產(chǎn)者和消費(fèi)者資源競(jìng)爭(zhēng)問(wèn)題
- 例如生產(chǎn)比較慢, 而消費(fèi)比較快, 就會(huì)導(dǎo)致消費(fèi)者消費(fèi)到錯(cuò)誤數(shù)據(jù)
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 創(chuàng)建一把互斥鎖
var lock = sync.Mutex{}
// 定義緩沖區(qū)
var sce []int = make([]int, 10)
// 定義生產(chǎn)者
func producer(){
// 加鎖, 注意是lock就是我們的鎖, 全局公用一把鎖
lock.Lock()
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
sce[i] = num
fmt.Println("生產(chǎn)者生產(chǎn)了: ", num)
time.Sleep(time.Millisecond * 500)
}
// 解鎖
lock.Unlock()
}
// 定義消費(fèi)者
func consumer() {
// 加鎖, 注意和生產(chǎn)者中用的是同一把鎖
// 如果生產(chǎn)者中已加過(guò)了, 則阻塞直到解鎖后再重新加鎖
lock.Lock()
for i:=0;i<10;i++{
num := sce[i]
fmt.Println("---消費(fèi)者消費(fèi)了", num)
}
lock.Unlock()
}
func main() {
go producer()
go consumer()
for{
;
}
}
- 思考: 那如果是一對(duì)多, 或者多對(duì)多的關(guān)系, 上述代碼有問(wèn)題么?
管道(Channel)
- 上述實(shí)現(xiàn)并發(fā)的代碼中為了保持主線程不掛掉, 我們都會(huì)在最后寫上一個(gè)死循環(huán)或者寫上一個(gè)定時(shí)器來(lái)實(shí)現(xiàn)等待goroutine執(zhí)行完畢
- 上述實(shí)現(xiàn)并發(fā)的代碼中為了解決生產(chǎn)者消費(fèi)者資源同步問(wèn)題, 我們利用加鎖來(lái)解決, 但是這僅僅是一對(duì)一的情況, 如果是一對(duì)多或者多對(duì)多, 上述代碼還是會(huì)出現(xiàn)問(wèn)題
- 綜上所述, 企業(yè)開(kāi)發(fā)中需要一種更牛X的技術(shù)來(lái)解決上述問(wèn)題, 那就是
管道(Channel)
- Channel的本質(zhì)是一個(gè)隊(duì)列
- Channel是線程安全的, 也就是自帶鎖定功能
- Channel聲明和初始化
- 聲明:
var 變量名chan 數(shù)據(jù)類型 - 初始化:
mych := make(chan 數(shù)據(jù)類型, 容量) - Channel和切片還有字典一樣, 必須make之后才能使用
- Channel和切片還有字典一樣, 是引用類型
- 聲明:
package main
import "fmt"
func main() {
// 1.聲明一個(gè)管道
var mych chan int
// 2.初始化一個(gè)管道
mych = make(chan int, 3)
// 3.查看管道的長(zhǎng)度和容量
fmt.Println("長(zhǎng)度是", len(mych), "容量是", cap(mych))
// 4.像管道中寫入數(shù)據(jù)
mych<- 666
fmt.Println("長(zhǎng)度是", len(mych), "容量是", cap(mych))
// 5.取出管道中寫入的數(shù)據(jù)
num := <-mych
fmt.Println("num = ", num)
fmt.Println("長(zhǎng)度是", len(mych), "容量是", cap(mych))
}
- 注意點(diǎn):
- 管道中只能存放聲明的數(shù)據(jù)類型, 不能存放其它數(shù)據(jù)類型
- 管道中如果已經(jīng)沒(méi)有數(shù)據(jù), 再取就會(huì)報(bào)錯(cuò)
- 如果管道中數(shù)據(jù)已滿, 再寫入就會(huì)報(bào)錯(cuò)
package main
import "fmt"
func main() {
// 1.聲明一個(gè)管道
var mych chan int
// 2.初始化一個(gè)管道
mych = make(chan int, 3)
// 注意點(diǎn): 管道中只能存放聲明的數(shù)據(jù)類型, 不能存放其它數(shù)據(jù)類型
//mych<-3.14
// 注意點(diǎn): 管道中如果已經(jīng)沒(méi)有數(shù)據(jù),
// 并且檢測(cè)不到有其它協(xié)程再往管道中寫入數(shù)據(jù), 那么再取就會(huì)報(bào)錯(cuò)
//num = <-mych
//fmt.Println("num = ", num)
// 注意點(diǎn): 如果管道中數(shù)據(jù)已滿, 再寫入就會(huì)報(bào)錯(cuò)
mych<- 666
mych<- 777
mych<- 888
mych<- 999
}
- 管道的關(guān)閉和遍歷
package main
import "fmt"
func main() {
// 1.創(chuàng)建一個(gè)管道
mych := make(chan int, 3)
// 2.往管道中存入數(shù)據(jù)
mych<-666
mych<-777
mych<-888
// 3.遍歷管道
// 第一次遍歷i等于0, len = 3,
// 第二次遍歷i等于1, len = 2
// 第三次遍歷i等于2, len = 1
//for i:=0; i<len(mych); i++{
// fmt.Println(<-mych) // 輸出結(jié)果不正確
//}
// 3.寫入完數(shù)據(jù)之后先關(guān)閉管道
// 注意點(diǎn): 管道關(guān)閉之后只能讀不能寫
close(mych)
//mych<- 999 // 報(bào)錯(cuò)
// 4.遍歷管道
// 利用for range遍歷, 必須先關(guān)閉管道, 否則會(huì)報(bào)錯(cuò)
//for value := range mych{
// fmt.Println(value)
//}
// close主要用途:
// 在企業(yè)開(kāi)發(fā)中我們可能不確定管道有還沒(méi)有有數(shù)據(jù), 所以我們可能一直獲取
// 但是我們可以通過(guò)ok-idiom模式判斷管道是否關(guān)閉, 如果關(guān)閉會(huì)返回false給ok
for{
if num, ok:= <-mych; ok{
fmt.Println(num)
}else{
break;
}
}
fmt.Println("數(shù)據(jù)讀取完畢")
}
- Channel阻塞現(xiàn)象
- 單獨(dú)在主線程中操作管道, 寫滿了會(huì)報(bào)錯(cuò), 沒(méi)有數(shù)據(jù)去獲取也會(huì)報(bào)錯(cuò)
- 只要在協(xié)程中操作管道過(guò), 寫滿了就會(huì)阻塞, 沒(méi)有就數(shù)據(jù)去獲取也會(huì)阻塞
package main
import (
"fmt"
"time"
)
// 創(chuàng)建一個(gè)管道
var myCh = make(chan int, 5)
func demo() {
var myCh = make(chan int, 5)
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//fmt.Println("我是第六次添加之前代碼")
//myCh<-666
//fmt.Println("我是第六次添加之后代碼")
fmt.Println("我是第六次直接獲取之前代碼")
<-myCh
fmt.Println("我是第六次直接獲取之后代碼")
}
func test() {
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//fmt.Println("我是第六次添加之前代碼")
//myCh<-666
//fmt.Println("我是第六次添加之后代碼")
//fmt.Println("我是第六次直接獲取之前代碼")
//<-myCh
//fmt.Println("我是第六次直接獲取之后代碼")
}
func example() {
time.Sleep(time.Second * 2)
myCh<-666
}
func main() {
// 1.同一個(gè)go程中操作管道
// 寫滿了會(huì)報(bào)錯(cuò)
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//myCh<-666
// 沒(méi)有了去取也會(huì)報(bào)錯(cuò)
//<-myCh
// 2.在協(xié)程中操作管道
// 寫滿了不會(huì)報(bào)錯(cuò), 但是會(huì)阻塞
//go test()
// 沒(méi)有了去取也不會(huì)報(bào)錯(cuò), 也會(huì)阻塞
//go test()
//go demo()
//go demo()
// 3.只要在協(xié)程中操作了管道, 就會(huì)發(fā)生阻塞現(xiàn)象
go example()
fmt.Println("myCh之前代碼")
<-myCh
fmt.Println("myCh之后代碼")
//for{
// ;
//}
}
- 利用Channel實(shí)現(xiàn)生產(chǎn)者消費(fèi)者
package main
import (
"fmt"
"math/rand"
"time"
)
// 定義緩沖區(qū)
var myCh = make(chan int, 5)
var exitCh = make(chan bool, 1)
// 定義生產(chǎn)者
func producer(){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生產(chǎn)者生產(chǎn)了: ", num)
// 往管道中寫入數(shù)據(jù)
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生產(chǎn)完畢之后關(guān)閉管道
close(myCh)
fmt.Println("生產(chǎn)者停止生產(chǎn)")
}
// 定義消費(fèi)者
func consumer() {
// 不斷從管道中獲取數(shù)據(jù), 直到管道關(guān)閉位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消費(fèi)者消費(fèi)了", num)
}
}
fmt.Println("消費(fèi)者停止消費(fèi)")
exitCh<-true
}
func main() {
go producer()
go consumer()
fmt.Println("exitCh之前代碼")
<-exitCh
fmt.Println("exitCh之后代碼")
}
- 無(wú)緩沖Channel
package main
import "fmt"
var myCh1 = make(chan int, 5)
var myCh2 = make(chan int, 0)
func main() {
// 有緩沖管道
// 只寫入, 不讀取不會(huì)報(bào)錯(cuò)
//myCh1<-1
//myCh1<-2
//myCh1<-3
//myCh1<-4
//myCh1<-5
//fmt.Println("len =",len(myCh1), "cap =", cap(myCh1))
// 無(wú)緩沖管道
// 只有兩端同時(shí)準(zhǔn)備好才不會(huì)報(bào)錯(cuò)
go func() {
fmt.Println(<-myCh2)
}()
// 只寫入, 不讀取會(huì)報(bào)錯(cuò)
myCh2<-1
//fmt.Println("len =",len(myCh2), "cap =", cap(myCh2))
// 寫入之后在同一個(gè)線程讀取也會(huì)報(bào)錯(cuò)
//fmt.Println(<-myCh2)
// 在主程中先寫入, 在子程中后讀取也會(huì)報(bào)錯(cuò)
//go func() {
// fmt.Println(<-myCh2)
//}()
}
- 無(wú)緩沖Channel和有緩沖Channel
- 有緩沖管道具備異步的能力(寫幾個(gè)讀一個(gè)或讀幾個(gè))
- 無(wú)緩沖管道具備同步的能力(寫一個(gè)讀一個(gè))
package main
import (
"fmt"
"math/rand"
"time"
)
// 定義緩沖區(qū)
//var myCh = make(chan int, 0)
var myCh = make(chan int)
var exitCh = make(chan bool, 1)
// 定義生產(chǎn)者
func producer(){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生產(chǎn)者生產(chǎn)了: ", num)
// 往管道中寫入數(shù)據(jù)
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生產(chǎn)完畢之后關(guān)閉管道
close(myCh)
fmt.Println("生產(chǎn)者停止生產(chǎn)")
}
// 定義消費(fèi)者
func consumer() {
// 不斷從管道中獲取數(shù)據(jù), 直到管道關(guān)閉位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消費(fèi)者消費(fèi)了", num)
}
}
fmt.Println("消費(fèi)者停止消費(fèi)")
exitCh<-true
}
func main() {
go producer()
go consumer()
fmt.Println("exitCh之前代碼")
<-exitCh
fmt.Println("exitCh之后代碼")
}
IO的延遲說(shuō)明:
看到的輸出結(jié)果和我們想象的不太一樣, 是因?yàn)镮O輸出非常消耗性能, 輸出之后還沒(méi)來(lái)得及賦值可能就跑去執(zhí)行別的協(xié)程了
- 單向管道和雙向管道
- 默認(rèn)情況下所有管道都是雙向了(可讀可寫)
- 但是在企業(yè)開(kāi)發(fā)中, 我們經(jīng)常需要用到將一個(gè)管道作為參數(shù)傳遞
- 在傳遞的過(guò)程中希望對(duì)方只能單向使用, 要么只能寫,要么只能讀
- 雙向管道
- var myCh chan int = make(chan int, 0)
- 單向管道
- var myCh chan<- int = make(chan<- int, 0)
- var myCh <-chan int = make(<-chan int, 0)
- 注意點(diǎn):
- 雙向管道可以自動(dòng)轉(zhuǎn)換為任意一種單向管道
- 單向管道不能轉(zhuǎn)換為雙向管道
package main
import "fmt"
func main() {
// 1.定義一個(gè)雙向管道
var myCh chan int = make(chan int, 5)
// 2.將雙向管道轉(zhuǎn)換單向管道
var myCh2 chan<- int
myCh2 = myCh
fmt.Println(myCh2)
var myCh3 <-chan int
myCh3 = myCh
fmt.Println(myCh3)
// 3.雙向管道,可讀可寫
myCh<-1
myCh<-2
myCh<-3
fmt.Println(<-myCh)
// 3.只寫管道,只能寫, 不能讀
// myCh2<-666
// fmt.Println(<-myCh2)
// 4.指讀管道, 只能讀,不能寫
fmt.Println(<-myCh3)
//myCh3<-666
// 注意點(diǎn): 管道之間賦值是地址傳遞, 以上三個(gè)管道底層指向相同容器
}
- 單向管道作為函數(shù)參數(shù)
package main
import (
"fmt"
"math/rand"
"time"
)
// 定義生產(chǎn)者
func producer(myCh chan<- int){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生產(chǎn)者生產(chǎn)了: ", num)
// 往管道中寫入數(shù)據(jù)
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生產(chǎn)完畢之后關(guān)閉管道
close(myCh)
fmt.Println("生產(chǎn)者停止生產(chǎn)")
}
// 定義消費(fèi)者
func consumer(myCh <-chan int) {
// 不斷從管道中獲取數(shù)據(jù), 直到管道關(guān)閉位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消費(fèi)者消費(fèi)了", num)
}
}
fmt.Println("消費(fèi)者停止消費(fèi)")
}
func main() {
// 定義緩沖區(qū)
var myCh = make(chan int, 5)
go producer(myCh)
consumer(myCh)
}
select選擇結(jié)構(gòu)
- select是Go中的一個(gè)控制結(jié)構(gòu),類似于switch語(yǔ)句,用于處理異步IO操作
- 如果有多個(gè)case都可以運(yùn)行,select會(huì)隨機(jī)選出一個(gè)執(zhí)行,其他不會(huì)執(zhí)行。
- 如果沒(méi)有可運(yùn)行的case語(yǔ)句,且有default語(yǔ)句,那么就會(huì)執(zhí)行default的動(dòng)作。
- 如果沒(méi)有可運(yùn)行的case語(yǔ)句,且沒(méi)有default語(yǔ)句,select將阻塞,直到某個(gè)case通信可以運(yùn)行
select {
case IO操作1:
IO操作1讀取或?qū)懭氤晒蛨?zhí)行
case IO操作2:
IO操作2讀取或?qū)懭氤晒蛨?zhí)行
default:
如果上面case都沒(méi)有成功,則進(jìn)入default處理流程
}
- 注意點(diǎn):
- select的case后面必須是一個(gè)IO操作
- 一般情況下使用select結(jié)構(gòu)不用寫default
package main
import (
"fmt"
"time"
)
func main() {
// 創(chuàng)建管道
var myCh = make(chan int)
var exitCh = make(chan bool)
// 生產(chǎn)數(shù)據(jù)
go func() {
for i:=0;i <10;i++{
myCh<-i
time.Sleep(time.Second)
}
//close(myCh)
exitCh<-true
}()
// 讀取數(shù)據(jù)
for{
fmt.Println("讀取代碼被執(zhí)行了")
select {
case num:= <-myCh:
fmt.Println("讀到了", num)
case <-exitCh:
//break // 沒(méi)用, 跳出的是select
return
}
fmt.Println("-----------")
}
}
- select應(yīng)用場(chǎng)景
- 實(shí)現(xiàn)多路監(jiān)聽(tīng)
- 實(shí)現(xiàn)超時(shí)處理
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// 1.創(chuàng)建管道
myCh := make(chan int, 5)
exitCh := make(chan bool)
// 2.生成數(shù)據(jù)
go func() {
for i:=0; i<10; i++ {
myCh<-i
time.Sleep(time.Second * 3)
}
}()
// 3.獲取數(shù)據(jù)
go func() {
for{
select {
case num:= <-myCh:
fmt.Println(num)
case <-time.After(time.Second * 2):
exitCh<-true
runtime.Goexit()
}
}
}()
<-exitCh
fmt.Println("程序結(jié)束")
}
定時(shí)器補(bǔ)充
- 一次性定時(shí)器
- NewTimer函數(shù)
- func NewTimer(d Duration) *Timer
- NewTimer創(chuàng)建一個(gè)Timer,它會(huì)在到期后向Timer自身的C字段發(fā)送當(dāng)時(shí)的時(shí)間
type Timer struct {
C <-chan Time // 對(duì)于我們來(lái)說(shuō), 這個(gè)屬性是只讀的管道
r runtimeTimer
}
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
fmt.Println("開(kāi)始時(shí)間", start)
timer := time.NewTimer(time.Second * 3)
fmt.Println("讀取之前代碼被執(zhí)行")
end := <-timer.C // 系統(tǒng)寫入數(shù)據(jù)之前會(huì)阻塞
fmt.Println("讀取之后代碼被執(zhí)行")
fmt.Println("結(jié)束時(shí)間", end)
}
- After函數(shù)
- func After(d Duration) <-chan Time
- 底層就是對(duì)NewTimer的封裝, 只不過(guò)返回值不同而已
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
fmt.Println("開(kāi)始時(shí)間", start)
timer := time.After(time.Second * 3)
fmt.Println("讀取之前代碼被執(zhí)行")
end := <-timer // 系統(tǒng)寫入數(shù)據(jù)之前會(huì)阻塞
fmt.Println("讀取之后代碼被執(zhí)行")
fmt.Println("結(jié)束時(shí)間", end)
}
- 周期性定時(shí)器
- NewTicker函數(shù)
- func NewTicker(d Duration) *Ticker
- 和NewTimer差不多, 只不過(guò)NewTimer只會(huì)往管道中寫入一次數(shù)據(jù), 而NewTicker每隔一段時(shí)間就會(huì)寫一次
type Ticker struct {
C <-chan Time // 周期性傳遞時(shí)間信息的通道
// 內(nèi)含隱藏或非導(dǎo)出字段
}
package main
import (
"fmt"
"time"
)
func main() {
// 1.創(chuàng)建一個(gè)周期定時(shí)器
ticker := time.NewTicker(time.Second)
// 2.不斷從重啟定時(shí)器中獲取時(shí)間
for{
t := <-ticker.C // 系統(tǒng)寫入數(shù)據(jù)之前會(huì)阻塞
fmt.Println(t)
}
}

