深入理解Golang之context

為什么需要context

在并發(fā)程序中,由于超時、取消操作或者一些異常情況,往往需要進行搶占操作或者中斷后續(xù)操作。熟悉channel的應該都見過使用done channel來處理此類問題。比如以下這個例子:

func?main()?{????messages?:=?make(chan?int,?10)????

done?:=?make(chan?bool)????

defer?close(messages)???

?//?consumer????go?func()?{????????ticker?:=?time.NewTicker(1?*?time.Second)????????for?_?=?range?ticker.C?{????????????select?{????????????case?<-done:???????????????? ?

?fmt.Println("child?process?interrupt...")????????????????

return????????????

default:????????????????

fmt.Printf("send?message:?%d\n",?<-messages)????????????}????????}????}()??

??//?producer????for?i?:=?0;?i?<?10;?i++?{????????messages?<-?i????}???

?time.Sleep(5?*?time.Second)????

close(done)????time.Sleep(1?*?time.Second)????

fmt.Println("main?process?exit!")}


上述例子中定義了一個buffer為0的channel done, 子協程運行著定時任務。如果主協程需要在某個時刻發(fā)送消息通知子協程中斷任務退出,那么就可以讓子協程監(jiān)聽這個done channel,一旦主協程關閉done channel,那么子協程就可以推出了,這樣就實現了主協程通知子協程的需求。這很好,但是這也是有限的。

如果我們可以在簡單的通知上附加傳遞額外的信息來控制取消:為什么取消,或者有一個它必須要完成的最終期限,更或者有多個取消選項,我們需要根據額外的信息來判斷選擇執(zhí)行哪個取消選項。

考慮下面這種情況:假如主協程中有多個任務1, 2, …m,主協程對這些任務有超時控制;而其中任務1又有多個子任務1, 2, …n,任務1對這些子任務也有自己的超時控制,那么這些子任務既要感知主協程的取消信號,也需要感知任務1的取消信號。

如果還是使用done channel的用法,我們需要定義兩個done channel,子任務們需要同時監(jiān)聽這兩個done channel。嗯,這樣其實好像也還行哈。但是如果層級更深,如果這些子任務還有子任務,那么使用done channel的方式將會變得非常繁瑣且混亂。

我們需要一種優(yōu)雅的方案來實現這樣一種機制:

上層任務取消后,所有的下層任務都會被取消;

中間某一層的任務取消后,只會將當前任務的下層任務取消,而不會影響上層的任務以及同級任務。

這個時候context就派上用場了。我們首先看看context的結構設計和實現原理。

context是什么

context接口

先看Context接口結構,看起來非常簡單。

type?Context?interface?{??

??Deadline()?(deadline?time.Time,?ok?bool)??

??Done()?<-chan?struct{}???

?Err()?error????

Value(key?interface{})?interface{}}


Context接口包含四個方法:

Deadline返回綁定當前context的任務被取消的截止時間;如果沒有設定期限,將返回ok == false。

Done當綁定當前context的任務被取消時,將返回一個關閉的channel;如果當前context不會被取消,將返回nil。

Err如果Done返回的channel沒有關閉,將返回nil;如果Done返回的channel已經關閉,將返回非空的值表示任務結束的原因。如果是context被取消,Err將返回Canceled;如果是context超時,Err將返回DeadlineExceeded。

Value返回context存儲的鍵值對中當前key對應的值,如果沒有對應的key,則返回nil。

可以看到Done方法返回的channel正是用來傳遞結束信號以搶占并中斷當前任務;Deadline方法指示一段時間后當前goroutine是否會被取消;以及一個Err方法,來解釋goroutine被取消的原因;而Value則用于獲取特定于當前任務樹的額外信息。而context所包含的額外信息鍵值對是如何存儲的呢?其實可以想象一顆樹,樹的每個節(jié)點可能攜帶一組鍵值對,如果當前節(jié)點上無法找到key所對應的值,就會向上去父節(jié)點里找,直到根節(jié)點,具體后面會說到。

再來看看context包中的其他關鍵內容。

emptyCtx

emptyCtx是一個int類型的變量,但實現了context的接口。emptyCtx沒有超時時間,不能取消,也不能存儲任何額外信息,所以emptyCtx用來作為context樹的根節(jié)點。

//?An?emptyCtx?is?never?canceled,?has?no?values,?and?has?no?deadline.?It?is?not

//?struct{},?since?vars?of?this?type?must?have?distinct?addresses.

type?emptyCtx?intfunc?(*emptyCtx)?Deadline()?(deadline?time.Time,?ok?bool)?{????return}

