如何優(yōu)雅地關(guān)閉Go channel

本文譯自:How To Close Channels in Golang Elegantly。
幾天前,我寫(xiě)了一篇文章來(lái)說(shuō)明golang中channel的使用規(guī)范。在redditHN,那篇文章收到了很多贊同,但是我也收到了下面幾個(gè)關(guān)于Go channel設(shè)計(jì)和規(guī)范的批評(píng):

  1. 在不能更改channel狀態(tài)的情況下,沒(méi)有簡(jiǎn)單普遍的方式來(lái)檢查channel是否已經(jīng)關(guān)閉了
  2. 關(guān)閉已經(jīng)關(guān)閉的channel會(huì)導(dǎo)致panic,所以在closer(關(guān)閉者)不知道channel是否已經(jīng)關(guān)閉的情況下去關(guān)閉channel是很危險(xiǎn)的
  3. 發(fā)送值到已經(jīng)關(guān)閉的channel會(huì)導(dǎo)致panic,所以如果sender(發(fā)送者)在不知道channel是否已經(jīng)關(guān)閉的情況下去向channel發(fā)送值是很危險(xiǎn)的

那些批評(píng)看起來(lái)都很有道理(實(shí)際上并沒(méi)有)。是的,沒(méi)有一個(gè)內(nèi)置函數(shù)可以檢查一個(gè)channel是否已經(jīng)關(guān)閉。如果你能確定不會(huì)向channel發(fā)送任何值,那么也確實(shí)需要一個(gè)簡(jiǎn)單的方法來(lái)檢查channel是否已經(jīng)關(guān)閉:

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }
    
    return false
}

func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

上面已經(jīng)提到了,沒(méi)有一種適用的方式來(lái)檢查channel是否已經(jīng)關(guān)閉了。但是,就算有一個(gè)簡(jiǎn)單的 closed(chan T) bool函數(shù)來(lái)檢查channel是否已經(jīng)關(guān)閉,它的用處還是很有限的,就像內(nèi)置的len函數(shù)用來(lái)檢查緩沖channel中元素?cái)?shù)量一樣。原因就在于,已經(jīng)檢查過(guò)的channel的狀態(tài)有可能在調(diào)用了類(lèi)似的方法返回之后就修改了,因此返回來(lái)的值已經(jīng)不能夠反映剛才檢查的channel的當(dāng)前狀態(tài)了。
盡管在調(diào)用closed(ch)返回true的情況下停止向channel發(fā)送值是可以的,但是如果調(diào)用closed(ch)返回false,那么關(guān)閉channel或者繼續(xù)向channel發(fā)送值就不安全了(會(huì)panic)。

The Channel Closing Principle

在使用Go channel的時(shí)候,一個(gè)適用的原則是不要從接收端關(guān)閉channel,也不要關(guān)閉有多個(gè)并發(fā)發(fā)送者的channel。換句話(huà)說(shuō),如果sender(發(fā)送者)只是唯一的sender或者是channel最后一個(gè)活躍的sender,那么你應(yīng)該在sender的goroutine關(guān)閉channel,從而通知receiver(s)(接收者們)已經(jīng)沒(méi)有值可以讀了。維持這條原則將保證永遠(yuǎn)不會(huì)發(fā)生向一個(gè)已經(jīng)關(guān)閉的channel發(fā)送值或者關(guān)閉一個(gè)已經(jīng)關(guān)閉的channel。
(下面,我們將會(huì)稱(chēng)上面的原則為channel closing principle

打破channel closing principle的解決方案

如果你因?yàn)槟撤N原因從接收端(receiver side)關(guān)閉channel或者在多個(gè)發(fā)送者中的一個(gè)關(guān)閉channel,那么你應(yīng)該使用列在Golang panic/recover Use Cases的函數(shù)來(lái)安全地發(fā)送值到channel中(假設(shè)channel的元素類(lèi)型是T)

func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            // the return result can be altered 
            // in a defer function call
            closed = true
        }
    }()
    
    ch <- value // panic if ch is closed
    return false // <=> closed = false; return
}

如果channel ch沒(méi)有被關(guān)閉的話(huà),那么這個(gè)函數(shù)的性能將和ch <- value接近。對(duì)于channel關(guān)閉的時(shí)候,SafeSend函數(shù)只會(huì)在每個(gè)sender goroutine中調(diào)用一次,因此程序不會(huì)有太大的性能損失。
同樣的想法也可以用在從多個(gè)goroutine關(guān)閉channel中:

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            justClosed = false
        }
    }()
    
    // assume ch != nil here.
    close(ch) // panic if ch is closed
    return true
}

很多人喜歡用sync.Once來(lái)關(guān)閉channel:

type MyChannel struct {
    C    chan T
    once sync.Once
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.once.Do(func(){
        close(mc.C)
    })
}

當(dāng)然了,我們也可以用sync.Mutex來(lái)避免多次關(guān)閉channel:

