Go嵌套并發(fā)實(shí)現(xiàn)EDM,附坑點(diǎn)分析#1

看著身邊優(yōu)秀的小伙伴們?cè)缇烷_(kāi)始寫(xiě)博客,自己深感落后,還好遲做總比不做好,勉勵(lì)自己見(jiàn)賢思齊。趁著年前最后一個(gè)周末,陽(yáng)光正好,寫(xiě)下第一篇博客,為2019年開(kāi)個(gè)頭,以期完成今年為自己立下的flags。

從PHPer轉(zhuǎn)Gopher,很大一個(gè)原因就是業(yè)務(wù)對(duì)性能和并發(fā)的持續(xù)需求,另一個(gè)主要原因就是Go語(yǔ)言原生的并發(fā)特性,可以在提供同等高可用的能力下,使用更少的機(jī)器資源,節(jié)約可觀的成本。因此本文就結(jié)合自己在學(xué)習(xí)Go并發(fā)的實(shí)戰(zhàn)demo中,把遇到的一些坑點(diǎn)寫(xiě)下來(lái),共享進(jìn)步。

1. 在Go語(yǔ)言中實(shí)現(xiàn)并發(fā)控制,目前主要有三種方式:

a) Channel - 分為無(wú)緩沖、有緩沖通道;

b) WaitGroup - sync包提供的goroutine間的同步機(jī)制;

c) Context - 在調(diào)用鏈不同goroutine間傳遞和共享數(shù)據(jù);

本文demo中主要用到了前兩種,基本使用請(qǐng)查看官方文檔。

2. Demo需求與分析:

需求:實(shí)現(xiàn)一個(gè)EDM的高效郵件發(fā)送:需要支持多個(gè)國(guó)家(可以看成是多個(gè)任務(wù)),需要記錄每條任務(wù)發(fā)送的狀態(tài)(當(dāng)前成功、失敗條數(shù)),需要支持可暫停(stop)、重新發(fā)送(run)操作。

分析:從需求可以看出,在郵件發(fā)送中可以通過(guò)并發(fā)實(shí)現(xiàn)多個(gè)國(guó)家(多個(gè)任務(wù))并發(fā)、單個(gè)任務(wù)分批次并發(fā)實(shí)現(xiàn)快速、高效EDM需求。

3. Demo實(shí)戰(zhàn)源碼:

3.1 main.go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 10
  wg          sync.WaitGroup
  finish      = make(chan bool)
)

func main() {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 5; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    go RunTask(filename, start, batchLength)
  }

  // main 阻塞等待goroutine執(zhí)行完成
  fmt.Println(<-finish)

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}

// 單任務(wù)
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // 等待一批完成才進(jìn)入下一批
    //wg.Wait()
  }

  wg.Wait()
  finish <- true

  return retErr
}

注意上面wg.Wait()的位置(下面有討論),在finish channel之前,目的是為了等待子goroutine運(yùn)行完,再通過(guò)一個(gè)無(wú)緩沖通道finish通知main goroutine,然后main運(yùn)行結(jié)束。

func ReadLines()讀取指定行數(shù)據(jù):

// 讀取指定行數(shù)據(jù)
func ReadLines(filename string, start, length int) (line int, retErr error) {
  fmt.Println("current file:", filename)

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳過(guò)開(kāi)始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {
      wg.Add(1)
      // go并發(fā)執(zhí)行
      go SendEmail(line)
      if startLine == endLine {
        break
      }
    }

    startLine++
  }

  return startLine, retErr
}

// 模擬郵件發(fā)送
func SendEmail(email string) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

運(yùn)行上面main.go,3個(gè)任務(wù)在1s內(nèi)并發(fā)完成所有郵件(./task/edm1.txt中一行表示一個(gè)郵箱)發(fā)送。

true

finished all tasks.

Total cost(ms): 1001

那么問(wèn)題來(lái)了:沒(méi)有實(shí)現(xiàn)分批每次并發(fā)batchLength = 10,因?yàn)槿绻环峙l(fā)送,只要其中某個(gè)任務(wù)或某一封郵件出錯(cuò)了,那下次重新run的時(shí)候,會(huì)不知道哪些用戶已經(jīng)發(fā)送過(guò)了,出現(xiàn)重復(fù)發(fā)送。而分批發(fā)送即使中途出錯(cuò)了,下一次重新run可從上次出錯(cuò)的end行開(kāi)始,最多是[start - end]一個(gè)batchLength 發(fā)送失敗,可以接受。

于是,將倒數(shù)第5行wg.Wait()注釋掉,倒數(shù)第8行注釋打開(kāi),如下:

// 單任務(wù)
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // 等待一批完成才進(jìn)入下一批
    wg.Wait()
  }

  //wg.Wait()
  finish <- true

  return retErr
}

運(yùn)行就報(bào)錯(cuò):

panic: sync: WaitGroup is reused before previous Wait has returned

提示WaitGroupgoroutine之間重用了,雖然是全局變量,看起來(lái)是使用不當(dāng)。怎么調(diào)整呢?

3.2 main.go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 10
  outerWg     sync.WaitGroup
)

func main() {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 3; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    outerWg.Add(1)
    go RunTask(filename, start, batchLength)
  }

  // main 阻塞等待goroutine執(zhí)行完成
  outerWg.Wait()

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}

// 單任務(wù)
func RunTask(filename string, start, length int) (retErr error) {
  for {
    isFinish := make(chan bool)
    readLine, err := ReadLines(filename, start, length, isFinish)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    // 等待一批完成才進(jìn)入下一批
    fmt.Println("current line:", readLine)
    start += length
    <-isFinish

    // 關(guān)閉channel,釋放資源
    close(isFinish)
  }

  outerWg.Done()

  return retErr
}

