context.Context取消其他協(xié)程的操作

  • 現(xiàn)在有一個(gè)需求,兩個(gè)子協(xié)程分別執(zhí)行兩個(gè)一次性長耗時(shí)操作,其中一個(gè)協(xié)程因?yàn)殄e(cuò)誤退出的時(shí)候,另外一個(gè)協(xié)程也需要退出,當(dāng)我閱讀相關(guān)文章的時(shí)候都告訴我,用如下代碼實(shí)現(xiàn):

    package main
    
    import (
        "context"
        "errors"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        wg := sync.WaitGroup{}
        errChan := make(chan error)
        wg.Add(2)
        // 子協(xié)程1
        go func(ctx context.Context) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù)
                    time.Sleep(30 * time.Second)
                }
            }
        }(ctx)
    
        // 子協(xié)程2
        go func() {
            defer wg.Done()
            // 模擬執(zhí)行3秒以后出現(xiàn)了錯(cuò)誤退出協(xié)程
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
    
        // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用
        // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出
        // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                cancel()
            }
        }()
        wg.Wait()
        close(errChan)
    }
    

但是仔細(xì)分析后,發(fā)現(xiàn)這樣的代碼并不能滿足我們的需求。

先我們先明確一下我們需求:

  1. 子協(xié)程1和子協(xié)程2都是只需要執(zhí)行一次的長耗時(shí)任務(wù)
  2. 子協(xié)程2因?yàn)榘l(fā)生了錯(cuò)誤退出,此時(shí)子協(xié)程1也需要退出

我們再來分析上面的代碼,是否能滿足我們的需求:

  1. 當(dāng)子協(xié)程2發(fā)生錯(cuò)誤退出了,將錯(cuò)誤放入errChan中,errChan拿出值發(fā)現(xiàn)err != nil,調(diào)用cancel
  2. 此時(shí)子協(xié)程1正在被阻塞中,等待30秒阻塞完成以后,進(jìn)入下一次循環(huán),發(fā)現(xiàn)當(dāng)前當(dāng)前協(xié)程應(yīng)該cancel了,于是當(dāng)前子協(xié)程1退出協(xié)程。

顯然執(zhí)行的結(jié)果并不能滿足我們的預(yù)期需求:

假如子協(xié)程1中的任務(wù)執(zhí)行了一次以后,進(jìn)入下一次循環(huán),發(fā)現(xiàn)ctx還沒有接收到cancel的信號,就會(huì)第二次執(zhí)行任務(wù),現(xiàn)在與我們的需求是違背的。

此時(shí)的解決方案可以有兩種:

  1. 在子協(xié)程1中加入一個(gè)bool類型的變量來判斷任務(wù)是否已經(jīng)執(zhí)行過,代碼如下:

    // 子協(xié)程1
    go func(ctx context.Context) {
        defer wg.Done()
        var isExec bool
        for {
            select {
                case <-ctx.Done():
                 return
                default:
                    if !isExec {
                        // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù)
                        time.Sleep(30 * time.Second) 
                    }
            }
        }
    }(ctx)
    

    這樣做其實(shí)也沒有意義,這個(gè)任務(wù)本身就應(yīng)該只執(zhí)行一次,執(zhí)行結(jié)束后,難道一直循環(huán)著等其他地方cancel以后才退出當(dāng)前協(xié)程嗎?

  2. 任務(wù)執(zhí)行完成以后return直接退出,代碼如下:

    // 子協(xié)程1
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
                case <-ctx.Done():
                 return
                default:
                 // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù)
                 time.Sleep(30 * time.Second) 
                 return
            }
        }
    }(ctx)
    

    這樣做以后就會(huì)導(dǎo)致ctx的cancel沒有任何意義,不管怎樣,子協(xié)程1中的任務(wù)都是會(huì)執(zhí)行完成以后才會(huì)退出的

仔細(xì)分析下來,這樣的寫法其實(shí)并不能滿足我們的需求。

那么到底應(yīng)該如何書寫才能滿足我們的需求呢。

