本文譯自:How To Close Channels in Golang Elegantly。
幾天前,我寫(xiě)了一篇文章來(lái)說(shuō)明golang中channel的使用規(guī)范。在reddit和HN,那篇文章收到了很多贊同,但是我也收到了下面幾個(gè)關(guān)于Go channel設(shè)計(jì)和規(guī)范的批評(píng):
- 在不能更改channel狀態(tài)的情況下,沒(méi)有簡(jiǎn)單普遍的方式來(lái)檢查channel是否已經(jīng)關(guān)閉了
- 關(guān)閉已經(jīng)關(guān)閉的channel會(huì)導(dǎo)致panic,所以在closer(關(guān)閉者)不知道channel是否已經(jīng)關(guān)閉的情況下去關(guān)閉channel是很危險(xiǎn)的
- 發(fā)送值到已經(jīng)關(guān)閉的channel會(huì)導(dǎo)致panic,所以如果sender(發(fā)送者)在不知道channel是否已經(jīng)關(guān)閉的情況下去向channel發(fā)送值是很危險(xiǎn)的
那些批評(píng)看起來(lái)都很有道理(實(shí)際上并沒(méi)有)。是的,沒(méi)有一個(gè)內(nèi)置函數(shù)可以檢查一個(gè)channel是否已經(jīng)關(guān)閉。如果你能確定不會(huì)向channel發(fā)送任何值,那么也確實(shí)需要一個(gè)簡(jiǎn)單的方法來(lái)檢查channel是否已經(jīng)關(guān)閉:
package main
import "fmt"
type T int
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
上面已經(jīng)提到了,沒(méi)有一種適用的方式來(lái)檢查channel是否已經(jīng)關(guān)閉了。但是,就算有一個(gè)簡(jiǎn)單的 closed(chan T) bool函數(shù)來(lái)檢查channel是否已經(jīng)關(guān)閉,它的用處還是很有限的,就像內(nèi)置的len函數(shù)用來(lái)檢查緩沖channel中元素?cái)?shù)量一樣。原因就在于,已經(jīng)檢查過(guò)的channel的狀態(tài)有可能在調(diào)用了類(lèi)似的方法返回之后就修改了,因此返回來(lái)的值已經(jīng)不能夠反映剛才檢查的channel的當(dāng)前狀態(tài)了。
盡管在調(diào)用closed(ch)返回true的情況下停止向channel發(fā)送值是可以的,但是如果調(diào)用closed(ch)返回false,那么關(guān)閉channel或者繼續(xù)向channel發(fā)送值就不安全了(會(huì)panic)。
The Channel Closing Principle
在使用Go channel的時(shí)候,一個(gè)適用的原則是不要從接收端關(guān)閉channel,也不要關(guān)閉有多個(gè)并發(fā)發(fā)送者的channel。換句話(huà)說(shuō),如果sender(發(fā)送者)只是唯一的sender或者是channel最后一個(gè)活躍的sender,那么你應(yīng)該在sender的goroutine關(guān)閉channel,從而通知receiver(s)(接收者們)已經(jīng)沒(méi)有值可以讀了。維持這條原則將保證永遠(yuǎn)不會(huì)發(fā)生向一個(gè)已經(jīng)關(guān)閉的channel發(fā)送值或者關(guān)閉一個(gè)已經(jīng)關(guān)閉的channel。
(下面,我們將會(huì)稱(chēng)上面的原則為channel closing principle
打破channel closing principle的解決方案
如果你因?yàn)槟撤N原因從接收端(receiver side)關(guān)閉channel或者在多個(gè)發(fā)送者中的一個(gè)關(guān)閉channel,那么你應(yīng)該使用列在Golang panic/recover Use Cases的函數(shù)來(lái)安全地發(fā)送值到channel中(假設(shè)channel的元素類(lèi)型是T)
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// the return result can be altered
// in a defer function call
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
如果channel ch沒(méi)有被關(guān)閉的話(huà),那么這個(gè)函數(shù)的性能將和ch <- value接近。對(duì)于channel關(guān)閉的時(shí)候,SafeSend函數(shù)只會(huì)在每個(gè)sender goroutine中調(diào)用一次,因此程序不會(huì)有太大的性能損失。
同樣的想法也可以用在從多個(gè)goroutine關(guān)閉channel中:
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
// assume ch != nil here.
close(ch) // panic if ch is closed
return true
}
很多人喜歡用sync.Once來(lái)關(guān)閉channel:
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func(){
close(mc.C)
})
}
當(dāng)然了,我們也可以用sync.Mutex來(lái)避免多次關(guān)閉channel:
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
mc.mutex.Unlock()
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
我們應(yīng)該要理解為什么Go不支持內(nèi)置SafeSend和SafeClose函數(shù),原因就在于并不推薦從接收端或者多個(gè)并發(fā)發(fā)送端關(guān)閉channel。Golang甚至禁止關(guān)閉只接收(receive-only)的channel。
保持channel closing principle的優(yōu)雅方案
上面的SaveSend函數(shù)有一個(gè)缺點(diǎn)是,在select語(yǔ)句的case關(guān)鍵字后不能作為發(fā)送操作被調(diào)用(譯者注:類(lèi)似于 case SafeSend(ch, t):)。另外一個(gè)缺點(diǎn)是,很多人,包括我自己都覺(jué)得上面通過(guò)使用panic/recover和sync包的方案不夠優(yōu)雅。針對(duì)各種場(chǎng)景,下面介紹不用使用panic/recover和sync包,純粹是利用channel的解決方案。
(在下面的例子總,sync.WaitGroup只是用來(lái)讓例子完整的。它的使用在實(shí)踐中不一定一直都有用)
- M個(gè)receivers,一個(gè)sender,sender通過(guò)關(guān)閉data channel說(shuō)“不再發(fā)送”
這是最簡(jiǎn)單的場(chǎng)景了,就只是當(dāng)sender不想再發(fā)送的時(shí)候讓sender關(guān)閉data 來(lái)關(guān)閉channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
- 一個(gè)receiver,N個(gè)sender,receiver通過(guò)關(guān)閉一個(gè)額外的signal channel說(shuō)“請(qǐng)停止發(fā)送”
這種場(chǎng)景比上一個(gè)要復(fù)雜一點(diǎn)。我們不能讓receiver關(guān)閉data channel,因?yàn)檫@么做將會(huì)打破channel closing principle。但是我們可以讓receiver關(guān)閉一個(gè)額外的signal channel來(lái)通知sender停止發(fā)送值:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
正如注釋說(shuō)的,對(duì)于額外的signal channel來(lái)說(shuō),它的sender是data channel的receiver。這個(gè)額外的signal channel被它唯一的sender關(guān)閉,遵守了channel closing principle。
- M個(gè)receiver,N個(gè)sender,它們當(dāng)中任意一個(gè)通過(guò)通知一個(gè)moderator(仲裁者)關(guān)閉額外的signal channel來(lái)說(shuō)“讓我們結(jié)束游戲吧”
這是最復(fù)雜的場(chǎng)景了。我們不能讓任意的receivers和senders關(guān)閉data channel,也不能讓任何一個(gè)receivers通過(guò)關(guān)閉一個(gè)額外的signal channel來(lái)通知所有的senders和receivers退出游戲。這么做的話(huà)會(huì)打破channel closing principle。但是,我們可以引入一個(gè)moderator來(lái)關(guān)閉一個(gè)額外的signal channel。這個(gè)例子的一個(gè)技巧是怎么通知moderator去關(guān)閉額外的signal channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
在這個(gè)例子中,仍然遵守著channel closing principle。
請(qǐng)注意channel toStop的緩沖大小是1.這是為了避免當(dāng)mederator goroutine 準(zhǔn)備好之前第一個(gè)通知就已經(jīng)發(fā)送了,導(dǎo)致丟失。
- 更多的場(chǎng)景?
很多的場(chǎng)景變體是基于上面三種的。舉個(gè)例子,一個(gè)基于最復(fù)雜情況的變體可能要求receivers讀取buffer channel中剩下所有的值。這應(yīng)該很容易處理,所有這篇文章也就不提了。
盡管上面三種場(chǎng)景不能覆蓋所有Go channel的使用場(chǎng)景,但它們是最基礎(chǔ)的,實(shí)踐中的大多數(shù)場(chǎng)景都可以分類(lèi)到那三種中。
結(jié)論
這里沒(méi)有一種場(chǎng)景要求你去打破channel closing principle。如果你遇到了這種場(chǎng)景,請(qǐng)思考一下你的設(shè)計(jì)并重寫(xiě)你的代碼。
用Go編程就像在創(chuàng)作藝術(shù)。