AsyncIO for the Working Python Developer翻譯

asyncio是從python3.4被引入的一個(gè)并發(fā)模塊。它被設(shè)計(jì)成使用coroutines和futures來(lái)簡(jiǎn)化異步代碼,并把代碼變得和同步代碼一樣簡(jiǎn)明,因?yàn)樗麤](méi)有回調(diào)。

線程、事件循環(huán)、協(xié)程和futures

線程是一個(gè)廣為人知的工具,但是asyncio使用了完全不同的結(jié)構(gòu):event loops,coroutines和futures

  • event loop 事件循環(huán) 程序員會(huì)把一些函數(shù)注冊(cè)到事件循環(huán)上,事件循環(huán)負(fù)責(zé)管理它們。當(dāng)滿足事件發(fā)生的時(shí)候,調(diào)用相應(yīng)的協(xié)程函數(shù)。
  • Coroutines 協(xié)程是一個(gè)特殊的函數(shù)(使用async關(guān)鍵字定義的函數(shù)),其作用于python中的生成器類(lèi)似,當(dāng)await時(shí),它會(huì)釋放控制權(quán)并將它交還給事件循環(huán)。一個(gè)Coroutine需要使用事件循環(huán)進(jìn)行調(diào)用,為此,我們需創(chuàng)建一個(gè)Task,它是Future類(lèi)型。
  • Futures 代表將來(lái)執(zhí)行或沒(méi)有執(zhí)行的任務(wù)的結(jié)果,其結(jié)果有可能是一個(gè)異常。

同步和異步

在文章 Concurrency is not parallelism, it’s better 中,羅伯·派克(Rob Pike)指出:

Breaking down tasks into concurrent subtasks only allows parallelism, it’s the scheduling of these subtasks that creates it.

asyncio就是這么做的,你可以構(gòu)造你的代碼,所以子任務(wù)被定義為協(xié)程,并允許你按照你的要求來(lái)安排它們,讓他們同時(shí)進(jìn)行。如果其他任務(wù)處于等待狀態(tài),協(xié)程則可能發(fā)生上下文切換,但如果沒(méi)有其他任務(wù)處于待處理狀態(tài),將不會(huì)發(fā)生切換。

asyncio實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來(lái)完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await掛起,然后其他協(xié)程繼續(xù)工作。創(chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊(cè)到事件循環(huán)中。

讓我們來(lái)看一個(gè)基本的例子:

import asyncio

async def foo():
    print('Running in foo')
    await asyncio.sleep(0)
    print('Explicit context switch to foo again')

async def bar():
    print('Explicit context to bar')
    await asyncio.sleep(0)
    print('Implicit context switch back to bar')

ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())]
wait_tasks = asyncio.wait(tasks)
ioloop.run_until_complete(wait_tasks)
ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
  • 首先,我們聲明了幾個(gè)簡(jiǎn)單的協(xié)程,并使用asyncio模塊里的sleep方法來(lái)模擬非阻塞工作。
  • 協(xié)程不能直接運(yùn)行,只能通過(guò)其他協(xié)程來(lái)調(diào)用,或被包裝成task并在之后被調(diào)用,我們使用create_task來(lái)創(chuàng)建task
  • asyncio.ensure_future(coroutine)loop.create_task(coroutine)都可以創(chuàng)建一個(gè)task,run_until_complete的參數(shù)是一個(gè)futrue對(duì)象。當(dāng)傳入一個(gè)協(xié)程,其內(nèi)部會(huì)自動(dòng)封裝成task,task是Future的子類(lèi)。isinstance(task, asyncio.Future)將會(huì)輸出True。
  • 一旦我們有了兩個(gè)tasks,我們將他們合為一個(gè)(wait_tasks),并通過(guò)wait方法來(lái)等待兩個(gè)tasks完成
  • 最后,我們使用run_until_complete方法來(lái)執(zhí)行wait_task

