golang任務(wù)拆分errgroup

ErrGroup是 Go 官方提供的一個(gè)同步擴(kuò)展庫??梢詫⒁粋€(gè)大任務(wù)拆分成幾個(gè)小任務(wù)并發(fā)執(zhí)行,提高程序效率。

主要有三個(gè)方法,WithContext、Go、Wait。

func WithContext(ctx context.Context) (*Group, context.Context)

WithContext,返回一個(gè)Group實(shí)例以及一個(gè)Context。如果有一個(gè)子任務(wù)返回錯(cuò)誤,或者Wait調(diào)用返回,這個(gè)Context就會cancel。

func (g *Group) Go(f func() error)

Go,用于傳入子任務(wù),如果成功返回nil,如果失敗返回error,同時(shí)cancel那個(gè)Context

func (g *Group) Wait() error

Wait,類似waitgroup,等所有的子任務(wù)完成后返回,如果有多個(gè)子任務(wù)返回error,則會返回第一個(gè)error,所有子任務(wù)執(zhí)行成功則返回nil。

比較常規(guī)的用法

package main

import (
    "errors"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    var g errgroup.Group

    // 啟動第一個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #1")
        // return errors.New("failed to exec #1")
        return nil
    })
    // 啟動第二個(gè)子任務(wù),它執(zhí)行失敗
    g.Go(func() error {
        time.Sleep(10 * time.Second)
        fmt.Println("exec #2")
        return errors.New("failed to exec #2")
    })

    // 啟動第三個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(15 * time.Second)
        fmt.Println("exec #3")
        return nil
    })
    // 等待三個(gè)任務(wù)都完成
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully exec all")
    } else {
        fmt.Println("failed:", err)
    }
}

運(yùn)行結(jié)果:


image-20201202163104917.png

在貼一個(gè)例子,是官方文檔提供的一個(gè)pipeline的例子,原文地址:https://godoc.org/golang.org/x/sync/errgroup#example-Group--Pipeline。

package main

import (
    "context"
    "crypto/md5"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"

    "golang.org/x/sync/errgroup"
)

func main() {
    m, err := MD5All(context.Background(), ".")
    if err != nil {
        log.Fatal(err)
    }

    for k, sum := range m {
        fmt.Printf("%s:\t%x\n", k, sum)
    }
}

type result struct {
    path string
    sum  [md5.Size]byte
}

// 遍歷根目錄下的所有文件,計(jì)算md5值
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    g, ctx := errgroup.WithContext(ctx)
    //文件路徑的channel
    paths := make(chan string)
    //遍歷文件,將文件路徑放到paths
    g.Go(func() error {
        defer close(paths)
        return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-ctx.Done():
                return ctx.Err()
            }
            return nil
        })
    })

    // 20個(gè)goroutine計(jì)算md5,從paths獲取文件路徑
    c := make(chan result) //存儲結(jié)果
    const numDigesters = 20
    for i := 0; i < numDigesters; i++ {
        g.Go(func() error {
            for path := range paths {
                data, err := ioutil.ReadFile(path)
                if err != nil {
                    return err
                }
                select {
                case c <- result{path, md5.Sum(data)}:
                case <-ctx.Done():
                    return ctx.Err()
                }
            }
            return nil
        })
    }
    go func() {
        g.Wait() //等待執(zhí)行完
        close(c)
    }()
    //將結(jié)果輸出到map
    m := make(map[string][md5.Size]byte)
    for r := range c {
        m[r.path] = r.sum
    }
    // 再次調(diào)用wait看有沒有error
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return m, nil
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容