Go語(yǔ)言學(xué)習(xí)筆記 - 并發(fā)

Goroutine

Go在語(yǔ)言層面對(duì)并發(fā)編程提供支持,采用輕量級(jí)線程(協(xié)程)實(shí)現(xiàn)。只需要在函數(shù)調(diào)用語(yǔ)句前添加go關(guān)鍵字,就可以創(chuàng)建并發(fā)執(zhí)行單元。開(kāi)發(fā)人員無(wú)需了解任何執(zhí)行細(xì)節(jié),調(diào)度器會(huì)自動(dòng)將其安排到合適的系統(tǒng)線程上執(zhí)行。goroutine是一種非常輕量級(jí)的實(shí)現(xiàn),可在單個(gè)進(jìn)程里執(zhí)行成千上萬(wàn)的并發(fā)任務(wù)。事實(shí)上,入口函數(shù)main就以goroutine運(yùn)行。另有與之配套的channel類型,用以實(shí)現(xiàn)“以通訊來(lái)共享內(nèi)存”的CSP模式。

go func() {
    println("Hello, World!")
}

調(diào)度器不能保證多個(gè)goroutine執(zhí)行次序,且進(jìn)程退出時(shí)不會(huì)等待它們結(jié)束。默認(rèn)情況下,進(jìn)程啟動(dòng)后僅允許一個(gè)系統(tǒng)線程服務(wù)于goroutine??墒褂铆h(huán)境變量或標(biāo)準(zhǔn)函數(shù)runtime.GOMAXPROCS修改(Go 1.5默認(rèn)方式)。讓高度器用多個(gè)線程實(shí)現(xiàn)多核并行,而不僅僅是并發(fā)。

func sum(id int) {
    var x int64
    for i := 0; i < math.MaxUint32; i++ {
        x += int64(i)
    }
    println(id, x)
}
func main() {
    wg := new(sync.WaitGroup)
    wg.Add(2)
    for i := 0; i < 2; i++ {
        go func(id int) {
            defer wg.Done()
            sum(id)
        }(i)
    }
    wg.Wait()
}
輸出:
$ go build -o test
$ time -p ./test
0 9223372030412324865
1 9223372030412324865
real 7.70 // 程序開(kāi)始到結(jié)束時(shí)間差 (非非 CPU 時(shí)間)
user 7.66 // 用用戶態(tài)所使用用 CPU 時(shí)間片片 (多核累加)
sys 0.01 // 內(nèi)核態(tài)所使用用 CPU 時(shí)間片片
$ GOMAXPROCS=2 time -p ./test
0 9223372030412324865
1 9223372030412324865
real 4.18
user 7.61// 雖然總時(shí)間差不多,但由 2 個(gè)核并行行,real 時(shí)間自自然少了許多。
sys 0.02

調(diào)用 runtime.Goexit 將立即終止當(dāng)前 goroutine 執(zhí)行,調(diào)度器確保所有已注冊(cè) defer延遲調(diào)用被執(zhí)行。

func main() {
    wg := new(sync.WaitGroup)
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer println("A.defer")
        func() {
            defer println("B.defer")
            runtime.Goexit() // 終止止當(dāng)前 goroutine
            println("B") // 不會(huì)執(zhí)行行
        }()
        println("A")    // 不會(huì)執(zhí)行行
    }()
    wg.Wait()
}
//輸出:
B.defer
A.defer

和協(xié)程 yield 作用類似,Gosched 讓出底層線程,將當(dāng)前 goroutine 暫停,放回隊(duì)列等待下次被調(diào)度執(zhí)行。

func main() {
    wg := new(sync.WaitGroup)
    wg.Add(2)
    go func() {
        defer wg.Done()
        for i := 0; i < 6; i++ {
            println(i)
            if i == 3 { runtime.Gosched() }
        }
    }()
    go func() {
        defer wg.Done()
        println("Hello, World!")
    }()
    wg.Wait()
}
//輸出
$ go run main.go
0
1
2
3
Hello, World!
4
5

