asyncio是從python3.4被引入的一個(gè)并發(fā)模塊。它被設(shè)計(jì)成使用coroutines和futures來(lái)簡(jiǎn)化異步代碼,并把代碼變得和同步代碼一樣簡(jiǎn)明,因?yàn)樗麤](méi)有回調(diào)。
線程、事件循環(huán)、協(xié)程和futures
線程是一個(gè)廣為人知的工具,但是asyncio使用了完全不同的結(jié)構(gòu):event loops,coroutines和futures
- event loop 事件循環(huán) 程序員會(huì)把一些函數(shù)注冊(cè)到事件循環(huán)上,事件循環(huán)負(fù)責(zé)管理它們。當(dāng)滿足事件發(fā)生的時(shí)候,調(diào)用相應(yīng)的協(xié)程函數(shù)。
-
Coroutines 協(xié)程是一個(gè)特殊的函數(shù)(使用
async關(guān)鍵字定義的函數(shù)),其作用于python中的生成器類(lèi)似,當(dāng)await時(shí),它會(huì)釋放控制權(quán)并將它交還給事件循環(huán)。一個(gè)Coroutine需要使用事件循環(huán)進(jìn)行調(diào)用,為此,我們需創(chuàng)建一個(gè)Task,它是Future類(lèi)型。 - Futures 代表將來(lái)執(zhí)行或沒(méi)有執(zhí)行的任務(wù)的結(jié)果,其結(jié)果有可能是一個(gè)異常。
同步和異步
在文章 Concurrency is not parallelism, it’s better 中,羅伯·派克(Rob Pike)指出:
Breaking down tasks into concurrent subtasks only allows parallelism, it’s the scheduling of these subtasks that creates it.
asyncio就是這么做的,你可以構(gòu)造你的代碼,所以子任務(wù)被定義為協(xié)程,并允許你按照你的要求來(lái)安排它們,讓他們同時(shí)進(jìn)行。如果其他任務(wù)處于等待狀態(tài),協(xié)程則可能發(fā)生上下文切換,但如果沒(méi)有其他任務(wù)處于待處理狀態(tài),將不會(huì)發(fā)生切換。
asyncio實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來(lái)完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await掛起,然后其他協(xié)程繼續(xù)工作。創(chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊(cè)到事件循環(huán)中。
讓我們來(lái)看一個(gè)基本的例子:
import asyncio
async def foo():
print('Running in foo')
await asyncio.sleep(0)
print('Explicit context switch to foo again')
async def bar():
print('Explicit context to bar')
await asyncio.sleep(0)
print('Implicit context switch back to bar')
ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())]
wait_tasks = asyncio.wait(tasks)
ioloop.run_until_complete(wait_tasks)
ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
- 首先,我們聲明了幾個(gè)簡(jiǎn)單的協(xié)程,并使用
asyncio模塊里的sleep方法來(lái)模擬非阻塞工作。 - 協(xié)程不能直接運(yùn)行,只能通過(guò)其他協(xié)程來(lái)調(diào)用,或被包裝成task并在之后被調(diào)用,我們使用create_task來(lái)創(chuàng)建task
-
asyncio.ensure_future(coroutine)和loop.create_task(coroutine)都可以創(chuàng)建一個(gè)task,run_until_complete的參數(shù)是一個(gè)futrue對(duì)象。當(dāng)傳入一個(gè)協(xié)程,其內(nèi)部會(huì)自動(dòng)封裝成task,task是Future的子類(lèi)。isinstance(task, asyncio.Future)將會(huì)輸出True。 - 一旦我們有了兩個(gè)tasks,我們將他們合為一個(gè)(
wait_tasks),并通過(guò)wait方法來(lái)等待兩個(gè)tasks完成 - 最后,我們使用
run_until_complete方法來(lái)執(zhí)行wait_task
通過(guò)使用await關(guān)鍵字,可以對(duì)耗時(shí)的操作進(jìn)行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán),在本例中,協(xié)程foo在執(zhí)行到await asyncio.sleep(0) 時(shí),協(xié)程會(huì)yield并且發(fā)生上下文切換,事件循環(huán)將切換執(zhí)行下一個(gè)任務(wù):bar 。相似地,協(xié)程bar執(zhí)行到await sleep時(shí),事件循環(huán)將控制權(quán)交給foo 并繼續(xù)執(zhí)行之前未執(zhí)行完的部分,是不是和python中的生成器很像?
這次我們來(lái)模擬兩個(gè)阻塞的tasks:gr1和gr2,假設(shè)他們的作用是給外部服務(wù)器發(fā)送兩個(gè)請(qǐng)求。當(dāng)他們正在執(zhí)行的時(shí)候,第三個(gè)任務(wù)(gr3)也能夠同時(shí)被執(zhí)行,示例代碼如下:
import time
import asyncio
start = time.time()
def tic():
return 'at %1.1f seconds' % (time.time() - start)
async def gr1():
# Busy waits for a second, but we don't want to stick around...
print('gr1 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr1 ended work: {}'.format(tic()))
async def gr2():
# Busy waits for a second, but we don't want to stick around...
print('gr2 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr2 ended work: {}'.format(tic()))
async def gr3():
print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
await asyncio.sleep(1)
print("Done!")
ioloop = asyncio.get_event_loop()
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
$ python3 1b-cooperatively-scheduled-asyncio-await.py
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Lets do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr1 ended work: at 2.0 seconds
gr2 Ended work: at 2.0 seconds
注意理解I/O循環(huán)是如何管理并調(diào)度任務(wù)并允許你的單線程代碼實(shí)現(xiàn)并發(fā)的,當(dāng)前兩個(gè)任務(wù)被阻塞時(shí),第三個(gè)任務(wù)能夠的到控制權(quán)。
執(zhí)行順序
在同步的世界,我們習(xí)慣了線性思維。如果我們有一系列耗時(shí)不同的任務(wù),它們將按照代碼順序依次執(zhí)行。
然而,當(dāng)使用并發(fā)時(shí),任務(wù)何時(shí)完成與它們?cè)诔绦蛑斜徽{(diào)用的順序無(wú)必然關(guān)系。
import random
from time import sleep
import asyncio
def task(pid):
"""Synchronous non-deterministic task.
"""
sleep(random.randint(0, 2) * 0.001)
print('Task %s done' % pid)
async def task_coro(pid):
"""Coroutine non-deterministic task
"""
await asyncio.sleep(random.randint(0, 2) * 0.001)
print('Task %s done' % pid)
def synchronous():
for i in range(1, 10):
task(i)
async def asynchronous():
tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)]
await asyncio.wait(tasks)
print('Synchronous:')
synchronous()
ioloop = asyncio.get_event_loop()
print('Asynchronous:')
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 8 done
Task 9 done
Task 1 done
Task 4 done
Task 3 done
Task 7 done
當(dāng)然,輸出結(jié)果會(huì)有所不同,因?yàn)槊總€(gè)任務(wù)都會(huì)隨機(jī)sleep一段時(shí)間,但是要注意結(jié)果順序和同步代碼是完全不同的,即使我們使用range函數(shù)以相同的順序構(gòu)建任務(wù)列表。
注意我們是如何將我們簡(jiǎn)單的同步代碼改為并發(fā)版本的。asyncio模塊將任務(wù)變?yōu)榉亲枞问讲⒉皇鞘裁茨Х?。在撰?xiě)asyncio并將它作為獨(dú)立標(biāo)準(zhǔn)庫(kù)的那個(gè)時(shí)期,其他大多數(shù)模塊并不支持異步,你可以使用concurrent.futures模塊在線程或進(jìn)程中封裝阻塞任務(wù),并返回一個(gè)asyncio可以使用的模塊:Future 。這個(gè)使用線程的例子可以在 Github 倉(cāng)庫(kù) 中找到。
這可能是當(dāng)下使用asyncio的主要不足,但是通過(guò)一些庫(kù)能解決這一問(wèn)題。
通過(guò)HTTP服務(wù)從網(wǎng)絡(luò)獲取數(shù)據(jù)就是一個(gè)典型的阻塞任務(wù),我使用 aiohttp 庫(kù)來(lái)進(jìn)行非阻塞HTTP請(qǐng)求,從Github的公開(kāi)事件API中檢索數(shù)據(jù)。
import time
import urllib.request
import asyncio
import aiohttp
URL = 'https://api.github.com/events'
MAX_CLIENTS = 3
def fetch_sync(pid):
print('Fetch sync process {} started'.format(pid))
start = time.time()
response = urllib.request.urlopen(URL)
datetime = response.getheader('Date')
print('Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start))
return datetime
async def fetch_async(pid):
print('Fetch async process {} started'.format(pid))
start = time.time()
response = await aiohttp.request('GET', URL)
datetime = response.headers.get('Date')
print('Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start))
response.close()
return datetime
def synchronous():
start = time.time()
for i in range(1, MAX_CLIENTS + 1):
fetch_sync(i)
print("Process took: {:.2f} seconds".format(time.time() - start))
async def asynchronous():
start = time.time()
tasks = [asyncio.ensure_future(
fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
await asyncio.wait(tasks)
print("Process took: {:.2f} seconds".format(time.time() - start))
print('Synchronous:')
synchronous()
print('Asynchronous:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py
Synchronous:
Fetch sync process 1 started
Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds
Fetch sync process 2 started
Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds
Fetch sync process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds
Process took: 1.54 seconds
Asynchronous:
Fetch async process 1 started
Fetch async process 2 started
Fetch async process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds
Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds
Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds
Process took: 0.54 seconds
首先,注意時(shí)間的差異,通過(guò)使用異步調(diào)用,我們同時(shí)向服務(wù)發(fā)出所有請(qǐng)求。正如之前討論的,每個(gè)請(qǐng)求產(chǎn)生控制流到下一個(gè),并在完成時(shí)返回。處理所有請(qǐng)求所花費(fèi)的總時(shí)間和最慢的請(qǐng)求所花費(fèi)得時(shí)間是相同的!僅花費(fèi)0.54秒。非??幔前??
其次,異步代碼與它的同步版本十分相似,它本質(zhì)上是一樣的!主要區(qū)別在于執(zhí)行GET請(qǐng)求和創(chuàng)建任務(wù)并等待它們完成的異步庫(kù)的實(shí)現(xiàn)。
創(chuàng)建協(xié)程
到目前為止,我們一直使用一種方法來(lái)創(chuàng)建協(xié)程:創(chuàng)建一組任務(wù)并等待所有這些任務(wù)完成。
但是我們可以按照不同的方式安排協(xié)程運(yùn)行或檢索結(jié)果。設(shè)想一個(gè)場(chǎng)景,我們需要盡快處理HTTP GET請(qǐng)求的結(jié)果,這個(gè)過(guò)程實(shí)際上和我們前面的例子非常相似:
import time
import random
import asyncio
import aiohttp
URL = 'https://api.github.com/events'
MAX_CLIENTS = 3
async def fetch_async(pid):
start = time.time()
sleepy_time = random.randint(2, 5)
print('Fetch async process {} started, sleeping for {} seconds'.format(
pid, sleepy_time))
await asyncio.sleep(sleepy_time)
response = await aiohttp.request('GET', URL)
datetime = response.headers.get('Date')
response.close()
return 'Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start)
async def asynchronous():
start = time.time()
futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
for i, future in enumerate(asyncio.as_completed(futures)):
result = await future
print('{} {}'.format(">>" * (i + 1), result))
print("Process took: {:.2f} seconds".format(time.time() - start))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py
Fetch async process 1 started, sleeping for 4 seconds
Fetch async process 3 started, sleeping for 5 seconds
Fetch async process 2 started, sleeping for 3 seconds
>> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds
>>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds
>>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds
Process took: 5.48 seconds
這種情況下的代碼只是略有不同,我們正在把協(xié)程收集到一個(gè)列表中,每個(gè)程序都準(zhǔn)備好被調(diào)度和執(zhí)行。 as_complete函數(shù)返回一個(gè)迭代器,當(dāng)它們進(jìn)來(lái)時(shí)會(huì)產(chǎn)生一個(gè)完整的Future。順便一提,as_completed和wait都是來(lái)自concurrent.futures.的兩個(gè)函數(shù)。
讓我們來(lái)看看另一個(gè)例子,想象一下你正試圖獲得你的IP地址。你可以使用類(lèi)似的服務(wù)來(lái)檢索它,但不確定它們是否可以在運(yùn)行時(shí)訪問(wèn)。你不想逐一檢查每一個(gè)。你會(huì)發(fā)送并發(fā)請(qǐng)求到每個(gè)服務(wù),并選擇第一個(gè)回應(yīng),對(duì)不對(duì)?沒(méi)錯(cuò)!
那么,原來(lái)我們的老朋友wait 有一個(gè)參數(shù)來(lái)做到這一點(diǎn):return_when。到目前為止,我們忽視了wait的返回值,因?yàn)槲覀冎皇前讶蝿?wù)并行化。但是現(xiàn)在我們要從協(xié)程中檢索結(jié)果,所以我們可以使用兩組Futures: done和pending。
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
response = await aiohttp.request('GET', service.url)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, return_when=FIRST_COMPLETED)
print(done.pop().result())
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api finished with result: 82.34.76.170, took: 0.09 seconds
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>
等一下,發(fā)生了什么?代碼執(zhí)行結(jié)果沒(méi)有問(wèn)題,但那些警告信息是什么?
我們安排了兩個(gè)任務(wù),但是一旦第一個(gè)任務(wù)完成,關(guān)閉第二個(gè)任務(wù)。 asyncio認(rèn)為這是一個(gè)錯(cuò)誤,并打印出一個(gè)警告。我們應(yīng)該讓事件循環(huán)知道不打擾pending future。那么該怎么做?
Future的狀態(tài)
(As in states that a Future can be in, not states that are in the future… you know what I mean)
future有以下幾種狀態(tài):
- Pending
- Running
- Done
- Cancelled
創(chuàng)建future的時(shí)候,task為pending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消??梢允褂?code>asyncio.Task獲取事件循環(huán)的task
當(dāng)future完成時(shí),result方法將返回future的結(jié)果,如果它掛起或取消,則引發(fā)InvalidStateError,如果取消它將引發(fā)CancelledError,最后如果協(xié)程引發(fā)異常,則會(huì)再次引發(fā)異常與調(diào)用異常相同的行為。
你也可以調(diào)用done,cancel或者running若Future處于這一狀態(tài),注意,done意味著返回結(jié)果或者拋出異常。你可以通過(guò)調(diào)用cancel方法來(lái)明確地取消Future,這聽(tīng)起來(lái)就像我們?cè)谇懊娴睦又行枰迯?fù)警告:
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
response = await aiohttp.request('GET', service.url)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, return_when=FIRST_COMPLETED)
print(done.pop().result())
for future in pending:
future.cancel()
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py
Fetching IP from ipify
Fetching IP from ip-api
ip-api finished with result: 82.34.76.170, took: 0.08 seconds
這次的輸出很完美。
若你想添加額外的邏輯,F(xiàn)utures還允許附加回調(diào),當(dāng)他們到達(dá)完成狀態(tài)。你甚至可以手動(dòng)設(shè)置Future的結(jié)果或異常,通常用于單元測(cè)試目的。
原文鏈接:AsyncIO for the Working Python Developer
后記:
在下才疏學(xué)淺,第一次嘗試翻譯,內(nèi)容偏差和蹩腳的措辭還請(qǐng)各位諒解。理解不當(dāng)之處還請(qǐng)各位前輩替在下指出,不勝感激。