當(dāng)一個(gè)資源需要在 goroutine 之間共享時(shí),通道在 goroutine 之間架起了一個(gè)管道,并提供了確保同步交換數(shù)據(jù)的機(jī)制。(這是除了 atomic 和 mutex 之外的第三種處理競(jìng)態(tài)資源的方式)
Channel分為兩種:
- 無(wú)緩沖的 Channel:無(wú)緩沖的通道(unbuffered channel)是指在接收前沒(méi)有能力保存任何值的通道。這種類(lèi)型的通道要求發(fā)送 goroutine 和接收 goroutine 同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。
如果兩個(gè) goroutine 沒(méi)有同時(shí)準(zhǔn)備好,通道會(huì)導(dǎo)致先執(zhí)行發(fā)送或接收操作的 goroutine 阻塞等待。這種對(duì)通道進(jìn)行發(fā)送和接收的交互行為本身就是同步的。- 有緩沖的 Channel:有緩沖的通道(buffered channel)是一種在被接收前能存儲(chǔ)一個(gè)或者多個(gè)值的通道。這種類(lèi)型的通道并不強(qiáng)制要求 goroutine 之間必須同時(shí)完成發(fā)送和接收。
只有在通道中沒(méi)有要接收的值時(shí),接收動(dòng)作才會(huì)阻塞。只有在通道沒(méi)有可用緩沖區(qū)容納被發(fā)送的值時(shí),發(fā)送動(dòng)作才會(huì)阻塞。
一、無(wú)緩沖的 Channel 使用示例
import (
"sync"
"fmt"
"math/rand"
)
var wg sync.WaitGroup
// Channel 完整的類(lèi)型是 "chan 數(shù)據(jù)類(lèi)型"
func player(name string, court chan int) {
defer wg.Done()
for {
// 1. 阻塞等待接球,如果通道關(guān)閉,ok返回false
ball, ok := <-court
if !ok {
fmt.Printf("channel already closed! Player %s won\n", name)
return
}
random := rand.Intn(100)
if random%13 == 0 {
fmt.Printf("Player %s Lose\n", name)
// 關(guān)閉通道
close(court)
return
}
fmt.Printf("Player %s Hit %d\n", name, ball)
ball ++
// 2. 發(fā)球,阻塞等待對(duì)方接球
court <- ball
}
}
// 兩個(gè) player 打網(wǎng)球,即生產(chǎn)者和消費(fèi)者模式(互為生產(chǎn)者和消費(fèi)者)
func main() {
wg.Add(2)
// 1. 創(chuàng)建一個(gè)無(wú)緩沖的通道
// Channel 完整的類(lèi)型是 "chan 數(shù)據(jù)類(lèi)型"
court := make(chan int)
// 2. 創(chuàng)建兩個(gè) goroutine
go player("zhangsan", court)
go player("lisi", court)
// 3. 發(fā)球:向通道發(fā)送數(shù)據(jù),阻塞等待通道對(duì)端接收
court <- 1
// 4. 等待輸家出現(xiàn)
wg.Wait()
}
二、有緩沖的 Channel 使用示例
import (
"sync"
"fmt"
"time"
)
// 使用4個(gè)goroutine來(lái)完成10個(gè)任務(wù)
const (
taskNum = 10
goroutineNum = 4
)
var countDownLatch sync.WaitGroup
func worker(name string, taskChannel chan string) {
defer countDownLatch.Done()
for {
// 1. 不斷的阻塞等待分配工作
task, ok := <-taskChannel
if !ok {
fmt.Printf("channel closed and channel is empty\n")
return
}
//fmt.Printf("worker %s start %s\n", name, task)
time.Sleep(100 * time.Millisecond)
fmt.Printf("worker %s complete %s\n", name, task)
}
}
func main() {
countDownLatch.Add(goroutineNum)
// 1. 創(chuàng)建有緩沖區(qū)的string channel
taskChannel := make(chan string, taskNum)
// 2. 創(chuàng)建 4 個(gè)goroutine去干活
for i := 0; i < goroutineNum; i++ {
go worker(fmt.Sprintf("worker %d", i), taskChannel)
}
// 3. 向通道加入task
for i := 0; i < taskNum; i++ {
taskChannel <- fmt.Sprintf("task %d", i)
}
// 4. 關(guān)閉通道:
// 當(dāng)通道關(guān)閉后,goroutine 依舊可以從通道接收數(shù)據(jù),但是不能再向通道里發(fā)送數(shù)據(jù)。
// 能夠從已經(jīng)關(guān)閉的通道接收數(shù)據(jù)這一點(diǎn)非常重要,因?yàn)檫@允許通道關(guān)閉后依舊能取出其中緩沖的全部值,而不會(huì)有數(shù)據(jù)丟失。
// 從一個(gè)已經(jīng)關(guān)閉且沒(méi)有數(shù)據(jù)的通道里獲取數(shù)據(jù),總會(huì)立刻返回,并返回一個(gè)通道類(lèi)型的零值
close(taskChannel)
// 5. 等待
countDownLatch.Wait()
}
當(dāng)通道關(guān)閉后,goroutine 依舊可以從通道接收數(shù)據(jù),但是不能再向通道里發(fā)送數(shù)據(jù)。能夠從已經(jīng)關(guān)閉的通道接收數(shù)據(jù)這一點(diǎn)非常重要,因?yàn)檫@允許通道關(guān)閉后依舊能取出其中緩沖的全部值,而不會(huì)有數(shù)據(jù)丟失。從一個(gè)已經(jīng)關(guān)閉且沒(méi)有數(shù)據(jù)的通道里獲取數(shù)據(jù),總會(huì)立刻返回,并返回一個(gè)通道類(lèi)型的零值