從上面可以看出:調(diào)整的思路是外層用WaitGroup控制,里層用channel 控制,執(zhí)行又報(bào)錯(cuò) : (

fatal error: all goroutines are asleep - deadlock!



goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

    /usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)

    /usr/local/go/src/sync/waitgroup.go:131 +0x72

main.main()

    /home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab



goroutine 5 [chan send]:

main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0)

仔細(xì)檢查,發(fā)現(xiàn)上面代碼中定義的isFinish 是一個(gè)無(wú)緩沖channel,在發(fā)郵件SendMail() 子協(xié)程沒(méi)有完成時(shí),寫(xiě)入到了沒(méi)有及時(shí)讀取的無(wú)緩沖通道將阻塞當(dāng)前goroutine,其他goroutine也是一樣的都被阻塞,這樣就出現(xiàn)了all goroutines are asleep - deadlock!

channel阻塞口訣:讀不出來(lái)、寫(xiě)不進(jìn)去的時(shí)候,都會(huì)阻塞!

于是將上面代碼改為有緩沖繼續(xù)嘗試:

isFinish := make(chan bool, 1)
// 讀取指定行數(shù)據(jù)
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // 控制每一批發(fā)完再下一批
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳過(guò)開(kāi)始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go并發(fā)執(zhí)行
      go SendEmail(line, wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// 模擬郵件發(fā)送
func SendEmail(email string, wg sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

運(yùn)行,又報(bào)錯(cuò)了 : (

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

    /usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)

這次提示有點(diǎn)不一樣,看起來(lái)是里層的WaitGroup 導(dǎo)致了死鎖,繼續(xù)檢查發(fā)現(xiàn)里層wg 是值傳遞,應(yīng)該使用指針傳引用。

// go并發(fā)執(zhí)行
go SendEmail(line, wg)

最后修改代碼如下:

// 讀取指定行數(shù)據(jù)
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // 控制每一批發(fā)完再下一批
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳過(guò)開(kāi)始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go并發(fā)執(zhí)行
      go SendEmail(line, &wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// 模擬郵件發(fā)送
func SendEmail(email string, wg *sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

趕緊運(yùn)行一下,這次終于成功啦 : )

current line: 100

current file: ./task/edm2.txt

Read EOF: ./task/edm2.txt

Read EOF: ./task/edm2.txt

finished all tasks.

Total cost(ms): 4003

每個(gè)任務(wù)模擬的是100行,從第60行開(kāi)始運(yùn)行,四個(gè)任務(wù)并發(fā)執(zhí)行,每個(gè)任務(wù)分批內(nèi)再次并發(fā),并且控制了每一批次完成后再進(jìn)行下一批,所以總運(yùn)行時(shí)間約4s,符合期望值。完整源碼請(qǐng)移步GitHub。

4. 小結(jié):

本文通過(guò)兩層嵌套Go 并發(fā),模擬實(shí)現(xiàn)了高性能并發(fā)EDM,具體的一些出錯(cuò)行控制、任務(wù)中斷與再次執(zhí)行將在下次繼續(xù)討論,主要邏輯已跑通,幾個(gè)坑點(diǎn)小結(jié)如下:

a) WaitGroup 一般用于main 主協(xié)程等待全部子協(xié)程退出后,再優(yōu)雅退出主協(xié)程;嵌套使用時(shí)注意wg.Wait()放的位置;

b) 合理使用channel,無(wú)緩沖chan將阻塞當(dāng)前goroutine,有緩沖chan在cap未滿的情況下不會(huì)阻塞當(dāng)前goroutine,使用完記得釋放chan資源;

c) 注意函數(shù)間傳值或傳引用(本質(zhì)上還是傳值,傳的指針的指針內(nèi)存值)的合理使用;

后記:第一篇博客寫(xiě)到這里差不多算完成了,一不小心一個(gè)下午就過(guò)去了,寫(xiě)的邏輯、可讀性可能不太好請(qǐng)見(jiàn)諒,歡迎留言批評(píng)指正。感謝您的閱讀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Go語(yǔ)言中的并發(fā)編程 并發(fā)是編程里面一個(gè)非常重要的概念,Go語(yǔ)言在語(yǔ)言層面天生支持并發(fā),這也是Go語(yǔ)言流行的一個(gè)很...
    吳佳浩閱讀 405評(píng)論 0 1
  • 1 并發(fā)與并行 Erlang之父Joe Armstrong曾經(jīng)以下圖解釋并發(fā)與并行。 并發(fā)在圖中的解釋是兩隊(duì)人排隊(duì)...
    泥人冷風(fēng)閱讀 290評(píng)論 0 0
  • 本文翻譯自Sameer Ajmani的文章《Go Concurrency Patterns: Pipelines ...
    大蟒傳奇閱讀 3,977評(píng)論 0 15
  • 并發(fā)是編程里面一個(gè)非常重要的概念,Go語(yǔ)言在語(yǔ)言層面天生支持并發(fā),這也是Go語(yǔ)言流行的一個(gè)很重要的原因。 Go語(yǔ)言...
    雪上霜閱讀 287評(píng)論 0 0
  • 控制并發(fā)有三種種經(jīng)典的方式,一種是通過(guò)channel通知實(shí)現(xiàn)并發(fā)控制 一種是WaitGroup,另外一種就是Con...
    wiseAaron閱讀 10,826評(píng)論 4 34

友情鏈接更多精彩內(nèi)容