本節(jié)介紹 asyncio 剩余的一些常用操作:事件循環(huán)實(shí)現(xiàn)無(wú)限循環(huán)任務(wù),在事件循環(huán)中執(zhí)行普通函數(shù)以及協(xié)程鎖。
一. 無(wú)限循環(huán)任務(wù)
事件循環(huán)的 run_until_complete 方法運(yùn)行事件循環(huán)時(shí),當(dāng)其中的全部任務(wù)完成后,會(huì)自動(dòng)停止循環(huán)。若想無(wú)限運(yùn)行事件循環(huán),可使用 asyncio 提供的 run_forever 方法:
import asyncio
import time
from datetime import datetime
async def work(loop, t):
print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] start')
await asyncio.sleep(t) # 模擬IO操作
print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] finished')
loop.stop() # 停止事件循環(huán),stop后仍可重新運(yùn)行
if __name__ == '__main__':
loop = asyncio.get_event_loop() # 創(chuàng)建任務(wù),該任務(wù)會(huì)自動(dòng)加入事件循環(huán)
task = asyncio.ensure_future(work(loop, 1))
print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
loop.run_forever() # 無(wú)限運(yùn)行事件循環(huán),直至loop.stop停止
print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
loop.close() # 關(guān)閉事件循環(huán),只有l(wèi)oop處于停止?fàn)顟B(tài)才會(huì)執(zhí)行
運(yùn)行結(jié)果:使用 loop.run_forever() 啟動(dòng)無(wú)限循環(huán)時(shí),task 實(shí)例會(huì)自動(dòng)加入事件循環(huán)。如果注釋掉 loop.stop() 方法,則 loop.run_forever() 之后的代碼永遠(yuǎn)不會(huì)被執(zhí)行,因?yàn)?loop.run_forever() 是個(gè)無(wú)限循環(huán)。

以上是單任務(wù)事件循環(huán),將 loop 作為參數(shù)傳入?yún)f(xié)程函數(shù)創(chuàng)建協(xié)程,在協(xié)程內(nèi)部執(zhí)行 loop.stop 方法停止事件循環(huán)。
下面的例子是多任務(wù)事件循環(huán),使用回調(diào)函數(shù)執(zhí)行 loop.stop() 停止事件循環(huán):
import time
import asyncio
import functools
from datetime import datetime
def loop_stop(loop, future): # 最后一個(gè)參數(shù)必須為future或task
print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[callback] stop loop by callback.')
loop.stop()
async def work(t):
print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine start.')
await asyncio.sleep(t)
print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine end.')
def main():
loop = asyncio.get_event_loop()
tasks = asyncio.gather(work(3), work(1))
tasks.add_done_callback(functools.partial(loop_stop, loop))
loop.run_forever()
loop.close()
if __name__ == '__main__':
start = time.time()
main()
end = time.time()
print(f'耗時(shí):{end - start}')
運(yùn)行結(jié)果:asyncio.gather 創(chuàng)建的搜集器,參數(shù)為任意數(shù)量的協(xié)程,任務(wù)搜集器本身也是 task / future 對(duì)象。
任務(wù)搜集器的 add_done_callback 方法用來(lái)添加回調(diào)函數(shù),該函數(shù)只在事件循環(huán)中所有的任務(wù)都完成后運(yùn)行一次。
注意:
add_done_callback的參數(shù)為回調(diào)函數(shù),當(dāng)回調(diào)函數(shù)定義了除future參數(shù)之外的任何參數(shù)后,必須使用偏函數(shù)。此處,使用functools.partial方法創(chuàng)建偏函數(shù)以便將loop作為參數(shù)加入回調(diào)函數(shù)。

loop.run_until_complete 方法本身也是調(diào)用 loop.run_forever 方法,然后通過(guò)回調(diào)函數(shù)調(diào)用 loop.stop 來(lái)終止事件循環(huán)。
二. 在事件循環(huán)中加入普函數(shù)
2.1 加入普通函數(shù),并立即排定執(zhí)行順序
事件循環(huán)的 call_soon 方法可以將普通函數(shù)作為任務(wù)加入到事件循環(huán),并立即排定任務(wù)的執(zhí)行順序:
import asyncio
def func(name): # 普通函數(shù)
print(f'[func] hello, {name}')
async def work(t, name): # 協(xié)程函數(shù)
print(f'[work] {name} start.')
await asyncio.sleep(t)
print(f'[work] {name} finished.')
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(work(3, 'A'))
loop.call_soon(func, 'word')
loop.create_task(work(2, 'B'))
loop.run_until_complete(work(3, 'C'))
if __name__ == '__main__':
main()
運(yùn)行結(jié)果:loop.call_soon 將普通函數(shù)當(dāng)作 task 加入到事件循環(huán)并排定執(zhí)行順序,該方法的第一個(gè)參數(shù)為普通函數(shù)的名字,普通函數(shù)的參數(shù)寫(xiě)在后面。
loop.run_until_complete(work(3, 'C')) 阻塞啟動(dòng)事件循環(huán),而且又添加了一個(gè)任務(wù)。

