asyncio是Python中解決異步I/O高并發(fā)的一個(gè)模塊。
asyncio的事件循環(huán)
我們先看下asyncio有哪些功能:
包含各種特定系統(tǒng)實(shí)現(xiàn)的模塊化事件循環(huán)(針對(duì)不同系統(tǒng)都能兼容的事件循環(huán):例如Windows下的
select,linux下的epoll。)傳輸和協(xié)議抽象(對(duì)TCP和UDP協(xié)議的抽象)
對(duì)TCP、UDP、SSL、子進(jìn)程、延時(shí)調(diào)用以及其他的具體支持
模仿
futures模塊但適用于事件循環(huán)使用的Future類(lèi)基于
yield from的協(xié)議和任務(wù),可以讓我們使用順序的方式編寫(xiě)并發(fā)代碼必須使用一個(gè)將產(chǎn)生阻塞IO的調(diào)用時(shí),有接口可以把這個(gè)事件遷移到線程池
模仿
threading模塊中的同步原語(yǔ),可以用在單線程內(nèi)的協(xié)程之間
前面我們學(xué)習(xí)了協(xié)程,但是協(xié)程脫離事件循環(huán)意義就不是很大了。
下面我們開(kāi)始學(xué)習(xí)asyncio的使用吧!??
首先我們明確一點(diǎn),高并發(fā)異步IO編程的編碼模式由三部分組成:
事件循環(huán)+回調(diào)(驅(qū)動(dòng)生成器)+epoll(IO多路復(fù)用)
asyncio是Python用于解決異步io編程的一整套解決方案
有趣的小知識(shí):
tornado也是基于asyncio的異步框架,通過(guò)協(xié)程和事件循環(huán)來(lái)完成高并發(fā)。相對(duì)于Django和Flask這種傳統(tǒng)的阻塞IO框架本身不提供web服務(wù)器,不會(huì)去完成Socket編碼的,因此我們?cè)诓渴鸬臅r(shí)候會(huì)搭配實(shí)現(xiàn)了SOcket編碼的框架(uwsgi, gunicorn+nginx)。Tornado實(shí)現(xiàn)了自己的web服務(wù)器,因此我們部署Tornado的時(shí)候是可以直接部署的(會(huì)使用epoll來(lái)完成socket請(qǐng)求),但是真正部署的時(shí)候,還是會(huì)使用nginx來(lái)完成一些操作(IP限制等)。因此Tornado的數(shù)據(jù)庫(kù)驅(qū)動(dòng)就不能使用阻塞IO驅(qū)動(dòng)框架了。
asyncio的簡(jiǎn)單使用:
協(xié)程要搭配事件循環(huán)才能使用
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
# 我們使用 asyncio 實(shí)現(xiàn)的事件循環(huán) 這個(gè)loop就可完成 之前我們自己實(shí)現(xiàn)的 事件循環(huán) select 的操作
loop = asyncio.get_event_loop()
# 可以使用 run_until_complete 進(jìn)行協(xié)程的調(diào)用 這是一個(gè)阻塞函數(shù) 可以理解為多線程編程中的jion方法 然后把 asyncio理解為協(xié)程池
loop.run_until_complete(get_html("http://www.imooc.com"))
print(time.time()-start_time)
# 輸出
start get url
end get url
2.0019102096557617
我們可以同時(shí)執(zhí)行多個(gè)協(xié)程,傳入一個(gè)可迭代的任務(wù)對(duì)象
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# 這個(gè) tasks 可以是不同的協(xié)程
tasks = [get_html("http://www.imooc.com") for i in range(10)]
# asyncio.wait()函數(shù)會(huì)接收一個(gè)可迭代對(duì)象
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
# 輸出就不打印了 耗時(shí)大概兩秒
注意:在協(xié)程中不能使用同步的時(shí)間睡眠
time.sleep(),否則當(dāng)執(zhí)行的協(xié)程超過(guò)一個(gè)的時(shí)候就會(huì)出現(xiàn)同步阻塞的情況。
要是哪個(gè)小伙伴想測(cè)試下上面那句話,可以將上面的代碼await asyncio.sleep(2)改為time.sleep(2)你會(huì)發(fā)現(xiàn)運(yùn)行的時(shí)間不再是兩秒了,而是20+秒。
為什么不能再協(xié)程使用同步的sleep呢?
這就要說(shuō)到我們的loop小朋友了,協(xié)程要配合事件循環(huán)的,我們?cè)谶\(yùn)行協(xié)程的時(shí)候當(dāng)遇到await關(guān)鍵字就知道這是一個(gè)異步阻塞操作了,會(huì)在此處暫停返回一個(gè)Future對(duì)象,然后由loop小朋友再執(zhí)行已經(jīng)可以運(yùn)行的協(xié)程。這樣保證了能夠異步執(zhí)行操作。當(dāng)我們直接在協(xié)程中使用sleep同步操作時(shí)候,不會(huì)暫停而是一直等待,這就是原因??
如何獲得協(xié)程的返回值呢?
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "紅燒肉"
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# 這里使用 asyncio.ensure_future 來(lái)獲得一個(gè)future對(duì)象 是不是很像多線程編程中的 submit
get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
# 也可以使用 loop 的 create_task 兩者用法一樣
# task = loop.create_task()
# task 是 future 的子類(lèi)
# 可以將future 對(duì)象傳入到 run_until_complete
loop.run_until_complete(get_future)
# 通過(guò) future 對(duì)象的 result函數(shù)獲得結(jié)果
print(get_future.result())
# 輸出
start get url
紅燒肉
上面的代碼還可以這么寫(xiě)
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
loop.run_until_complete(task)
print(task.result())
我們看到使用loop.create_task和asyncio.ensure_future是一樣的效果,具體區(qū)別我們稍后會(huì)學(xué)習(xí)到。??
有沒(méi)有小伙伴懷疑,當(dāng)使用asyncio.ensure_future的時(shí)候是何時(shí)和我們創(chuàng)建的loop建立聯(lián)系的呢,是在loop.run_until_complete(get_future)的時(shí)候嗎?
讓我們看下ensure_future的源碼:
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
# 看這里 當(dāng)沒(méi)有l(wèi)oop傳入的時(shí)候,會(huì)獲得當(dāng)前l(fā)oop 因?yàn)榫€程中只有這個(gè)一個(gè) loop 這里啟動(dòng)loop和外層代碼的loop是同一個(gè)loop
if loop is None:
loop = events.get_event_loop()
# 我們看到 內(nèi)部同樣是使用 create_task
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')
除了上面直接調(diào)用協(xié)程,我們還可以在協(xié)程執(zhí)行完成之后進(jìn)行一個(gè)回調(diào)。
import asyncio
import time
from functools import partial
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "紅燒肉"
# 當(dāng)我們想要在 回調(diào)函數(shù)中傳遞參數(shù)的時(shí)候 注意 future 參數(shù)寫(xiě)在最后
def callback(url, future):
print(url)
print("send email to 紅燒肉")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
task.add_done_callback(partial(callback, "http://www.imooc.com"))
loop.run_until_complete(task)
print(task.result())
我們使用partial將傳入的參數(shù),偽造成一個(gè)函數(shù)。
回調(diào)函數(shù)會(huì)默認(rèn)接收一個(gè) future 對(duì)象參數(shù)
wait和gather
我們上面已經(jīng)使用了wait來(lái)進(jìn)行多協(xié)程的運(yùn)行,我們看下它的源碼:
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
@coroutine
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The sequence futures must not be empty.
Coroutines will be wrapped in Tasks.
Returns two sets of Future: (done, pending).
Usage:
done, pending = yield from asyncio.wait(fs)
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError('Invalid return_when value: {}'.format(return_when))
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
return (yield from _wait(fs, timeout, return_when, loop))
這個(gè)wait我們理解為多線程中的wait,同樣存在return_when參數(shù),可以指定何時(shí)返回。
那gather如何使用呢?
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.imooc.com") for i in range(10)]
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time()-start_time)
我們將wait直接修改為gather然后可迭代對(duì)象加上*即可。
兩者的區(qū)別是什么呢?
-
gather更加hight-level -
gather可以將協(xié)程分組
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.imooc.com") for i in range(2)]
# 我們可以分組傳遞
loop.run_until_complete(asyncio.gather(*group1, *group2))
# 我們可以將先進(jìn)行g(shù)ather操作
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
loop.run_until_complete(asyncio.gather(group1, group2))
# 我們可以批量取消某個(gè)分組
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
group2.cancel()
task取消和子協(xié)程調(diào)用原理
我們先看下run_until_complete和run_forever兩個(gè)函數(shù)的區(qū)別。
run_until_complete在運(yùn)行完指定的協(xié)程之后就會(huì)停止,而run_forever則會(huì)一直運(yùn)行。
看下源碼:

