asyncio 學(xué)習(xí)(1)

asyncio 真的是好復(fù)雜,官方文檔也只是在說(shuō)明一些概念,看了也不懂??。

大致看了一些 asyncio 的源碼,也看了一些文章,決定自己試著實(shí)現(xiàn)一個(gè) asyncio 。

  • 事件循環(huán)

從 asyncio 的 api 來(lái)看,它實(shí)現(xiàn)了一個(gè)事件循環(huán)。事件循環(huán)的基本思想可以用下面這段偽代碼來(lái)表示。

class EventLoop:
    def register(self, event, callback):
        self._callbacks[event] = callback
    
    def start(self):
        while True:
            # 等待事件發(fā)生
            events = self.wait()
            for event in events:
                self._callbacks[event]()

loop = EventLoop()
loop.register(event, callback)
loop.start()

可以分為這么幾部分:注冊(cè)事件回調(diào)、事件循環(huán);事件循環(huán)包括事件等待、執(zhí)行事件回調(diào)。

我們要記住這個(gè)核心邏輯,事件循環(huán)的實(shí)現(xiàn)便是在這上面添磚加瓦,暴露出不同的api給開(kāi)發(fā)者使用。

  • hello world

我們先來(lái)看 asyncio 的 hello world 例子,今天這邊文章便是要實(shí)現(xiàn)這個(gè)例子

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

我把自己的包命名為 ayqio ,也就是說(shuō)我們要實(shí)現(xiàn) ayqio.sleepayqio.run

這個(gè)例子所等待的事件是個(gè)時(shí)間事件,我們先實(shí)現(xiàn)時(shí)間事件相關(guān)的邏輯。

其中有很多 api 我會(huì)狠狠地復(fù)刻 asyncio 的(不用自己起名字真爽( ̄_, ̄ ))。

先實(shí)現(xiàn)一個(gè)處理時(shí)間事件的事件循環(huán)。

邏輯是這樣的:我們先注冊(cè)執(zhí)行時(shí)間和回調(diào),在事件循環(huán)中遍歷時(shí)間事件,獲得最小等待時(shí)間,然后 time.sleep 到指定時(shí)間即可。這里我們使用最小堆排序時(shí)間事件。

import heapq
import time
import datetime
import typing

def log(msg):
    print(f"[{datetime.datetime.now()}]{msg}")

class TimerHandle(object):
    """定時(shí)處理器"""
    def __init__(self, when: float, callback, *args):
        self.when = when
        self._callback = callback
        self._args = args

    def run(self):
        """執(zhí)行回調(diào)"""
        log(f"run callback {self._callback}, args: {self._args}")
        if self._callback is not None:
            self._callback(*self._args)

class EventLoop(object):
    _instance = None

    def __init__(self):
        # 調(diào)度的定時(shí)處理器
        self._scheduled: typing.List[TimerHandle] = []
        # 就緒的定時(shí)處理器
        self._ready: typing.List[TimerHandle] = []
        self._stopping = False

    def run_forever(self):
        while True:
            self._run_once()
            if self._stopping:
                break

    def _run_once(self):
        """進(jìn)行一次事件循環(huán)"""
        # 計(jì)算下次事件循環(huán)等待事件
        timeout: typing.Optional[float] = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            when = self._scheduled[0].when
            # 0 <= timeout
            timeout = max(0, when - self.time())

        if timeout:
            time.sleep(timeout)

        # 尋找就緒的定時(shí)處理器
        end_time = self.time()
        while self._scheduled:
            handle = self._scheduled[0]
            if handle.when > end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)
        # 執(zhí)行定時(shí)處理器
        # 統(tǒng)一一個(gè)地方執(zhí)行定時(shí)處理器,循環(huán)過(guò)程中增加的就緒處理器放到下次循環(huán)進(jìn)行
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.pop(0)
            handle.run()

    def time(self):
        return time.monotonic()

    def call_later(self, delay: typing.Union[int, float], callback, *args):
        """在 delay 秒后執(zhí)行回調(diào)"""
        return self.call_at(self.time() + delay, callback, *args)

    def call_at(self, when: float, callback, *args):
        """在 when 時(shí)間執(zhí)行回調(diào)"""
        timer = TimerHandle(when, callback, *args)
        # 定時(shí)事件回調(diào)使用最小堆排序
        heapq.heappush(self._scheduled, timer)
        return timer

    def call_soon(self, callback, *args):
        """盡快執(zhí)行回調(diào)"""
        return self.call_at(self.time(), callback, *args)

    def stop(self):
        self._stopping = True