type MyChannel struct {
    C      chan T
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
    mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

我們應(yīng)該要理解為什么Go不支持內(nèi)置SafeSendSafeClose函數(shù),原因就在于并不推薦從接收端或者多個(gè)并發(fā)發(fā)送端關(guān)閉channel。Golang甚至禁止關(guān)閉只接收(receive-only)的channel。

保持channel closing principle的優(yōu)雅方案

上面的SaveSend函數(shù)有一個(gè)缺點(diǎn)是,在select語(yǔ)句的case關(guān)鍵字后不能作為發(fā)送操作被調(diào)用(譯者注:類(lèi)似于 case SafeSend(ch, t):)。另外一個(gè)缺點(diǎn)是,很多人,包括我自己都覺(jué)得上面通過(guò)使用panic/recoversync包的方案不夠優(yōu)雅。針對(duì)各種場(chǎng)景,下面介紹不用使用panic/recoversync包,純粹是利用channel的解決方案。
(在下面的例子總,sync.WaitGroup只是用來(lái)讓例子完整的。它的使用在實(shí)踐中不一定一直都有用)

  • M個(gè)receivers,一個(gè)sender,sender通過(guò)關(guān)閉data channel說(shuō)“不再發(fā)送”
    這是最簡(jiǎn)單的場(chǎng)景了,就只是當(dāng)sender不想再發(fā)送的時(shí)候讓sender關(guān)閉data 來(lái)關(guān)閉channel:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 100
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    
    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {
                // the only sender can close the channel safely.
                close(dataCh)
                return
            } else {            
                dataCh <- value
            }
        }
    }()
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()
            
            // receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }
    
    wgReceivers.Wait()
}
  • 一個(gè)receiver,N個(gè)sender,receiver通過(guò)關(guān)閉一個(gè)額外的signal channel說(shuō)“請(qǐng)停止發(fā)送”
    這種場(chǎng)景比上一個(gè)要復(fù)雜一點(diǎn)。我們不能讓receiver關(guān)閉data channel,因?yàn)檫@么做將會(huì)打破channel closing principle。但是我們可以讓receiver關(guān)閉一個(gè)額外的signal channel來(lái)通知sender停止發(fā)送值:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its reveivers are the senders of channel dataCh.
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                value := rand.Intn(MaxRandomNumber)
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }()
    }
    
    // the receiver
    go func() {
        defer wgReceivers.Done()
        
        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // the receiver of the dataCh channel is
                // also the sender of the stopCh cahnnel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }
            
            log.Println(value)
        }
    }()
    
    // ...
    wgReceivers.Wait()
}

正如注釋說(shuō)的,對(duì)于額外的signal channel來(lái)說(shuō),它的sender是data channel的receiver。這個(gè)額外的signal channel被它唯一的sender關(guān)閉,遵守了channel closing principle。

  • M個(gè)receiver,N個(gè)sender,它們當(dāng)中任意一個(gè)通過(guò)通知一個(gè)moderator(仲裁者)關(guān)閉額外的signal channel來(lái)說(shuō)“讓我們結(jié)束游戲吧”
    這是最復(fù)雜的場(chǎng)景了。我們不能讓任意的receivers和senders關(guān)閉data channel,也不能讓任何一個(gè)receivers通過(guò)關(guān)閉一個(gè)額外的signal channel來(lái)通知所有的senders和receivers退出游戲。這么做的話(huà)會(huì)打破channel closing principle。但是,我們可以引入一個(gè)moderator來(lái)關(guān)閉一個(gè)額外的signal channel。這個(gè)例子的一個(gè)技巧是怎么通知moderator去關(guān)閉額外的signal channel:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // the channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its reveiver is the moderator goroutine shown below.
    
    var stoppedBy string
    
    // moderator
    go func() {
        stoppedBy = <- toStop // part of the trick used to notify the moderator
                              // to close the additional signal channel.
        close(stopCh)
    }()
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // here, a trick is used to notify the moderator
                    // to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }
                
                // the first select here is to try to exit the
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()
            
            for {
                // same as senders, the first select here is to 
                // try to exit the goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // the same trick is used to notify the moderator 
                        // to close the additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }
                    
                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }
    
    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在這個(gè)例子中,仍然遵守著channel closing principle
請(qǐng)注意channel toStop的緩沖大小是1.這是為了避免當(dāng)mederator goroutine 準(zhǔn)備好之前第一個(gè)通知就已經(jīng)發(fā)送了,導(dǎo)致丟失。

  • 更多的場(chǎng)景?
    很多的場(chǎng)景變體是基于上面三種的。舉個(gè)例子,一個(gè)基于最復(fù)雜情況的變體可能要求receivers讀取buffer channel中剩下所有的值。這應(yīng)該很容易處理,所有這篇文章也就不提了。
    盡管上面三種場(chǎng)景不能覆蓋所有Go channel的使用場(chǎng)景,但它們是最基礎(chǔ)的,實(shí)踐中的大多數(shù)場(chǎng)景都可以分類(lèi)到那三種中。

結(jié)論

這里沒(méi)有一種場(chǎng)景要求你去打破channel closing principle。如果你遇到了這種場(chǎng)景,請(qǐng)思考一下你的設(shè)計(jì)并重寫(xiě)你的代碼。
用Go編程就像在創(chuàng)作藝術(shù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容