在響應(yīng)應(yīng)用請(qǐng)求的過(guò)程中, 有時(shí)候會(huì)遇到比較耗時(shí)的任務(wù), 比如給用戶發(fā)送郵件, 耗時(shí)任務(wù)時(shí)間不可能控, 很可能超過(guò) 1s, 為了給用戶比較好的體驗(yàn), 一般會(huì)控制請(qǐng)求響應(yīng)時(shí)間(RT, response time)在300ms內(nèi)(不考慮網(wǎng)絡(luò)波動(dòng)), 甚至在 200ms 內(nèi). 面對(duì)這樣的工作場(chǎng)景, 就需要使用異步任務(wù)進(jìn)行處理.
Go協(xié)程與異步
從一段簡(jiǎn)單的代碼開(kāi)始:
func TestTask(t *testing.T) {
task()
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
- 代碼在 Goland 中編寫(xiě), 同時(shí)也推薦使用 Goland 進(jìn)行 Go 開(kāi)發(fā)
- 這里使用單測(cè)(
test)演示代碼:- 輸入
test就可以快速生成代碼(Goland 中稱之為live templates, 其實(shí)就是預(yù)設(shè)好的代碼片段) - 在單測(cè)點(diǎn)擊可以執(zhí)行: 1. 點(diǎn)擊左側(cè)(gutter icon)的運(yùn)行圖標(biāo); 2. 函數(shù)上右鍵菜單鍵; 3. 快捷鍵
ctl-shift-R
- 輸入
上面使用 task() 模擬耗時(shí) 1s 的任務(wù), 整個(gè)test代表一次請(qǐng)求, 執(zhí)行如下:
=== RUN TestTask
2022/11/17 20:11:15 task done
2022/11/17 20:11:15 req done
--- PASS: TestTask (1.00s)
PASS
Go基礎(chǔ)知識(shí): 天生并發(fā), 使用
go關(guān)鍵字就可以開(kāi)新協(xié)程, 將代碼放到新協(xié)程中執(zhí)行
func TestTask(t *testing.T) {
go task()
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
- 只需要在
task()前添加go關(guān)鍵字, 就可以新開(kāi)一個(gè)協(xié)程, 將task()在新協(xié)程中執(zhí)行
不過(guò)在這里, 并沒(méi)有得到預(yù)期的結(jié)果:
=== RUN TestTask
2022/11/17 20:16:08 req done
--- PASS: TestTask (0.00s)
PASS
- 輸出顯示:
task()中的日志沒(méi)有輸出, 看起來(lái)像沒(méi)有執(zhí)行
Go基礎(chǔ)知識(shí): Go的代碼都在協(xié)程中執(zhí)行, 入口
main()函數(shù)是主協(xié)程, 之后使用go關(guān)鍵詞開(kāi)的協(xié)程都是子協(xié)程, 主協(xié)程退出后, 程序會(huì)終止(exit)
也就是說(shuō)上面的 TestTask()(主協(xié)程) 和 go task()(子協(xié)程)都執(zhí)行了, 但是主協(xié)程執(zhí)行完, 程序退出了, 子協(xié)程沒(méi)執(zhí)行完(或者沒(méi)調(diào)度到), 就被強(qiáng)制退出了
簡(jiǎn)單 Go 并發(fā): 任務(wù)編排
上面的例子, 常見(jiàn)有 3 種解決方案:
- 方案1: 等子協(xié)程執(zhí)行完
func TestTask(t *testing.T) {
go task()
time.Sleep(time.Second) // 等待子協(xié)程執(zhí)行完
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
- 方案2: 使用
WaitGroup
func TestTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
task()
wg.Done()
}()
wg.Wait()
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
WaitGroup 其實(shí)很好理解, 就是同時(shí)等待一組任務(wù)完成, 它分為 3 步: 1. Add: 總共有多少任務(wù); 2. Done(): 表示任務(wù)執(zhí)行完; 3. Wait(): 等待所有任務(wù)完成
- 方案3: 使用 Go 的并發(fā)語(yǔ)言
chan
func TestTask(t *testing.T) {
ch := make(chan struct{}) // 初始化 chan
go func() {
task()
ch <- struct{}{} // 發(fā)送到 chan
}()
<-ch // 從 chan 獲取
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
Go基礎(chǔ)知識(shí): 通過(guò)
chan T就可以申明T類型的 chan, 供協(xié)程間進(jìn)行通信;struct{}是 Go 中0 memory use(0內(nèi)存占用)類型, 適合上面使用 chan 進(jìn)行 控制 而不需要 數(shù)據(jù) 進(jìn)行通信的情況
雖然只是3個(gè)簡(jiǎn)單的 demo code, Go 提供的 2 種并發(fā)能力都有展示:
- 傳統(tǒng)并發(fā)原語(yǔ): 大部分集中在
sync包下, 上面案例2中的sync.WaitGroup就是其中之一 - Go 基于 CSP 的并發(fā)編程范式: 包括
go chan select, 上面的案例3中展示了go+chan的基本用法
簡(jiǎn)單 Go 并發(fā)講完了, 那任務(wù)編排又是啥? 其實(shí), 某等程度上, 任務(wù)編排=異步, 任務(wù)需要 分工 完成時(shí), 也就是一個(gè)任務(wù)相對(duì)于另一個(gè)任務(wù)需要 異步處理. 而任務(wù)編排, 恰恰是 Go 語(yǔ)言中基于 chan 進(jìn)行并發(fā)編程的強(qiáng)項(xiàng).
Go 中有一個(gè)大的方向,就是任務(wù)編排用 Channel,共享資源保護(hù)用傳統(tǒng)并發(fā)原語(yǔ)。
回到最初的代碼, 在實(shí)際使用的使用, 到底使用的是哪種方案呢? 答案是 方案1. 看看接近真實(shí)場(chǎng)景的代碼
func TestTrace(t *testing.T) {
for { // 服務(wù)以 daemon 的方式持續(xù)運(yùn)行
// 不斷處理用戶的請(qǐng)求
{
go task()
log.Print("req done")
}
}
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
也就是真實(shí)場(chǎng)景下, 主協(xié)程所在的 server 會(huì)一直常駐, 請(qǐng)求(request)所有的子協(xié)程不用擔(dān)心還沒(méi)執(zhí)行完就被強(qiáng)制退出了
避坑: 野生 Goroutine
在繼續(xù)講解之前, 一定要提一下使用 go 開(kāi)協(xié)程的一個(gè)坑, 或者說(shuō)一個(gè)非常重要的基礎(chǔ)知識(shí):
Go基礎(chǔ)知識(shí): panic只對(duì)當(dāng)前goroutine的defer有效
Go中出現(xiàn) panic(), 程序會(huì)立即終止:
func TestPanic(t *testing.T) {
panic("panic")
log.Print("end")
}
=== RUN TestPanic
--- FAIL: TestPanic (0.00s)
panic: panic [recovered]
panic: panic
goroutine 118 [running]:
testing.tRunner.func1.2({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8
testing.tRunner.func1()
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378
panic({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204
...
...
testing.tRunner(0x14000603040, 0x104058678)
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c
created by testing.(*T).Run
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300
Process finished with the exit code 1
- 可以看到,
panic后程序直接退出,panic后的log.Print("end")并沒(méi)有執(zhí)行
當(dāng)然, 想要程序健壯一些, panic 是可以 吃掉 的:
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
panic("panic")
log.Print("end")
}
=== RUN TestPanic
2022/11/17 22:25:08 panic
--- PASS: TestPanic (0.00s)
PASS
使用 recover() 對(duì) panic() 進(jìn)行恢復(fù), 程序就不會(huì)崩掉(exit)
但是, 一定要注意
panic只對(duì)當(dāng)前goroutine的defer有效!
panic只對(duì)當(dāng)前goroutine的defer有效!
panic只對(duì)當(dāng)前goroutine的defer有效!
重要的事情說(shuō)三遍.
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
go func() {
panic("panic")
}()
log.Print("end")
}
=== RUN TestPanic
panic: panic
goroutine 88 [running]:
...
...
...
Process finished with the exit code 1
而 go 里面開(kāi)協(xié)程又是如此的方便, 簡(jiǎn)單一個(gè) go 關(guān)鍵字即可, 所以大家給這種情況起了個(gè)外號(hào): 野生 Goroutine. 最簡(jiǎn)單的做法就是對(duì)協(xié)程進(jìn)行一次封裝, 比如這樣:
package gox
// Run start with a goroutine
func Run(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
fn()
}()
}
原本的 go task(), 使用 gox.Run(task)進(jìn)行替換, 就可以 task 出現(xiàn) panic 的時(shí)候, 程序還能恢復(fù)
Trace: 異步任務(wù)還能進(jìn)行鏈路追蹤么?
隨著可觀測(cè)技術(shù)的不斷演進(jìn), 基建上的不斷提升, 鏈路追蹤技術(shù)也進(jìn)行了演進(jìn)
- trace1.0: opentracing jaeger
- trace2.0: otel
當(dāng)用戶請(qǐng)求進(jìn)來(lái)時(shí), 可以通過(guò) traceId 串聯(lián)起用戶的完成調(diào)用鏈, 監(jiān)控和排查問(wèn)題能力大大增強(qiáng)!
{
"code": 200,
"status": 200,
"msg": "成功",
"errors": null,
"data": "env-t0",
"timestamp": 1668696256,
"traceId": "..."
}
trace 通過(guò)請(qǐng)求(request)中的 context, 不斷向下傳遞, 從而將當(dāng)前請(qǐng)求的所用調(diào)用通過(guò)同一個(gè) traceId 串聯(lián)起來(lái)
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
Task(ctx)
log.Print("req done")
}
func Task(ctx context.Context) {
// 使用自定義span, 將當(dāng)前操作上報(bào)到trace
_, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask")
defer span.End()
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
如同上面演示的 demo code 演示:
- 通過(guò)
ctx, 將當(dāng)前請(qǐng)求(request)的所有操作使用同一個(gè) traceId 串起來(lái) - otel 默認(rèn)了很多操作的 trace 上報(bào), 比如 mysql/redis/kafka 等等, 也可以使用自定義
span的方式進(jìn)行新增
如果要進(jìn)行耗時(shí)任務(wù)異步處理, 直覺(jué)上直接 go 一下:
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
go Task(ctx)
log.Print("req done")
}
這時(shí)候腦海中陡然蹦出一個(gè)聲音: 野生Goroutine
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
gox.RunCtx(ctx, Task) // 在 gox.Run 的基礎(chǔ)上, 添加 ctx 支持
log.Print("req done")
}
可是等測(cè)試一下, 就會(huì)發(fā)現(xiàn), task() 并沒(méi)有執(zhí)行!
細(xì)心的小伙伴就會(huì)發(fā)現(xiàn), 這和開(kāi)始的例子有點(diǎn)像呀, 而且對(duì)比下就會(huì)知道, 此處多了一個(gè) ctx:
func TestTask(t *testing.T) {
go task(ctx)
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
- 沒(méi)有 ctx 的時(shí)候, 因?yàn)橹鲄f(xié)程一直在, 子協(xié)程可以處理完任務(wù)在退出, 也就是子協(xié)程的生命周期都在主協(xié)程內(nèi)
- 有 ctx 的時(shí)候, 由于 ctx 的存在, 請(qǐng)求(request)中主協(xié)程需要接受 ctx 控制, 異步處理后, 請(qǐng)求也就結(jié)束了(上面
log.Print("req done")模擬的部分), 這是 ctx 就會(huì)控制子協(xié)程一起結(jié)束掉, 也就是子協(xié)程的生命周期都在當(dāng)前請(qǐng)求的協(xié)程內(nèi)
于是, 又有了 2 種處理辦法:
- 簡(jiǎn)單做法, 就像上面一樣, 沒(méi)有 ctx, 就沒(méi)有問(wèn)題了嘛. 如果用一句話來(lái)概括這種方法:
面試官: 你可以回家等消息了 - 既然又要執(zhí)行異步任務(wù), 又要有 trace, 那把 trace 繼續(xù)傳下, 用一個(gè)新的 ctx 就好了嘛
上代碼:
- 復(fù)制 ctx, 把 trace 繼續(xù)傳下去
package ctxkit
// Clone 復(fù)制 ctx 中對(duì)應(yīng) key 的值,移除父級(jí) cancel。
func Clone(preCtx context.Context, keys ...interface{}) context.Context {
newCtx := context.Background()
// 從 pctx 開(kāi)啟一個(gè)子 span,來(lái)傳遞 traceId
_, ospan := otel.GetTracerProvider().
Tracer(trace_in.InstrumentationPrefix+"/ctxkit").
Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes(
trace_attr.AttrAsyncFlag.Int(1), // 標(biāo)記為異步
))
defer ospan.End()
newCtx = trace.ContextWithSpan(newCtx, ospan)
return ctxClone(newCtx, preCtx, keys...)
}
// CloneWithoutSpan 功能同 Clone,但不會(huì)創(chuàng)建 trace span,建議在大批數(shù)據(jù) for 循環(huán)之前使用,避免 span 鏈路過(guò)長(zhǎng)。
func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context {
tid := trace_in.GetOtelTraceId(preCtx)
if tid == "" {
tid = trace_in.FakeTraceId()
}
newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid)
return ctxClone(newCtx, preCtx, keys...)
}
func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context {
for _, key := range _ctxKeys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v)
}
}
keys = append(keys, _strKeys...)
for _, key := range keys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v) //nolint
}
}
return baseCtx
}
- 實(shí)際使用
func TestTask(t *testing.T) {
nexCtx := ctxkit.Clone(ctx)
go task(newCtx)
log.Print("req done")
}
func task() {
// 模擬耗時(shí)任務(wù)
time.Sleep(time.Second)
log.Print("task done")
}
異步任務(wù): 能否更優(yōu)雅點(diǎn)
如果是從請(qǐng)求過(guò)來(lái)的, 請(qǐng)求中自帶 trace, 并會(huì)在請(qǐng)求(request)的初始化的時(shí)候建 trace 寫(xiě)入到請(qǐng)求的 ctx 中, 那如果直接執(zhí)行一個(gè)異步任務(wù)呢?
那就需要手動(dòng)初始化 trace 了.
上代碼:
- 封裝異步任務(wù)(job): 封裝trace -> clone ctx -> 指標(biāo)收集(jobMetricsWrap) -> 野生Goroutine捕獲
package job
// AsyncJob 異步任務(wù)。
// name: 任務(wù)名。
// return: waitFunc,調(diào)用可以等待任務(wù)完成。
func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() {
ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace
newCtx := ctxkit.Clone(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// 指標(biāo)收集
jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...))
}()
return wg.Wait
}
- 實(shí)際使用:
func TestJob(t *testing.T) {
ctx := context.Background()
// 異步任務(wù)
// 邏輯在協(xié)程中執(zhí)行,已包裝 recover 邏輯
wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error {
// 內(nèi)部處理使用傳入的 ctx,已經(jīng)執(zhí)行過(guò) citkit.Clone
return doAsyncTask(ctx)
})
wait() // 如果需要等待任務(wù)結(jié)束則調(diào)用 wait,不需要?jiǎng)t忽略返回值
}
func doAsyncTask(ctx context.Context) error {
logs.InfoCtx(ctx, "async task done")
return nil
}
=== RUN TestJob
2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "..."}
--- PASS: TestJob (0.00s)
PASS
PS: 這里需要查看效果, 所以調(diào)用了
wait()等待異步任務(wù)結(jié)束, 實(shí)際使用可以直接使用job.AsyncJob()或者_ = job.AsyncJob()
最后一起來(lái)看看 trace 使用的效果:
todo: img
Asynq: 專業(yè)異步任務(wù)框架
如果只是 異步一下, 上面講解的內(nèi)容也基本夠用了; 如果有重度異步任務(wù)使用, 就得考慮專業(yè)的異步任務(wù)隊(duì)列框架了, Go 中可以選擇 Async
Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Allow aggregating group of tasks to batch multiple successive operations
- Flexible handler interface with support for middlewares
-
Ability to pause queueto stop processing tasks from the queue - Periodic Tasks
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
- Integration with Prometheus to collect and visualize queue metrics
-
Web UIto inspect and remote-control queues and tasks -
CLIto inspect and remote-control queues and tasks
整體架構(gòu)圖
todo
實(shí)際使用
使用的 demo 就不貼了, asynq 的文檔很詳細(xì), 說(shuō)一下具體實(shí)踐中遇到的 2個(gè) case:
- 使用
web UI: 處于安全考慮, 設(shè)置了ReadOnly
h := asynqmon.New(asynqmon.Options{
RootPath: "/monitoring", // RootPath specifies the root for asynqmon app
RedisConnOpt: tasks.GetRedis(),
ReadOnly: true, // admin web can't operation
})
r := mux.NewRouter()
r.PathPrefix(h.RootPath()).Handler(h)
srv := &http.Server{
Handler: r,
Addr: ":8080",
}
PS: 使用
web UI由于涉及到使用新的端口, 而應(yīng)用部署已經(jīng)上 k8s 了, 如何順利訪問(wèn)就需要一系列運(yùn)維操作, 留個(gè)坑, 以后有機(jī)會(huì)再填
- 測(cè)試環(huán)境OK, 線上報(bào)錯(cuò):
recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot
對(duì)比發(fā)現(xiàn), 是測(cè)試和線上使用的不同類型的 redis 實(shí)例導(dǎo)致的, 搜索云服務(wù)的幫助文檔:
todo
集群架構(gòu)實(shí)例的命令限制: 如需在集群架構(gòu)實(shí)例中執(zhí)行下述受限制的命令,請(qǐng)使用hash tag確保命令所要操作的key都分布在1個(gè)hash slot中
但是查看 asqnq 源碼: 以 enqueue 操作為例, lua 操作中的部分 key 無(wú)法通過(guò)外部添加 hash tag
// github.com/hibiken/asynq/internal/rdb/rdb.go
// enqueueCmd enqueues a given task message.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "pending",
"pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
最終, 通過(guò)使用線上另一臺(tái)主從版redis解決問(wèn)題
寫(xiě)在最后
到這里, 工作用Go: 異步任務(wù)怎么寫(xiě) 就暫時(shí)告一段落了, 這個(gè)過(guò)程中:
- 一些計(jì)算機(jī)基礎(chǔ)概念的理解: 同步與異步, 異步與任務(wù)編排, 協(xié)程與異步, 協(xié)程與生命周期
- 一些 Go 語(yǔ)言的基礎(chǔ)知識(shí)以及基礎(chǔ)不牢地動(dòng)山搖的坑: 野生Goroutine, panic&recover
- 可觀測(cè)的實(shí)踐之一: trace
- 專業(yè)的異步任務(wù)框架 Asynq 以及踩坑記
一起擁抱變化, 直面問(wèn)題和挑戰(zhàn), 不斷精進(jìn), 我們下個(gè)話題再見(jiàn)????.