【python】協(xié)程之a(chǎn)syncio:調(diào)用步驟、阻塞和await、task任務(wù)、future對象

首先介紹一下 偏函數(shù)

如果需要減少某個函數(shù)的參數(shù)個數(shù),你可以使用

  • functools.partial()

【作用一】:partial() 函數(shù)允許你給一個或多個參數(shù)設(shè)置固定的值,減少接下來被調(diào)用時的參數(shù)個數(shù)。

【作用二】:partial() 用于固定某些參數(shù),并返回一個新的callable對象。


關(guān)于協(xié)程的調(diào)用步驟

下面將簡單介紹asyncio的使用。

  • ??event_loop 事件循環(huán):程序開啟一個無限的循環(huán),程序員會把一些函數(shù)注冊到事件循環(huán)上。當(dāng)滿足事件發(fā)生的時候,調(diào)用相應(yīng)的協(xié)程函數(shù)。

  • ??coroutine 協(xié)程:協(xié)程對象,指一個使用async關(guān)鍵字定義的函數(shù),它的調(diào)用不會立即執(zhí)行函數(shù),而是會返回一個協(xié)程對象。協(xié)程對象需要注冊到事件循環(huán),由事件循環(huán)調(diào)用。

  • ??task 任務(wù):一個協(xié)程對象就是一個原生可以掛起的函數(shù),任務(wù)則是對協(xié)程進一步封裝,其中包含任務(wù)的各種狀態(tài)。

  • ??future對象: 代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果。它和task上沒有本質(zhì)的區(qū)別;

  • ??async/await 關(guān)鍵字:python3.5 用于定義協(xié)程的關(guān)鍵字,async定義一個協(xié)程,await用于掛起阻塞的異步調(diào)用接口。

重點注意:

1.當(dāng)我們給一個函數(shù)添加了async關(guān)鍵字,或者使用asyncio.coroutine裝飾器裝飾,就會把它變成一個異步函數(shù)。
2.每個線程有一個事件循環(huán),主線程調(diào)用asyncio.get_event_loop時會創(chuàng)建事件循環(huán);

3.將任務(wù)封裝為集合asyncio.gather(*args),之后一起傳入事件循環(huán)中;

4.要把異步的任務(wù)丟給這個循環(huán)的run_until_complete方法,事件循環(huán)會安排協(xié)同程序的執(zhí)行。和方法名字一樣,該方法會等待異步的任務(wù)完全執(zhí)行才會結(jié)束。

【注釋】實現(xiàn)協(xié)程的不僅僅是asyncio,tornado和gevent都能實現(xiàn)類似的功能。

來看一個實例??:

async def test1():
    print("1")

async def test2():
    print("2")

a = test1()
b = test2()

try:
    a.send(None) # 可以通過調(diào)用 send 方法,執(zhí)行協(xié)程函數(shù)
except StopIteration as e:
    print(e.value)
    # 協(xié)程函數(shù)執(zhí)行結(jié)束時會拋出一個StopIteration 異常,標(biāo)志著協(xié)程函數(shù)執(zhí)行結(jié)束,返回值在value中
    pass
try:
    b.send(None) # 可以通過調(diào)用 send 方法,執(zhí)行協(xié)程函數(shù)
except StopIteration:
    print(e.value)
    # 協(xié)程函數(shù)執(zhí)行結(jié)束時會拋出一個StopIteration 異常,標(biāo)志著協(xié)程函數(shù)執(zhí)行結(jié)束,返回值在value中
    pass

---
輸出:
1
2

【解釋】程序先執(zhí)行了test1函數(shù),等到test1函數(shù)執(zhí)行完后再執(zhí)行test2函數(shù)。從執(zhí)行過程上來看目前協(xié)程函數(shù)與普通函數(shù)沒有區(qū)別,并沒有實現(xiàn)異步函數(shù)。


關(guān)于阻塞和await

【概念】使用async關(guān)鍵字定義的協(xié)程對象,使用await可以針對耗時的操作進行掛起(是生成器中的yield的替代,但是本地協(xié)程函數(shù)不允許使用),讓出當(dāng)前控制權(quán)。協(xié)程遇到await,事件循環(huán)將會掛起該協(xié)程,執(zhí)行別的協(xié)程,直到其他協(xié)程也掛起,或者執(zhí)行完畢,在進行下一個協(xié)程的執(zhí)行。

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(1) # asyncio.sleep(1)返回的也是一個協(xié)程對象
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

