一種解決IO問題的方法是異步IO。當代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令,并不等待IO結(jié)果,然后就去執(zhí)行其他代碼了。一段時間后,當IO返回結(jié)果時,再通知CPU進行處理
消息模型是如何解決同步IO必須等待IO操作這一問題的呢?當遇到IO操作時,代碼只負責發(fā)出IO請求,不等待IO結(jié)果,然后直接結(jié)束本輪消息處理,進入下一輪消息處理過程。當IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果。
在“發(fā)出IO請求”到收到“IO完成”的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力。
異步IO模型需要一個消息循環(huán),在消息循環(huán)中,主線程不斷地重復“讀取消息-處理消息”這一過程:
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
協(xié)程
協(xié)程,又稱微線程,纖程。英文名Coroutine
子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。
所以子程序調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。
子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。
協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當?shù)臅r候再返回來接著執(zhí)行。
在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。
子程序就是協(xié)程的一種特例
和多線程比,協(xié)程的優(yōu)勢在于:
- 協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯
- 不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。
Python 對協(xié)程的支持是通過 generator 實現(xiàn)的:
在 generator 中,我們不但可以通過 for 循環(huán)來迭代,還可以不斷調(diào)用 next() 函數(shù)獲取由 yield 語句返回的下一個值。
但是 Python 的 yield 不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)
傳統(tǒng)的生產(chǎn)者-消費者模型改用協(xié)程:
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
# 調(diào)用c.send(None)啟動生成器
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
注意到 consumer 函數(shù)是一個 generator ,把一個 consumer 傳入 produce 后:
- 首先調(diào)用
c.send(None)啟動生成器; - 然后,一旦生產(chǎn)了東西,通過
c.send(n)切換到consumer執(zhí)行; -
consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回; -
produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息; -
produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個過程結(jié)束。
asyncio
asyncio 的編程模型就是一個消息循環(huán)。我們從 asyncio 模塊中直接獲取一個 EventLoop 的引用,然后把需要執(zhí)行的協(xié)程扔到 EventLoop 中執(zhí)行,就實現(xiàn)了異步IO
import threading
import asyncio
# @asyncio.coroutine 把一個 generator 標記為 coroutine 類型
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
# 異步調(diào)用asyncio.sleep(1)
# yield from 語法可以讓我們方便地調(diào)用另一個 generator
# 由于 asyncio.sleep() 也是一個 coroutine ,所以線程不會等待 asyncio.sleep() ,而是直接中斷并執(zhí)行下一個消息循環(huán)
# 當 asyncio.sleep() 返回時,線程就可以從 yield from 拿到返回值(此處是None),然后接著執(zhí)行下一行語句
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
# 獲取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 把這個 coroutine 扔到 EventLoop 中執(zhí)行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
用 asyncio 的異步網(wǎng)絡(luò)連接來獲取sina、sohu和163的網(wǎng)站首頁:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
async/await
用 asyncio 提供的 @asyncio.coroutine 可以把一個 generator 標記為 coroutine 類型,然后在 coroutine 內(nèi)部用 yield from 調(diào)用另一個 coroutine 實現(xiàn)異步操作
為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法 async 和 await ,可以讓 coroutine 的代碼更簡潔易讀
要使用新的語法,只需要做兩步簡單的替換:
- 把
@asyncio.coroutine替換為async - 把
yield from替換為await
對比一下上一節(jié)的代碼:
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
用新語法重新編寫如下:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
aiohttp
asyncio可以實現(xiàn)單線程并發(fā)IO操作
如果把asyncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實現(xiàn)多用戶的高并發(fā)支持
asyncio實現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實現(xiàn)的HTTP框架
$ pip install aiohttp