Channel

引用類型Channel是CSP模型的具體實(shí)現(xiàn),用于多個(gè)goruntine之間進(jìn)行通訊。其內(nèi)部實(shí)現(xiàn)了同步,確保了并發(fā)安全。默認(rèn)為同步模式,需要發(fā)送和接收配對(duì)。否則會(huì)被阻塞,直到另一方準(zhǔn)備好后被喚醒。

func main() {
    data := make(chan int) // 數(shù)據(jù)交換隊(duì)列
    exit := make(chan bool) // 退出通知
    go func() {
        for d := range data {// 從隊(duì)列迭代接收數(shù)據(jù),直到 close 。
            fmt.Println(d)
        }
        fmt.Println("recv over.")
        exit <- true// 發(fā)出退出通知。
    }()
    data <- 1// 發(fā)送數(shù)據(jù)。
    data <- 2
    data <- 3
    close(data)// 關(guān)閉隊(duì)列。
    fmt.Println("send over.")
    <-exit// 等待退出通知。
}
//輸出:
1
2
3
send over.
recv over.

異步方式通過(guò)判斷緩沖區(qū)來(lái)決定是否阻塞。如果緩沖區(qū)已滿,發(fā)送被阻塞;緩沖區(qū)為空,接收被阻塞。通常情況下,異步channel可減少排隊(duì)阻塞,具備更高的效率。但應(yīng)該考慮使用指針規(guī)避大對(duì)象拷貝,將多個(gè)元素打包,減少緩沖區(qū)大小等。

func main() {
    data := make(chan int, 3)// 緩沖區(qū)可以存儲(chǔ) 3 個(gè)元素
    exit := make(chan bool)
    data <- 1// 在緩沖區(qū)未滿前,不會(huì)阻塞。
    data <- 2
    data <- 3
    go func() {
        for d := range data {// 在緩沖區(qū)未空前,不會(huì)阻塞。
            fmt.Println(d)
        }
        exit <- true
    }()
    data <- 4// 如果緩沖區(qū)已滿,阻塞。
    data <- 5
    close(data)
    <-exit
}

緩沖區(qū)是內(nèi)部屬性,并非類型構(gòu)成要素。

var a, b chan int = make(chan int), make(chan int, 3)

除用用 range 外,還可用 ok-idiom 模式判斷 channel 是否關(guān)閉。

for {
    if d, ok := <-data; ok {
        fmt.Println(d)
    } else {
        break
    }
}

向 closed channel 發(fā)送數(shù)據(jù)引發(fā) panic 錯(cuò)誤,接收立即返回零值。而 nil channel,無(wú)論收發(fā)都會(huì)被阻塞。內(nèi)置函數(shù) len 返回未被讀取的緩沖元素?cái)?shù)量,cap 返回緩沖區(qū)大小。

單向

可以將channel隱式轉(zhuǎn)換為單身隊(duì)列,只收或只發(fā)。

c := make(chan int, 3)
var send chan <- int = c // send only
var recv <- chan int = c // receiver only

選擇

如果需要同時(shí)處理多個(gè)channel,可以用select語(yǔ)句,它隨機(jī)選擇一個(gè)可用的channel做收發(fā)操作,或執(zhí)行default case。

func main() {
    a, b := make(chan int, 3), make(chan int)
    go func() {
        v, ok, s := 0, false, ""
        for {
            select {// 隨機(jī)選擇可用用 channel,接收數(shù)據(jù)。
            case v, ok = <-a: s = "a"
            case v, ok = <-b: s = "b"
            }
            if ok {
              fmt.Println(s, v)
            } else {
                os.Exit(0)
            }
        }
    }()
    for i := 0; i < 5; i++ {
          select {// 隨機(jī)選擇可用用 channel,發(fā)送數(shù)據(jù)。
          case a <- i:
          case b <- i:
          }
    }
    close(a)
    select {}// 沒(méi)有可用用 channel,阻塞 main goroutine。
}
//輸出:
b 3
a 0
a 1
a 2
b 4