func?(*emptyCtx)?Done()?<-chan?struct{}?{????return?nil}

func?(*emptyCtx)?Err()?error?{????return?nil}

func?(*emptyCtx)?Value(key?interface{})?interface{}?{????return?nil}

func?(e?*emptyCtx)?String()?string?{????switch?e?{????case?background:????????return?"context.Background"????case?todo:????????return?"context.TODO"????}???

?return?"unknown?empty?Context"}

var?(????background?=?new(emptyCtx)????todo???????=?new(emptyCtx))

func?Background()?Context?{????return?background}

func?TODO()?Context?{????return?todo}

一般不會直接使用emptyCtx,而是使用由emptyCtx實例化的兩個變量,分別可以通過調用Background和TODO方法得到,但這兩個context在實現上是一樣的。那么Background和TODO方法得到的context有什么區(qū)別呢?

Background和TODO只是用于不同場景下:

Background通常被用于主函數、初始化以及測試中,作為一個頂層的context,也就是說一般我們創(chuàng)建的context都是基于Background;而TODO是在不確定使用什么context的時候才會使用。

下面將介紹兩種不同功能的基礎context類型:valueCtx和cancelCtx。

valueCtx

valueCtx結構體

type?valueCtx?struct?{????Context????key,?val?interface{}}

func?(c?*valueCtx)?Value(key?interface{})?interface{}?{??

??if?c.key?==?key?{????????return?c.val????}???

?return?c.Context.Value(key)}

valueCtx利用一個Context類型的變量來表示父節(jié)點context,所以當前context繼承了父context的所有信息;valueCtx類型還攜帶一組鍵值對,也就是說這種context可以攜帶額外的信息。valueCtx實現了Value方法,用以在context鏈路上獲取key對應的值,如果當前context上不存在需要的key,會沿著context鏈向上尋找key對應的值,直到根節(jié)點。

WithValue

WithValue用以向context添加鍵值對:

func?WithValue(parent?Context,?key,?val?interface{})?Context?{??

??if?key?==?nil?{????????panic("nil?key")????}???

?if?!reflect.TypeOf(key).Comparable()?{????????panic("key?is?not?comparable")????}???

?return?&valueCtx{parent,?key,?val}}

這里添加鍵值對不是在原context結構體上直接添加,而是以此context作為父節(jié)點,重新創(chuàng)建一個新的valueCtx子節(jié)點,將鍵值對添加在子節(jié)點上,由此形成一條context鏈。獲取value的過程就是在這條context鏈上由尾部上前搜尋:


cancelCtx

cancelCtx結構體

type?cancelCtx?struct?{???

?Context????mu???????sync.Mutex????????????//?protects?following?fields????

?done?????chan?struct{}?????????//?created?lazily,?closed?by?first?cancel?call????

?children?map[canceler]struct{}?//?set?to?nil?by?the?first?cancel?call????

? err??????error?????????????????//?set?to?non-nil?by?the?first?cancel?call}

type?canceler?interface?{????cancel(removeFromParent?bool,?err?error)????

Done()?<-chan?struct{}

}

跟valueCtx類似,cancelCtx中也有一個context變量作為父節(jié)點;變量done表示一個channel,用來表示傳遞關閉信號;children表示一個map,存儲了當前context節(jié)點下的子節(jié)點;err用于存儲錯誤信息表示任務結束的原因。

再來看一下cancelCtx實現的方法:

func?(c?*cancelCtx)?Done()?<-chan?struct{}?{???

?c.mu.Lock()????

if?c.done?==?nil?{????????c.done?=?make(chan?struct{})???

?}????

d?:=?c.done???

?c.mu.Unlock()????

return?d}

func?(c?*cancelCtx)?Err()?error?{???

?c.mu.Lock()????err?:=?c.err????c.mu.Unlock()????return?err}

func?(c?*cancelCtx)?cancel(removeFromParent?bool,?err?error)?{???

?if?err?==?nil?{???????

?panic("context:?internal?error:?missing?cancel?error")????

}????

c.mu.Lock()????if?c.err?!=?nil?{???????

?c.mu.Unlock()????????return?//?already?canceled????

}????//?設置取消原因???

?c.err?=?err????設置一個關閉的channel或者將done?channel關閉,用以發(fā)送關閉信號???

?if?c.done?==?nil?{???????

?c.done?=?closedchan???

?}?else?{???

?????close(c.done)????}????

//?將子節(jié)點context依次取消????for?child?:=?range?c.children?{???

?????//?NOTE:?acquiring?the?child's?lock

?while?holding?parent's?lock.???????

?child.cancel(false,?err)????}????

c.children?=?nil????

c.mu.Unlock()???

?if?removeFromParent?{????????//?將當前context節(jié)點從父節(jié)點上移除???????

?removeChild(c.Context,?c)????}

}

