python-復(fù)盤-異步IO/協(xié)程/asyncio/async/await

異步IO

CPU的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等IO。在一個線程中,CPU執(zhí)行代碼的速度極快,然而,一旦遇到IO操作,如讀寫文件、發(fā)送網(wǎng)絡(luò)數(shù)據(jù)時,就需要等待IO操作完成,才能繼續(xù)進(jìn)行下一步操作。這種情況稱為同步IO
在IO操作的過程中,當(dāng)前線程被掛起,而其他需要CPU執(zhí)行的代碼就無法被當(dāng)前線程執(zhí)行了。

因?yàn)橐粋€IO操作就阻塞了當(dāng)前線程,導(dǎo)致其他代碼無法執(zhí)行,所以我們必須使用多線程或者多進(jìn)程來并發(fā)執(zhí)行代碼,為多個用戶服務(wù)。每個用戶都會分配一個線程,如果遇到IO導(dǎo)致線程被掛起,其他用戶的線程不受影響。

多線程和多進(jìn)程的模型雖然解決了并發(fā)問題,但是系統(tǒng)不能無上限地增加線程。由于系統(tǒng)切換線程的開銷也很大,所以,一旦線程數(shù)量過多,CPU的時間就花在線程切換上了,真正運(yùn)行代碼的時間就少了,結(jié)果導(dǎo)致性能嚴(yán)重下降。

由于我們要解決的問題是CPU高速執(zhí)行能力和IO設(shè)備的龜速嚴(yán)重不匹配,多線程和多進(jìn)程只是解決這一問題的一種方法。

另一種解決IO問題的方法是異步IO。當(dāng)代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令,并不等待IO結(jié)果,然后就去執(zhí)行其他代碼了。一段時間后,當(dāng)IO返回結(jié)果時,再通知CPU進(jìn)行處理。

可以想象如果按普通順序?qū)懗龅拇a實(shí)際上是沒法完成異步IO的:

do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 線程停在此處等待IO操作結(jié)果
# IO操作完成后線程才能繼續(xù)執(zhí)行:
do_some_code(r)

所以,同步IO模型的代碼是無法實(shí)現(xiàn)異步IO模型的。

異步IO模型需要一個消息循環(huán),在消息循環(huán)中,主線程不斷地重復(fù)“讀取消息-處理消息”這一過程:

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

消息模型其實(shí)早在應(yīng)用在桌面應(yīng)用程序中了。一個GUI程序的主線程就負(fù)責(zé)不停地讀取消息并處理消息。所有的鍵盤、鼠標(biāo)等消息都被發(fā)送到GUI程序的消息隊(duì)列中,然后由GUI程序的主線程處理。

由于GUI線程處理鍵盤、鼠標(biāo)等消息的速度非???,所以用戶感覺不到延遲。某些時候,GUI線程在一個消息處理的過程中遇到問題導(dǎo)致一次消息處理時間過長,此時,用戶會感覺到整個GUI程序停止響應(yīng)了,敲鍵盤、點(diǎn)鼠標(biāo)都沒有反應(yīng)。這種情況說明在消息模型中,處理一個消息必須非常迅速,否則,主線程將無法及時處理消息隊(duì)列中的其他消息,導(dǎo)致程序看上去停止響應(yīng)。

消息模型是如何解決同步IO必須等待IO操作這一問題的呢?當(dāng)遇到IO操作時,代碼只負(fù)責(zé)發(fā)出IO請求,不等待IO結(jié)果,然后直接結(jié)束本輪消息處理,進(jìn)入下一輪消息處理過程。當(dāng)IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果。

在“發(fā)出IO請求”到收到“IO完成”的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力。


協(xié)程

協(xié)程,又稱微線程,纖程。英文名Coroutine。

協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用。

子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。

所以子程序調(diào)用是通過棧實(shí)現(xiàn)的,一個線程就是執(zhí)行一個子程序。

子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同。

協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。

注意,在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點(diǎn)類似CPU的中斷。比如子程序A、B:

def A():
    print('1')
    print('2')
    print('3')

def B():
    print('x')
    print('y')
    print('z')

假設(shè)由協(xié)程執(zhí)行,在執(zhí)行A的過程中,可以隨時中斷,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A,結(jié)果可能是:

1
2
x
y
3
z

但是在A中是沒有調(diào)用B的,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。

看起來A、B的執(zhí)行有點(diǎn)像多線程,但協(xié)程的特點(diǎn)在于是一個線程執(zhí)行,那和多線程比,協(xié)程有何優(yōu)勢?

最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因?yàn)樽映绦蚯袚Q不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。

第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因?yàn)閰f(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python對協(xié)程的支持是通過generator實(shí)現(xiàn)的。

在generator中,我們不但可以通過for循環(huán)來迭代,還可以不斷調(diào)用next()函數(shù)獲取由yield語句返回的下一個值。

但是Python的yield不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)。

來看例子:

傳統(tǒng)的生產(chǎn)者-消費(fèi)者模型是一個線程寫消息,一個線程取消息,通過鎖機(jī)制控制隊(duì)列和等待,但一不小心就可能死鎖。

如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉(zhuǎn)到消費(fèi)者開始執(zhí)行,待消費(fèi)者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:

def consumer():       # 有yield的函數(shù)就是生成器,沒的跑
    r = 'what the fuck?'
    print(r)   # ??發(fā)送None時,函數(shù)從頭開始執(zhí)行的,到 yield r 停止,此后的send(xxx)都是從 n = yield 開始。記住,n = yield 是啟動點(diǎn), yield r 暫停點(diǎn),并返回yield r結(jié)果給produce函數(shù)
    while True:
        n = yield r        
# 注意,yield r 是代碼終止點(diǎn),n = yield是啟動點(diǎn),一個正常的循環(huán)??過程是從 n = yield開始執(zhí)行,到下面,執(zhí)行到r ='200k'后,再回到 yield r處暫停,此時暫停的yield r 應(yīng)該是經(jīng)過新的循環(huán),這里沒有 for in 函數(shù),但是,r最新的200k就是它的新循環(huán),所以此時yield r為200k時生成器程序也就是consumer停止,但是,新的yield r 要回給send,send發(fā)送消息,也會要求得到消息的結(jié)果
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)    # 
    n = 0     # 沒有上面的c.send,系統(tǒng)報錯can't send non-None value to a just-started generator
    while n < 5:   #  
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

執(zhí)行結(jié)果:

what the fuck?                 # 它只會出現(xiàn)一次
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函數(shù)是一個generator,把一個consumer傳入produce后:

首先調(diào)用c.send(None)啟動生成器;

然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;

consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回;

produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;

produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個過程結(jié)束。

send(vlaue)發(fā)送value,也會要求接收value的yield把最新的值再返回給它,通俗說,給 你一個消息,你得給我一個結(jié)果,這樣才公平

整個流程無鎖,由一個線程執(zhí)行,produceconsumer協(xié)作完成任務(wù),所以稱為“協(xié)程”,而非線程的搶占式多任務(wù)。

最后套用Donald Knuth的一句話總結(jié)協(xié)程的特點(diǎn):

“子程序就是協(xié)程的一種特例。”


asyncio

asyncio是Python 3.4版本引入的標(biāo)準(zhǔn)庫,直接內(nèi)置了對異步IO的支持。

asyncio的編程模型就是一個消息循環(huán)。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執(zhí)行的協(xié)程扔到EventLoop中執(zhí)行,就實(shí)現(xiàn)了異步IO。

asyncio實(shí)現(xiàn)Hello world代碼如下:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 異步調(diào)用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執(zhí)行coroutine
loop.run_until_complete(hello())
loop.close()

@asyncio.coroutine把一個generator標(biāo)記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執(zhí)行

hello()會首先打印出Hello world!,然后,yield from語法可以讓我們方便地調(diào)用另一個generator。由于asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當(dāng)asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語句。

asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了,因此可以實(shí)現(xiàn)并發(fā)執(zhí)行。

我們用Task封裝兩個coroutine試試:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

觀察執(zhí)行過程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

由打印的當(dāng)前線程名稱可以看出,兩個coroutine是由同一個線程并發(fā)執(zhí)行的。

如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個線程并發(fā)執(zhí)行。

我們用asyncio的異步網(wǎng)絡(luò)連接來獲取sina、sohu和163的網(wǎng)站首頁:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

執(zhí)行結(jié)果如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

可見3個連接由一個線程通過coroutine并發(fā)完成。

小結(jié)

asyncio提供了完善的異步IO支持;

異步操作需要在coroutine中通過yield from完成;

多個coroutine可以封裝成一組Task然后并發(fā)執(zhí)行。



async/await

asyncio提供的@asyncio.coroutine可以把一個generator標(biāo)記為coroutine類型,然后在coroutine內(nèi)部用yield from調(diào)用另一個coroutine實(shí)現(xiàn)異步操作。

為了簡化并更好地標(biāo)識異步IO,從Python 3.5開始引入了新的語法asyncawait,可以讓coroutine的代碼更簡潔易讀。

請注意,asyncawait是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:

@asyncio.coroutine替換為async
yield from替換為await。

讓我們對比一下上一節(jié)的代碼:

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

用新語法重新編寫如下:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

剩下的代碼保持不變。


aiohttp

asyncio可以實(shí)現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把asyncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實(shí)現(xiàn)多用戶的高并發(fā)支持。

asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。
我們先安裝aiohttp:

pip install aiohttp

然后編寫一個HTTP服務(wù)器,分別處理以下URL:

/ - 首頁返回b'<h1>Index</h1>'

/hello/{name} - 根據(jù)URL參數(shù)返回文本hello, %s!。

代碼如下:

import asyncio

from aiohttp import web
#  既然是異步,所有的函數(shù)就必須要變成異步非阻塞單線程方式,所以下列所有函數(shù)全部 async await
async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index~</h1>',content_type='text/html')    # AAA 參考最下方注釋

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

注意aiohttp的初始化函數(shù)init()也是一個coroutine,loop.create_server()則利用asyncio創(chuàng)建TCP服務(wù)。

AAA處,原理代碼是 return web.Response(body=b'<h1>Index</h1>'),程序中的AAA處代碼為新代碼,沒有后面的一串代碼,打開網(wǎng)頁就會進(jìn)入到下載界面,而無法顯示網(wǎng)頁

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容