asyncio并發(fā)編程-上

asyncioPython中解決異步I/O高并發(fā)的一個(gè)模塊。

asyncio的事件循環(huán)

我們先看下asyncio有哪些功能:

  1. 包含各種特定系統(tǒng)實(shí)現(xiàn)的模塊化事件循環(huán)(針對(duì)不同系統(tǒng)都能兼容的事件循環(huán):例如Windows下的select,linux下的epoll。)

  2. 傳輸和協(xié)議抽象(對(duì)TCP和UDP協(xié)議的抽象)

  3. 對(duì)TCP、UDP、SSL、子進(jìn)程、延時(shí)調(diào)用以及其他的具體支持

  4. 模仿futures模塊但適用于事件循環(huán)使用的Future類(lèi)

  5. 基于yield from的協(xié)議和任務(wù),可以讓我們使用順序的方式編寫(xiě)并發(fā)代碼

  6. 必須使用一個(gè)將產(chǎn)生阻塞IO的調(diào)用時(shí),有接口可以把這個(gè)事件遷移到線程池

  7. 模仿threading模塊中的同步原語(yǔ),可以用在單線程內(nèi)的協(xié)程之間

前面我們學(xué)習(xí)了協(xié)程,但是協(xié)程脫離事件循環(huán)意義就不是很大了。

下面我們開(kāi)始學(xué)習(xí)asyncio的使用吧!??

首先我們明確一點(diǎn),高并發(fā)異步IO編程的編碼模式由三部分組成:

事件循環(huán)+回調(diào)(驅(qū)動(dòng)生成器)+epoll(IO多路復(fù)用)

asyncioPython用于解決異步io編程的一整套解決方案

有趣的小知識(shí):

tornado也是基于asyncio的異步框架,通過(guò)協(xié)程和事件循環(huán)來(lái)完成高并發(fā)。相對(duì)于DjangoFlask這種傳統(tǒng)的阻塞IO框架本身不提供web服務(wù)器,不會(huì)去完成Socket編碼的,因此我們?cè)诓渴鸬臅r(shí)候會(huì)搭配實(shí)現(xiàn)了SOcket編碼的框架(uwsgi, gunicorn+nginx)。Tornado實(shí)現(xiàn)了自己的web服務(wù)器,因此我們部署Tornado的時(shí)候是可以直接部署的(會(huì)使用epoll來(lái)完成socket請(qǐng)求),但是真正部署的時(shí)候,還是會(huì)使用nginx來(lái)完成一些操作(IP限制等)。因此Tornado的數(shù)據(jù)庫(kù)驅(qū)動(dòng)就不能使用阻塞IO驅(qū)動(dòng)框架了。

asyncio的簡(jiǎn)單使用:

協(xié)程要搭配事件循環(huán)才能使用

import asyncio
import time


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    
    # 我們使用 asyncio 實(shí)現(xiàn)的事件循環(huán) 這個(gè)loop就可完成 之前我們自己實(shí)現(xiàn)的 事件循環(huán) select 的操作
    loop = asyncio.get_event_loop()
    
    # 可以使用 run_until_complete 進(jìn)行協(xié)程的調(diào)用 這是一個(gè)阻塞函數(shù) 可以理解為多線程編程中的jion方法 然后把 asyncio理解為協(xié)程池
    loop.run_until_complete(get_html("http://www.imooc.com"))
    print(time.time()-start_time)
    
# 輸出
start get url
end get url
2.0019102096557617

我們可以同時(shí)執(zhí)行多個(gè)協(xié)程,傳入一個(gè)可迭代的任務(wù)對(duì)象

import asyncio
import time


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    # 這個(gè) tasks 可以是不同的協(xié)程
    tasks = [get_html("http://www.imooc.com") for i in range(10)]
    
    # asyncio.wait()函數(shù)會(huì)接收一個(gè)可迭代對(duì)象
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time()-start_time)

# 輸出就不打印了 耗時(shí)大概兩秒

注意:在協(xié)程中不能使用同步的時(shí)間睡眠 time.sleep(),否則當(dāng)執(zhí)行的協(xié)程超過(guò)一個(gè)的時(shí)候就會(huì)出現(xiàn)同步阻塞的情況。