在圖片中我們看到run_until_complete里面同樣使用了run_forever。但是,增加了一個(gè)回調(diào)_run_until_complete_cb:
def _run_until_complete_cb(fut):
exc = fut._exception
if (isinstance(exc, BaseException) and not isinstance(exc, Exception)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()
在回調(diào)函數(shù)中當(dāng)沒(méi)有協(xié)程運(yùn)行的時(shí)候會(huì)將loop即事件循環(huán)直接暫停。
asyncio會(huì)將loop放到future中,而future同樣會(huì)被放到loop中。因此我們可以在任何一個(gè)任務(wù)中停止掉 loop
如何取消協(xié)程中的task(future)
import asyncio
async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))
if __name__ == "__main__":
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)
tasks = [task1, task2, task3]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
# 我們發(fā)送一個(gè) controle + c 異常
except KeyboardInterrupt as e:
# 獲得所有的task
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")
# 將task 取消 返回布爾值
print(task.cancel())
# 先將 loop 暫停
loop.stop()
# 記得將 loop 再次運(yùn)行 run_forever 否則將報(bào)錯(cuò)
loop.run_forever()
finally:
# 最后 關(guān)閉 loop
loop.close()
有咩有小伙伴對(duì)all_tasks = asyncio.Task.all_tasks()這句代碼疑惑?