可以發(fā)現cancelCtx類型變量其實也是canceler類型,因為cancelCtx實現了canceler接口。

Done方法和Err方法沒必要說了,cancelCtx類型的context在調用cancel方法時會設置取消原因,將done channel設置為一個關閉channel或者關閉channel,然后將子節(jié)點context依次取消,如果有需要還會將當前節(jié)點從父節(jié)點上移除。

WithCancel

WithCancel函數用來創(chuàng)建一個可取消的context,即cancelCtx類型的context。WithCancel返回一個context和一個CancelFunc,調用CancelFunc即可觸發(fā)cancel操作。直接看源碼:

type?CancelFunc?func()

func?WithCancel(parent?Context)?(ctx?Context,?cancel?CancelFunc)?{??

??c?:=?newCancelCtx(parent)????propagateCancel(parent,?&c)????

return?&c,?func()?{?c.cancel(true,?Canceled)?}

}

//?newCancelCtx?returns?an?initialized?cancelCtx.func?newCancelCtx(parent?Context)?cancelCtx?{???

?//?將parent作為父節(jié)點context生成一個新的子節(jié)點????

return?cancelCtx{Context:?parent}}

func?propagateCancel(parent?Context,?child?canceler)?{????if?parent.Done()?==?nil?{????????

//?parent.Done()返回nil表明父節(jié)點以上的路徑上沒有可取消的context????????return?//?parent?is?never?canceled????}????//?獲取最近的類型為cancelCtx的祖先節(jié)點???

?if?p,?ok?:=?parentCancelCtx(parent);?ok?{????????p.mu.Lock()????????if?p.err?!=?nil?{???????????

?//?parent?has?already?been?canceled????????????child.cancel(false,?p.err)???????

?}?else?{???????????

?if?p.children?==?nil?{????????????????p.children?=?make(map[canceler]struct{})????????????}???????????

?//?將當前子節(jié)點加入最近cancelCtx祖先節(jié)點的children中???????????

?p.children[child]?=?struct{}{}???????

?}????????

p.mu.Unlock()???

?}?else?{????????go?func()?{???????????

?select?{????????????

case?<-parent.Done():???????????????

? ? ?????child.cancel(false,?parent.Err())???????????

?case?<-child.Done():????????????}????????}()????}

}

func?parentCancelCtx(parent?Context)?(*cancelCtx,?bool)?

{????for?{????????switch?c?:=?parent:(type)?{

????????case?*cancelCtx:????????????

????????return?c,?true????????

????????case?*timerCtx:????????????

????????return?&c.cancelCtx,?true? ? ? ? ? ? ? ? ????????case?*valueCtx:????????????

? ? ? ? ?parent?=?c.Context????????default:????????????

????????return?nil,?false????????}???

?}

}

之前說到cancelCtx取消時,會將后代節(jié)點中所有的cancelCtx都取消,propagateCancel即用來建立當前節(jié)點與祖先節(jié)點這個取消關聯邏輯。

如果parent.Done()返回nil,表明父節(jié)點以上的路徑上沒有可取消的context,不需要處理;

如果在context鏈上找到到cancelCtx類型的祖先節(jié)點,則判斷這個祖先節(jié)點是否已經取消,如果已經取消就取消當前節(jié)點;否則將當前節(jié)點加入到祖先節(jié)點的children列表。

否則開啟一個協程,監(jiān)聽parent.Done()和child.Done(),一旦parent.Done()返回的channel關閉,即context鏈中某個祖先節(jié)點context被取消,則將當前context也取消。

這里或許有個疑問,為什么是祖先節(jié)點而不是父節(jié)點?這是因為當前context鏈可能是這樣的:


當前cancelCtx的父節(jié)點context并不是一個可取消的context,也就沒法記錄children。

timerCtx

timerCtx是一種基于cancelCtx的context類型,從字面上就能看出,這是一種可以定時取消的context。

type?timerCtx?struct?{????

????cancelCtx????timer?*time.Timer?//?Under?cancelCtx.mu.? ? ? ? ?????deadline?time.Time}

????func?(c?*timerCtx)?Deadline()?(deadline?time.Time,?ok?bool)?{?

???return?c.deadline,?true}

func?(c?*timerCtx)?cancel(removeFromParent?bool,?err?error)?{????將內部的cancelCtx取消????

c.cancelCtx.cancel(false,?err)????

if?removeFromParent?{????????????

//?Remove?this?timerCtx?from?its?parent?cancelCtx's?children.????????removeChild(c.cancelCtx.Context,?c)????}????

c.mu.Lock()????if?c.timer?!=?nil?{???????

// 取消計時器????????

c.timer.Stop()???????

?c.timer?=?nil????

}???

?c.mu.Unlock()

}

timerCtx內部使用cancelCtx實現取消,另外使用定時器timer和過期時間deadline實現定時取消的功能。timerCtx在調用cancel方法,會先將內部的cancelCtx取消,如果需要則將自己從cancelCtx祖先節(jié)點上移除,最后取消計時器。

WithDeadline

WithDeadline返回一個基于parent的可取消的context,并且其過期時間deadline不晚于所設置時間d。

func?WithDeadline(parent?Context,?d?time.Time)?(Context,?CancelFunc)?{???

?if?cur,?ok?:=?parent.Deadline();?ok?&&?cur.Before(d)?{???????

?//?The?current?deadline?is?already?sooner?than?the?new?one.????????return?WithCancel(parent)????}????

c?:=?&timerCtx{????????cancelCtx:?newCancelCtx(parent),????????deadline:??d,????}????

//?建立新建context與可取消context祖先節(jié)點的取消關聯關系????propagateCancel(parent,?c)???

?????????dur?:=?time.Until(d)????

? ? ? ? ? ?if?dur?<=?0?{????????c.cancel(true,?DeadlineExceeded)?//?deadline?has?already?passed????????

return?c,?

func()?{?c.cancel(false,?Canceled)?}???

?}???

?c.mu.Lock()????

defer?c.mu.Unlock()????if?c.err?==?nil?{????????

c.timer?=?time.AfterFunc(dur,?func()?{????????????c.cancel(true,?DeadlineExceeded)????????})????}????

return?c,?func()?{?c.cancel(true,?Canceled)?}

}

如果父節(jié)點parent有過期時間并且過期時間早于給定時間d,那么新建的子節(jié)點context無需設置過期時間,使用WithCancel創(chuàng)建一個可取消的context即可;

否則,就要利用parent和過期時間d創(chuàng)建一個定時取消的timerCtx,并建立新建context與可取消context祖先節(jié)點的取消關聯關系,接下來判斷當前時間距離過期時間d的時長dur:

如果dur小于0,即當前已經過了過期時間,則直接取消新建的timerCtx,原因為DeadlineExceeded;

否則,為新建的timerCtx設置定時器,一旦到達過期時間即取消當前timerCtx。

WithTimeout

與WithDeadline類似,WithTimeout也是創(chuàng)建一個定時取消的context,只不過WithDeadline是接收一個過期時間點,而WithTimeout接收一個相對當前時間的過期時長timeout:

func?WithTimeout(parent?Context,?timeout?time.Duration)?(Context,?CancelFunc)?{???

?return?WithDeadline(parent,?time.Now().Add(timeout))

}

context的使用

首先使用context實現文章開頭done channel的例子來示范一下如何更優(yōu)雅實現協程間取消信號的同步:

func?main()?{????

messages?:=?make(chan?int,?10)????//?producer????

for?i?:=?0;?i?<?10;?i++?{????????messages?<-?i????}???

?ctx,?cancel?:=?context.WithTimeout(context.Background(),?5*time.Second)????

//?consumer????

go?func(ctx?context.Context)?{????????ticker?:=?time.NewTicker(1?*?time.Second)????????

for?_?=?range?ticker.C?{????????????select?{????????????case?<-ctx.Done():????????????????fmt.Println("child?process?interrupt...")????????????????return????????????default:???????????????

?fmt.Printf("send?message:?%d\n",?<-messages)????????????}???????

?}???

?}(ctx)????

defer?close(messages)????

defer?cancel()????select?{????

case?<-ctx.Done():????????

time.Sleep(1?*?time.Second)????????fmt.Println("main?process?exit!")????}

}

這個例子中,只要讓子線程監(jiān)聽主線程傳入的ctx,一旦ctx.Done()返回空channel,子線程即可取消執(zhí)行任務。但這個例子還無法展現context的傳遞取消信息的強大優(yōu)勢。

閱讀過net/http包源碼的朋友可能注意到在實現http server時就用到了context, 下面簡單分析一下。

1、首先Server在開啟服務時會創(chuàng)建一個valueCtx,存儲了server的相關信息,之后每建立一條連接就會開啟一個協程,并攜帶此valueCtx。

func?(srv?*Server)?Serve(l?net.Listener)?error?{?

???...????

var?tempDelay?time.Duration?????//?how?long?to?sleep?on?accept?failure????

baseCtx?:=?context.Background()?//?base?is?always?background,?per?Issue?16220????

ctx?:=?context.WithValue(baseCtx,?ServerContextKey,?srv)????for?{???????

?rw,?e?:=?l.Accept()????????...???????

?tempDelay?=?0????????

c?:=?srv.newConn(rw)???????

?c.setState(c.rwc,?StateNew)?//?before?Serve?can?return????????

go?c.serve(ctx)????}

}

2、建立連接之后會基于傳入的context創(chuàng)建一個valueCtx用于存儲本地地址信息,之后在此基礎上又創(chuàng)建了一個cancelCtx,然后開始從當前連接中讀取網絡請求,每當讀取到一個請求則會將該cancelCtx傳入,用以傳遞取消信號。一旦連接斷開,即可發(fā)送取消信號,取消所有進行中的網絡請求。

func?(c?*conn)?serve(ctx?context.Context)?{???

?c.remoteAddr?=?c.rwc.RemoteAddr().String()????

ctx?=?context.WithValue(ctx,?LocalAddrContextKey,?c.rwc.LocalAddr())???

?...???

?ctx,cancelCtx?:=?context.WithCancel(ctx)????

c.cancelCtx?=?cancelCtx????defer?cancelCtx()???

?...????

for?{????????w,?err?:=?c.readRequest(ctx)???????

?...???????

?serverHandler{c.server}.ServeHTTP(w,?w.req)???????

?...????}

}

3、讀取到請求之后,會再次基于傳入的context創(chuàng)建新的cancelCtx,并設置到當前請求對象req上,同時生成的response對象中cancelCtx保存了當前context取消方法。

func?(c?*conn)?readRequest(ctx?context.Context)?(w?*response,?err?error)?{??

??...???

?req,?err?:=?readRequest(c.bufr,?keepHostHeader)????

...????ctx,?cancelCtx?:=?context.WithCancel(ctx)????

req.ctx?=?ctx????

...????

w?=?&response{????????

conn:??????????c,????????

cancelCtx:?????cancelCtx,???????

?req:???????????req,???????

?reqBody:???????req.Body,???????

?handlerHeader:?make(Header),????????

contentLength:?-1,????????

closeNotifyCh:?make(chan?bool,?1),????????

//?We?populate?these?ahead?of?time?so?we're?not????????

//?reading?from?req.Header?after?their?Handler?starts????????

//?and?maybe?mutates?it?(Issue?14940)????????

wants10KeepAlive:?req.wantsHttp10KeepAlive(),???????

?wantsClose:???????req.wantsClose(),????}???

?...???

?return?w,?nil}

這樣處理的目的主要有以下幾點:

一旦請求超時,即可中斷當前請求;

在處理構建response過程中如果發(fā)生錯誤,可直接調用response對象的cancelCtx方法結束當前請求;

在處理構建response完成之后,調用response對象的cancelCtx方法結束當前請求。

在整個server處理流程中,使用了一條context鏈貫穿Server、Connection、Request,不僅將上游的信息共享給下游任務,同時實現了上游可發(fā)送取消信號取消所有下游任務,而下游任務自行取消不會影響上游任務。

總結

context主要用于父子任務之間的同步取消信號,本質上是一種協程調度的方式。另外在使用context時有兩點值得注意:上游任務僅僅使用context通知下游任務不再需要,但不會直接干涉和中斷下游任務的執(zhí)行,由下游任務自行決定后續(xù)的處理操作,也就是說context的取消操作是無侵入的;context是線程安全的,因為context本身是不可變的(immutable),因此可以放心地在多個協程中傳遞使用。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 前言 首先解答上一篇文章一文帶你快速入門context中留下的疑惑,為什么要defer cancelFunc()?...
    滅BUG閱讀 663評論 0 0
  • 起因 最近學習golang框架的時候發(fā)現許多地方都用到了context的概念,比如grpc請求 etcd訪問等許...
    Kathent閱讀 1,731評論 0 3
  • context包專門用來簡化處理單個請求的多個goroutine之間與請求域的數據、取消信號、截止時間等相關操作。...
    wz998閱讀 3,949評論 0 3
  • Context 通常被譯作上下文,一般理解為程序單元的一個運行狀態(tài)、現場、快照,而翻譯中上下文又很好地詮釋了它的本...
    Asphalt7閱讀 658評論 0 0
  • 引言 context 是 Go 中廣泛使用的程序包,由 Google 官方開發(fā),在 1.7 版本引入。它用來簡化在...
    51reboot閱讀 3,628評論 0 10

友情鏈接更多精彩內容