這篇文章總結(jié)了channel的10種常用操作,以一個(gè)更高的視角看待channel,會(huì)給大家?guī)韺?duì)channel更全面的認(rèn)識(shí)。
在介紹10種操作前,先簡(jiǎn)要介紹下channel的使用場(chǎng)景、基本操作和注意事項(xiàng)。
channel的使用場(chǎng)景
把channel用在數(shù)據(jù)流動(dòng)的地方:
- 消息傳遞、消息過濾
- 信號(hào)廣播
- 事件訂閱與廣播
- 請(qǐng)求、響應(yīng)轉(zhuǎn)發(fā)
- 任務(wù)分發(fā)
- 結(jié)果匯總
- 并發(fā)控制
- 同步與異步
- ...
channel的基本操作和注意事項(xiàng)
channel存在3種狀態(tài):
- nil,未初始化的狀態(tài),只進(jìn)行了聲明,或者手動(dòng)賦值為
nil - active,正常的channel,可讀或者可寫
- closed,已關(guān)閉,千萬不要誤認(rèn)為關(guān)閉channel后,channel的值是nil
channel可進(jìn)行3種操作:
- 讀
- 寫
- 關(guān)閉
把這3種操作和3種channel狀態(tài)可以組合出9種情況:
| 操作 | nil的channel | 正常channel | 已關(guān)閉channel |
|---|---|---|---|
| <- ch | 阻塞 | 成功或阻塞 | 讀到零值 |
| ch <- | 阻塞 | 成功或阻塞 | panic |
| close(ch) | panic | 成功 | panic |
對(duì)于nil通道的情況,也并非完全遵循上表,有1個(gè)特殊場(chǎng)景:當(dāng)nil的通道在select的某個(gè)case中時(shí),這個(gè)case會(huì)阻塞,但不會(huì)造成死鎖。
參考代碼請(qǐng)看:https://dave.cheney.net/2014/03/19/channel-axioms
下面介紹使用channel的10種常用操作。
1. 使用for range讀channel
- 場(chǎng)景:當(dāng)需要不斷從channel讀取數(shù)據(jù)時(shí)
- 原理:使用
for-range讀取channel,這樣既安全又便利,當(dāng)channel關(guān)閉時(shí),for循環(huán)會(huì)自動(dòng)退出,無需主動(dòng)監(jiān)測(cè)channel是否關(guān)閉,可以防止讀取已經(jīng)關(guān)閉的channel,造成讀到數(shù)據(jù)為通道所存儲(chǔ)的數(shù)據(jù)類型的零值。 - 用法:
for x := range ch{
fmt.Println(x)
}
2. 使用_,ok判斷channel是否關(guān)閉
- 場(chǎng)景:讀channel,但不確定channel是否關(guān)閉時(shí)
- 原理:讀已關(guān)閉的channel會(huì)得到零值,如果不確定channel,需要使用
ok進(jìn)行檢測(cè)。ok的結(jié)果和含義:-
true:讀到數(shù)據(jù),并且通道沒有關(guān)閉。 -
false:通道關(guān)閉,無數(shù)據(jù)讀到。
-
- 用法:
if v, ok := <- ch; ok {
fmt.Println(v)
}
3. 使用select處理多個(gè)channel
- 場(chǎng)景:需要對(duì)多個(gè)通道進(jìn)行同時(shí)處理,但只處理最先發(fā)生的channel時(shí)
- 原理:
select可以同時(shí)監(jiān)控多個(gè)通道的情況,只處理未阻塞的case。當(dāng)通道為nil時(shí),對(duì)應(yīng)的case永遠(yuǎn)為阻塞,無論讀寫。特殊關(guān)注:普通情況下,對(duì)nil的通道寫操作是要panic的。 - 用法:
// 分配job時(shí),如果收到關(guān)閉的通知?jiǎng)t退出,不分配job
func (h *Handler) handle(job *Job) {
select {
case h.jobCh<-job:
return
case <-h.stopCh:
return
}
}
4. 使用channel的聲明控制讀寫權(quán)限
- 場(chǎng)景:協(xié)程對(duì)某個(gè)通道只讀或只寫時(shí)
- 目的:A. 使代碼更易讀、更易維護(hù),B. 防止只讀協(xié)程對(duì)通道進(jìn)行寫數(shù)據(jù),但通道已關(guān)閉,造成panic。
- 用法:
- 如果協(xié)程對(duì)某個(gè)channel只有寫操作,則這個(gè)channel聲明為只寫。
- 如果協(xié)程對(duì)某個(gè)channel只有讀操作,則這個(gè)channe聲明為只讀。
// 只有g(shù)enerator進(jìn)行對(duì)outCh進(jìn)行寫操作,返回聲明
// <-chan int,可以防止其他協(xié)程亂用此通道,造成隱藏bug
func generator(int n) <-chan int {
outCh := make(chan int)
go func(){
for i:=0;i<n;i++{
outCh<-i
}
}()
return outCh
}
// consumer只讀inCh的數(shù)據(jù),聲明為<-chan int
// 可以防止它向inCh寫數(shù)據(jù)
func consumer(inCh <-chan int) {
for x := range inCh {
fmt.Println(x)
}
}
5. 使用緩沖channel增強(qiáng)并發(fā)
- 場(chǎng)景:并發(fā)
- 原理:有緩沖通道可供多個(gè)協(xié)程同時(shí)處理,在一定程度可提高并發(fā)性。
- 用法:
// 無緩沖
ch1 := make(chan int)
ch2 := make(chan int, 0)
// 有緩沖
ch3 := make(chan int, 1)
func test() {
inCh := generator(100)
outCh := make(chan int, 10)
// 使用5個(gè)`do`協(xié)程同時(shí)處理輸入數(shù)據(jù)
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go do(inCh, outCh, &wg)
}
go func() {
wg.Wait()
close(outCh)
}()
for r := range outCh {
fmt.Println(r)
}
}
func generator(n int) <-chan int {
outCh := make(chan int)
go func() {
for i := 0; i < n; i++ {
outCh <- i
}
close(outCh)
}()
return outCh
}
func do(inCh <-chan int, outCh chan<- int, wg *sync.WaitGroup) {
for v := range inCh {
outCh <- v * v
}
wg.Done()
}
6. 為操作加上超時(shí)
- 場(chǎng)景:需要超時(shí)控制的操作
- 原理:使用
select和time.After,看操作和定時(shí)器哪個(gè)先返回,處理先完成的,就達(dá)到了超時(shí)控制的效果 - 用法:
func doWithTimeOut(timeout time.Duration) (int, error) {
select {
case ret := <-do():
return ret, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
func do() <-chan int {
outCh := make(chan int)
go func() {
// do work
}()
return outCh
}
7. 使用time實(shí)現(xiàn)channel無阻塞讀寫
- 場(chǎng)景:并不希望在channel的讀寫上浪費(fèi)時(shí)間
- 原理:是為操作加上超時(shí)的擴(kuò)展,這里的操作是channel的讀或?qū)?/li>
- 用法:
func unBlockRead(ch chan int) (x int, err error) {
select {
case x = <-ch:
return x, nil
case <-time.After(time.Microsecond):
return 0, errors.New("read time out")
}
}
func unBlockWrite(ch chan int, x int) (err error) {
select {
case ch <- x:
return nil
case <-time.After(time.Microsecond):
return errors.New("read time out")
}
}
注:time.After等待可以替換為default,則是channel阻塞時(shí),立即返回的效果
8. 使用close(ch)關(guān)閉所有下游協(xié)程
- 場(chǎng)景:退出時(shí),顯示通知所有協(xié)程退出
- 原理:所有讀
ch的協(xié)程都會(huì)收到close(ch)的信號(hào) - 用法:
func (h *Handler) Stop() {
close(h.stopCh)
// 可以使用WaitGroup等待所有協(xié)程退出
}
// 收到停止后,不再處理請(qǐng)求
func (h *Handler) loop() error {
for {
select {
case req := <-h.reqCh:
go handle(req)
case <-h.stopCh:
return
}
}
}
9. 使用chan struct{}作為信號(hào)channel
- 場(chǎng)景:使用channel傳遞信號(hào),而不是傳遞數(shù)據(jù)時(shí)
- 原理:沒數(shù)據(jù)需要傳遞時(shí),傳遞空struct
- 用法:
// 上例中的Handler.stopCh就是一個(gè)例子,stopCh并不需要傳遞任何數(shù)據(jù)
// 只是要給所有協(xié)程發(fā)送退出的信號(hào)
type Handler struct {
stopCh chan struct{}
reqCh chan *Request
}
10. 使用channel傳遞結(jié)構(gòu)體的指針而非結(jié)構(gòu)體
- 場(chǎng)景:使用channel傳遞結(jié)構(gòu)體數(shù)據(jù)時(shí)
- 原理:channel本質(zhì)上傳遞的是數(shù)據(jù)的拷貝,拷貝的數(shù)據(jù)越小傳輸效率越高,傳遞結(jié)構(gòu)體指針,比傳遞結(jié)構(gòu)體更高效
- 用法:
reqCh chan *Request
// 好過
reqCh chan Request
11. 使用channel傳遞channel
- 場(chǎng)景:使用場(chǎng)景有點(diǎn)多,通常是用來獲取結(jié)果。
- 原理:channel可以用來傳遞變量,channel自身也是變量,可以傳遞自己。
- 用法:下面示例展示了有序展示請(qǐng)求的結(jié)果,另一個(gè)示例可以見另外文章的版本3。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
// 存放結(jié)果的channel的channel
outs := make(chan chan int, len(reqs))
var wg sync.WaitGroup
wg.Add(len(reqs))
for _, x := range reqs {
o := handle(&wg, x)
outs <- o
}
go func() {
wg.Wait()
close(outs)
}()
// 讀取結(jié)果,結(jié)果有序
for o := range outs {
fmt.Println(<-o)
}
}
// handle 處理請(qǐng)求,耗時(shí)隨機(jī)模擬
func handle(wg *sync.WaitGroup, a int) chan int {
out := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
out <- a
wg.Done()
}()
return out
}
你有哪些channel的奇淫巧技,說來看看?
- 如果這篇文章對(duì)你有幫助,請(qǐng)點(diǎn)個(gè)贊/喜歡,感謝。
- 本文作者:大彬
- 如果喜歡本文,隨意轉(zhuǎn)載,但請(qǐng)保留此原文鏈接:http://lessisbetter.site/2019/01/20/golang-channel-all-usage/

image