要是哪個(gè)小伙伴想測(cè)試下上面那句話,可以將上面的代碼await asyncio.sleep(2)改為time.sleep(2)你會(huì)發(fā)現(xiàn)運(yùn)行的時(shí)間不再是兩秒了,而是20+秒。

為什么不能再協(xié)程使用同步的sleep呢?

這就要說(shuō)到我們的loop小朋友了,協(xié)程要配合事件循環(huán)的,我們?cè)谶\(yùn)行協(xié)程的時(shí)候當(dāng)遇到await關(guān)鍵字就知道這是一個(gè)異步阻塞操作了,會(huì)在此處暫停返回一個(gè)Future對(duì)象,然后由loop小朋友再執(zhí)行已經(jīng)可以運(yùn)行的協(xié)程。這樣保證了能夠異步執(zhí)行操作。當(dāng)我們直接在協(xié)程中使用sleep同步操作時(shí)候,不會(huì)暫停而是一直等待,這就是原因??

如何獲得協(xié)程的返回值呢?
import asyncio
import time


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    return "紅燒肉"


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    # 這里使用 asyncio.ensure_future 來(lái)獲得一個(gè)future對(duì)象 是不是很像多線程編程中的 submit
    get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
    
        # 也可以使用 loop 的 create_task 兩者用法一樣
    # task = loop.create_task()
    # task 是 future 的子類(lèi)
    
    # 可以將future 對(duì)象傳入到 run_until_complete
    loop.run_until_complete(get_future)
    # 通過(guò) future 對(duì)象的 result函數(shù)獲得結(jié)果
    print(get_future.result())
    
# 輸出
start get url
紅燒肉

上面的代碼還可以這么寫(xiě)

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
    task = loop.create_task(get_html("http://www.imooc.com"))
    loop.run_until_complete(task)
    print(task.result())

我們看到使用loop.create_taskasyncio.ensure_future是一樣的效果,具體區(qū)別我們稍后會(huì)學(xué)習(xí)到。??

有沒(méi)有小伙伴懷疑,當(dāng)使用asyncio.ensure_future的時(shí)候是何時(shí)和我們創(chuàng)建的loop建立聯(lián)系的呢,是在loop.run_until_complete(get_future)的時(shí)候嗎?

讓我們看下ensure_future的源碼:

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        # 看這里 當(dāng)沒(méi)有l(wèi)oop傳入的時(shí)候,會(huì)獲得當(dāng)前l(fā)oop 因?yàn)榫€程中只有這個(gè)一個(gè) loop 這里啟動(dòng)loop和外層代碼的loop是同一個(gè)loop
        if loop is None:
            loop = events.get_event_loop()
            
        # 我們看到 內(nèi)部同樣是使用 create_task
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('A Future, a coroutine or an awaitable is required')

除了上面直接調(diào)用協(xié)程,我們還可以在協(xié)程執(zhí)行完成之后進(jìn)行一個(gè)回調(diào)。

import asyncio
import time
from functools import partial


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    return "紅燒肉"


# 當(dāng)我們想要在 回調(diào)函數(shù)中傳遞參數(shù)的時(shí)候  注意 future 參數(shù)寫(xiě)在最后
def callback(url, future):
    print(url)
    print("send email to 紅燒肉")


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
    task = loop.create_task(get_html("http://www.imooc.com"))
    
    task.add_done_callback(partial(callback, "http://www.imooc.com"))
    
    loop.run_until_complete(task)
    
    print(task.result())

我們使用partial將傳入的參數(shù),偽造成一個(gè)函數(shù)。

回調(diào)函數(shù)會(huì)默認(rèn)接收一個(gè) future 對(duì)象參數(shù)

wait和gather

我們上面已經(jīng)使用了wait來(lái)進(jìn)行多協(xié)程的運(yùn)行,我們看下它的源碼:


FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED


@coroutine
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the Futures and coroutines given by fs to complete.

    The sequence futures must not be empty.

    Coroutines will be wrapped in Tasks.

    Returns two sets of Future: (done, pending).

    Usage:

        done, pending = yield from asyncio.wait(fs)

    Note: This does not raise TimeoutError! Futures that aren't done
    when the timeout occurs are returned in the second set.
    """
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
    if not fs:
        raise ValueError('Set of coroutines/Futures is empty.')
    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
        raise ValueError('Invalid return_when value: {}'.format(return_when))

    if loop is None:
        loop = events.get_event_loop()

    fs = {ensure_future(f, loop=loop) for f in set(fs)}

    return (yield from _wait(fs, timeout, return_when, loop))

這個(gè)wait我們理解為多線程中的wait,同樣存在return_when參數(shù),可以指定何時(shí)返回。

gather如何使用呢?

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html("http://www.imooc.com") for i in range(10)]
    loop.run_until_complete(asyncio.gather(*tasks))
    print(time.time()-start_time)

我們將wait直接修改為gather然后可迭代對(duì)象加上*即可。

兩者的區(qū)別是什么呢?

  1. gather更加hight-level
  2. gather可以將協(xié)程分組
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.imooc.com") for i in range(2)]

# 我們可以分組傳遞
loop.run_until_complete(asyncio.gather(*group1, *group2))

# 我們可以將先進(jìn)行g(shù)ather操作
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
loop.run_until_complete(asyncio.gather(group1, group2))

# 我們可以批量取消某個(gè)分組
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)

group2.cancel()
task取消和子協(xié)程調(diào)用原理

我們先看下run_until_completerun_forever兩個(gè)函數(shù)的區(qū)別。

run_until_complete在運(yùn)行完指定的協(xié)程之后就會(huì)停止,而run_forever則會(huì)一直運(yùn)行。

看下源碼:

image.png

在圖片中我們看到run_until_complete里面同樣使用了run_forever。但是,增加了一個(gè)回調(diào)_run_until_complete_cb

def _run_until_complete_cb(fut):
    exc = fut._exception
    if (isinstance(exc, BaseException) and not isinstance(exc, Exception)):
        # Issue #22429: run_forever() already finished, no need to
        # stop it.
        return
    fut._loop.stop()

在回調(diào)函數(shù)中當(dāng)沒(méi)有協(xié)程運(yùn)行的時(shí)候會(huì)將loop即事件循環(huán)直接暫停。

asyncio會(huì)將loop放到future中,而future同樣會(huì)被放到loop中。

因此我們可以在任何一個(gè)任務(wù)中停止掉 loop

如何取消協(xié)程中的task(future)
import asyncio


async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after {}s".format(sleep_times))


if __name__ == "__main__":
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    tasks = [task1, task2, task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    # 我們發(fā)送一個(gè) controle + c 異常
    except KeyboardInterrupt as e:
        # 獲得所有的task
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            
            # 將task 取消 返回布爾值
            print(task.cancel())
        
                # 先將 loop 暫停
        loop.stop()
        
        # 記得將 loop 再次運(yùn)行 run_forever 否則將報(bào)錯(cuò)
        loop.run_forever()
    finally:
        # 最后 關(guān)閉 loop
        loop.close()

有咩有小伙伴對(duì)all_tasks = asyncio.Task.all_tasks()這句代碼疑惑?

image.png

我們看了源碼就知道了 因?yàn)槿种挥幸粋€(gè)loop,所以能夠在任何位置輕松獲得loop相關(guān)的信息。

如何在協(xié)程中插入子協(xié)程

我們看一段官方文檔的代碼:

官方文檔叫chain coroutines鏈?zhǔn)絽f(xié)程?

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute() is chained to print_sum(): print_sum() coroutine waits until compute() is completed before returning its result.

序列圖:

image.png

圖中展示大致意思:當(dāng)我們運(yùn)行一個(gè)協(xié)程的時(shí)候,立即創(chuàng)建一個(gè)Task,由EventLoop驅(qū)動(dòng)Task,然后Task驅(qū)動(dòng)print_sum。當(dāng)協(xié)程中調(diào)用了另外一個(gè)子協(xié)程的時(shí)候,是直接由Task和子協(xié)程通信的。直至子協(xié)程運(yùn)行完畢拋出StopIteration異常,然后父協(xié)程會(huì)捕捉到異常并提取出結(jié)果,父協(xié)程運(yùn)行完畢,同樣拋出異常,逐層往上拋出然后終止Task。重點(diǎn)在于Task和子協(xié)程compute之間的通道,以及異常拋出攔截。

The “Task” is created by the AbstractEventLoop.run_until_complete() method when it gets a coroutine object instead of a task.

意思是,圖中的Task并不是一個(gè)任務(wù)而是一個(gè)協(xié)程對(duì)象。

The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses AbstractEventLoop.call_later() to wake up the task in 1 second.

意思是,圖大致講了如何在協(xié)程中調(diào)用子協(xié)程,但是內(nèi)部實(shí)現(xiàn)沒(méi)有體現(xiàn)出來(lái)。

例如:當(dāng)調(diào)用asyncio.sleep(1.0)的時(shí)候會(huì)創(chuàng)建一個(gè)內(nèi)部的future對(duì)象然后使用 AbstractEventLoop.call_later() 在一秒后喚醒任務(wù)。

asyncio中的其他函數(shù)
call_soon 函數(shù)
import asyncio


def callback(sleep_times):
    print(f"success time {sleep_times}")


def stoploop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 這里傳入的是函數(shù)名稱(chēng) 不是協(xié)程 因?yàn)楹芏鄷r(shí)候 我們希望在循環(huán)體系中插入一個(gè)函數(shù)
    # call_soon 是即刻執(zhí)行 比不是下一行代碼執(zhí)行 而是等到下一個(gè)循環(huán)的時(shí)候執(zhí)行
    loop.call_soon(callback, 2)

    # 停止時(shí)間循環(huán)
    loop.call_soon(stoploop, loop)

    # 因?yàn)槲覀儌魅氲牟皇菂f(xié)程 而是函數(shù) 因此啟動(dòng)要使用  run_forever
    loop.run_forever()
call_later 函數(shù)
import asyncio


def callback(sleep_times):
    print(f"success time {sleep_times}")


def stoploop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # call_later 是延遲調(diào)用
    loop.call_later(2, callback, 2)
    loop.call_later(1, callback, 1)
    loop.call_later(3, callback, 3)

    loop.run_forever()
    
# 輸出
success time 1
success time 2
success time 3

從輸出看出 call_later并不是根據(jù)添加的順序執(zhí)行的 而是根據(jù)延遲的時(shí)間。

為了進(jìn)一步比較call_latercall_soon的區(qū)別我們看下下面代碼的輸出

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # call_later 是延遲調(diào)用
    loop.call_later(2, callback, 2)
    loop.call_later(1, callback, 1)
    loop.call_later(3, callback, 3)

    loop.call_soon(callback, 4)

    loop.run_forever()
# 輸出

success time 4
success time 1
success time 2
success time 3

我們看到call_soon執(zhí)行是比call_later要早的 是下個(gè)循環(huán)立即執(zhí)行

call_at函數(shù)

call_at函數(shù)可以讓我們指定時(shí)間運(yùn)行回調(diào)函數(shù),這里的時(shí)間是 loop里面的時(shí)間 不是傳統(tǒng)的時(shí)間

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
        
        # 獲得loop的當(dāng)前時(shí)間
    loop_time = loop.time()

    # 使用 call_at 在 當(dāng)前時(shí)間的基礎(chǔ)上 延遲幾秒執(zhí)行回調(diào)
    loop.call_at(loop_time + 2, callback, 2)
    loop.call_at(loop_time + 1, callback, 1)
    loop.call_at(loop_time + 3, callback, 3)

    loop.call_soon(callback, 4)

    loop.run_forever()
    
# 輸出
success time 4
success time 1
success time 2
success time 3
call_soon_threadsafe 函數(shù)

這是一個(gè)線程安全的函數(shù) 作用和 call_soon一樣

asyncio是可以在多線程環(huán)境下運(yùn)行的,asyncio是一整套的異步IO解決方案,不僅可以解決協(xié)程調(diào)度問(wèn)題,還可以解決線程、進(jìn)程問(wèn)題。

def call_soon_threadsafe(self, callback, *args):
    """Like call_soon(), but thread-safe."""
    self._check_closed()
    if self._debug:
        self._check_callback(callback, 'call_soon_threadsafe')
    handle = self._call_soon(callback, args)
    if handle._source_traceback:
        del handle._source_tracebac
    # 又這個(gè)函數(shù)實(shí)現(xiàn)線程安全的
    self._write_to_self()
    return handle

當(dāng)我們?cè)诙嗑€程中 多個(gè)回調(diào)函數(shù)使用了一個(gè)變量 可以使用這個(gè)來(lái)保證線程安全

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容