Python實(shí)現(xiàn)協(xié)程(六)

本節(jié)介紹 asyncio 剩余的一些常用操作:事件循環(huán)實(shí)現(xiàn)無(wú)限循環(huán)任務(wù),在事件循環(huán)中執(zhí)行普通函數(shù)以及協(xié)程鎖。

一. 無(wú)限循環(huán)任務(wù)

事件循環(huán)的 run_until_complete 方法運(yùn)行事件循環(huán)時(shí),當(dāng)其中的全部任務(wù)完成后,會(huì)自動(dòng)停止循環(huán)。若想無(wú)限運(yùn)行事件循環(huán),可使用 asyncio 提供的 run_forever 方法:

import asyncio
import time
from datetime import datetime


async def work(loop, t):
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] start')
    await asyncio.sleep(t)  # 模擬IO操作
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] finished')
    loop.stop()  # 停止事件循環(huán),stop后仍可重新運(yùn)行


if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # 創(chuàng)建任務(wù),該任務(wù)會(huì)自動(dòng)加入事件循環(huán)
    task = asyncio.ensure_future(work(loop, 1))
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
    loop.run_forever()  # 無(wú)限運(yùn)行事件循環(huán),直至loop.stop停止
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
    loop.close()  # 關(guān)閉事件循環(huán),只有l(wèi)oop處于停止?fàn)顟B(tài)才會(huì)執(zhí)行

運(yùn)行結(jié)果:使用 loop.run_forever() 啟動(dòng)無(wú)限循環(huán)時(shí),task 實(shí)例會(huì)自動(dòng)加入事件循環(huán)。如果注釋掉 loop.stop() 方法,則 loop.run_forever() 之后的代碼永遠(yuǎn)不會(huì)被執(zhí)行,因?yàn)?loop.run_forever() 是個(gè)無(wú)限循環(huán)。

以上是單任務(wù)事件循環(huán),將 loop 作為參數(shù)傳入?yún)f(xié)程函數(shù)創(chuàng)建協(xié)程,在協(xié)程內(nèi)部執(zhí)行 loop.stop 方法停止事件循環(huán)。

下面的例子是多任務(wù)事件循環(huán),使用回調(diào)函數(shù)執(zhí)行 loop.stop() 停止事件循環(huán):

import time
import asyncio
import functools
from datetime import datetime


def loop_stop(loop, future):  # 最后一個(gè)參數(shù)必須為future或task
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[callback] stop loop by callback.')
    loop.stop()


async def work(t):
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine start.')
    await asyncio.sleep(t)
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine end.')


def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(work(3), work(1))
    tasks.add_done_callback(functools.partial(loop_stop, loop))
    loop.run_forever()
    loop.close()


if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print(f'耗時(shí):{end - start}')

運(yùn)行結(jié)果asyncio.gather 創(chuàng)建的搜集器,參數(shù)為任意數(shù)量的協(xié)程,任務(wù)搜集器本身也是 task / future 對(duì)象。

任務(wù)搜集器的 add_done_callback 方法用來(lái)添加回調(diào)函數(shù),該函數(shù)只在事件循環(huán)中所有的任務(wù)都完成后運(yùn)行一次。

注意:add_done_callback 的參數(shù)為回調(diào)函數(shù),當(dāng)回調(diào)函數(shù)定義了除 future 參數(shù)之外的任何參數(shù)后,必須使用偏函數(shù)。此處,使用 functools.partial 方法創(chuàng)建偏函數(shù)以便將 loop 作為參數(shù)加入回調(diào)函數(shù)。

loop.run_until_complete 方法本身也是調(diào)用 loop.run_forever 方法,然后通過(guò)回調(diào)函數(shù)調(diào)用 loop.stop 來(lái)終止事件循環(huán)。

二. 在事件循環(huán)中加入普函數(shù)

2.1 加入普通函數(shù),并立即排定執(zhí)行順序

事件循環(huán)的 call_soon 方法可以將普通函數(shù)作為任務(wù)加入到事件循環(huán),并立即排定任務(wù)的執(zhí)行順序:

import asyncio


def func(name):  # 普通函數(shù)
    print(f'[func] hello, {name}')


async def work(t, name):  # 協(xié)程函數(shù)
    print(f'[work] {name} start.')
    await asyncio.sleep(t)
    print(f'[work] {name} finished.')


def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(3, 'A'))
    loop.call_soon(func, 'word')
    loop.create_task(work(2, 'B'))
    loop.run_until_complete(work(3, 'C'))


if __name__ == '__main__':
    main()

運(yùn)行結(jié)果loop.call_soon 將普通函數(shù)當(dāng)作 task 加入到事件循環(huán)并排定執(zhí)行順序,該方法的第一個(gè)參數(shù)為普通函數(shù)的名字,普通函數(shù)的參數(shù)寫(xiě)在后面。

loop.run_until_complete(work(3, 'C')) 阻塞啟動(dòng)事件循環(huán),而且又添加了一個(gè)任務(wù)。

2.2 加入普通函數(shù),并在稍后執(zhí)行

loop.call_later 方法同 loop.call_soon 一樣,可將普通函數(shù)作為任務(wù)放到事件循環(huán)里。不同之處在于,call_later 可設(shè)置延遲執(zhí)行,第一個(gè)參數(shù)為延遲時(shí)間:

import asyncio
 
def func(name):  # 普通函數(shù)
    print(f'[func] hello, {name}')
 
async def work(t, name):  # 協(xié)程函數(shù)
    print(f'[work] {name} start.')
    await asyncio.sleep(t)
    print(f'[work] {name} finished.')
 
def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(4, 'A'))
    loop.call_later(1, func, 'word1')
    loop.call_soon(func, 'word2')
    loop.create_task(work(2, 'B'))
    loop.call_later(3, func, 'word3')
    loop.run_until_complete(work(2, 'C'))
 
if __name__ == '__main__':
    main()

運(yùn)行結(jié)果:事件循環(huán)運(yùn)行到 work(2, 'C') 完成時(shí)終止循環(huán),此時(shí)輸出 hello, word3 的普通函數(shù)尚未執(zhí)行,協(xié)程任務(wù) A 仍處于暫停狀態(tài)。

2.3 其它常用方法

  • call_soon 立即執(zhí)行
  • call_later 延遲執(zhí)行
  • call_at 在某時(shí)刻執(zhí)行
  • loop.time 是事件循環(huán)內(nèi)部的一個(gè)即時(shí)方法,返回值是時(shí)刻,數(shù)據(jù)類(lèi)型為 float

將上例中的 call_later 使用 loop.time + call_at 實(shí)現(xiàn):

def main():
    loop = asyncio.get_event_loop()
    start = loop.time()  # 時(shí)間循環(huán)內(nèi)部時(shí)刻
    asyncio.ensure_future(work(4, 'A'))
    # loop.call_later(1, func, 'word1')
    # 上面注釋這行等同于下面這行
    loop.call_at(start+1, func, 'word1')
    loop.call_soon(func, 'word2')
    loop.create_task(work(2, 'B'))
    # loop.call_later(3, func, 'word3')
    loop.call_at(start+3, func, 'word3')
    loop.run_until_complete(work(2, 'C'))

運(yùn)行結(jié)果與 2.2 中的示例一致,不再贅述。這三個(gè) call_xxx 方法的作用都是將函數(shù)作為任務(wù)排定到事件循環(huán)中,返回值都是 asyncio.events.TimerHandle 實(shí)例,注意它們不是協(xié)程任務(wù),不能作為 loop.run_until_complete 的參數(shù)。

三. 協(xié)程鎖

asyncio.lock 從字面意思來(lái)講,應(yīng)該被稱(chēng)為異步 IO 鎖,之所以叫協(xié)程鎖,是因?yàn)樗ǔ?xiě)在子協(xié)程中,用來(lái)將協(xié)程內(nèi)部的一段代碼鎖住,直到這段代碼運(yùn)行完畢解鎖。

協(xié)程鎖的固定用法是使用 async with 創(chuàng)建協(xié)程上下文環(huán)境,把需要加鎖的代碼寫(xiě)入其中。

注:with 是普通上下文管理器關(guān)鍵字,async with 是異步上下文管理器關(guān)鍵字。能夠使用 with 關(guān)鍵字的對(duì)象須有 __enter____exit__ 方法,而能夠使用 async with 關(guān)鍵字的對(duì)象須有
__aenter____aexit__ 方法。

async with 會(huì)自動(dòng)運(yùn)行 lock__aenter__ 方法,該方法會(huì)調(diào)用
acquire 方法上鎖;在語(yǔ)句塊結(jié)束時(shí)自動(dòng)運(yùn)行 __aexit__ 方法,該方法會(huì)調(diào)用 release 方法解鎖。這和 with 一樣,都是簡(jiǎn)化 try ... finally 語(yǔ)句。

import asyncio

l = []
lock = asyncio.Lock()  # 協(xié)程鎖


async def coro_work(name):
    print(f'coroutine {name} start.')
    async with lock:
        print(f'{name} run with lock start.')
        if 'hi' in l:
            return name
        await asyncio.sleep(2)
        l.append('hi')
        print(f'{name} release lock.')
        return name


async def one():
    name = await coro_work('ONE')
    print(f'{name} finished.')


async def two():
    name = await coro_work('TWO')
    print(f'{name} finished')


def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.wait([one(), two()])
    loop.run_until_complete(tasks)
    print(l)


if __name__ == '__main__':
    main()

運(yùn)行結(jié)果: 當(dāng)協(xié)程 ONE 運(yùn)行到 await asyncio.sleep(2) 處時(shí),將讓步 CPU 的使用權(quán),協(xié)程 TWO 開(kāi)始執(zhí)行,但執(zhí)行到 async with lock 時(shí),會(huì)阻塞,因?yàn)?ONE 還沒(méi)有釋放協(xié)程鎖,此刻線程進(jìn)入阻塞狀態(tài),開(kāi)始等待 ONE 釋放協(xié)程鎖。

鎖被釋放后,協(xié)程 ONE 結(jié)束運(yùn)行,返回值作為 await coro_work('ONE') 表達(dá)式的值,賦值給 one 函數(shù)中的局部變量 name。

至此協(xié)程 TWO 開(kāi)始上鎖執(zhí)行,由于此時(shí) if 條件判斷返回 True ,將直接 return ,因此終端不會(huì)輸出鎖的釋放提示。

至此關(guān)于 yield、yield from(await)、@asyncio.coroutine(async)asyncio 的介紹已經(jīng)完畢!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容