通過(guò)使用await關(guān)鍵字,可以對(duì)耗時(shí)的操作進(jìn)行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán),在本例中,協(xié)程foo在執(zhí)行到await asyncio.sleep(0) 時(shí),協(xié)程會(huì)yield并且發(fā)生上下文切換,事件循環(huán)將切換執(zhí)行下一個(gè)任務(wù):bar 。相似地,協(xié)程bar執(zhí)行到await sleep時(shí),事件循環(huán)將控制權(quán)交給foo 并繼續(xù)執(zhí)行之前未執(zhí)行完的部分,是不是和python中的生成器很像?

這次我們來(lái)模擬兩個(gè)阻塞的tasks:gr1和gr2,假設(shè)他們的作用是給外部服務(wù)器發(fā)送兩個(gè)請(qǐng)求。當(dāng)他們正在執(zhí)行的時(shí)候,第三個(gè)任務(wù)(gr3)也能夠同時(shí)被執(zhí)行,示例代碼如下:

import time
import asyncio

start = time.time()


def tic():
    return 'at %1.1f seconds' % (time.time() - start)


async def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('gr1 started work: {}'.format(tic()))
    await asyncio.sleep(2)
    print('gr1 ended work: {}'.format(tic()))


async def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('gr2 started work: {}'.format(tic()))
    await asyncio.sleep(2)
    print('gr2 ended work: {}'.format(tic()))


async def gr3():
    print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
    await asyncio.sleep(1)
    print("Done!")


ioloop = asyncio.get_event_loop()
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
$ python3 1b-cooperatively-scheduled-asyncio-await.py
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Lets do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr1 ended work: at 2.0 seconds
gr2 Ended work: at 2.0 seconds

注意理解I/O循環(huán)是如何管理并調(diào)度任務(wù)并允許你的單線程代碼實(shí)現(xiàn)并發(fā)的,當(dāng)前兩個(gè)任務(wù)被阻塞時(shí),第三個(gè)任務(wù)能夠的到控制權(quán)。

執(zhí)行順序

在同步的世界,我們習(xí)慣了線性思維。如果我們有一系列耗時(shí)不同的任務(wù),它們將按照代碼順序依次執(zhí)行。

然而,當(dāng)使用并發(fā)時(shí),任務(wù)何時(shí)完成與它們?cè)诔绦蛑斜徽{(diào)用的順序無(wú)必然關(guān)系。

import random
from time import sleep
import asyncio


def task(pid):
    """Synchronous non-deterministic task.
    """
    sleep(random.randint(0, 2) * 0.001)
    print('Task %s done' % pid)


async def task_coro(pid):
    """Coroutine non-deterministic task
    """
    await asyncio.sleep(random.randint(0, 2) * 0.001)
    print('Task %s done' % pid)


def synchronous():
    for i in range(1, 10):
        task(i)


async def asynchronous():
    tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)]
    await asyncio.wait(tasks)


print('Synchronous:')
synchronous()

ioloop = asyncio.get_event_loop()
print('Asynchronous:')
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 8 done
Task 9 done
Task 1 done
Task 4 done
Task 3 done
Task 7 done

當(dāng)然,輸出結(jié)果會(huì)有所不同,因?yàn)槊總€(gè)任務(wù)都會(huì)隨機(jī)sleep一段時(shí)間,但是要注意結(jié)果順序和同步代碼是完全不同的,即使我們使用range函數(shù)以相同的順序構(gòu)建任務(wù)列表。

注意我們是如何將我們簡(jiǎn)單的同步代碼改為并發(fā)版本的。asyncio模塊將任務(wù)變?yōu)榉亲枞问讲⒉皇鞘裁茨Х?。在撰?xiě)asyncio并將它作為獨(dú)立標(biāo)準(zhǔn)庫(kù)的那個(gè)時(shí)期,其他大多數(shù)模塊并不支持異步,你可以使用concurrent.futures模塊在線程或進(jìn)程中封裝阻塞任務(wù),并返回一個(gè)asyncio可以使用的模塊:Future 。這個(gè)使用線程的例子可以在 Github 倉(cāng)庫(kù) 中找到。

