Python 學習筆記19 - 異步IO

一種解決IO問題的方法是異步IO。當代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令,并不等待IO結(jié)果,然后就去執(zhí)行其他代碼了。一段時間后,當IO返回結(jié)果時,再通知CPU進行處理

消息模型是如何解決同步IO必須等待IO操作這一問題的呢?當遇到IO操作時,代碼只負責發(fā)出IO請求,不等待IO結(jié)果,然后直接結(jié)束本輪消息處理,進入下一輪消息處理過程。當IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果。

在“發(fā)出IO請求”到收到“IO完成”的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力。

異步IO模型需要一個消息循環(huán),在消息循環(huán)中,主線程不斷地重復“讀取消息-處理消息”這一過程:

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

協(xié)程

協(xié)程,又稱微線程,纖程。英文名Coroutine

子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。
所以子程序調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。
子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。

協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當?shù)臅r候再返回來接著執(zhí)行。
在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。

子程序就是協(xié)程的一種特例

和多線程比,協(xié)程的優(yōu)勢在于:

  • 協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯
  • 不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多

因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python 對協(xié)程的支持是通過 generator 實現(xiàn)的:
generator 中,我們不但可以通過 for 循環(huán)來迭代,還可以不斷調(diào)用 next() 函數(shù)獲取由 yield 語句返回的下一個值。
但是 Python 的 yield 不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)

傳統(tǒng)的生產(chǎn)者-消費者模型改用協(xié)程:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    # 調(diào)用c.send(None)啟動生成器
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

注意到 consumer 函數(shù)是一個 generator ,把一個 consumer 傳入 produce 后:

  1. 首先調(diào)用 c.send(None) 啟動生成器;
  2. 然后,一旦生產(chǎn)了東西,通過 c.send(n) 切換到 consumer 執(zhí)行;
  3. consumer 通過 yield 拿到消息,處理,又通過 yield 把結(jié)果傳回;
  4. produce 拿到 consumer 處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
  5. produce 決定不生產(chǎn)了,通過 c.close() 關(guān)閉 consumer ,整個過程結(jié)束。

asyncio

asyncio 的編程模型就是一個消息循環(huán)。我們從 asyncio 模塊中直接獲取一個 EventLoop 的引用,然后把需要執(zhí)行的協(xié)程扔到 EventLoop 中執(zhí)行,就實現(xiàn)了異步IO

import threading
import asyncio

# @asyncio.coroutine 把一個 generator 標記為 coroutine 類型
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    
    # 異步調(diào)用asyncio.sleep(1)
    # yield from 語法可以讓我們方便地調(diào)用另一個 generator
    # 由于 asyncio.sleep() 也是一個 coroutine ,所以線程不會等待 asyncio.sleep() ,而是直接中斷并執(zhí)行下一個消息循環(huán)
    # 當 asyncio.sleep() 返回時,線程就可以從 yield from 拿到返回值(此處是None),然后接著執(zhí)行下一行語句
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

# 獲取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 把這個 coroutine 扔到 EventLoop 中執(zhí)行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

asyncio 的異步網(wǎng)絡(luò)連接來獲取sina、sohu和163的網(wǎng)站首頁:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

async/await

asyncio 提供的 @asyncio.coroutine 可以把一個 generator 標記為 coroutine 類型,然后在 coroutine 內(nèi)部用 yield from 調(diào)用另一個 coroutine 實現(xiàn)異步操作

為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法 asyncawait ,可以讓 coroutine 的代碼更簡潔易讀

要使用新的語法,只需要做兩步簡單的替換:

  1. @asyncio.coroutine 替換為 async
  2. yield from 替換為 await

對比一下上一節(jié)的代碼:

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

用新語法重新編寫如下:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

aiohttp

asyncio可以實現(xiàn)單線程并發(fā)IO操作

如果把asyncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實現(xiàn)多用戶的高并發(fā)支持

asyncio實現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實現(xiàn)的HTTP框架

$ pip install aiohttp
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容