# 全局的事件循環(huán)
_event_loop = EventLoop()

def get_event_loop():
    return _event_loop

def run(coroutine):
    loop = get_event_loop()
    loop._run_once()

接下來(lái),我們?cè)谕粋€(gè)目錄下新建 helloworld.py文件

import ayqio

def world():
    print('... World!')

def main():
    print('Hello ...')
    loop = ayqio.get_event_loop()
    loop.call_later(1, world)

ayqio.run(main())

執(zhí)行 helloworld.py 文件后,我們可以看到 print('... World!') 在等待了1秒之后才執(zhí)行。

總之,我們實(shí)現(xiàn)了一個(gè)可以處理時(shí)間事件的事件循環(huán)。后面我們要改造成跟 asyncio 一樣的 api 。

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

我們要來(lái)拆解這個(gè)例子的邏輯,將它與我們上面說(shuō)的事件循環(huán)基本思想結(jié)合起來(lái)。

事件循環(huán)先是要注冊(cè)事件回調(diào),而在上面的代碼里,便是 await asyncio.sleep(1) 注冊(cè)了一個(gè)時(shí)間事件,時(shí)間事件觸發(fā)后,要執(zhí)行 print('... World!'),也就是要驅(qū)動(dòng)協(xié)程往下走。流程如下

注冊(cè)時(shí)間事件回調(diào) --> 等待 1s(sleep) --> 調(diào)用回調(diào) --> 驅(qū)動(dòng)協(xié)程 (main)

寫成代碼就是

coroutine = main()
obj = coroutine.send(None)  # obj 由 sleep 返回
obj.add_coroutine_callback(coroutine)

一方面協(xié)程需要激活,執(zhí)行到 await 處,另一方面協(xié)程執(zhí)行完成后會(huì)拋出異常 StopIteration,因此我們把協(xié)程封裝起來(lái)。上面代碼中的 obj 我們參照 python 原有的 Future。完整代碼如下

# Future 狀態(tài)
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'


class Future(object):
    def __init__(self, loop):
        self._result = None
        self._exception = None
        self._state = _PENDING
        self._callbacks = []
        self._loop = loop

    def result(self):
        if self._state != _FINISHED:
            raise ValueError('Result is not ready.')
        if self._exception is not None:
            raise self._exception

        return self._result

    def set_result(self, result):
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()

    def set_exception(self, exception):
        self._exception = exception
        self._state = _FINISHED
        self.__schedule_callbacks()

    def add_done_callback(self, callback):
        self._callbacks.append(callback)

    def __schedule_callbacks(self):
        """把回調(diào)交給事件循環(huán)執(zhí)行"""
        callbacks = self._callbacks[:]
        if not callbacks:
            return
        # 清空回調(diào)
        self._callbacks[:] = []
        for callback in callbacks:
            self._loop.call_soon(callback, self)

    def __await__(self):
        """成為 awiatable 對(duì)象,可以使用 await """
        # 返回 future ,注冊(cè)回調(diào)驅(qū)動(dòng)協(xié)程
        yield self
        # 返回結(jié)果
        return self._result


class Task(object):
    """驅(qū)動(dòng)協(xié)程執(zhí)行"""
    def __init__(self, coroutine, loop):
        self._loop = loop
        self._coroutine = coroutine
        # 激活協(xié)程
        self._loop.call_soon(self.__step)

    def __step(self, exc=None):
        try:
            if exc is None:
                result = self._coroutine.send(None)
            else:
                result = self._coroutine.throw(exc)
        except StopIteration as e:
            log(f"coroutine {self._coroutine} finish, return {e.value}")
        except Exception as e:
            log(f"coroutine {self._coroutine} step catch e: {e}")
        else:
            result.add_done_callback(self.__wakeup)

    def __wakeup(self, future):
        try:
            future.result()
        except Exception as e:
            self.__step(e)
        else:
            self.__step()

