協(xié)程的定義
函數(shù)
入口:有且只有一個(gè)入口
出口:有且只有一個(gè)出口協(xié)程
入口:多個(gè)入口
出口:多個(gè)出口
特點(diǎn):暫定,保留執(zhí)行狀態(tài),恢復(fù)執(zhí)行
協(xié)程是函數(shù)基礎(chǔ)上,一種更加寬泛定義的計(jì)算機(jī)程序模塊,它可以有多個(gè)入口點(diǎn),允許從一個(gè)入口點(diǎn)到下個(gè)入口點(diǎn)之前暫停,保留執(zhí)行狀態(tài),等待合適的時(shí)機(jī)恢復(fù)執(zhí)行狀態(tài),從下一個(gè)入口點(diǎn)重新開始性質(zhì)
協(xié)程代碼塊
從一個(gè)入口點(diǎn)到下一個(gè)入口點(diǎn)的代碼協(xié)程模塊
有n個(gè)入口代碼,和n個(gè)協(xié)程代碼塊組成。其組織形式為:函數(shù)入口-》協(xié)程代碼塊-》入口點(diǎn)-》協(xié)程代碼塊...
可迭代對(duì)象,迭代器,生成器
可迭代對(duì)象(Iterable)
python中可迭代類型如字符串,列表,元組,字典等:
str = 'abc'
list = ['a', 'b', 'c']
set = ('a', 'b', 'c')
dict = {'a':1, 'b':2}
他們的特點(diǎn)是dir(object)實(shí)現(xiàn)了__iter__()函數(shù)
迭代器(Iterator)
迭代器的要求是dir(object)實(shí)現(xiàn)了__iter__()和__next__()兩個(gè)方法
生成器(generator)
生成器是特殊的迭代器,在迭代器的基礎(chǔ)上包含yield關(guān)鍵字
總結(jié)
- 迭代器必須實(shí)現(xiàn)迭代器協(xié)議
__iter__和__next__ -
__iter__返回的對(duì)象是可迭代對(duì)象 - 迭代器一定是可迭代對(duì)象,但可迭代對(duì)象不一定是迭代器,有可能迭代細(xì)節(jié)交給另一個(gè)類,這個(gè)類才是迭代器
- 生成器一定是一個(gè)迭代器,同時(shí)也是迭代對(duì)象
- 生成器是一種特殊的迭代器,包含關(guān)鍵字
yield來(lái)實(shí)現(xiàn)懶惰計(jì)算,并是的外部影響生成器的執(zhí)行成為可能
生成器和yield語(yǔ)義
生成器的定義:提供一種函數(shù),能夠返回中間結(jié)果給調(diào)用者,然后維護(hù)函數(shù)的局部狀態(tài),以便當(dāng)函數(shù)離開后,也能恢復(fù)執(zhí)行。
生成器是一個(gè)包含關(guān)鍵字yield表達(dá)式的函數(shù)。一個(gè)生成器是異步的,即生成器模塊中含有阻塞代碼,生成器的類型是types.GeneratorType。
#生成器
def sum(total):
total = 0
while True:
a = yield total
total += a
if __name__ == '__main__':
s = sum(10) //獲得生成器,但不執(zhí)行生成器的代碼
print next(s) //運(yùn)行生成器的代碼,執(zhí)行yield total處,輸出0
print s.send(10) //運(yùn)行生成器的代碼,第二次執(zhí)行到y(tǒng)ield total處,輸出10
print s.send(2) //運(yùn)行生成器的代碼,第三次執(zhí)行到y(tǒng)ield total處,輸出12
對(duì)上訴代碼進(jìn)行說(shuō)明:
- sum是一個(gè)生成器,包含關(guān)鍵字
yield - sum生成器
a=yield從send(value)獲取外部參數(shù) - sum生成器
yield total將total值傳出 - next(s)初始化生成器,并block在第一個(gè)yield中
- s.send(v)喚醒生成器,并將值傳入
Generator已經(jīng)具備協(xié)程的能力,如能夠暫停,保存狀態(tài),傳出值;恢復(fù)執(zhí)行,接受參數(shù),異步執(zhí)行。
但此時(shí)Generator還不是一個(gè)協(xié)程。一個(gè)真正的協(xié)程能夠控制代碼什么時(shí)候繼續(xù)執(zhí)行。而此時(shí)的Generator執(zhí)行遇到一個(gè)yield還是把執(zhí)行控制權(quán)轉(zhuǎn)移給調(diào)用者
Future
Future顧名思義未來(lái),將在未來(lái)被執(zhí)行的代碼。Future的類包含result,done, callback屬性,以及_set_result, _set_done方法,通過(guò)這些屬性和方法來(lái)實(shí)現(xiàn)當(dāng)future被賦值后,能夠執(zhí)行對(duì)應(yīng)的callback函數(shù),具體代碼如下:
class Future(object):
def done(self):
return self._done
def result(self, timeout=None):
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
self._result = result
self._set_done()
def _set_done(self):
self._done = True
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception('exception calling callback %r for %r',
cb, self)
self._callbacks = None
Future說(shuō)明
1.Future對(duì)象通過(guò)add_done_callback將回調(diào)函數(shù)與future對(duì)象進(jìn)行綁定,目的是future被賦值后調(diào)用callback中的回調(diào)函數(shù),從而實(shí)現(xiàn)激活生成器(回調(diào)函數(shù)調(diào)用gen.send(value))
2.def set_result(self, result):對(duì)Future對(duì)象賦值,將異常處理結(jié)果賦值給Future對(duì)象的result屬性,同時(shí)執(zhí)行_set_done遍歷callback的函數(shù)
3.def _set_done(self):遍歷callback的函數(shù)列表,并執(zhí)行
總結(jié)來(lái)說(shuō),future對(duì)象綁定一個(gè)回調(diào)函數(shù)(回到函數(shù)會(huì)激活生成器),當(dāng)future對(duì)象被賦值,會(huì)回調(diào)回調(diào)函數(shù),進(jìn)而回調(diào)函數(shù)會(huì)激活生成器(gen.send())
IOLoop類
IOLoop在協(xié)程運(yùn)行環(huán)境中擔(dān)任著協(xié)程調(diào)度的角色,消息循環(huán)本質(zhì)上是一種事件循環(huán),等待事件,然后運(yùn)行對(duì)應(yīng)事件的處理器。IOLoop主要調(diào)度處理的是IO事件(讀,寫,錯(cuò)誤)以及callback和timeout。
IOLoop中注冊(cè)future:add_future
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(lambda future: self.add_callback(callback, future))
IOLoop將callback和future作為參數(shù)傳入IOLoop的add_callback,封裝成匿名函數(shù),并存儲(chǔ)到future的callback屬性中,當(dāng)future被設(shè)置后匿名函數(shù)則會(huì)在下一個(gè)IOLoop中運(yùn)行
- 1.
lambda future: self.add_callback(callback, future),將callback,future作為參數(shù)傳入IOLoop的add_callback方法,實(shí)則利用偏函數(shù)將callback和future進(jìn)行封包裝,包裝好的偏函數(shù)加入到IOLoop的回調(diào)函數(shù)列表中。當(dāng)IOLoop下一次迭代運(yùn)行,遍歷回調(diào)函數(shù)并執(zhí)行 - 匿名函數(shù)加入到future的callback屬性中,當(dāng)future對(duì)象被設(shè)置后,會(huì)調(diào)用回調(diào)函數(shù)
Coroutine函數(shù)裝飾器
函數(shù)裝飾器本質(zhì)是一個(gè)函數(shù),它將被調(diào)用的函數(shù)func作為參數(shù),返回一個(gè)新的函數(shù)make_coroutine_wrapper。
def coroutine(func, replace_callback=True):
return make_coroutine_wrapper(func, replace_callback=True)