這可能是當(dāng)下使用asyncio的主要不足,但是通過(guò)一些庫(kù)能解決這一問(wèn)題。

通過(guò)HTTP服務(wù)從網(wǎng)絡(luò)獲取數(shù)據(jù)就是一個(gè)典型的阻塞任務(wù),我使用 aiohttp 庫(kù)來(lái)進(jìn)行非阻塞HTTP請(qǐng)求,從Github的公開(kāi)事件API中檢索數(shù)據(jù)。

import time
import urllib.request
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


def fetch_sync(pid):
    print('Fetch sync process {} started'.format(pid))
    start = time.time()
    response = urllib.request.urlopen(URL)
    datetime = response.getheader('Date')

    print('Process {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start))

    return datetime


async def fetch_async(pid):
    print('Fetch async process {} started'.format(pid))
    start = time.time()
    response = await aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    print('Process {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start))

    response.close()
    return datetime


def synchronous():
    start = time.time()
    for i in range(1, MAX_CLIENTS + 1):
        fetch_sync(i)
    print("Process took: {:.2f} seconds".format(time.time() - start))


async def asynchronous():
    start = time.time()
    tasks = [asyncio.ensure_future(
        fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
    await asyncio.wait(tasks)
    print("Process took: {:.2f} seconds".format(time.time() - start))


print('Synchronous:')
synchronous()

print('Asynchronous:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py
Synchronous:
Fetch sync process 1 started
Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds
Fetch sync process 2 started
Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds
Fetch sync process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds
Process took: 1.54 seconds
Asynchronous:
Fetch async process 1 started
Fetch async process 2 started
Fetch async process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds
Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds
Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds
Process took: 0.54 seconds

首先,注意時(shí)間的差異,通過(guò)使用異步調(diào)用,我們同時(shí)向服務(wù)發(fā)出所有請(qǐng)求。正如之前討論的,每個(gè)請(qǐng)求產(chǎn)生控制流到下一個(gè),并在完成時(shí)返回。處理所有請(qǐng)求所花費(fèi)的總時(shí)間和最慢的請(qǐng)求所花費(fèi)得時(shí)間是相同的!僅花費(fèi)0.54秒。非??幔前??

其次,異步代碼與它的同步版本十分相似,它本質(zhì)上是一樣的!主要區(qū)別在于執(zhí)行GET請(qǐng)求和創(chuàng)建任務(wù)并等待它們完成的異步庫(kù)的實(shí)現(xiàn)。

創(chuàng)建協(xié)程

到目前為止,我們一直使用一種方法來(lái)創(chuàng)建協(xié)程:創(chuàng)建一組任務(wù)并等待所有這些任務(wù)完成。

但是我們可以按照不同的方式安排協(xié)程運(yùn)行或檢索結(jié)果。設(shè)想一個(gè)場(chǎng)景,我們需要盡快處理HTTP GET請(qǐng)求的結(jié)果,這個(gè)過(guò)程實(shí)際上和我們前面的例子非常相似:

import time
import random
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('Fetch async process {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))

    await asyncio.sleep(sleepy_time)

    response = await aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    response.close()
    return 'Process {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)


async def asynchronous():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = await future
        print('{} {}'.format(">>" * (i + 1), result))

    print("Process took: {:.2f} seconds".format(time.time() - start))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py
Fetch async process 1 started, sleeping for 4 seconds
Fetch async process 3 started, sleeping for 5 seconds
Fetch async process 2 started, sleeping for 3 seconds
>> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds
>>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds
>>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds
Process took: 5.48 seconds

這種情況下的代碼只是略有不同,我們正在把協(xié)程收集到一個(gè)列表中,每個(gè)程序都準(zhǔn)備好被調(diào)度和執(zhí)行。 as_complete函數(shù)返回一個(gè)迭代器,當(dāng)它們進(jìn)來(lái)時(shí)會(huì)產(chǎn)生一個(gè)完整的Future。順便一提,as_completed和wait都是來(lái)自concurrent.futures.的兩個(gè)函數(shù)。

讓我們來(lái)看看另一個(gè)例子,想象一下你正試圖獲得你的IP地址。你可以使用類(lèi)似的服務(wù)來(lái)檢索它,但不確定它們是否可以在運(yùn)行時(shí)訪問(wèn)。你不想逐一檢查每一個(gè)。你會(huì)發(fā)送并發(fā)請(qǐng)求到每個(gè)服務(wù),并選擇第一個(gè)回應(yīng),對(duì)不對(duì)?沒(méi)錯(cuò)!

那么,原來(lái)我們的老朋友wait 有一個(gè)參數(shù)來(lái)做到這一點(diǎn):return_when。到目前為止,我們忽視了wait的返回值,因?yàn)槲覀冎皇前讶蝿?wù)并行化。但是現(xiàn)在我們要從協(xié)程中檢索結(jié)果,所以我們可以使用兩組Futures: donepending。

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query')
)


async def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    response = await aiohttp.request('GET', service.url)
    json_response = await response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start)


async def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = await asyncio.wait(
        futures, return_when=FIRST_COMPLETED)

    print(done.pop().result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api finished with result: 82.34.76.170, took: 0.09 seconds
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>

等一下,發(fā)生了什么?代碼執(zhí)行結(jié)果沒(méi)有問(wèn)題,但那些警告信息是什么?

我們安排了兩個(gè)任務(wù),但是一旦第一個(gè)任務(wù)完成,關(guān)閉第二個(gè)任務(wù)。 asyncio認(rèn)為這是一個(gè)錯(cuò)誤,并打印出一個(gè)警告。我們應(yīng)該讓事件循環(huán)知道不打擾pending future。那么該怎么做?

Future的狀態(tài)

(As in states that a Future can be in, not states that are in the future… you know what I mean)

future有以下幾種狀態(tài):

  • Pending
  • Running
  • Done
  • Cancelled

創(chuàng)建future的時(shí)候,taskpending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消??梢允褂?code>asyncio.Task獲取事件循環(huán)的task

當(dāng)future完成時(shí),result方法將返回future的結(jié)果,如果它掛起或取消,則引發(fā)InvalidStateError,如果取消它將引發(fā)CancelledError,最后如果協(xié)程引發(fā)異常,則會(huì)再次引發(fā)異常與調(diào)用異常相同的行為。

你也可以調(diào)用done,cancel或者runningFuture處于這一狀態(tài),注意,done意味著返回結(jié)果或者拋出異常。你可以通過(guò)調(diào)用cancel方法來(lái)明確地取消Future,這聽(tīng)起來(lái)就像我們?cè)谇懊娴睦又行枰迯?fù)警告:

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query')
)


async def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    response = await aiohttp.request('GET', service.url)
    json_response = await response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start)


async def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = await asyncio.wait(
        futures, return_when=FIRST_COMPLETED)

    print(done.pop().result())

    for future in pending:
        future.cancel()


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py
Fetching IP from ipify
Fetching IP from ip-api
ip-api finished with result: 82.34.76.170, took: 0.08 seconds

這次的輸出很完美。

若你想添加額外的邏輯,F(xiàn)utures還允許附加回調(diào),當(dāng)他們到達(dá)完成狀態(tài)。你甚至可以手動(dòng)設(shè)置Future的結(jié)果或異常,通常用于單元測(cè)試目的。

原文鏈接:AsyncIO for the Working Python Developer

后記:

在下才疏學(xué)淺,第一次嘗試翻譯,內(nèi)容偏差和蹩腳的措辭還請(qǐng)各位諒解。理解不當(dāng)之處還請(qǐng)各位前輩替在下指出,不勝感激。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容