異步異常與上下文
在Python黑魔法---上下文管理器最后關于上下文的使用,提到了tornado的處理方式。本篇就來一探究竟?;仡檰栴},異步函數(shù)執(zhí)行的時候,拋出的異常已經(jīng)和主函數(shù)的上下文不一致,為了解決這個問題,可以使用Python的上下文管理器進行wrapper。下面的代碼,就存在異步異常在主函數(shù)中無法捕獲的問題:
import tornado.ioloop
import tornado.stack_context
ioloop = tornado.ioloop.IOLoop.instance()
times = 0
def callback():
print 'run callback'
raise ValueError('except in callback')
def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=callback)
def main():
try:
async_task()
except Exception as e:
print 'main exception {}'.format(e)
print 'end'
運行上述代碼將會返回:
run async task 1
end
run callback
ERROR:root:Exception in callback <tornado.stack_context._StackContextWrapper object at 0x10306f890>
Traceback (most recent call last):
...
raise ValueError('except in callback')
ValueError: except in callback
async_task函數(shù)執(zhí)行的時候,在注冊了一個異步回調函數(shù)callback??墒窃赼sync_task的異常try邏輯中,callback拋出的異常無法正確的catch。也就是終端并沒有輸出main exception except in callback,而是僅僅輸出了except in callback的異常。
初次解決
因為主函數(shù)無法捕獲回調的異常,同時為了防止回調的異常蔓延到主函數(shù),一個簡單的思路就是在callback中進行try捕獲。修改代碼如下:
def callback():
print 'run callback'
try:
raise ValueError('except in callback')
except Exception as e:
print 'main exception {}'.format(e)
運行結果如下:
run async task 1
end
run callback
main exception except in callback
看起來不錯,在callback中寫入了main函數(shù)的捕獲邏輯。問題算是解決了。可是,這樣的做法相當丑陋。如果主函數(shù)里針對callback異常還有別的業(yè)務邏輯,那么這樣的寫法就很死,甚至無法完成接下來的邏輯。
包裹上下文
針對主函數(shù)無法catch,初次嘗試把catch移步到callback中。這樣的問題是涉及主函數(shù)邏輯會寫死。如果異步的try作為一個包裹,而不是語法修改,會不會更好呢?寫個 callback代碼如下:
def callback():
print 'run callback'
raise ValueError('except in callback')
def wrapper(func):
try:
func()
except Exception as e:
print 'main exception {}'.format(e)
def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=functools.partial(wrapper, callback))
def main():
wrapper(async_task)
運行之后,發(fā)現(xiàn)主函數(shù)可以catch callback中的異常了。這樣做的思路其實很簡單,因為callback會產生異常,并且這個異常需要蔓延傳播到主函數(shù),那么我們就挖一個坑,這個坑分別包裹callback和主函數(shù),因為坑都是一樣的,所有raise的異??梢远x在坑中。