2.2 加入普通函數(shù),并在稍后執(zhí)行
loop.call_later 方法同 loop.call_soon 一樣,可將普通函數(shù)作為任務(wù)放到事件循環(huán)里。不同之處在于,call_later 可設(shè)置延遲執(zhí)行,第一個(gè)參數(shù)為延遲時(shí)間:
import asyncio
def func(name): # 普通函數(shù)
print(f'[func] hello, {name}')
async def work(t, name): # 協(xié)程函數(shù)
print(f'[work] {name} start.')
await asyncio.sleep(t)
print(f'[work] {name} finished.')
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(work(4, 'A'))
loop.call_later(1, func, 'word1')
loop.call_soon(func, 'word2')
loop.create_task(work(2, 'B'))
loop.call_later(3, func, 'word3')
loop.run_until_complete(work(2, 'C'))
if __name__ == '__main__':
main()
運(yùn)行結(jié)果:事件循環(huán)運(yùn)行到 work(2, 'C') 完成時(shí)終止循環(huán),此時(shí)輸出 hello, word3 的普通函數(shù)尚未執(zhí)行,協(xié)程任務(wù) A 仍處于暫停狀態(tài)。

2.3 其它常用方法
-
call_soon立即執(zhí)行 -
call_later延遲執(zhí)行 -
call_at在某時(shí)刻執(zhí)行 -
loop.time是事件循環(huán)內(nèi)部的一個(gè)即時(shí)方法,返回值是時(shí)刻,數(shù)據(jù)類(lèi)型為float
將上例中的 call_later 使用 loop.time + call_at 實(shí)現(xiàn):
def main():
loop = asyncio.get_event_loop()
start = loop.time() # 時(shí)間循環(huán)內(nèi)部時(shí)刻
asyncio.ensure_future(work(4, 'A'))
# loop.call_later(1, func, 'word1')
# 上面注釋這行等同于下面這行
loop.call_at(start+1, func, 'word1')
loop.call_soon(func, 'word2')
loop.create_task(work(2, 'B'))
# loop.call_later(3, func, 'word3')
loop.call_at(start+3, func, 'word3')
loop.run_until_complete(work(2, 'C'))
運(yùn)行結(jié)果與 2.2 中的示例一致,不再贅述。這三個(gè) call_xxx 方法的作用都是將函數(shù)作為任務(wù)排定到事件循環(huán)中,返回值都是 asyncio.events.TimerHandle 實(shí)例,注意它們不是協(xié)程任務(wù),不能作為 loop.run_until_complete 的參數(shù)。
三. 協(xié)程鎖
asyncio.lock 從字面意思來(lái)講,應(yīng)該被稱(chēng)為異步 IO 鎖,之所以叫協(xié)程鎖,是因?yàn)樗ǔ?xiě)在子協(xié)程中,用來(lái)將協(xié)程內(nèi)部的一段代碼鎖住,直到這段代碼運(yùn)行完畢解鎖。
協(xié)程鎖的固定用法是使用 async with 創(chuàng)建協(xié)程上下文環(huán)境,把需要加鎖的代碼寫(xiě)入其中。
注:
with是普通上下文管理器關(guān)鍵字,async with是異步上下文管理器關(guān)鍵字。能夠使用with關(guān)鍵字的對(duì)象須有__enter__和__exit__方法,而能夠使用async with關(guān)鍵字的對(duì)象須有
__aenter__和__aexit__方法。
async with 會(huì)自動(dòng)運(yùn)行 lock 的 __aenter__ 方法,該方法會(huì)調(diào)用
acquire 方法上鎖;在語(yǔ)句塊結(jié)束時(shí)自動(dòng)運(yùn)行 __aexit__ 方法,該方法會(huì)調(diào)用 release 方法解鎖。這和 with 一樣,都是簡(jiǎn)化 try ... finally 語(yǔ)句。
import asyncio
l = []
lock = asyncio.Lock() # 協(xié)程鎖
async def coro_work(name):
print(f'coroutine {name} start.')
async with lock:
print(f'{name} run with lock start.')
if 'hi' in l:
return name
await asyncio.sleep(2)
l.append('hi')
print(f'{name} release lock.')
return name
async def one():
name = await coro_work('ONE')
print(f'{name} finished.')
async def two():
name = await coro_work('TWO')
print(f'{name} finished')
def main():
loop = asyncio.get_event_loop()
tasks = asyncio.wait([one(), two()])
loop.run_until_complete(tasks)
print(l)
if __name__ == '__main__':
main()
運(yùn)行結(jié)果: 當(dāng)協(xié)程 ONE 運(yùn)行到 await asyncio.sleep(2) 處時(shí),將讓步 CPU 的使用權(quán),協(xié)程 TWO 開(kāi)始執(zhí)行,但執(zhí)行到 async with lock 時(shí),會(huì)阻塞,因?yàn)?ONE 還沒(méi)有釋放協(xié)程鎖,此刻線程進(jìn)入阻塞狀態(tài),開(kāi)始等待 ONE 釋放協(xié)程鎖。
鎖被釋放后,協(xié)程 ONE 結(jié)束運(yùn)行,返回值作為 await coro_work('ONE') 表達(dá)式的值,賦值給 one 函數(shù)中的局部變量 name。
至此協(xié)程 TWO 開(kāi)始上鎖執(zhí)行,由于此時(shí) if 條件判斷返回 True ,將直接 return ,因此終端不會(huì)輸出鎖的釋放提示。

至此關(guān)于 yield、yield from(await)、@asyncio.coroutine(async)、asyncio 的介紹已經(jīng)完畢!