asyncio 真的是好復(fù)雜,官方文檔也只是在說(shuō)明一些概念,看了也不懂??。
大致看了一些 asyncio 的源碼,也看了一些文章,決定自己試著實(shí)現(xiàn)一個(gè) asyncio 。
- 事件循環(huán)
從 asyncio 的 api 來(lái)看,它實(shí)現(xiàn)了一個(gè)事件循環(huán)。事件循環(huán)的基本思想可以用下面這段偽代碼來(lái)表示。
class EventLoop:
def register(self, event, callback):
self._callbacks[event] = callback
def start(self):
while True:
# 等待事件發(fā)生
events = self.wait()
for event in events:
self._callbacks[event]()
loop = EventLoop()
loop.register(event, callback)
loop.start()
可以分為這么幾部分:注冊(cè)事件回調(diào)、事件循環(huán);事件循環(huán)包括事件等待、執(zhí)行事件回調(diào)。
我們要記住這個(gè)核心邏輯,事件循環(huán)的實(shí)現(xiàn)便是在這上面添磚加瓦,暴露出不同的api給開(kāi)發(fā)者使用。
- hello world
我們先來(lái)看 asyncio 的 hello world 例子,今天這邊文章便是要實(shí)現(xiàn)這個(gè)例子
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
我把自己的包命名為 ayqio ,也就是說(shuō)我們要實(shí)現(xiàn) ayqio.sleep 和 ayqio.run 。
這個(gè)例子所等待的事件是個(gè)時(shí)間事件,我們先實(shí)現(xiàn)時(shí)間事件相關(guān)的邏輯。
其中有很多 api 我會(huì)狠狠地復(fù)刻 asyncio 的(不用自己起名字真爽( ̄_, ̄ ))。
先實(shí)現(xiàn)一個(gè)處理時(shí)間事件的事件循環(huán)。
邏輯是這樣的:我們先注冊(cè)執(zhí)行時(shí)間和回調(diào),在事件循環(huán)中遍歷時(shí)間事件,獲得最小等待時(shí)間,然后 time.sleep 到指定時(shí)間即可。這里我們使用最小堆排序時(shí)間事件。
import heapq
import time
import datetime
import typing
def log(msg):
print(f"[{datetime.datetime.now()}]{msg}")
class TimerHandle(object):
"""定時(shí)處理器"""
def __init__(self, when: float, callback, *args):
self.when = when
self._callback = callback
self._args = args
def run(self):
"""執(zhí)行回調(diào)"""
log(f"run callback {self._callback}, args: {self._args}")
if self._callback is not None:
self._callback(*self._args)
class EventLoop(object):
_instance = None
def __init__(self):
# 調(diào)度的定時(shí)處理器
self._scheduled: typing.List[TimerHandle] = []
# 就緒的定時(shí)處理器
self._ready: typing.List[TimerHandle] = []
self._stopping = False
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def _run_once(self):
"""進(jìn)行一次事件循環(huán)"""
# 計(jì)算下次事件循環(huán)等待事件
timeout: typing.Optional[float] = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0].when
# 0 <= timeout
timeout = max(0, when - self.time())
if timeout:
time.sleep(timeout)
# 尋找就緒的定時(shí)處理器
end_time = self.time()
while self._scheduled:
handle = self._scheduled[0]
if handle.when > end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 執(zhí)行定時(shí)處理器
# 統(tǒng)一一個(gè)地方執(zhí)行定時(shí)處理器,循環(huán)過(guò)程中增加的就緒處理器放到下次循環(huán)進(jìn)行
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.pop(0)
handle.run()
def time(self):
return time.monotonic()
def call_later(self, delay: typing.Union[int, float], callback, *args):
"""在 delay 秒后執(zhí)行回調(diào)"""
return self.call_at(self.time() + delay, callback, *args)
def call_at(self, when: float, callback, *args):
"""在 when 時(shí)間執(zhí)行回調(diào)"""
timer = TimerHandle(when, callback, *args)
# 定時(shí)事件回調(diào)使用最小堆排序
heapq.heappush(self._scheduled, timer)
return timer
def call_soon(self, callback, *args):
"""盡快執(zhí)行回調(diào)"""
return self.call_at(self.time(), callback, *args)
def stop(self):
self._stopping = True
# 全局的事件循環(huán)
_event_loop = EventLoop()
def get_event_loop():
return _event_loop
def run(coroutine):
loop = get_event_loop()
loop._run_once()
接下來(lái),我們?cè)谕粋€(gè)目錄下新建 helloworld.py文件
import ayqio
def world():
print('... World!')
def main():
print('Hello ...')
loop = ayqio.get_event_loop()
loop.call_later(1, world)
ayqio.run(main())
執(zhí)行 helloworld.py 文件后,我們可以看到 print('... World!') 在等待了1秒之后才執(zhí)行。
總之,我們實(shí)現(xiàn)了一個(gè)可以處理時(shí)間事件的事件循環(huán)。后面我們要改造成跟 asyncio 一樣的 api 。
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
我們要來(lái)拆解這個(gè)例子的邏輯,將它與我們上面說(shuō)的事件循環(huán)基本思想結(jié)合起來(lái)。
事件循環(huán)先是要注冊(cè)事件回調(diào),而在上面的代碼里,便是 await asyncio.sleep(1) 注冊(cè)了一個(gè)時(shí)間事件,時(shí)間事件觸發(fā)后,要執(zhí)行 print('... World!'),也就是要驅(qū)動(dòng)協(xié)程往下走。流程如下
注冊(cè)時(shí)間事件回調(diào) --> 等待 1s(sleep) --> 調(diào)用回調(diào) --> 驅(qū)動(dòng)協(xié)程 (main)
寫成代碼就是
coroutine = main()
obj = coroutine.send(None) # obj 由 sleep 返回
obj.add_coroutine_callback(coroutine)
一方面協(xié)程需要激活,執(zhí)行到 await 處,另一方面協(xié)程執(zhí)行完成后會(huì)拋出異常 StopIteration,因此我們把協(xié)程封裝起來(lái)。上面代碼中的 obj 我們參照 python 原有的 Future。完整代碼如下
# Future 狀態(tài)
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'
class Future(object):
def __init__(self, loop):
self._result = None
self._exception = None
self._state = _PENDING
self._callbacks = []
self._loop = loop
def result(self):
if self._state != _FINISHED:
raise ValueError('Result is not ready.')
if self._exception is not None:
raise self._exception
return self._result
def set_result(self, result):
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def set_exception(self, exception):
self._exception = exception
self._state = _FINISHED
self.__schedule_callbacks()
def add_done_callback(self, callback):
self._callbacks.append(callback)
def __schedule_callbacks(self):
"""把回調(diào)交給事件循環(huán)執(zhí)行"""
callbacks = self._callbacks[:]
if not callbacks:
return
# 清空回調(diào)
self._callbacks[:] = []
for callback in callbacks:
self._loop.call_soon(callback, self)
def __await__(self):
"""成為 awiatable 對(duì)象,可以使用 await """
# 返回 future ,注冊(cè)回調(diào)驅(qū)動(dòng)協(xié)程
yield self
# 返回結(jié)果
return self._result
class Task(object):
"""驅(qū)動(dòng)協(xié)程執(zhí)行"""
def __init__(self, coroutine, loop):
self._loop = loop
self._coroutine = coroutine
# 激活協(xié)程
self._loop.call_soon(self.__step)
def __step(self, exc=None):
try:
if exc is None:
result = self._coroutine.send(None)
else:
result = self._coroutine.throw(exc)
except StopIteration as e:
log(f"coroutine {self._coroutine} finish, return {e.value}")
except Exception as e:
log(f"coroutine {self._coroutine} step catch e: {e}")
else:
result.add_done_callback(self.__wakeup)
def __wakeup(self, future):
try:
future.result()
except Exception as e:
self.__step(e)
else:
self.__step()
下面我們來(lái)實(shí)現(xiàn) sleep 邏輯,它要注冊(cè)時(shí)間事件回調(diào),返回一個(gè) Future 對(duì)象。同時(shí)我們將創(chuàng)建 Future 和 Task 封裝到 EventLoop 中。
class EventLoop(object):
def create_future(self):
return Future(self)
def create_task(self, coroutine):
return Task(coroutine, self)
# 標(biāo)準(zhǔn)庫(kù) asyncio 里面的 sleep 是個(gè)協(xié)程,這里實(shí)現(xiàn)的只是一個(gè)普通的函數(shù)
# return 的 Future 就是一個(gè)協(xié)程,實(shí)現(xiàn)了 `__await__` 方法
def sleep(seconds):
loop = get_event_loop()
f = loop.create_future()
loop.call_later(seconds, f.set_result, None)
return f
事件循環(huán)還需要提供方法,用來(lái)驅(qū)動(dòng)協(xié)程直至協(xié)程結(jié)束。直接的思路便是注冊(cè)回調(diào),在協(xié)程執(zhí)行完成后執(zhí)行回調(diào)停止事件循環(huán),我們直接用 Task 繼承 Future 來(lái)實(shí)現(xiàn)。
完整的流程如下
- run 函數(shù)根據(jù)傳入的協(xié)程 main() 構(gòu)建 Task ;
- Task 的實(shí)例化過(guò)程
__init__注冊(cè)回調(diào)到事件循環(huán),這個(gè)回調(diào)負(fù)責(zé)驅(qū)動(dòng)協(xié)程,執(zhí)行到 sleep 處的 await ; - 開(kāi)始事件循環(huán),執(zhí)行第 2 步中的回調(diào),即 Task.__step 方法;
- Task.__step 方法驅(qū)動(dòng)協(xié)程,執(zhí)行到 await sleep 處;
- sleep 構(gòu)建 Future ,注冊(cè)時(shí)間事件
Future.set_result到事件循環(huán),返回構(gòu)建好的Future; - 控制流回到
Task.__step方法, result 接收 await sleep 返回的 Future ,同時(shí)將下一次協(xié)程的執(zhí)行 (Task.__wakeup) 注冊(cè)到 Future 的回調(diào); - 控制流回到事件循環(huán),開(kāi)始下一次循環(huán);
- 1s 過(guò)后,事件循環(huán)執(zhí)行時(shí)間事件回調(diào)
Future.set_result,Future開(kāi)始執(zhí)行它的回調(diào),即第 6 步中注冊(cè)的Task.__wakeup; -
Task.__wakeup讀取執(zhí)行結(jié)果,驅(qū)動(dòng)協(xié)程執(zhí)行下一步,Future.__await__方法被執(zhí)行,返回Future的result,回到main繼續(xù)向下執(zhí)行,此時(shí)main協(xié)程執(zhí)行完畢,拋出StopIteration,在Task.__step方法中捕獲,調(diào)用Task.set_result方法,執(zhí)行Task本身的回調(diào),這個(gè)回調(diào)會(huì)停止事件循環(huán)。
完整代碼見(jiàn)
v1.0 · yeqing/ayqio - 碼云 - 開(kāi)源中國(guó) (gitee.com)
asyncio.sleep 還有個(gè)參數(shù)是 result ,這個(gè)我們直接加上即可。
如果我們?cè)诙鄬?shí)現(xiàn)幾個(gè) api ,還可以直接使用 asyncio.sleep 來(lái)運(yùn)行。
import ayqio
import asyncio
async def main():
print('Hello ...')
# 這里的 name 由 Future.__await__ 返回
# name = await ayqio.sleep(1, "World")
name = await asyncio.sleep(1, "World", loop=ayqio.get_event_loop())
print(f'... {name}!')
ayqio.run(main())