我們看了源碼就知道了 因?yàn)槿种挥幸粋€(gè)loop,所以能夠在任何位置輕松獲得loop相關(guān)的信息。
如何在協(xié)程中插入子協(xié)程
我們看一段官方文檔的代碼:
官方文檔叫chain coroutines鏈?zhǔn)絽f(xié)程?
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
compute() is chained to print_sum(): print_sum() coroutine waits until compute() is completed before returning its result.
序列圖:

圖中展示大致意思:當(dāng)我們運(yùn)行一個(gè)協(xié)程的時(shí)候,立即創(chuàng)建一個(gè)Task,由EventLoop驅(qū)動(dòng)Task,然后Task驅(qū)動(dòng)print_sum。當(dāng)協(xié)程中調(diào)用了另外一個(gè)子協(xié)程的時(shí)候,是直接由Task和子協(xié)程通信的。直至子協(xié)程運(yùn)行完畢拋出StopIteration異常,然后父協(xié)程會(huì)捕捉到異常并提取出結(jié)果,父協(xié)程運(yùn)行完畢,同樣拋出異常,逐層往上拋出然后終止Task。重點(diǎn)在于Task和子協(xié)程compute之間的通道,以及異常拋出攔截。
The “Task” is created by the
AbstractEventLoop.run_until_complete()method when it gets a coroutine object instead of a task.
意思是,圖中的Task并不是一個(gè)任務(wù)而是一個(gè)協(xié)程對(duì)象。
The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses
AbstractEventLoop.call_later()to wake up the task in 1 second.
意思是,圖大致講了如何在協(xié)程中調(diào)用子協(xié)程,但是內(nèi)部實(shí)現(xiàn)沒(méi)有體現(xiàn)出來(lái)。
例如:當(dāng)調(diào)用asyncio.sleep(1.0)的時(shí)候會(huì)創(chuàng)建一個(gè)內(nèi)部的future對(duì)象然后使用 AbstractEventLoop.call_later() 在一秒后喚醒任務(wù)。
asyncio中的其他函數(shù)
call_soon 函數(shù)
import asyncio
def callback(sleep_times):
print(f"success time {sleep_times}")
def stoploop(loop):
loop.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 這里傳入的是函數(shù)名稱(chēng) 不是協(xié)程 因?yàn)楹芏鄷r(shí)候 我們希望在循環(huán)體系中插入一個(gè)函數(shù)
# call_soon 是即刻執(zhí)行 比不是下一行代碼執(zhí)行 而是等到下一個(gè)循環(huán)的時(shí)候執(zhí)行
loop.call_soon(callback, 2)
# 停止時(shí)間循環(huán)
loop.call_soon(stoploop, loop)
# 因?yàn)槲覀儌魅氲牟皇菂f(xié)程 而是函數(shù) 因此啟動(dòng)要使用 run_forever
loop.run_forever()
call_later 函數(shù)
import asyncio
def callback(sleep_times):
print(f"success time {sleep_times}")
def stoploop(loop):
loop.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# call_later 是延遲調(diào)用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.run_forever()
# 輸出
success time 1
success time 2
success time 3
從輸出看出 call_later并不是根據(jù)添加的順序執(zhí)行的 而是根據(jù)延遲的時(shí)間。
為了進(jìn)一步比較call_later和call_soon的區(qū)別我們看下下面代碼的輸出
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# call_later 是延遲調(diào)用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
# 輸出
success time 4
success time 1
success time 2
success time 3
我們看到call_soon執(zhí)行是比call_later要早的 是下個(gè)循環(huán)立即執(zhí)行
call_at函數(shù)
call_at函數(shù)可以讓我們指定時(shí)間運(yùn)行回調(diào)函數(shù),這里的時(shí)間是 loop里面的時(shí)間 不是傳統(tǒng)的時(shí)間
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 獲得loop的當(dāng)前時(shí)間
loop_time = loop.time()
# 使用 call_at 在 當(dāng)前時(shí)間的基礎(chǔ)上 延遲幾秒執(zhí)行回調(diào)
loop.call_at(loop_time + 2, callback, 2)
loop.call_at(loop_time + 1, callback, 1)
loop.call_at(loop_time + 3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
# 輸出
success time 4
success time 1
success time 2
success time 3
call_soon_threadsafe 函數(shù)
這是一個(gè)線程安全的函數(shù) 作用和 call_soon一樣
asyncio是可以在多線程環(huán)境下運(yùn)行的,asyncio是一整套的異步IO解決方案,不僅可以解決協(xié)程調(diào)度問(wèn)題,還可以解決線程、進(jìn)程問(wèn)題。
def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_tracebac
# 又這個(gè)函數(shù)實(shí)現(xiàn)線程安全的
self._write_to_self()
return handle
當(dāng)我們?cè)诙嗑€程中 多個(gè)回調(diào)函數(shù)使用了一個(gè)變量 可以使用這個(gè)來(lái)保證線程安全