-
現(xiàn)在有一個(gè)需求,兩個(gè)子協(xié)程分別執(zhí)行兩個(gè)一次性長耗時(shí)操作,其中一個(gè)協(xié)程因?yàn)殄e(cuò)誤退出的時(shí)候,另外一個(gè)協(xié)程也需要退出,當(dāng)我閱讀相關(guān)文章的時(shí)候都告訴我,用如下代碼實(shí)現(xiàn):
package main import ( "context" "errors" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} errChan := make(chan error) wg.Add(2) // 子協(xié)程1 go func(ctx context.Context) { defer wg.Done() for { select { case <-ctx.Done(): return default: // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù) time.Sleep(30 * time.Second) } } }(ctx) // 子協(xié)程2 go func() { defer wg.Done() // 模擬執(zhí)行3秒以后出現(xiàn)了錯(cuò)誤退出協(xié)程 time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用 // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出 // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作 go func() { if err := <-errChan; err != nil { cancel() } }() wg.Wait() close(errChan) }
但是仔細(xì)分析后,發(fā)現(xiàn)這樣的代碼并不能滿足我們的需求。
先我們先明確一下我們需求:
- 子協(xié)程1和子協(xié)程2都是只需要執(zhí)行一次的長耗時(shí)任務(wù)
- 子協(xié)程2因?yàn)榘l(fā)生了錯(cuò)誤退出,此時(shí)子協(xié)程1也需要退出
我們再來分析上面的代碼,是否能滿足我們的需求:
- 當(dāng)子協(xié)程2發(fā)生錯(cuò)誤退出了,將錯(cuò)誤放入errChan中,errChan拿出值發(fā)現(xiàn)err != nil,調(diào)用cancel
- 此時(shí)子協(xié)程1正在被阻塞中,等待30秒阻塞完成以后,進(jìn)入下一次循環(huán),發(fā)現(xiàn)當(dāng)前當(dāng)前協(xié)程應(yīng)該cancel了,于是當(dāng)前子協(xié)程1退出協(xié)程。
顯然執(zhí)行的結(jié)果并不能滿足我們的預(yù)期需求:
假如子協(xié)程1中的任務(wù)執(zhí)行了一次以后,進(jìn)入下一次循環(huán),發(fā)現(xiàn)ctx還沒有接收到cancel的信號,就會(huì)第二次執(zhí)行任務(wù),現(xiàn)在與我們的需求是違背的。
此時(shí)的解決方案可以有兩種:
-
在子協(xié)程1中加入一個(gè)bool類型的變量來判斷任務(wù)是否已經(jīng)執(zhí)行過,代碼如下:
// 子協(xié)程1 go func(ctx context.Context) { defer wg.Done() var isExec bool for { select { case <-ctx.Done(): return default: if !isExec { // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù) time.Sleep(30 * time.Second) } } } }(ctx)這樣做其實(shí)也沒有意義,這個(gè)任務(wù)本身就應(yīng)該只執(zhí)行一次,執(zhí)行結(jié)束后,難道一直循環(huán)著等其他地方cancel以后才退出當(dāng)前協(xié)程嗎?
-
任務(wù)執(zhí)行完成以后return直接退出,代碼如下:
// 子協(xié)程1 go func(ctx context.Context) { defer wg.Done() for { select { case <-ctx.Done(): return default: // 模擬一個(gè)阻塞30秒的長耗時(shí)任務(wù) time.Sleep(30 * time.Second) return } } }(ctx)這樣做以后就會(huì)導(dǎo)致ctx的cancel沒有任何意義,不管怎樣,子協(xié)程1中的任務(wù)都是會(huì)執(zhí)行完成以后才會(huì)退出的
仔細(xì)分析下來,這樣的寫法其實(shí)并不能滿足我們的需求。
那么到底應(yīng)該如何書寫才能滿足我們的需求呢。
需要分為三種情況來看:
-
任務(wù)本身是可以通過context.Context控制的,比如http請求
package main import ( "context" "errors" "fmt" "io" "net/http" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) // 當(dāng)有兩個(gè)協(xié)程往同一個(gè)通道中寫入數(shù)據(jù)的時(shí)候,但是又只有一處讀的情況下,至少需要一個(gè)緩沖區(qū) // 否則會(huì)造成死鎖 errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(2) // 子協(xié)程1 go func(ctx context.Context) { defer wg.Done() request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil) if err != nil { errChan <- err return } resp, err := http.DefaultClient.Do(request) if err != nil { errChan <- err return } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { errChan <- err return } fmt.Println(string(body)) }(ctx) // 子協(xié)程2 go func() { defer wg.Done() time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用 // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出 // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作 go func() { if err := <-errChan; err != nil { fmt.Println(err) cancel() } }() wg.Wait() }上面的代碼中,子協(xié)程1中訪問的是一個(gè)耗時(shí)較長的http接口(我在此接口中sleep了30秒來模擬因?yàn)榫W(wǎng)絡(luò)原因或者其他原因?qū)е陆涌谠L問時(shí)間較長的情況),假如子協(xié)程2運(yùn)行了3秒以后出現(xiàn)了錯(cuò)誤,調(diào)用了cancel,那么子協(xié)程1也會(huì)因?yàn)閏ontext的控制產(chǎn)生錯(cuò)誤直接退出,不需要等待30秒請求結(jié)束以后才會(huì)退出。
-
如果任務(wù)本身不能通過ctx控制,但是任務(wù)本身是可以拆分為多次完成的任務(wù)。比如,子協(xié)程1中的任務(wù)是讀取一個(gè)100M文件。
package main import ( "context" "errors" "fmt" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) // 當(dāng)有兩個(gè)協(xié)程往同一個(gè)通道中寫入數(shù)據(jù)的時(shí)候,但是又只有一處讀的情況下,至少需要一個(gè)緩沖區(qū) // 否則會(huì)造成死鎖 errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(2) // 子協(xié)程1 go func(ctx context.Context) { for i := 0; i < 100; i++ { select { case <-ctx.Done(): return default: time.Sleep(1 * time.Second) fmt.Println("讀取1M的數(shù)據(jù)") } } }(ctx) // 子協(xié)程2 go func() { defer wg.Done() time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身應(yīng)該在子協(xié)程出現(xiàn)錯(cuò)誤退出的時(shí)候調(diào)用 // 因?yàn)樽訁f(xié)程1和子協(xié)程2都可能會(huì)出現(xiàn)錯(cuò)誤而退出 // 為了避免忘記調(diào)用cancel的情況,專門另起一個(gè)協(xié)程來控制cancel操作 go func() { if err := <-errChan; err != nil { fmt.Println(err) cancel() } }() wg.Wait() }上面的代碼中,讀取100M的文件,分為100次讀取,每次讀取1M數(shù)據(jù),假如子協(xié)程2運(yùn)行了3秒出現(xiàn)錯(cuò)誤退出以后,子協(xié)程1在讀取了最近的1M數(shù)據(jù)以后進(jìn)入下一次循環(huán)也會(huì)發(fā)現(xiàn)被cancel了,就會(huì)退出協(xié)程, 不繼續(xù)執(zhí)行任務(wù)
如果任務(wù)本身是一次性任務(wù),并且不能拆分為多次任務(wù),又不能被context.Context控制的任務(wù),只能等待任務(wù)執(zhí)行結(jié)束,不需要傳入context.Context來進(jìn)行取消控制
除了自己控制context.Context來控制協(xié)程取消操作以外,還可以利用ErrGroup的方式來更簡單控制協(xié)程的取消
package main
import (
"context"
"fmt"
"io"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(request)
if err != nil {
fmt.Println(err)
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(string(body))
return nil
})
eg.Go(func() error {
for i := 0; i < 10; i++ {
fmt.Printf("wait %d second\n", i)
time.Sleep(time.Second)
}
return fmt.Errorf("something is wrong")
})
if err := eg.Wait(); err != nil {
fmt.Println(err)
return
}
fmt.Println("task is success")
}
上面的代碼,可以用非常簡單的方式來處理子協(xié)程 2出現(xiàn)錯(cuò)誤的情況下,子協(xié)程1也同時(shí)需要退出的需求。不需要自己控制sync.Group和errChan導(dǎo)致代碼復(fù)雜化。