下面我們來(lái)實(shí)現(xiàn) sleep 邏輯,它要注冊(cè)時(shí)間事件回調(diào),返回一個(gè) Future 對(duì)象。同時(shí)我們將創(chuàng)建 FutureTask 封裝到 EventLoop 中。

class EventLoop(object):

    def create_future(self):
        return Future(self)
    
    def create_task(self, coroutine):
        return Task(coroutine, self)
    
# 標(biāo)準(zhǔn)庫(kù) asyncio 里面的 sleep 是個(gè)協(xié)程,這里實(shí)現(xiàn)的只是一個(gè)普通的函數(shù)
# return 的 Future 就是一個(gè)協(xié)程,實(shí)現(xiàn)了 `__await__` 方法
def sleep(seconds):
    loop = get_event_loop()
    f = loop.create_future()
    loop.call_later(seconds, f.set_result, None)
    return f

事件循環(huán)還需要提供方法,用來(lái)驅(qū)動(dòng)協(xié)程直至協(xié)程結(jié)束。直接的思路便是注冊(cè)回調(diào),在協(xié)程執(zhí)行完成后執(zhí)行回調(diào)停止事件循環(huán),我們直接用 Task 繼承 Future 來(lái)實(shí)現(xiàn)。
完整的流程如下

  1. run 函數(shù)根據(jù)傳入的協(xié)程 main() 構(gòu)建 Task ;
  2. Task 的實(shí)例化過(guò)程 __init__ 注冊(cè)回調(diào)到事件循環(huán),這個(gè)回調(diào)負(fù)責(zé)驅(qū)動(dòng)協(xié)程,執(zhí)行到 sleep 處的 await ;
  3. 開(kāi)始事件循環(huán),執(zhí)行第 2 步中的回調(diào),即 Task.__step 方法;
  4. Task.__step 方法驅(qū)動(dòng)協(xié)程,執(zhí)行到 await sleep 處;
  5. sleep 構(gòu)建 Future ,注冊(cè)時(shí)間事件 Future.set_result 到事件循環(huán),返回構(gòu)建好的 Future
  6. 控制流回到 Task.__step 方法, result 接收 await sleep 返回的 Future ,同時(shí)將下一次協(xié)程的執(zhí)行 (Task.__wakeup) 注冊(cè)到 Future 的回調(diào);
  7. 控制流回到事件循環(huán),開(kāi)始下一次循環(huán);
  8. 1s 過(guò)后,事件循環(huán)執(zhí)行時(shí)間事件回調(diào) Future.set_result ,Future 開(kāi)始執(zhí)行它的回調(diào),即第 6 步中注冊(cè)的 Task.__wakeup ;
  9. Task.__wakeup 讀取執(zhí)行結(jié)果,驅(qū)動(dòng)協(xié)程執(zhí)行下一步,Future.__await__ 方法被執(zhí)行,返回 Futureresult ,回到 main 繼續(xù)向下執(zhí)行,此時(shí) main 協(xié)程執(zhí)行完畢,拋出 StopIteration ,在 Task.__step 方法中捕獲,調(diào)用 Task.set_result 方法,執(zhí)行 Task 本身的回調(diào),這個(gè)回調(diào)會(huì)停止事件循環(huán)。

完整代碼見(jiàn)

v1.0 · yeqing/ayqio - 碼云 - 開(kāi)源中國(guó) (gitee.com)

asyncio.sleep 還有個(gè)參數(shù)是 result ,這個(gè)我們直接加上即可。

如果我們?cè)诙鄬?shí)現(xiàn)幾個(gè) api ,還可以直接使用 asyncio.sleep 來(lái)運(yùn)行。

import ayqio
import asyncio


async def main():
    print('Hello ...')
    # 這里的 name 由 Future.__await__ 返回
    # name = await ayqio.sleep(1, "World")
    name = await asyncio.sleep(1, "World",  loop=ayqio.get_event_loop())
    print(f'... {name}!')

ayqio.run(main())
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容