try:
    a.send(None) # 可以通過調(diào)用 send 方法,執(zhí)行協(xié)程函數(shù)
except StopIteration:
    # 協(xié)程函數(shù)執(zhí)行結(jié)束時會拋出一個StopIteration 異常,標(biāo)志著協(xié)程函數(shù)執(zhí)行結(jié)束
    pass

try:
    b.send(None) # 可以通過調(diào)用 send 方法,執(zhí)行協(xié)程函數(shù)
except StopIteration:
    pass

---
輸出:
1
3
4

【解釋】程序先執(zhí)行test1協(xié)程函數(shù),在執(zhí)行到await時,test1函數(shù)停止了執(zhí)行(阻塞);接著開始執(zhí)行test2協(xié)程函數(shù),直到test2執(zhí)行完畢。從結(jié)果中,我們可以看到,直到程序運行完畢,test1函數(shù)也沒有執(zhí)行完(沒有執(zhí)行print(“2”))


【對上面的代碼塊進行修改】,使得test1函數(shù)完全執(zhí)行

import asyncio
async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
loop.run_until_complete(test1())

---
輸出:
1
3
4
2

【事件循環(huán)方法】asyncio.get_event_loop方法可以創(chuàng)建一個事件循環(huán),然后使用 run_until_complete 將協(xié)程注冊到事件循環(huán),并啟動事件循環(huán)。

【技能升級】使用async可以定義協(xié)程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán)。協(xié)程遇到await,事件循環(huán)將會掛起該協(xié)程,執(zhí)行別的協(xié)程,直到其他的協(xié)程也掛起或者執(zhí)行完畢,再進行下一個協(xié)程的執(zhí)行,協(xié)程的目的也是讓一些耗時的操作異步化


關(guān)于task任務(wù)

由于協(xié)程對象不能直接運行,在注冊事件循環(huán)的時候,其實是run_until_complete方法將協(xié)程包裝成為了一個任務(wù)(task)對象。所謂task對象是Future類的子類,保存了協(xié)程運行后的狀態(tài),用于未來獲取協(xié)程的結(jié)果。我們也可以手動將協(xié)程對象定義成task,改進代碼如下:

【對上面的代碼塊再一次修改】

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)

【重點回顧】前面說到task對象保存了協(xié)程運行的狀態(tài),并且可以獲取協(xié)程函數(shù)運行的返回值;

關(guān)于task對象那么具體該如何獲取呢?
  • 需要綁定回調(diào)函數(shù)

  • 直接在運行完task任務(wù)后輸出

【提升】如果使用send方法執(zhí)行函數(shù),則返回值可以通過捕捉StopIteration異常,利用StopIteration.value獲取。

【舉個例子】:??

import asyncio

async def test_one():
  print("Zurich")
  await test_two()
  print("Alzacar")
  return "stop"

async def test_two():
  print("--")
  print("$$$")

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test_one())
loop.run_until_complete(task)
print(task.result())

---
【Output】:
Zurich
Alzacar
--
$$$

使用future對象

【解釋】:future對象有幾個狀態(tài):Pending、Running、Done、Cancelled。創(chuàng)建future的時候,task為pending,事件循環(huán)調(diào)用執(zhí)行的時候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消,可以使用asyncio.Task獲取事件循環(huán)的task。

【程序:關(guān)于future對象的使用】

import asyncio
import functools

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

def callback(param1,param2,future):
    print(param1,param2)
    print('Callback:',future.result())

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))# 綁定回調(diào)函數(shù)
loop.run_until_complete(task)

【說明】通過future對象的result方法可以獲取協(xié)程函數(shù)的返回值,創(chuàng)建task。test1()是一個協(xié)程對象。
回調(diào)函數(shù)中的future對象就是創(chuàng)建的task對象。
【多插播一句】:回調(diào)函數(shù)如果需要接受多個參數(shù),可以通過偏函數(shù)導(dǎo)入。


【關(guān)于協(xié)程的停止】

怎么停止執(zhí)行協(xié)程呢?
【第一步】:需要先取消task
【第二步】:停止loop事件循環(huán)。

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(3)
    print("2")
    return "stop"

tasks = [
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
]

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

【參考資料】https://thief.one/2018/06/21/1/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容