channel
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
// Unbuffered Channel of strings.
c := make(chan string)
go boring("boring!", c)
for i := 0; i < 5; i++ {
// Read From Channel - Blocking.
fmt.Printf("You say: %q\n", <-c) // Receive expression is just a value.
}
fmt.Println("You're boring: I'm leaving.")
}
func boring(msg string, c chan string) {
for i := 0; ; i++ {
// Write to Channel.
c <- fmt.Sprintf("%s %d", msg, i) // Expression to be sent can be any suitable value.
// The write does not return until the read from main is complete.
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
main 函數(shù)里,首先創(chuàng)建一個 channel 變量,channel 變量必須先創(chuàng)建,只申明,比如 var c chan string,會報錯。
用 go 語法起一個boring function,并將 channel c 傳遞進(jìn)去,boring function 里面往 channel c 里寫入字符串,然后sleep 一段時間,main 函數(shù)里,以阻塞的方式去讀取 channel c 里的內(nèi)容。
generator
// Generator: Function that returns a channel
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("boring!") // Function returning a channel.
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring: I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
這里拿到 channel c 的方式不是通過 make 定義,而是通過 boring function 來拿到的。boring function 里面通過 for 循環(huán),不斷的往 channel c 里塞信息。在main 函數(shù)里,有一個 for 循環(huán) receiver 來不斷的讀取 channel 里的信息,然后程序就這么運(yùn)行了。
/*
Generator: Function that returns a channel
The boring function returns a channel that lets us communicate with the
boring service it provides.
We can have more instances of the service.
*/
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe) // Joe and Ann are blocking each other.
fmt.Println(<-ann) // waiting for a message to read.
}
fmt.Println("You're boring: I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
有序的 generator,相對于上一個程序,這個程序定義了兩個 channel:joe 和 ann,boring 函數(shù)跟之前的沒有任何區(qū)別,main 函數(shù)里,循環(huán)調(diào)用,按順序阻塞等待一個信息去讀取。
multipleplexing
/*
Multiplexing: Let whosoever is ready to talk, talk.
The fanIn function fronts the other channels. Goroutines that are ready to talk
can independently talk without Blocking the other Goroutines. The FanIn channel
receives all messages for processing.
Decouples the execution between the different Goroutines.
Joe ---
\
----- FanIn --- Independent Messages Displayed
/
Ann ---
*/
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c) // Display any message received on the FanIn channel.
}
fmt.Println("You're boring: I'm leaving.")
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string) // The FanIn channel
go func() { // This Goroutine will receive messages from Joe.
for {
c <- <-input1 // Write the message to the FanIn channel, Blocking Call.
}
}()
go func() { // This Goroutine will receive messages from Ann
for {
c <- <-input2 // Write the message to the FanIn channel, Blocking Call.
}
}()
return c
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}

扇入(fanIn)表示一個模塊被多個模塊調(diào)用。
扇出(fanOut)表示一個模塊調(diào)用多個模塊。
講兩個 channel 被并入一個 channel 進(jìn)行輸出。
sequencing
package main
import (
"fmt"
"math/rand"
"time"
)
// Message contains a channel for the reply.
type Message struct {
str string
wait chan bool // Acts as a signaler
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
msg1 := <-c // Waiting on someone (Joe) to talk
fmt.Println(msg1.str)
msg2 := <-c // Waiting on someone (Ann) to talk
fmt.Println(msg2.str)
msg1.wait <- true // Joe can run again
msg2.wait <- true // Ann can run again
}
fmt.Println("You're boring: I'm leaving.")
}
func fanIn(input1, input2 <-chan Message) <-chan Message {
c := make(chan Message) // The FanIn channel.
go func() { // This Goroutine will receive messages from Joe.
for {
c <- <-input1 // Write the message to the FanIn channel, Blocking Call.
}
}()
go func() { // This Goroutine will receive messages from Ann.
for {
c <- <-input2 // Write the message to the FanIn channel, Blocking Call.
}
}()
return c
}
func boring(msg string) <-chan Message { // Returns receive-only (<-) channel of strings.
c := make(chan Message)
waitForIt := make(chan bool) // Give main control over our execution.
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- Message{fmt.Sprintf("%s %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-waitForIt // Block until main tells us to go again.
}
}()
return c // Return the channel to the caller.
}
fanIn 是針對 Message 的消息結(jié)構(gòu)體來進(jìn)行扇入的,而不是針對單獨(dú)的 channel 來扇入的。
- channel 的阻塞
<-waitForIt
select
- select
/*
Select is a control structure that is unique to concurrency.
The reason channels and Goroutines are built into the language.
Like a switch but each case is a communication:
-- All channels are evaluated
-- Selection blocks until one communication can proceed, which then does.
-- If multiple can proceed, select choose pseudo-randomly.
-- Default clause, if present, executes immediately if no channel is ready.
Multiplexing: Let whosoever is ready to talk, talk.
The fanIn function fronts the other channels. Goroutines that are ready to talk
can independently talk without Blocking the other Goroutines. The FanIn channel
receives all messages for processing.
Decouples the execution between the different Goroutines.
Joe ---
\
----- FanIn --- Independent Messages Displayed
/
Ann ---
*/
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c) // Display any message received on the FanIn channel.
}
fmt.Println("You're boring: I'm leaving.")
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string) // The FanIn channel
go func() { // Now using a select and only one Goroutine
for {
select {
case s := <-input1:
c <- s
case s := <-input2:
c <- s
}
}
}()
return c
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
這里把 fanIn 和 select 結(jié)合起來了,在 fanIn 里,進(jìn)行 select。
- timeout using select
/*
Timeout Using Select
The time.After function returns a channel that blocks for the specified duration.
After the interval, the channel delivers the current time, once.
The select is giving the boring routine 800ms to respond. This will be an endless
loop if boring can perform its work under 800ms every time.
*/
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(800 * time.Millisecond): // This is reset on every iteration.
fmt.Println("You're too slow.")
return
}
}
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
- timeout using select for whole conversation
/*
Timeout Using Select
Create the timer once, outside the loop, to time out the entire conversation.
(In the previous program, we had a timeout for each message)
This time the program will terminate after 5 seconds
*/
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second) // Terminate program after 5 seconds.
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You're too slow.")
return
}
}
}
func boring(msg string) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
這一版本的區(qū)別與 Timeout Using Select 的是,定義了一個 timeout。
timeout := time.After(5 * time.Second)
這個 timeout 對于所有的 goroutine 是共用的。
- Quit channel
/*
Quit Channel
You can turn this around and tell Joe to stop when we're tired of listening to him.
*/
package main
import (
"fmt"
"math/rand"
)
func main() {
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- {
fmt.Println(<-c)
}
quit <- true
fmt.Println("EXIT")
}
func boring(msg string, quit chan bool) <-chan string { // Returns receive-only (<-) channel of strings.
c := make(chan string)
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
select {
case c <- fmt.Sprintf("%s %d", msg, i):
// Do Nothing
case <-quit:
fmt.Println("Quiting")
return
}
}
}()
return c // Return the channel to the caller.
}