靈活性變大了,當然,這樣做還是有限制,比如主函數(shù)需要另外一種坑,如果定義多個坑,那么還得修改 async_task中的wrapper,比較好的方式是在主函數(shù)可以動態(tài)的傳遞wrapper函數(shù)。這就涉及到全局變量??梢允褂萌值淖侄未鎯Χ鄠€不同的wrapper函數(shù)坑。
times = 0
GLOBAL_WRAPPERS = {}
def callback():
print 'run callback'
raise ValueError('except in callback')
def wrapper(func):
try:
func()
except Exception as e:
print 'wrapper exception {}'.format(e)
def other_wrapper(func):
try:
func()
except Exception as e:
print 'other_wrapper exception {}'.format(e)
def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=functools.partial(GLOBAL_WRAPPERS['context'], callback))
def main():
GLOBAL_WRAPPERS['context'] = wrapper
wrapper(async_task)
GLOBAL_WRAPPERS['context'] = other_wrapper
other_wrapper(async_task)
定義了一個全局變量,用于保存不同的函數(shù)坑,其實這個坑可以理解為函數(shù)執(zhí)行的上下文。變換不同的上下文,異步callback也會跟著進入對應的上下文。這種技巧,tornado的stack_context用到了極致,相當巧妙。
tornado stack_context 源碼
對于stack_context的分析,主要采用tornado2.0的代碼例子。tornado的源碼附帶的測試樣例非常棒,不過我們還是寫一個簡單的使用stack_context的代碼,然后再一步步看程序的執(zhí)行。
times = 0
def callback():
print 'Run callback'
raise ValueError('except in callback')
def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=callback)
@contextlib.contextmanager
def contextor():
print 'Enter contextor'
try:
yield
except Exception as e:
print 'Handler except'
print 'exception {}'.format(e)
finally:
print 'Release'
def main():
with tornado.stack_context.StackContext(contextor):
async_task()
print 'End'
運行結果如下:
Enter contextor
run async task 1
Release
End
Enter contextor
Run callback
Handler except
exception except in callback
Release
從輸出來看:
- 首先進入contextor上下文管理器上下文
- 執(zhí)行 async 函數(shù)
- 退出contextor上下文管理器上下文
- 再次進入contextor上下文管理器上下文
- 執(zhí)行異步的callback
- callback產生異常,執(zhí)行 contextor上下文管理器的異常處理代碼
- 再次退出contextor上下文管理器上下文
所有上述的步驟,正如前面的分析,無論是主函數(shù)還是異步回調函數(shù),都經(jīng)過了stack_context的包裹(挖的坑),實現(xiàn)了上下文切換執(zhí)行代碼。具體而言,在我們的代碼的with語句進行了一次包裹,ioloop.add_callback則進行了對回調的包裹。
創(chuàng)建Stack_context 上下文管理器
在main函數(shù)中,首先創(chuàng)建了Stack_context上下文管理器,然后通過with語句進入contextor上下文
def main():
stack_context = tornado.stack_context.StackContext(contextor)
with stack_context:
async_task()
print 'End'
在 stack_context.py 文件中,實例StackContext的時候,將上下文管理contextor注入其中,然后調用 with語句的時候,執(zhí)行StackContext的 __enter__方法:
class StackContext(object):
def __init__(self, context_factory):
# 將上下文管理函數(shù)傳到StackContext
self.context_factory = context_factory
def __enter__(self):
# 存儲舊的狀態(tài)上下文
self.old_contexts = _state.contexts
# _state.contexts 是一個元組的結果,為StackContext和上下文管理函數(shù) (class, arg) 這樣的結構,下面就是更新 _state.contexts
_state.contexts = (self.old_contexts +
((StackContext, self.context_factory),))
try:
# self.context_factory 是傳遞進來的上下文管理函數(shù)(contextor),通過調用self.context_factory創(chuàng)建上下文管理器。
self.context = self.context_factory()
# 調用上下文管理器的__enter__ 方法,進入contextor上下文環(huán)境
self.context.__enter__()
except Exception:
_state.contexts = self.old_contexts
raise
上述代碼注釋解釋了大部分邏輯,需要額外注意是這個 _state.context。它是一個python線程的全局變量(theading.local),其職能類似GLOBAL_WRAPPER用于保存不同的上下文。他的特點就是每個線程都能把自己的私有數(shù)據(jù)寫入,同時對于別的線程又是隔離不可見。一旦執(zhí)行了self.context.__enter__()代碼,函數(shù)控制上下文將會轉移到上下文管理器(contextor)的__enter__方法中:
def contextor():
# StackContext 中執(zhí)行 self.context = self.context_factory()將會轉移到此
print 'Enter contextor'
try:
yield
except Exception as e:
print 'Handler except'
print 'exception {}'.format(e)
finally:
print 'Release'
此時可以看到控制臺輸出 'Enter contextor'的輸出,同時被yield,函數(shù)控制權回到StackContext中的enter。
注冊回調函數(shù)
接下來,進入到with語句后,__enter__方法返回后,執(zhí)行async_task函數(shù),而async_task調用了ioloop.add_callback(callback=callback)。下面來看里面的代碼:
def add_callback(self, callback):
if not self._callbacks:
self._wake()
# 將callback傳遞給stack_context,返回一個_StackContextWrapper對象,該其中保存了callback和aysnc_task的上下文對象元組(StackContext, contextor)
self._callbacks.append(stack_context.wrap(callback))
add_callback 會針對管道進行一下處理,具體放到ioloop再討論,這里只需要了解callback又被stack_context包裹了,并且注冊到ioloop實例的_callbacks列表里。
下面在看看這個wrap干了什么事情:
def wrap(fn):
if fn is None or fn.__class__ is _StackContextWrapper:
return fn
def wrapped(callback, contexts, *args, **kwargs):
...
return _StackContextWrapper(wrapped, fn, _state.contexts)
首先判斷包裹的函數(shù)(callback)是否為None,并且他是否已經(jīng)被_StackContextWrapper包裹了,如果滿足上面的條件,就直接返回。否則則進行_StackContextWrapper包裹。_StackContextWrapper其實就是一個偏函數(shù)functools.partial。這里需要注意的是 wreapped函數(shù)(稍后會用到),fn(被包裹的callback),狀態(tài)上下文 _state.contexts。 _state.contexts就是之前 Stack_context.enter方法中創(chuàng)建的那個 (class,args) 元組。這樣的做法,就是為了后面包裹回調函數(shù)的上下文環(huán)境保存起來。此時的_state.contexts是一個 StackContext和contextor的元組對,將會在wrapper函數(shù)中進行再一次包裹:即StackContext(contextor)。
管理回調函數(shù)上下文
stack_context.wrap函數(shù)執(zhí)行返回后,將會退出包裹contextor的上下文,即調用StackContext的 __exit__方法:
def __exit__(self, type, value, traceback):
try:
return self.context.__exit__(type, value, traceback)
finally:
# 將全contextor的上下文出棧
_state.contexts = self.old_contexts
該__exit__中會執(zhí)行self.context的__exit__方法,即contextor中的finnaly,此時會打印出 Release。
@contextlib.contextmanager
def contextor():
print 'Enter contextor'
try:
yield
except Exception as e:
print 'Handler except'
print 'exception {}'.format(e)
finally:
print 'Release'
StackContext的finally還會把剛執(zhí)行完畢的全局上下文出棧, 即恢復到StackContext.wrapper(contextor)之前的上下文。
執(zhí)行callback
出現(xiàn)異常的邏輯在callback,到目前為止,還沒有執(zhí)行callback函數(shù)。從上面的經(jīng)驗可以看出,想要執(zhí)行callback,首先需要上下文管理器包裹一下callback,然后進入callback上下文,執(zhí)行callback,觸發(fā)異常,進入callback的exit上下文。當然,無論是之前的對contextor的wrapper還是接下來對callback的wrapper,都是用的同一個上下文管理器 contextor。
繼續(xù)代碼的執(zhí)行,將會運行到 ioloop.start方法
callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
然后是在_run_callback中執(zhí)行 callback()函數(shù)。
def _run_callback(self, callback):
try:
callback() # 此時成callback是一個被StackContext.wrap包裹的_StackContextWrappe對象。即可以通過contextor創(chuàng)建上下文環(huán)境,該上下文環(huán)境與async_task的一致
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handle_callback_exception(callback)
注意此時的callback,并不是定義的callback,而是經(jīng)過StackContext包裹的callback,具體在StackContext.wrap(callback)調用的時候,返回了偏函數(shù)的_StackContextWrapper 對象。因此調用_StackContextWrappe(),進入下面的StackContext.wrap函數(shù)的邏輯
def wrap(fn):
'''
if fn is None or fn.__class__ is _StackContextWrapper:
return fn
def wrapped(callback, contexts, *args, **kwargs):
# 判斷當前上下文(cls, args(contextor))是否在全局中保存。對于沒有嵌套的StackContext.wrap,此時的條件不成立。如果是嵌套包裹,此時就直接調用callback。
if contexts is _state.contexts or not contexts:
callback(*args, **kwargs)
return
# 將 StackContext和contextor進行包裹
if not _state.contexts:
new_contexts = [cls(arg) for (cls, arg) in contexts]
elif (len(_state.contexts) > len(contexts) or
any(a[1] is not b[1]
for a, b in itertools.izip(_state.contexts, contexts))):
# contexts have been removed or changed, so start over
new_contexts = ([NullContext()] +
[cls(arg) for (cls,arg) in contexts])
else:
new_contexts = [cls(arg)
for (cls, arg) in contexts[len(_state.contexts):]]
if len(new_contexts) > 1:
with _nested(*new_contexts):
callback(*args, **kwargs)
elif new_contexts:
# 再一次使用 StackContext包裹一個上下文處理器 contextor
with new_contexts[0]:
# 將callback在被StackContext包裹contextor執(zhí)行callback
callback(*args, **kwargs)
else:
callback(*args, **kwargs)
return _StackContextWrapper(wrapped, fn, _state.contexts)
上述代碼很多,其實目前只需要關注new_contexts = [cls(arg) for (cls, arg) in contexts]和with new_contexts[0]:callback(*args, **kwargs)兩個邏輯。
cls(arg)的做法,與main函數(shù)中的stack_context = tornado.stack_context.StackContext(contextor)。 一模一樣。創(chuàng)建一個創(chuàng)建Stack_context 上下文管理器。至于with new_contexts則與StackContext.wrapper(connextor)的效果一致。進入contextor上下文環(huán)境,然后執(zhí)行callback,此時進入上下文管理器的時候,也會打印 Enter contextor。然后就是真正的執(zhí)行callback回調函數(shù)。因為發(fā)生異常,就觸發(fā)了contextor的__exit__方法,然后執(zhí)行了print 'exception {}'.format(e)代碼,最后退出contextor上下文環(huán)境。完成callback的調用。
回顧
如果一步步debug,還是很容易弄清楚StackContext的原理,寫成文字,反而說不清?,F(xiàn)在我們再分析代碼輸出結果
1. Enter contextor
2. run async task 1
3. Release
4. End
5. Enter contextor
6. Run callback
7. Handler except
8. exception except in callback
9. Release
1 StackContext(contextor)實例化創(chuàng)建上下文管理器,然后通過with語句調用,進入了contextor的 __enter__方法所打印輸出
2 進入with上下文環(huán)境,調用 async_task輸出,同時ioloop注冊回調函數(shù)。通過stack_context.wrap(callback)注冊并保存與async_task上下文一樣的管理器,并使用_StackContext偏函數(shù)返回
3 退出with代碼塊,執(zhí)行contextor.exit方法輸出
4 主函數(shù)main繼續(xù)執(zhí)行打印
5 ioloop繼續(xù)執(zhí)行,調用callback回調,此時的callback是_StackContextWrapper對象,_StackContextWrapper調用 wrapper函數(shù)內邏輯,通過cls(args)創(chuàng)建一個新的上下文管理器,并通過with new_contexts[0]進入上下文管理器。
6 進入 callback函數(shù)執(zhí)行
7 產生異常,觸發(fā)新創(chuàng)建的上下文管理器的exit中的異常處理
8 輸出異常
9 執(zhí)行上下文管理器的finnaly分支,退出上下文管理器。
其中 2 步驟是處理上下文管理器的基礎,5則是aync_task和callback上下文管理器包裹同步的關鍵。
大概流程圖如下:

總而言之,async_task和callback的執(zhí)行上下文本來不一樣。為了解決問題,定義一個上下文管理器contextor。無論再調用async_task還是callback之前,先用StackContext管理contextor。初始執(zhí)行async_task和callback函數(shù)邏輯的時候,都在contextor上下文環(huán)境中,并且異常拋出也一樣。簡化為一下步驟為:
1 使用StackContext(contextor) 創(chuàng)建一個上下文管理器,并將上下文管理函數(shù)推入_state.contexts 棧中
2 執(zhí)行 async_task函數(shù),注冊callback回調:將_state.contexts棧中的上下文管理函數(shù)出棧,創(chuàng)建一個_StackContextWrapper 對象,該對象存儲了出棧的async_task上下文函數(shù)。此時ioloop注冊的callback為_StackContextWrapper對象。
3 ioloop調用callback,_StackContextWrapper中,將存儲的上下文函數(shù)創(chuàng)建一個與syanc_task 一樣的上下文管理器。在這個上下文環(huán)境中執(zhí)行callback函數(shù)
4 3步驟中也涉及了創(chuàng)建上下文管理器的_state.contexts入棧出棧操作,多嵌套的with則會操作對應的上下文函數(shù)。執(zhí)行完callback(或產生異常),執(zhí)行上下文管理器的exit方法。
4個步驟的關鍵在于通過_state.contexts棧的處理,將主函數(shù)上下文管理函數(shù)綁定給了callback。因此無論callback還是async_task的上下文,通過contextor管理器都變得一樣了。
contextor就像一個橋梁,連接著async_task和callback。而StackContext就像一個工程師,如何把函數(shù)和異步回調之間架設橋梁。
總結
本篇使用了大量的文字描述stack_contextor 的原理,其實還比不過打斷點執(zhí)行一遍。當然,對于多個嵌套的with,stack_context模塊同樣使用,其關鍵就在于_state.context是一個上下文管理器的棧,通過他的入棧和出棧可以輕松應對嵌套環(huán)境下的上下文環(huán)境。
下面是一段多嵌套的代碼和輸出結果,具體原理就不再分析了:
ioloop = tornado.ioloop.IOLoop.instance()
times = 0
def callback():
print 'Run callback'
raise ValueError('except in callback')
def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=callback)
@contextlib.contextmanager
def A():
print("Enter A context")
try:
yield
except Exception as e:
print("A catch the exception: %s" % e)
finally:
print("Exit A context")
@contextlib.contextmanager
def B():
print("Enter B context")
try:
yield
except Exception as e:
print("B catch the exception: %s" % e)
finally:
print("Exit B context")
def main():
with tornado.stack_context.StackContext(A):
with tornado.stack_context.StackContext(B):
async_task()
main()
ioloop.start()
輸入結果很明了:
Enter A context
Enter B context
run async task 1
Exit B context
Exit A context
Enter A context
Enter B context
Run callback
B catch the exception: except in callback
Exit B context
Exit A context
先進入A的上下文,再進入B中,然后運行函數(shù)注冊異步回調,退出B,再退出A。ioloop執(zhí)行異步函數(shù),再進入A,再進入B,運行回調,B發(fā)生異常,catch 捕獲,退出B,再退出A 。