Chapter 8 Goroutines and Channels
Go enable two styles of concurrent programming. This chapter presents coroutines and channels, which support communicating sequential processes or CSP, a model of concurrency in which values are passed between independent activities (goroutines) but variables are for the the most part confined to a single activity. Chapter 9 covers some aspects of the more traditional model of shared memory multithreading, which will be familiar if you've used threads in other mainstream languages. Chapter 9 also points out some important hazards and pitfalls of concurrent programming that we won't delve into in this chapter.
goroutine 兩種模式,一種用于兩個(gè) goroutine 之間的交流,variables 被限定在一個(gè)單獨(dú)的 activities。
另一種類似于其他主流語言的多線程,特點(diǎn)是 shared memory multithreading。
8.1 Goroutines
In Go, each concurrently executing activity is called a goroutine.
golang中,每個(gè)并發(fā)執(zhí)行的 activity 被稱為 goroutine。
If you have used operating system threads or threads in other languages, then you can assume for now that a goroutine is similar to a thread, and you'll be able to write correct programs. The differences between threads and goroutines are essentially quantitative, not qualitative, and will be described in Section 9.8.
threads 與 goroutines 的區(qū)別是定量的,而非定性的。在9.8中講會(huì)進(jìn)行進(jìn)一步的解釋。
When a program starts, its only goroutine is the one that calls the main function, so we call it the main goroutine. New goroutines are created by the go statement. Syntactically, a go statement is an ordinary function or method call prefixed by the keyword go. A go statement causes the function to be called in a newly created goroutine. The go statement itself completes immediately:
當(dāng)一個(gè)程序啟動(dòng)后,程序唯一的main function 就是 main goroutine。
- f()
- go f()
package main
import (
"time"
"fmt"
)
func main() {
go spinner(5 * time.Millisecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x - 1) + fib(x - 2)
}
程序里的 /r 表示回車
Other than by returning from main or exiting the program, there is no programmatic way for one goroutine to stop another, but as we will see later, there are ways to communicate with a goroutine to request that it stop itself.
除了從 main 函數(shù)返回 或者 退出當(dāng)前的程序,沒有程序上的辦法能讓一個(gè) goroutine 來停止另一個(gè) goroutine,但是有辦法讓一個(gè) goroutine 向另一個(gè) goroutine 發(fā)送消息,讓他自己停止下來。
8.2 Example: Concurrent Clock Server
一個(gè)時(shí)鐘例子
服務(wù)器
package main
import (
"net"
"log"
"io"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
handleConn(conn)
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}
重要的是三步:
- listener, err := net.Listen("tcp", "localhost:8000")
- conn, err := listener.Accept()
- _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
注釋:采用的時(shí)間必須是 2016/1/2 15:04:05 (123456) 一月二號(hào) 三點(diǎn)四分五秒
https://segmentfault.com/q/1010000010976398/a-1020000010982052
在客戶端采用 nc 命令連接
nc localhost 8000
客戶端
package main
import (
"net"
"log"
"io"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
_, err = io.Copy(os.Stdout, conn)
if err != nil {
log.Fatal(err)
}
}
程序的關(guān)鍵代碼片段
- 連接
conn, err := net.Dial("tcp", "localhost:8000") - 輸出
_, err = io.Copy(os.Stdout, conn)
注釋:
killall clock1
8.3. Example: Concurrent Echo Server
The clock server used one goroutine per connection. In this section, we'll build an echo server that uses multiple goroutines per connection.
之前 clock 的例子里,每次連接用一個(gè) goroutine。下面的 echo server 每次連接用很多 goroutine。
reverb1
package main
import (
"net"
"log"
"time"
"fmt"
"strings"
"bufio"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, shout)
time.Sleep(delay)
fmt.Fprintln(c, strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1 * time.Second)
}
c.Close()
}
func main() {
l, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err)
continue
}
handleConn(conn)
}
}
- 建立監(jiān)聽
l, err := net.Listen("tcp", "localhost:8000") - 建立連接
conn, err := l.Accept() - 等待讀取輸入
input := bufio.NewScanner(c) - 判斷是否有輸入
input.Scan() - 關(guān)閉 net.Conn
c.Close() - 輸出到 net.Conn
fmt.Fprintln(c, shout)
待改進(jìn):
每次只能響應(yīng)一個(gè) request 的請求,就是必須先處理好上一次的請求再去處理下一次的請求。
netcat2
package main
import (
"net"
"log"
"os"
"io"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
- 建立連接 Dial
conn, err := net.Dial("tcp", "localhost:8000") - 拷貝(conn io.Writer io.Reader)
copy conn io.Writer io.Reader
在 main goroutine 里用 mustCopy 函數(shù),講 os.Stdin 拷貝給 conn,然后用 goroutine 來處理 conn。
reverb2
package main
import (
"net"
"log"
"bufio"
"time"
"fmt"
"strings"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, shout)
time.Sleep(delay)
fmt.Fprintln(c, strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1 * time.Second)
}
c.Close()
}
func main() {
l, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
8.4 Channels
If goroutines are the activities of a concurrent Go program, channels are the connections between them. A channel is a communication mechanism that lets one goroutine send values to another goroutine. Each channel is a conduit for values of a particular type, called the channel's element type.
channel 是一種通訊機(jī)制,允許一個(gè) goroutine 向另一個(gè) goroutine 發(fā)送數(shù)據(jù)。
ch := make(chan int)
As with maps, a channel is a reference to the data structure created by make. When we copy a channel or pass one as an argument to a function, we are copying a reference, so caller and callee refer to the same data structure. As with other reference types, the zero value of channel is nil.
- 與 map 類似,make 出來的 channel 是一個(gè)引用變量,但我們復(fù)制一個(gè) channel 或者把 channel 傳遞給一個(gè)函數(shù)的時(shí)候,我們就是復(fù)制一個(gè)引用,函數(shù)或者復(fù)制值改變的都是底層的同一套數(shù)據(jù)。
- 與其他引用類型相似,channel 的 zero value 也是 nil
Two channels of the same type may be compared using ==. The comparison is true if both are references to the same channel data structure. A channel may also be compared to nil.
- channel 可以用 == 進(jìn)行對比,只有當(dāng)兩個(gè) channel 的引用代表相同的底層數(shù)據(jù)的時(shí)候,比對結(jié)果是 ture。
- channel 也可以與 nil 值進(jìn)行比對
A channel has two principal operations, send and receive, collectively known as communications. A send statement transmits a value from one goroutine, through the channel, to another goroutine executing a corresponding receive expression. Both operations are written using the <- operator. In a send statement, the <- separates the channel and value operands. In a receive expression, <- precedes the channel operand. A receive expression whose result is not used is a valid statement.
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // bugfered channel with capacity 3
8.4.1. Unbuffered Channels
A send operation on an unbuffered channel blocks the sending goroutine executes a corresponding receive on the same channel, at which point the value is transmitted and bothe goroutines may continue. Conversely, if the receive operation was attempted first, the receiving goroutine is blocked until another goroutine performs a send on the same channel.
- 無緩沖的 channel,需要發(fā)送方和接收方共同作用才行
d
Communication over an unbuffered channel causes the sending and receiving goroutines to synchronize. Because of this, unbuffered channels are sometimes called synchronous channels. When a value is sent on an unbuffered channel, the receipt of the value happens before the reawakening of the sending goroutine.
- “happens before”
In discussions of concurrency, when we say x happens before y, we don't mean merely that x occurs earlier in time than y, we mean that it is guaranteed to do so and that all its prior effects, such as updates to variables, are complete and that you may rely on them.
- 在討論并發(fā)性時(shí),當(dāng)我們說 x happens before y 時(shí),我們說的是不僅僅是 x 發(fā)生于 y 之前。而是 x 發(fā)生之后所引起的各種影響。
When x neither happens before y nor after y, we say that x is concurrent with y. This doesn't mean that x and y are necessarily simultaneous, merely that we cannot assume anything about their ordering. As we'll see in the next chapter, it's necessary to order certain events during the program's execution to avoid the problems that arise when two goroutines access the same variable concurrently.
- 當(dāng) x 既不發(fā)生于 y 之前,也不發(fā)生于 y 之后,我們說 x 和 y 是并發(fā)的。這并不是說 x 和 y 是并發(fā)的,只是說,我們不能確定 x 和 y 的先后執(zhí)行順序。在下一章中將會(huì)講述,事件之間要確定特定的執(zhí)行先后順序,才能避免一種情況,這種情況就是兩個(gè) goroutine 同時(shí)修改或讀取一個(gè)共同的變量。
串聯(lián)的Channels(Pipelines)
pipeline1
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
- 如果發(fā)送者知道,沒有更多的值需要發(fā)送到channel的話,那么讓接收者也能及時(shí)知道沒有多余的值可接收將是有用的,因?yàn)榻邮照呖梢酝V共槐匾慕邮盏却?。這可以通過內(nèi)置的close函數(shù)來關(guān)閉channel實(shí)現(xiàn):
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 10 ; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for {
x := <-naturals
squares <- x * x
time.Sleep(time.Second)
}
}()
for {
fmt.Println(<-squares)
}
//close(naturals)
}
- 在發(fā)送了十個(gè)自然數(shù)之后,對發(fā)送的 channel 進(jìn)行了 close 處理。
After a channel has been closed, any further send operations on it will panic. After the closed channel has been drained, that is, after the last sent element has been received, all subsequent receive operations will proceed without blocking but will yield a zero value. Closing the naturals channel above would cause the squarer's loop to spin as it receives a never-ending stream of zero values, and to send these zeros to the printer.
- 當(dāng)一個(gè) channel 被關(guān)閉(close)后,再向該 channel 發(fā)送數(shù)據(jù)將導(dǎo)致 panic 異常。
- 當(dāng)一個(gè)被關(guān)閉的 channel 中已經(jīng)發(fā)送的數(shù)據(jù)都被成功接收后,后續(xù)的接收操作將不被阻塞,它們會(huì)被立即返回一個(gè)零值。
那么如何保證,當(dāng)發(fā)送的 natural channel 被關(guān)閉后,停止 pipeline 的運(yùn)行呢?
There is no way to test directly whether a channel has been closed, but there is a variant of the receive operation that produces two results: the received channel element, plus a boolean value, conventionally called ok, which is true for a successful receive and false for a receive on a closed and drained channel. Using this feature, we can modify the squarer's loop to stop when the naturals channel is drained and close the squares channel in turn.
- 沒有辦法直接測試一個(gè)channel是否被關(guān)閉
- 但是接收操作有一個(gè)變體形式:它多接收一個(gè)結(jié)果,多接收的第二個(gè)結(jié)果是一個(gè)布爾值ok,ture表示成功從channels接收到值,false表示channels已經(jīng)被關(guān)閉并且里面沒有值可接收。
通過這個(gè)機(jī)制,當(dāng) naturals 發(fā)送完數(shù)據(jù)后,接收端就能停止下來。
兩種等價(jià)的寫法
// 通過對 channel 的讀?。▋蓚€(gè)值),后面一個(gè)值表示要讀取的 channel 是否被關(guān)閉
go func() {
for {
x, ok := <-naturals
if !ok {
break
}
squares <- x * x
}
close(squares)
}()
// 上一種寫法太笨拙了,所以采用下一種寫法,這兩者是等價(jià)的
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
close(squares)
}()
(書本的解釋)
Because the syntax above is clumsy and this pattern is common, the language lets us use a range loop to iterate over channels too. This is a more convenient syntax for receiving all the values sent on a channel and terminating the loop after the last one.
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
//for x := range naturals {
// squares <- x * x
//}
for {
x := <-naturals
squares <- x * x
time.Sleep(time.Second)
}
}()
for x := range squares {
fmt.Println(x)
}
}
- 上面注釋的是第二種寫法
- 輸出的是 0 1 4 0 0 0
- 原因:在 naturals channel 被關(guān)掉(close)以后,我們可以繼續(xù)從 naturals 中讀出數(shù)據(jù),只不過這個(gè)數(shù)據(jù)為 0。
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
//for {
// x := <-naturals
// squares <- x * x
// time.Sleep(time.Second)
//}
}()
for x := range squares {
fmt.Println(x)
}
}
- 輸出
0
1
4
fatal error: all goroutines are asleep - deadlock! - 將 for {} 循環(huán)改成 for range 模式,for range channel 這種寫法可以將 channel 里的數(shù)據(jù)都讀出來,直到 channel 被關(guān)閉。所以當(dāng) naturals 被關(guān)閉以后,最后的 fmt.Println 這部分的 for range 中 squares ,沒有數(shù)據(jù)可以向里面發(fā)送,產(chǎn)生死鎖。
pipeline2
package main
import (
"time"
"fmt"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
close(squares)
}()
for x := range squares {
fmt.Println(x)
}
}
- 所以在 naturals 被讀完以后,對 naturals channel 進(jìn)行關(guān)閉(close)處理,整個(gè)的順序就對了。
- 總結(jié)
for range channel 的寫法,相當(dāng)于每次去讀取被讀的 channel,直到被讀的 channel 關(guān)閉為止。
Attempting to close an already-closed channel causes a panic, as does closing a nil channel. Closing channels has another use as a broadcast mechanism, which we'll cover in Section 8.9.
- 嘗試關(guān)閉一個(gè)已經(jīng)被被關(guān)閉的 channel,會(huì)導(dǎo)致 panic
- 關(guān)閉一個(gè)值為 nil 的channel,也會(huì)導(dǎo)致 panic
- 關(guān)閉 channel 有其他更加廣泛的方法,我們將在 8.9 中講述。
8.4.3. 單方向的Channel (Unidirectional Channel Types)
To document this intent and prevent misuse, the Go type system provides unidirectional channel types that expose only one or the other of the send and receive operations. The type chan<- int, a send-only channel of int, allows sends but not receives. Conversely, the type <-chan int, a receive-only channel of int, allows receives but not sends. (The position of the <- arrow relative to the chan keyword is a mnemonic.) violations of this discipline are detected at compile time.
pipeline3
package main
import (
"fmt"
)
func counter(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
- channel 變量,in,表示在管道里,是只讀的 <-chan int 類型。
- channel 變量,out,表示在管道外面,是只寫的 chan<- int 類型。
8.4.4. 帶緩存的Channels (Buffered Channels)
A buffered channel has a queue of elements. The queue's maximum size is determined when it is created, by the capacity argument to make.
ch = make(chan string, 3)
d
A send operation on a buffered channel inserts an element at the back of the queue, and a receive operation removes an element from the front. If the channel is full, the send operation blocks its goroutine until space is make available by another coroutine's receive. Conversely, if the channel is empty, a receive operation blocks until a value is sent by another goroutine.
- 對帶緩沖的 channel 進(jìn)行 send 操作,就是往一個(gè) queue 里塞數(shù)據(jù),如果 channel 滿了,send operation 就會(huì)阻塞 channel。
- cap 函數(shù)能返回 channel 的大小
- len 函數(shù)可以返回 channel 里面的元素?cái)?shù)量。
- len 函數(shù),因?yàn)樵诓l(fā)程序中該信息會(huì)隨著接收操作而失效,但是它對某些故障診斷和性能優(yōu)化會(huì)有幫助。
因此mirroredQuery函數(shù)可能在另外兩個(gè)響應(yīng)慢的鏡像站點(diǎn)響應(yīng)之前就返回了結(jié)果。(順便說一下,多個(gè)goroutines并發(fā)地向同一個(gè)channel發(fā)送數(shù)據(jù),或從同一個(gè)channel接收數(shù)據(jù)都是常見的用法。就是所謂的 finIn finOut)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
- 如果我們使用了無緩存的channel,那么兩個(gè)慢的goroutines將會(huì)因?yàn)闆]有人接收而被永遠(yuǎn)卡住。這種情況,稱為goroutines泄漏,這將是一個(gè)BUG。和垃圾變量不同,泄漏的goroutines并不會(huì)被自動(dòng)回收,因此確保每個(gè)不再需要的goroutine能正常退出是重要的。
8.5 并發(fā)的循環(huán) (Looping in Parallel)
In this section, we'll explore some common concurrency patterns for executing all the iterations of a loop in parallel.
我們會(huì)探索一些用來在并行時(shí)循環(huán)迭代的常見并發(fā)模型。
package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
- 上述是要用的 thubnail 庫
embarrassingly parallel
Obviously the order in which we process the files doesn't matter, since each scaling operation is independent of all the others. Problems like this that consist entirely of subproblems that are completely independent of each other are described as embarrassingly parallel. Embarrassingly parallel problems are the easiest kind to implement concurrently and enjoy performance that scales linearly with the amount of parallelism.
- 易并行問題是最容易被實(shí)現(xiàn)成并行的一類問題, 并且最能夠享受到并發(fā)帶來的好處,能夠隨著并行的規(guī)模線性地?cái)U(kuò)展。
Let's execute all these operations in parallel, thereby hiding the latency of the file I/O and using multiple CPUs for the image-scaling computations. Our first attempt at a concurrent version just adds a go keyword. We'll ignore errors for now and address them later.
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
- 這個(gè)版本運(yùn)行得賊快,還沒等 goroutine 返回,main goroutine 就結(jié)束了。
- 沒有什么直接的辦法能夠等待goroutine完成,但是我們可以改變goroutine里的代碼讓其能夠?qū)⑼瓿汕闆r報(bào)告給外部的goroutine知曉,使用的方式是向一個(gè)共享的channel中發(fā)送事件。
package main
import (
"fmt"
"time"
)
func main() {
s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
fmt.Println(s)
for _, e := range s {
go func() {
fmt.Println(e)
}()
}
time.Sleep(time.Second * 5)
}
輸出
5
5
5
5
11
35
35
35
35
package main
import (
"fmt"
"time"
)
func main() {
s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
fmt.Println(s)
for _, e := range s {
go func(e int) {
fmt.Printf("%d ", e)
}(e)
}
time.Sleep(time.Second * 5)
}
輸出
[6 3 9 7 10 2 5 11 35]
6 3 5 11 2 9 35 7 10
- 第一本版本中的 e 會(huì)被所有的匿名函數(shù)所共享,所以這里打印的結(jié)果應(yīng)該是 35。
- 但是實(shí)際上不是,因?yàn)?goroutine 是并發(fā)的,在最后一個(gè) for range 之前,部分 goroutine 就執(zhí)行了 fmt.Println(e)
- 第二個(gè)版本,把 e 變量當(dāng)成匿名函數(shù)的參數(shù)傳遞進(jìn)去了,所以就能把 slice 里所有的值都打印出來了。
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak!
}
}
return nil
}
- 這個(gè)程序有一個(gè)微妙的bug。當(dāng)它遇到第一個(gè)非nil的error時(shí)會(huì)直接將error返回到調(diào)用方,使得沒有一個(gè)goroutine去排空errors channel。這樣剩下的worker goroutine在向這個(gè)channel中發(fā)送值時(shí),都會(huì)永遠(yuǎn)地阻塞下去,并且永遠(yuǎn)都不會(huì)退出。這種情況叫做goroutine泄露(§8.4.4),可能會(huì)導(dǎo)致整個(gè)程序卡住或者跑出out of memory的錯(cuò)誤。
- 解決方案:
- 采用緩沖 channel
- 一個(gè)可選的解決辦法是創(chuàng)建一個(gè)另外的goroutine,當(dāng)main goroutine返回第一個(gè)錯(cuò)誤的同時(shí)去排空channel+
// 使用 buffered channel 來解決問題
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
- wg.Add(1) 是對 wg 變量進(jìn)行 加1 操作
- wg.Done() 是對 wg 變量進(jìn)行 減1操作
- 怎樣創(chuàng)建一個(gè)closer goroutine, 并讓其在所有worker goroutine們結(jié)束之后再關(guān)閉sizes channel的。兩步操作:wait和close,必須是基于sizes的循環(huán)的并發(fā)??紤]一下另一種方案:如果等待操作被放在了main goroutine中,在循環(huán)之前,這樣的話就永遠(yuǎn)都不會(huì)結(jié)束了,如果在循環(huán)之后,那么又變成了不可達(dá)的部分,因?yàn)闆]有任何東西去關(guān)閉這個(gè)channel,這個(gè)循環(huán)就永遠(yuǎn)都不會(huì)終止。
8.6.示例: 并發(fā)的Web爬蟲
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
//!-crawl
//!+main
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
The main function resembles breadthFirst. As before, a work list records the queue of times that need processing, each item being a list of URLs to crawl, but this time, instead of representing the queue using a slice, we use a channel. Each call to crawl occurs in its own goroutine and sends the links
- 主函數(shù)和5.6節(jié)中的breadthFirst(廣度優(yōu)先)類似。像之前一樣,一個(gè)worklist是一個(gè)記錄了需要處理的元素的隊(duì)列,每一個(gè)元素都是一個(gè)需要抓取的URL列表,不過這一次我們用channel代替slice來做這個(gè)隊(duì)列。
- 注意這里的crawl所在的goroutine會(huì)將link作為一個(gè)顯式的參數(shù)傳入,來避免“循環(huán)變量快照”的問題。
The program is too parallel. Unbounded parallelism is rarely a good idea since there is always a limiting factor in the system, such as the number of CPU cores for compute-bound workloads, the number of spindles and heads for local disk I/O operations, the bandwidth of the network for streaming downloads, or the serving capacity of a web service. ... A simple way to do that in our example is to ensure that no more than n calls to links.Extract are active at once, where n is comfortably less than the file descriptor limit ---- 20, say. This is analogous to the way a doorman at a crowded nightclub admits a guest only when some other guest leaves.
- 無窮無盡地并行化并不是什么好事情,因?yàn)椴还茉趺凑f,你的系統(tǒng)總是會(huì)有一些個(gè)限制因素
- 為了解決這個(gè)問題,我們可以限制并發(fā)程序所使用的資源來使之適應(yīng)自己的運(yùn)行環(huán)境。
We can limit parallelism using a buffered channel of capacity n to model a concurrency primitive called a counting semaphore. Conceptually, each of the n vacant slots in the channel buffer represents a token entitling the holder to proceed. Sending a value into the channel acquires a token, and receiving a value from the channel releases a token, creating a new vacant slot. This ensures that at most n sends can occur without an intervening receive. ( Although it might be more intuitive to treat filled slots in the channel buffer as tokens, using vacant slots avoids the need to fill the channel buffer after creating it.) Since the cahnnel element type is not important, we'll use struct{}, which has size zero.
- 可以用緩沖 channel 來解決,(counting semaphore),有點(diǎn)類似于操作系統(tǒng)里的信號(hào)量。
crawl2
package main
import (
"fmt"
"log"
"os"
"gopl.io/ch5/links"
)
//!+sema
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return list
}
//!-sema
//!+
func main() {
worklist := make(chan []string)
var n int // number of pending sends to worklist
// Start with the command-line arguments.
n++
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for ; n > 0; n-- {
list := <-worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
- crawl 函數(shù)里的 tokens 就保證了只有 20 個(gè)并發(fā)。
- 通過 n 變量來解決了上一個(gè)版本中無限循環(huán)的問題
func main() {
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs
// Add command-line arguments to worklist.
go func() { worklist <- os.Args[1:] }()
// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}
// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}
- 這個(gè)版本使用了原來的crawl函數(shù),但沒有使用計(jì)數(shù)信號(hào)量,取而代之用了20個(gè)常駐的crawler goroutine,這樣來保證最多20個(gè)HTTP請求在并發(fā)。
- 前一個(gè) for 循環(huán),一共生成 20 個(gè) goroutine 來處理數(shù)據(jù)。
- 后面一個(gè) for 循環(huán),把沒有爬過的 url 通過channel 推送到那20個(gè) goroutine 里面去。
8.7.基于select的多路復(fù)用
countdown1
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
- time.Tick函數(shù)返回一個(gè)channel,程序會(huì)周期性地像一個(gè)節(jié)拍器一樣向這個(gè)channel發(fā)送事件。
countdown2
package main
import (
"fmt"
"os"
"time"
)
func launch() {
fmt.Println("Lift off!")
}
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
fmt.Println("10s has passed.")
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}
- 每一個(gè)case代表一個(gè)通信操作(在某個(gè)channel上進(jìn)行發(fā)送或者接收),并且會(huì)包含一些語句組成的一個(gè)語句塊。
- 一個(gè)接收表達(dá)式可能只包含接收表達(dá)式自身(譯注:不把接收到的值賦值給變量什么的)
- 一個(gè)沒有任何case的select語句寫作select{},會(huì)永遠(yuǎn)地等待下去。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
time.Sleep(1 * time.Second)
case ch <- i:
}
}
}
- ch這個(gè)channel的buffer大小是1,所以會(huì)交替的為空或?yàn)闈M,所以只有一個(gè)case可以進(jìn)行下去,無論i是奇數(shù)或者偶數(shù),它都會(huì)打印0 2 4 6 8。
package main
import (
"os"
"fmt"
"time"
)
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("Commencing countdown, Press return")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown >0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
fmt.Println("tick")
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}
func launch() {
fmt.Println("Lift off!")
}
Sometimes we want to try to send or receive on a channel but avoid blocking if the channel is not ready -- a non-blocking communication. A select statement can do that too. A select may have a default, which specifies what to do when none of the other communications can proceed immediately.
- default 的目的是防止程序阻塞在 select 處。
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
d
The select statement below receives a value from the abort channel if there is one to receive; otherwise it does nothing. This is a non-blocking receive operration; doing it repeatedly is called polling a channel. (輪詢 channel)
- 下面的select語句會(huì)在abort channel中有值時(shí),從其中接收值;無值時(shí)什么都不做。這是一個(gè)非阻塞的接收操作;反復(fù)地做這樣的操作叫做“輪詢channel”。
The zero value for a channel is nil. Perhaps surprisingly, nil channels are sometimes useful. Because send and receive operations on a nil channel block forever, a case in a select statement whose channel is nil is never selected. This lets us use nil to enable or disable cases that correspond to features like handling timeouts or cancellation, responding to other input events, or emitting output. We'll seen an example in the next section.
- nil的channel有時(shí)候也是有一些用處的。因?yàn)閷σ粋€(gè)nil的channel發(fā)送和接收操作會(huì)永遠(yuǎn)阻塞,在select語句中操作nil的channel永遠(yuǎn)都不會(huì)被select到。
- 這使得我們可以用nil來激活或者禁用case,來達(dá)成處理其它輸入或輸出事件時(shí)超時(shí)和取消的邏輯。我們會(huì)在下一節(jié)中看到一個(gè)例子。
8.7. Example: Concurrent directory Traversal
du1
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"io/ioutil"
)
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
- ioutil.ReadDir(dir)(也就是 dirents)這個(gè)函數(shù),返回的 entries 是一個(gè)slice,包含該目錄下所有的文件和文件夾的參數(shù)。
- walkDir 是一個(gè)遞歸的結(jié)構(gòu),里面是一個(gè) if else 的判斷語句,如果是 entry 是目錄,下面就用walkDir 進(jìn)行遞歸
The program would be nicer if it kept us informed of its progress. However, simply moving the printDiskUsage call into the loop would cause it to print thousands of lines of output.
The variant of du below prints the totals periodically, but only if the -v flag is specified since not all users will want to see progress messages. The background goroutine that loops over roots remains unchanged. The main goroutine now uses a ticker to generate events every 500ms, and a select statement to wait for either a file size message, in which case it updates the totals, or a tick event, in which case it prints the current totals. If the -v flag is not specified, the tick channel remains nil, and its case in the select is effectively disabled.
- -v flag 如果沒有指定,tick channel 就是置 nil,在后續(xù)的 select 語句中就會(huì)被disabled,這就是 nil channel 的妙用。
- 程序如果能周期性的通知運(yùn)行的進(jìn)展就好了,但是簡單的將 printDiskUsage 移動(dòng)到循環(huán)里,會(huì)打印出上千條消息。
du2
package main
// The du2 variant uses select and a time.Ticker
// to print the totals periodically if -v is set.
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
)
//!+
var verbose = flag.Bool("v", false, "show verbose progress messages")
func main() {
// ...start background goroutine...
//!-
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
//!+
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
//!-
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
Since the program no longer uses a range loop,the first select case must explicitly test whether the fileSizes channel has been closed, using the two-result form of receive operation. If the channel has been closed, the program breaks out of the loop. The labeled break statement breaks out of both the select and the for loop; an unlabeled break would break out of only the select, causing the loop to begin the next iteration.
- 這個(gè)版本的 fileSizes channel,不能再用 range 語法來寫了,要用到 size,ok 語法來寫。
- 然而,這樣子的做法依然需要很久的時(shí)間來完成。
However, it still takes too long to finish. There's no reason why all the calls to walkDir can't be done concurrently, thereby exploiting parallelism in the disk system. The third version of du, below, creates a new goroutine for each call to walkDir. It uses a sync.WaitGroup to count the number of calls to walkDir that are still active, and a closer goroutine to close the fileSizes channel when the counter drops to zero.
golang 中的 sync.WaitGroup
先說說WaitGroup的用途:它能夠一直等到所有的goroutine執(zhí)行完成,并且阻塞主線程的執(zhí)行,直到所有的goroutine執(zhí)行完成。
package main
import (
"time"
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
fmt.Println(i)
go func(n int) {
defer wg.Add(-1)
number(n)
}(i)
}
wg.Wait()
}
func number(i int) {
time.Sleep(time.Second)
fmt.Println(i)
}
- golang中的同步是通過sync.WaitGroup來實(shí)現(xiàn)的.WaitGroup的功能:它實(shí)現(xiàn)了一個(gè)類似隊(duì)列的結(jié)構(gòu),可以一直向隊(duì)列中添加任務(wù),當(dāng)任務(wù)完成后便從隊(duì)列中刪除,如果隊(duì)列中的任務(wù)沒有完全完成,可以通過Wait()函數(shù)來出發(fā)阻塞,防止程序繼續(xù)進(jìn)行,直到所有的隊(duì)列任務(wù)都完成為止.
- sync.WaitGroup 的缺點(diǎn)是無法指定固定的 goroutine 數(shù)目。
package main
import (
"flag"
"sync"
"time"
"fmt"
"path/filepath"
"os"
"io/ioutil"
)
var vFlag = flag.Bool("v", false, "show verbose progress messages")
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
var sema = make(chan struct{}, 20)
func dirents(dir string) []os.FileInfo {
sema <- struct{}{}
defer func() { <-sema }()
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
Since this program creates many thousands of goroutines at its peak, we have to change directs to use a counting semaphore to prevent it from opening too many files at once, just as we did for the web crawler in Section 8.6.
- 因?yàn)檫@個(gè)版本創(chuàng)造了數(shù)以千計(jì)的 goroutines,所以我們必須設(shè)定一個(gè)值,來防止并發(fā)的 goroutine 太多,這一點(diǎn)是在 dirents 里實(shí)現(xiàn)的。
This version runs several time faster than the previous one, though there is a lot of variability from system to system.
Cancellation
Sometimes we need to instruct a goroutine to stop what it is doing, for example, in a web server performing a computation on behalf of a client that has disconnected.
有時(shí)候我們需要通知goroutine停止它正在干的事情,比如一個(gè)正在執(zhí)行計(jì)算的web服務(wù),然而它的客戶端已經(jīng)斷開了和服務(wù)端的連接。
There is no way for one goroutine to terminate another directly, since that would leave all its shared variables in undefined states. In the rocket launch program we sent a single value on a channel named abort, which the countdown goroutine interpreted as a request to stop itself. But why if we need to cancel two goroutines, or an arbitrary number?
Go語言并沒有提供在一個(gè)goroutine中終止另一個(gè)goroutine的方法,由于這樣會(huì)導(dǎo)致goroutine之間的共享變量落在未定義的狀態(tài)上。在8.7節(jié)中的rocket launch程序中,我們往名字叫abort的channel里發(fā)送了一個(gè)簡單的值,在countdown的goroutine中會(huì)把這個(gè)值理解為自己的退出信號(hào)。但是如果我們想要退出兩個(gè)或者任意多個(gè)goroutine怎么辦呢?
Recall that after a channel has been closed and drained of all sent values, subsequent receive operations proceed immediately, yielding zero values. We can exploit this to create a broadcast mechanism: don't send a value on the channel, close it.
回憶一下我們關(guān)閉了一個(gè)channel并且被消費(fèi)掉了所有已發(fā)送的值,操作channel之后的代碼可以立即被執(zhí)行,并且會(huì)產(chǎn)生零值。我們可以將這個(gè)機(jī)制擴(kuò)展一下,來作為我們的廣播機(jī)制:不要向channel發(fā)送值,而是用關(guān)閉一個(gè)channel來進(jìn)行廣播。
package main
import (
"os"
"fmt"
"time"
)
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
func main() {
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
for i := 0; ;i++ {
if cancelled() {
return
}
fmt.Println(i)
time.Sleep(time.Second)
}
}
- 這里利用channel被關(guān)閉后,繼續(xù)從channel里讀取數(shù)據(jù)時(shí),channel里會(huì)返回 0的特性
- Next,we create a goroutine that will read from the standard input, which is typically connected to the terminal. As soon as any input is read(for instance, the user presses the return key), this goroutine broadcasts the cancellation by closing the done channel.
下面我們創(chuàng)建一個(gè)從標(biāo)準(zhǔn)輸入流中讀取內(nèi)容的goroutine,這是一個(gè)比較典型的連接到終端的程序。每當(dāng)有輸入被讀到(比如用戶按了回車鍵),這個(gè)goroutine就會(huì)把取消消息通過關(guān)閉done的channel廣播出去。
package main
import (
"os"
"sync"
"time"
"path/filepath"
"fmt"
)
var done = make(chan struct{})
func cancelled() bool {
select {
case <- done:
return true
default:
return false
}
}
func main() {
roots := os.Args[1:]
if len(roots) == 0 {
roots = []string{"."}
}
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
tick := time.Tick(500 * time.Millisecond)
var nfiles, nbytes int64
loop:
for {
select {
case <-done:
for range fileSizes {
}
return
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
var sema = make(chan struct{}, 20)
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
case <-done:
return nil
}
defer func() { <-sema }()
f, err := os.Open(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
defer f.Close()
entries, err := f.Readdir(0)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
}
return entries
}
Now we need to make our goroutines respond to the cancellation. In the main goroutine, we add a third case to the select statement that tries to receive from the done channel. The function returns if this case is ever selected, but before it returns it must first drain the fileSizes channel, discarding all values until the channel is
- 在main goroutine中,我們添加了select的第三個(gè)case語句,嘗試從done channel中接收內(nèi)容。如果這個(gè)case被滿足的話,在select到的時(shí)候即會(huì)返回,但在結(jié)束之前我們需要把fileSizes channel中的內(nèi)容“排”空,在channel被關(guān)閉之前,舍棄掉所有值。這樣可以保證對walkDir的調(diào)用不要被向fileSizes發(fā)送信息阻塞住,可以正確地完成。
It might be profitable to poll the cancellation status again within walkDir's loop, to avoid creating goroutines after the cancellation event. Cancellation involves a trade-off; a quicker response often requires more intrusive changes to program logic. Ensuring that no expensive operations ever occur after the cancellation event may require updating many places in your code, but often most of the benefit can be obtained by checking for cancellation in a few import places.
A little profiling of this program revealed that the bottleneck was the acquisition of a semaphore token in directs. The select below makes this operation cancellable and reduces the typical cancellation latency of the program from hundreds of milliseconds to tens:
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
Now, when cancellation occurs, all the background goroutines quickly stop and the main function returns, Of course, when main returns, a program exits, so it can be hard to tell a main function that cleans up after itself from one that does not. There's a handy trick we can use during testing: if instead of returning from main in the event of cancellation, we execute a call to panic, then the runtime will dump the stack of every goroutine in the program. If the main goroutine is the only one left, then it has cleaned up after itself. But if other goroutines remain, they may not have been properly cancelled, or perhaps they have been cancelled but the cancellation takes time; a little investigation may be worthwhile. The panic dup often contains sufficient information to distinguish these cases.
聊天服務(wù)(chat Server)
We'll finish this chapter with a chat server that lets several users broadcast textual messages to each other. There are four kinds of goroutine in this program. There is one instance apiece of the main and broadcaster goroutines, and for each client connection there is one handleConn and one clientWriter goroutine. The broadcaster is a good illustration of how select is used, since it has to respond to three different kinds of messages.
The job of the main goroutine, shown below, is to listen for and accept incoming network connections from clients. For each one, it creates a new handleConn goroutine, just as in the concurrent echo server we saw at the start of this chapter.
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
Next is the broadcaster. Its local variable clients records the current set of connected clients. The only information recorded about each client is the identity of its outgoing message channel, about which more later.
然后是broadcaster的goroutine。他的內(nèi)部變量clients會(huì)記錄當(dāng)前建立連接的客戶端集合。其記錄的內(nèi)容是每一個(gè)客戶端的消息發(fā)出channel的"資格"信息。
type client chan<- string
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string)
)
func broadcaster() {
clients := make(map[client]bool)
for {
select {
case msg := <-messages:
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}
The broadcaster listens on the global entering and leaving channels for announcements of arriving and departing clients. When it receives one of these events, it updates the clients set, and if the event was a departure, it closes the client's outgoing message channel. The broadcaster also listens for events on the global messages channel, to which each client sends all its incoming messages. When the broadcaster receives one of these events, it broadcasts the message to every connected client.
- broadcaster監(jiān)聽來自全局的entering和leaving的channel來獲知客戶端的到來和離開事件。當(dāng)其接收到其中的一個(gè)事件時(shí),會(huì)更新clients集合,當(dāng)該事件是離開行為時(shí),它會(huì)關(guān)閉客戶端的消息發(fā)送channel。broadcaster也會(huì)監(jiān)聽全局的消息channel,所有的客戶端都會(huì)向這個(gè)channel中發(fā)送消息。當(dāng)broadcaster接收到什么消息時(shí),就會(huì)將其廣播至所有連接到服務(wù)端的客戶端。
Now let's look as the per-client goroutines. The handleConn function creates a new outgoing message channel for its client and announces the arrival of the client of this client to the broadcaster over the entering channel. Then it reads every line of text from the client, sending each line to the broadcaster over the global incoming message channel, prefixing each message with the identity of its sender. Once there is nothing more to read from the client, handleConn announces the departure of the client over the leaving channel and closes the connection.
//定義只讀的channel
read_only := make (<-chan int)
//定義只寫的channel
write_only := make (chan<- int)
//可同時(shí)讀寫
read_write := make (chan int)
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- ch
messages <- who + " has left"
conn.Close()
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}