如上圖:
-
result = func(*arg, **kwargs)獲取函數(shù)對(duì)象,
-
-
if isinstance(result, types.GeneratorType)判斷result是否是生成器
-
-
yielded = next(result)因?yàn)閞esult是生成器,當(dāng)調(diào)用next會(huì)執(zhí)行到生成器第一個(gè)yield代碼,并返回一個(gè)future對(duì)象(異步操作)賦值給yield
-
-
Runner(result, future, yielded),將生成器對(duì)象, yielded(future對(duì)象),future(監(jiān)控future)傳入函數(shù)Runner
-
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
self.run()
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
try:
value = future.result()
except Exception:
self.had_exception = True
yielded = self.gen.throw(*sys.exc_info())
else:
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
return
if not self.handle_yield(yielded):
return
finally:
self.running = False
def handle_yield(self, yielded):
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
Runner函數(shù)內(nèi)部處理就是將給yeilded(future對(duì)象)綁定一個(gè)回調(diào)函數(shù),當(dāng)future對(duì)象被set_result后,調(diào)用run;而run函數(shù)內(nèi)部會(huì)調(diào)用生成器的send方法,并將vaule傳入到生成器。
def handle_yield(self, yielded):
- 1.將yielded轉(zhuǎn)化為future對(duì)象
- 2.調(diào)用IOLoop的
add_future函數(shù),將run函數(shù)添加到future的callback屬性中 - 3.當(dāng)future對(duì)象在某處代碼中被set_result,IOLoop下一個(gè)循環(huán)中便執(zhí)行run函數(shù)
- 4.run函數(shù)會(huì)取出future的result,并調(diào)用gen.send(value)啟動(dòng)生成器,并將生成器的輸出在賦值給yielded
- 5.重新賦值的yielded在作為參數(shù)傳入
handle_yield(self, yielded),循環(huán)步驟1,指導(dǎo)生成器結(jié)束
參考:
https://juejin.im/post/5ccafbf5e51d453a3a0acb42
https://blog.csdn.net/wyx819/article/details/45420017