go關(guān)鍵字可以用來開啟一個goroutine(協(xié)程))進行任務處理,而多個任務之間如果需要通信,就需要用到channel了。
func testSimple(){
intChan := make(chan int)
go func() {
intChan <- 1
}()
value := <- intChan
fmt.Println("value : ", value)
}
上面這個簡單的例子就是新開啟的goroutine向intChan發(fā)送了一個1的值,那么在主線程的intChan就會收到這個值的信息。
channel類型:無緩沖和緩沖類型
channel有兩種形式的,一種是無緩沖的,一個線程向這個channel發(fā)送了消息后,會阻塞當前的這個線程,知道其他線程去接收這個channel的消息。無緩沖的形式如下:
intChan := make(chan int)
帶緩沖的channel,是可以指定緩沖的消息數(shù)量,當消息數(shù)量小于指定值時,不會出現(xiàn)阻塞,超過之后才會阻塞,需要等待其他線程去接收channel處理,帶緩沖的形式如下:
//3為緩沖數(shù)量
intChan := make(chan int, 3)
傳輸struct結(jié)構(gòu)數(shù)據(jù)
channel可以傳輸基本類型的數(shù)據(jù)如int, string,同時也可以傳輸struct數(shù)據(jù)
type Person struct {
Name string
Age uint8
Address Addr
}
type Addr struct {
city string
district string
}
/*
測試channel傳輸復雜的Struct數(shù)據(jù)
*/
func testTranslateStruct() {
personChan := make(chan Person, 1)
person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
personChan <- person
person.Address = Addr{"guangzhou", "huadu"}
fmt.Printf("src person : %+v \n", person)
newPerson := <-personChan
fmt.Printf("new person : %+v \n", newPerson)
}
這里可以看到可以通過channel傳輸自定義的Person對象,同時一端修改了數(shù)據(jù),不影響另一端的數(shù)據(jù),也就是說通過channel傳遞后的數(shù)據(jù)是獨立的。
關(guān)閉channel
channel可以進行關(guān)閉,例如寫的一段關(guān)閉了channel,那么讀的一端讀取時就可以檢測讀取失敗
/*
測試關(guān)閉channel
*/
func testClose() {
ch := make(chan int, 5)
sign := make(chan int, 2)
go func() {
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(time.Second)
}
close(ch)
fmt.Println("the channel is closed")
sign <- 0
}()
go func() {
for {
i, ok := <-ch
fmt.Printf("%d, %v \n", i, ok)
if !ok {
break
}
time.Sleep(time.Second * 2)
}
sign <- 1
}()
<-sign
<-sign
}
合并多個channel的輸出
可以將多個channel的數(shù)據(jù)合并到一個channel進行輸出,形成一個消息隊列
/**
將多個輸入的channel進行合并成一個channel
*/
func testMergeInput() {
input1 := make(chan int)
input2 := make(chan int)
output := make(chan int)
go func(in1, in2 <-chan int, out chan<- int) {
for {
select {
case v := <-in1:
out <- v
case v := <-in2:
out <- v
}
}
}(input1, input2, output)
go func() {
for i := 0; i < 10; i++ {
input1 <- i
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
for i := 20; i < 30; i++ {
input2 <- i
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
for {
select {
case value := <-output:
fmt.Println("輸出:", value)
}
}
}()
time.Sleep(time.Second * 5)
fmt.Println("主線程退出")
}
通過channel實現(xiàn)退出的通知
定義一個用于退出的channel比如quit,不斷執(zhí)行任務的線程通過select監(jiān)聽quit的讀取,當讀取到quit中的消息時,退出當前的任務線程,這里是主線程通知任務線程退出。
/*
測試channel用于通知中斷退出的問題
*/
func testQuit() {
g := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-g:
fmt.Println(v)
case <-quit:
fmt.Println("B退出")
return
}
}
}()
for i := 0; i < 3; i++ {
g <- i
}
quit <- true
fmt.Println("testAB退出")
}
生產(chǎn)者消費者問題
通過channel可以比較方便的實現(xiàn)生產(chǎn)者消費者模型,這里開啟一個生產(chǎn)者線程,一個消費者線程,生產(chǎn)者線程往channel中發(fā)送消息,同時阻塞,消費者線程輪詢獲取channel中的消息,
進行處理,然后阻塞,這時生產(chǎn)者線程喚醒繼續(xù)后面的邏輯,如此便形成了簡單的生產(chǎn)者消費者模型。同時生產(chǎn)者在完成了所有的消息發(fā)送后,可以通過quit這個channel通知消費者線程退出,
而消費者線程退出時,通知主線程退出,整個程序完成退出。
/**
生產(chǎn)者消費者問題
*/
func testPCB() {
fmt.Println("test PCB")
intchan := make(chan int)
quitChan := make(chan bool)
quitChan2 := make(chan bool)
value := 0
go func() {
for i := 0; i < 3; i++ {
value = value + 1
intchan <- value
fmt.Println("write finish, value ", value)
time.Sleep(time.Second)
}
quitChan <- true
}()
go func() {
for {
select {
case v := <-intchan:
fmt.Println("read finish, value ", v)
case <-quitChan:
quitChan2 <- true
return
}
}
}()
<-quitChan2
fmt.Println("task is done ")
}
輸出順序問題
/*
這個結(jié)果輸出是1,2, 也可能是2,1, 也可能是2,順序是不一定的
*/
func testSequnse() {
ch := make(chan int)
go func() {
v := <-ch
fmt.Println(v)
}()
ch <- 1
fmt.Println("2")
}
上面這個輸出結(jié)果是什么呢?運行一下會發(fā)現(xiàn),可能是1,2,也可能是2,1, 也可能是2,順序是不一定的,那么為什么會是這樣的,我覺得因為這是兩個不同的線程,
它們是獨立運行的,當v := <-ch 執(zhí)行之后,主線程和當前線程都是運行狀態(tài)(非阻塞),先執(zhí)行主線程還是新線程的輸出就看cpu運行了,所以結(jié)果是不確定的。
channel的超時處理
通過time可以實現(xiàn)channel的超時處理,當一個channel讀取超過一定時間沒有消息到來時,就可以得到超時通知處理,防止一直阻塞當前線程
/*
檢查channel讀寫超時,并做超時的處理
*/
func testTimeout() {
g := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-g:
fmt.Println(v)
case <-time.After(time.Second * time.Duration(3)):
quit <- true
fmt.Println("超時,通知主線程退出")
return
}
}
}()
for i := 0; i < 3; i++ {
g <- i
}
<-quit
fmt.Println("收到退出通知,主線程退出")
}
channel的輸入輸出類型指定
channel可以在顯示指定它是輸入型還是輸出型的,指定為輸入型,則不能使用它輸出消息,否則出錯編譯不通過,同理,輸出型不能接受消息輸入,
這樣可以在編寫代碼時防止手誤寫錯誤輸入輸出類型而導致程序錯誤的問題。指定輸入輸出類型可以在方法參數(shù)時設定,那么它只在當前方法中會做輸入輸出限制,
可看下面實現(xiàn)。
/*
指定channel是輸入還是輸出型的,防止編寫時寫錯誤輸入輸出,指定了的話,可以在編譯時期作錯誤的檢查
*/
func testInAndOutChan() {
ch := make(chan int)
quit := make(chan bool)
//輸入型的chan是這種格式的:inChan chan<- int,如果換成輸出型的,則編譯時會報錯
go func(inChan chan<- int) {
for i := 0; i < 10; i++ {
inChan <- i
time.Sleep(time.Millisecond * 500)
}
quit <- true
quit <- true
}(ch)
go func(outChan <-chan int) {
for {
select {
case v := <-outChan:
fmt.Println("print out value : ", v)
case <-quit:
fmt.Println("收到退出通知,退出")
return
}
}
}(ch)
<-quit
fmt.Println("收到退出通知,主線程退出")
}
channel實現(xiàn)并發(fā)數(shù)量控制
通過設置一個帶緩沖數(shù)量的的channel來實現(xiàn)最大并發(fā)數(shù)量,最大并發(fā)數(shù)量即為緩沖數(shù)量,任務開始時想limit這個channel發(fā)送消息,
任務執(zhí)行完成后從這個limit讀取消息,這樣就可以保證當并發(fā)數(shù)量達到limit的緩沖數(shù)量時,limit <- true 這里會發(fā)生阻塞,停止
創(chuàng)建新的線程,知道某個線程執(zhí)行完成任務后,從limit讀取數(shù)據(jù),這樣就能保證最大并發(fā)數(shù)量控制在緩沖數(shù)量。
/*
測試通過channel來控制最大并發(fā)數(shù),來處理事件
*/
func testMaxNumControl() {
maxNum := 3
limit := make(chan bool, maxNum)
quit := make(chan bool)
for i:=0; i<100; i++{
fmt.Println("start worker : ", i)
limit <- true
go func(i int) {
fmt.Println("do worker start: ", i)
time.Sleep(time.Millisecond * 20)
fmt.Println("do worker finish: ", i)
<- limit
if i == 99{
fmt.Println("完成任務")
quit <- true
}
}(i)
}
<-quit
fmt.Println("收到退出通知,主程序退出")
}
監(jiān)聽中斷信號的channel
可以創(chuàng)建一個signal信號的channel,同時通過signal.Notify來監(jiān)聽os.Interrupt這個中斷信號,因此執(zhí)行到<- quit時就會阻塞在這里,
直到收到了os.Interrupt這個中斷信號,比如按Ctrl+C中斷程序的時候,主程序就會退出了。當然還可以監(jiān)聽其他信號,例如os.Kill等。
/*
監(jiān)聽中斷信號的channel
*/
func testSignal() {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
go func() {
time.Sleep(time.Second * 2)
number := 0;
for{
number++
println("number : ", number)
time.Sleep(time.Second)
}
}()
fmt.Println("按Ctrl+C可退出程序")
<- quit
fmt.Println("主程序退出")
}
channel實現(xiàn)同步控制,生產(chǎn)者消費者模型
開啟多個線程做賺錢和花錢的操作,共享讀寫remainMoney這個剩余金額變量,實現(xiàn)生產(chǎn)者消費者模型
//同步控制模型,生產(chǎn)者模型
var lockChan = make(chan int, 1)
var remainMoney = 1000
func testSynchronize() {
quit := make(chan bool, 2)
go func() {
for i:=0; i<10; i++{
money := (rand.Intn(12) + 1) * 100
go testSynchronize_expense(money)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
quit <- true
}()
go func() {
for i:=0; i<10; i++{
money := (rand.Intn(12) + 1) * 100
go testSynchronize_gain(money)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
quit <- true
}()
<- quit
<- quit
fmt.Println("主程序退出")
}
func testSynchronize_expense(money int) {
lockChan <- 0
if(remainMoney >= money){
srcRemainMoney := remainMoney
remainMoney -= money
fmt.Printf("原來有%d, 花了%d,剩余%d\n", srcRemainMoney, money, remainMoney)
}else{
fmt.Printf("想消費%d錢不夠了, 只剩%d\n", money, remainMoney)
}
<- lockChan
}
func testSynchronize_gain(money int) {
lockChan <- 0
srcRemainMoney := remainMoney
remainMoney += money
fmt.Printf("原來有%d, 賺了%d,剩余%d\n", srcRemainMoney, money, remainMoney)
<- lockChan
}