模式

用簡(jiǎn)單工廠模式打包并發(fā)任務(wù)和channel。

func NewTest() chan int {
    c := make(chan int)
    rand.Seed(time.Now().UnixNano())
    go func() {
        time.Sleep(time.Second)
        c <- rant.Int()
    }()
    return c
}
func main() {
    t := NewTest()
    println(<-t) //等待gorountime結(jié)束返回。
}

用channel實(shí)現(xiàn)信號(hào)量(semaphore)。

func main() {
    wg := sync.WaitGroup{}
    wg.Add(3)
    sem := make(chan int, 1)
    for i := 0; i < 3; i++ {
        go func(id int) {
            defer wg.Done()
            sem <- 1 // 向 sem 發(fā)送數(shù)據(jù),阻塞或者成功。
            for x := 0; x < 3; x++ {
                fmt.Println(id, x)
            }
            <- sem // 接收數(shù)據(jù),使得其他阻塞 goroutine 可以發(fā)送數(shù)據(jù)。
        }(i)
    }
    wg.Wait()
}

用closed channel發(fā)出退出通知。

func main() {
    var wg sync.WaitGroup
    quit := make(chan bool)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            task := func() {
                println(id, time.Now().Nanosecond())
                time.Sleep(time.Second)
            }
            for {
              select {
              case <- quit: // closed channel 不會(huì)阻塞,因此可用作退出通知。
                  return
              default://執(zhí)行正常任務(wù)
                  task()
              }
          }
        }(i)
    }
    time.Sleep(time.Second * 5) // 讓測(cè)試 goroutine 運(yùn)行一會(huì)。
    close(quit) // 發(fā)出退出通知。
    wg.Wait()
}

用select 實(shí)現(xiàn)超時(shí) (timeout)。channel 是第一類對(duì)象,可傳參 (內(nèi)部實(shí)現(xiàn)為指針) 或者作為結(jié)構(gòu)成員。

type Request struct {
    data []int
    ret chan int
}
func NewRequest(data ...int) *Request {
    return &Request{ data, make(chan int, 1) }
}
func Process(req *Request) {
    x := 0
    for _, i := range req.data {
        x += i
    }
    req.ret <- x
}
func main() {
    req := NewRequest(10, 20, 30)
    Process(req)
    fmt.Println(<-req.ret)
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • GO并發(fā)使用goroutine運(yùn)行程序,檢測(cè)并修正狀態(tài),利用通道共享數(shù)據(jù)。通常程序會(huì)被編寫(xiě)為一個(gè)順序執(zhí)行并完成一個(gè)...
    小線亮亮閱讀 629評(píng)論 0 1
  • Goroutine是Go里的一種輕量級(jí)線程——協(xié)程。相對(duì)線程,協(xié)程的優(yōu)勢(shì)就在于它非常輕量級(jí),進(jìn)行上下文切換的代價(jià)非...
    witchiman閱讀 5,143評(píng)論 0 9
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂(lè)視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,813評(píng)論 11 349
  • 今天介紹一下 go語(yǔ)言的并發(fā)機(jī)制以及它所使用的CSP并發(fā)模型 CSP并發(fā)模型 CSP模型是上個(gè)世紀(jì)七十年代提出的,...
    falm閱讀 68,831評(píng)論 10 80
  • *什么時(shí)候可以再次見(jiàn)到輕松氛圍* 什么時(shí)候知道自己的成長(zhǎng) 什么時(shí)候失去了自己 什么時(shí)候,我能找回來(lái)?
    章魚(yú)農(nóng)夫閱讀 198評(píng)論 0 1

友情鏈接更多精彩內(nèi)容