需要分為三種情況來看:

  1. 任務(wù)本身是可以通過context.Context控制的,比如http請求

    package main
    
    import (
        "context"
        "errors"
        "fmt"
        "io"
        "net/http"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        // 當(dāng)有兩個(gè)協(xié)程往同一個(gè)通道中寫入數(shù)據(jù)的時(shí)候,但是又只有一處讀的情況下,至少需要一個(gè)緩沖區(qū)
        // 否則會(huì)造成死鎖
        errChan := make(chan error, 1)
        wg := sync.WaitGroup{}
        wg.Add(2)
        // 子協(xié)程1
        go func(ctx context.Context) {
            defer wg.Done()
            request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil)
            if err != nil {
                errChan <- err
                return
            }
            resp, err := http.DefaultClient.Do(request)
            if err != nil {
                errChan <- err
                return
            }
            defer resp.Body.Close()
            body, err := io.ReadAll(resp.Body)
            if err != nil {
                errChan <- err
                return
            }
            fmt.Println(string(body))
        }(ctx)
        // 子協(xié)程2
        go func() {
            defer wg.Done()
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
        
        // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用
        // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出
        // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                fmt.Println(err)
                cancel()
            }
        }()
        wg.Wait()
    }
    

    上面的代碼中,子協(xié)程1中訪問的是一個(gè)耗時(shí)較長的http接口(我在此接口中sleep了30秒來模擬因?yàn)榫W(wǎng)絡(luò)原因或者其他原因?qū)е陆涌谠L問時(shí)間較長的情況),假如子協(xié)程2運(yùn)行了3秒以后出現(xiàn)了錯(cuò)誤,調(diào)用了cancel,那么子協(xié)程1也會(huì)因?yàn)閏ontext的控制產(chǎn)生錯(cuò)誤直接退出,不需要等待30秒請求結(jié)束以后才會(huì)退出。

  2. 如果任務(wù)本身不能通過ctx控制,但是任務(wù)本身是可以拆分為多次完成的任務(wù)。比如,子協(xié)程1中的任務(wù)是讀取一個(gè)100M文件。

    package main
    
    import (
        "context"
        "errors"
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        // 當(dāng)有兩個(gè)協(xié)程往同一個(gè)通道中寫入數(shù)據(jù)的時(shí)候,但是又只有一處讀的情況下,至少需要一個(gè)緩沖區(qū)
        // 否則會(huì)造成死鎖
        errChan := make(chan error, 1)
        wg := sync.WaitGroup{}
        wg.Add(2)
        // 子協(xié)程1
        go func(ctx context.Context) {
            for i := 0; i < 100; i++ {
                select {
                case <-ctx.Done():
                    return
                default:
                    time.Sleep(1 * time.Second)
                    fmt.Println("讀取1M的數(shù)據(jù)")
                }
            }
        }(ctx)
        // 子協(xié)程2
        go func() {
            defer wg.Done()
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
    
        // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用
        // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出
        // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                fmt.Println(err)
                cancel()
            }
        }()
        wg.Wait()
    }
    

    上面的代碼中,讀取100M的文件,分為100次讀取,每次讀取1M數(shù)據(jù),假如子協(xié)程2運(yùn)行了3秒出現(xiàn)錯(cuò)誤退出以后,子協(xié)程1在讀取了最近的1M數(shù)據(jù)以后進(jìn)入下一次循環(huán)也會(huì)發(fā)現(xiàn)被cancel了,就會(huì)退出協(xié)程, 不繼續(xù)執(zhí)行任務(wù)

  3. 如果任務(wù)本身是一次性任務(wù),并且不能拆分為多次任務(wù),又不能被context.Context控制的任務(wù),只能等待任務(wù)執(zhí)行結(jié)束,不需要傳入context.Context來進(jìn)行取消控制

除了自己控制context.Context來控制協(xié)程取消操作以外,還可以利用ErrGroup的方式來更簡單控制協(xié)程的取消

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    eg, ctx := errgroup.WithContext(context.Background())

    eg.Go(func() error {
        request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil)
        if err != nil {
            return err
        }
        resp, err := http.DefaultClient.Do(request)
        if err != nil {
            fmt.Println(err)
            return err
        }
        defer resp.Body.Close()
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            return err
        }
        fmt.Println(string(body))
        return nil
    })

    eg.Go(func() error {
        for i := 0; i < 10; i++ {
            fmt.Printf("wait %d second\n", i)
            time.Sleep(time.Second)
        }
        return fmt.Errorf("something is wrong")
    })
    if err := eg.Wait(); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("task is success")
}

上面的代碼,可以用非常簡單的方式來處理子協(xié)程 2出現(xiàn)錯(cuò)誤的情況下,子協(xié)程1也同時(shí)需要退出的需求。不需要自己控制sync.Group和errChan導(dǎo)致代碼復(fù)雜化。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • [TOC] Golang Context分析 Context背景 和 適用場景 golang在1.6.2的時(shí)候還沒...
    AllenWu閱讀 11,628評論 0 30
  • 在工程化的Go語言開發(fā)項(xiàng)目中,Go語言的源碼復(fù)用是建立在包(package)基礎(chǔ)之上的。本文介紹了Go語言中如何定...
    雪上霜閱讀 288評論 0 0
  • golang go和php的區(qū)別類型:go為編譯性語言;php解釋性語言錯(cuò)誤:go的錯(cuò)誤處理機(jī)制;php本身或者框...
    Impossible安徒生閱讀 470評論 0 0
  • go語言協(xié)程使用[vscode-webview://45b6830c-5e27-4be5-8359-1ea2a28...
    xcrossed閱讀 1,491評論 0 0
  • 輸入與輸出-fmt包 時(shí)間與日期-time包 命令行參數(shù)解析-flag包 日志-log包 IO操作-os包 IO操...
    思考的山羊閱讀 6,739評論 0 5

友情鏈接更多精彩內(nèi)容