說明
500 lines or less 系列中A Web Crawler With asyncio Coroutines嘗試翻譯,不求信雅達(dá),但求通俗易懂。
如有轉(zhuǎn)載,請(qǐng)標(biāo)明出處,并附原文地址。
如有侵犯原作者版權(quán),請(qǐng)聯(lián)系clark1013@hotmail.com。
原文地址如下,如有興趣,可去此處探索。
http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
概述
傳統(tǒng)的計(jì)算機(jī)科學(xué)強(qiáng)調(diào)的是通過高效的算法來快速完成數(shù)據(jù)計(jì)算。但是許多網(wǎng)絡(luò)應(yīng)用程序最耗時(shí)的步驟卻并非計(jì)算,而是接受了許多速度緩慢的連接或不常發(fā)生的時(shí)間。這些網(wǎng)絡(luò)應(yīng)用程序面對(duì)的挑戰(zhàn)是:需要高效等待海量的網(wǎng)絡(luò)事件。這個(gè)挑戰(zhàn)的現(xiàn)代解決方案是異步I/O(asynchronous I/O,簡(jiǎn)寫為“async”)。
本文構(gòu)建了一個(gè)簡(jiǎn)單的網(wǎng)絡(luò)爬蟲。這個(gè)網(wǎng)絡(luò)爬蟲是典型的異步應(yīng)用程序,因?yàn)樗鼪]有進(jìn)行大量的計(jì)算,而是等待許多響應(yīng)。這也意味著如果這個(gè)爬蟲一次可以爬取越多網(wǎng)頁,程序的運(yùn)行時(shí)間也越短。如果采用多線程的方案,即為每一個(gè)正在進(jìn)行的數(shù)據(jù)交換的請(qǐng)求開啟一個(gè)線程,那么隨著并發(fā)的請(qǐng)求數(shù)量的提升,在端口數(shù)量耗盡之前,計(jì)算機(jī)很快就會(huì)就會(huì)耗盡內(nèi)存或耗盡其他與線程有關(guān)的資源。使用異步I/O就避免了使用線程會(huì)帶來的問題。
我們將分三步來展示我們的示例。第一步,我們展示了一個(gè)異步事件循環(huán),在這個(gè)異步事件循環(huán)的基礎(chǔ)上,我們通過回調(diào)來初步構(gòu)建了網(wǎng)絡(luò)爬蟲。這個(gè)爬蟲很高效,但是隨著問題復(fù)雜性的擴(kuò)展,代碼的擴(kuò)展將會(huì)變成難以管理的意大利面式代碼。第二步,我們展示了既高效又易于擴(kuò)展的協(xié)程,這里的協(xié)程是通過python的生成器函數(shù)來實(shí)現(xiàn)的。第三步,我們用到了python的標(biāo)準(zhǔn)庫中的“asyncio”庫,并使用異步隊(duì)列協(xié)調(diào)這些組建。
任務(wù)
網(wǎng)絡(luò)爬蟲的目的是查找并下載網(wǎng)絡(luò)上的所有頁面,并可能存儲(chǔ)或?yàn)樗鼈兗由纤饕?。網(wǎng)絡(luò)爬蟲從一個(gè)根URL開始,它獲取每個(gè)頁面,并解析這個(gè)頁面中未能被爬取的頁面,并將未被爬取的加入隊(duì)列中。當(dāng)從頁面中無法解析到未被爬取的頁面,且隊(duì)列為空的時(shí)候,爬蟲才會(huì)停止工作。
我們可以通過可以通過并行地下載許多頁面來加快處理速度。每當(dāng)爬蟲找到新頁面的時(shí)候,它會(huì)在另外的端口上同時(shí)開啟一個(gè)抓取的操作。當(dāng)?shù)玫巾憫?yīng)的時(shí)候,爬蟲將會(huì)解析頁面,并將新的頁面加入到隊(duì)列中。爬蟲的的返回可能會(huì)包含帶來太高并發(fā)的結(jié)果,所以我們需要限制并發(fā)的請(qǐng)求量,并將多余的鏈接放在隊(duì)列中,直到有正在執(zhí)行的請(qǐng)求完成為止。
傳統(tǒng)的解決方案
我們?nèi)绾问沟门老x并行?傳統(tǒng)的方式是創(chuàng)建一個(gè)線程池,每個(gè)線程在負(fù)責(zé)在某一段事件的某一個(gè)端口上進(jìn)行頁面的下載。例如,我們需要從xkcd.com下載一個(gè)頁面:
def fetch(url):
sock = socket.socket()
socket.connect(("xkcd.com", 80))
request = "GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n".format(url)
sock.send(request.encode("ascii"))
response = b""
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = parse_links(response)
q.add(links)
套接字的操作默認(rèn)是阻塞的:每當(dāng)線程調(diào)用了connect或recv這樣的方法,線程會(huì)暫時(shí)停止直到操作完成為止。所以如果想同時(shí)下載多個(gè)頁面,我們也需要多個(gè)線程。更為成熟的應(yīng)用程序會(huì)通過將空閑的線程置于線程池中,以此用來攤銷創(chuàng)建線程所帶來的成本,在隨后的下載任務(wù)中,這些線程可以被復(fù)用。在連接池中,對(duì)于套接字也采取了同樣的處理手段。
然而,線程會(huì)帶來很大的系統(tǒng)開銷,所以操作系統(tǒng)會(huì)強(qiáng)制給用戶或機(jī)器可以的開啟的線程數(shù)加上限制。再Jesse的系統(tǒng)中,一個(gè)Python線程的內(nèi)存開銷大概是50KB,開啟10k個(gè)線程即會(huì)導(dǎo)致系統(tǒng)運(yùn)行受到影響。如果我們?cè)俨⑿械奶捉幼稚贤瑫r(shí)開啟10k量級(jí)的并發(fā)操作,在端口耗盡之前,系統(tǒng)的線程會(huì)先被耗盡。線程耗盡的形式可能是線程數(shù)量達(dá)到上限,或者是操作系統(tǒng)的線程數(shù)量限制成為瓶頸。
在Dan Kegel富有影響力的文章《The C10K problem》中,他指出了在I/O并發(fā)中多線程的限制。原文如下:
It's time for web servers to handle ten thousand clients simultaneously, don't you think? After all, the web is a big place now.
1999年,Kegel創(chuàng)造了"C10K"這個(gè)術(shù)語。從現(xiàn)在的計(jì)算機(jī)發(fā)展水平來看,10k量級(jí)的連接并不是大多數(shù)網(wǎng)絡(luò)應(yīng)用程序的瓶頸,但是這個(gè)問題僅僅是從并發(fā)規(guī)模上得到了改變,而非問題本身的類型。從那時(shí)的角度來看,10k量級(jí)的并發(fā)連接也是不可能發(fā)生的?,F(xiàn)在這個(gè)問題也僅僅是量級(jí)變得更高而已。事實(shí)上,我們的玩具爬蟲僅僅使用多線程也能工作地很好。但是對(duì)于大規(guī)模的應(yīng)用程序來說,當(dāng)并發(fā)數(shù)量達(dá)到100k量級(jí)的時(shí)候,瓶頸仍然是存在的:在大多數(shù)操作系統(tǒng)還能創(chuàng)建端口的時(shí)候,系統(tǒng)可以開啟的線程數(shù)量先達(dá)到上限。那么我們?nèi)绾慰朔@個(gè)問題呢?
異步
異步I/O框架通過使用非阻塞的套接字(non-blocking sockets)來在單線程上進(jìn)行并行的操作。在我們的異步爬蟲中,我們?cè)谔捉幼诌B接到服務(wù)器之前就將其設(shè)置為非阻塞的。
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(("xkcd.com", 80))
except BlockingIOError:
pass
詭異的是,一個(gè)非阻塞的套接字即使在正常工作的時(shí)候,也可能會(huì)突然拋出異常。這個(gè)詭異的行為來源于底層的C函數(shù),這個(gè)函數(shù)會(huì)將errno設(shè)置成EINPROCESS來標(biāo)識(shí)它已經(jīng)開始工作。
現(xiàn)在我們的爬蟲需要一種方式來獲知連接是否已經(jīng)建立,以便于其可以發(fā)送一條HTTP請(qǐng)求。我們可以簡(jiǎn)單地在一個(gè)死循環(huán)中不斷嘗試,就像下面地代碼一樣:
request = "GET {} HTTP/1.0\r\n Host: xkcd.com \r\n\r\n".format(url)
encoded = request.encode("ascii")
while True:
try:
sock.send(encoded)
break
except OSError as e:
pass
print("sent")
這樣的實(shí)現(xiàn)方式不僅會(huì)帶來巨大的資源浪費(fèi),而且不能有效的等待響應(yīng)多個(gè)套接字上的事件。早期,BSD Unix 對(duì)于這個(gè)問題的解決方案是select。select是一個(gè)在一個(gè)或多個(gè)(小規(guī)模)套接字上等待時(shí)間發(fā)生的C函數(shù)。現(xiàn)在隨著網(wǎng)絡(luò)應(yīng)用程序?qū)τ陧憫?yīng)大規(guī)模的連接的需求,select已經(jīng)逐漸被poll代替,更為先進(jìn)的方式是BSD的kquene和Linux的epoll。這些API的功能與select類似,但是在大量連接的情況下比select表現(xiàn)更好。
Python3.4的DefaultSelector使用的是系統(tǒng)上性能最好的類似于select的函數(shù)。為了注冊(cè)網(wǎng)絡(luò)I/O的通知,我們先創(chuàng)建了非阻塞的套接字并使用default selector來進(jìn)行注冊(cè)。
from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(("xkcd.com", 80))
except BlockingIOError:
pass
def connected():
selector.unregister(sock.fileno())
print("connected")
sock.register(sock.fileno(), EVENT_WRITE, connected)
我們忽略了連接過程中產(chǎn)生的虛假異常,隨后調(diào)用selector.register,并向其傳遞了套接字的文件描述符,另外傳遞了一個(gè)用以表達(dá)我們?cè)诘却畏N類型的時(shí)間的常數(shù)。需要注意的是,當(dāng)連接建立時(shí),我們傳遞了EVENT_WRITE:這個(gè)常數(shù)意味著我們想要知道何時(shí)這個(gè)套接字是“可寫”的。同時(shí),我們也傳遞了一個(gè)Python函數(shù)connected,這個(gè)函數(shù)在事件發(fā)生的時(shí)候運(yùn)行。也就是我們熟知的回調(diào)機(jī)制。
當(dāng)selector接收到I/O通知地時(shí)候,我們?cè)谘h(huán)中處理這個(gè)通知:
def loop():
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
connected回調(diào)被存儲(chǔ)為event_key.data,一旦非阻塞地套接字連接一次,我們?nèi)』剡@個(gè)方法并執(zhí)行一次。
與上面寫的不斷進(jìn)行檢查的循環(huán)不同,在這個(gè)方法中,我們通過調(diào)用select使循環(huán)暫停,并等待下一個(gè)I/O事件。隨后循環(huán)調(diào)用了在等待這些事件的回調(diào)函數(shù)。未完成的操作會(huì)保持掛起的狀態(tài)直到事件循環(huán)的接收到事件。
到目前為止,我們展示了什么呢?我們演示了當(dāng)操作準(zhǔn)備好的時(shí)候,我們?nèi)绾伍_始一個(gè)操作并執(zhí)行一個(gè)回調(diào)函數(shù)。一個(gè)異步框架建立在我們已經(jīng)演示的兩點(diǎn)特性之上:1.非阻塞的套接字和事件循環(huán);2.在單線程上并行進(jìn)行操作。
我們已經(jīng)已經(jīng)達(dá)成了“并發(fā)(concurrency)”,但卻非傳統(tǒng)意義上的“并行(parallelism)”,我們構(gòu)建了一個(gè)可以進(jìn)行重疊I/O的小系統(tǒng),它可以發(fā)起新的操作即使其他的操作還未完成。但是它卻未能有效利用多核進(jìn)行并行計(jì)算。不過,我們的系統(tǒng)也是為了I/O邊界問題而構(gòu)建,而非CPU邊界的問題。
所以我們的事件循環(huán)在并發(fā)I/O的場(chǎng)景下非常高效,因?yàn)樗恍枰獮槊總€(gè)連接分配線程資源。但是在我們繼續(xù)前進(jìn)之前,需要糾正一個(gè)常見的錯(cuò)誤認(rèn)知,即異步比多線程更加快速。通常來說,情況正好相反,在Python中,像我們這樣的系統(tǒng)在面對(duì)小規(guī)模、非?;钴S的連接的時(shí)候比多線程表現(xiàn)更差。在沒有全局解釋鎖的運(yùn)行時(shí)環(huán)境中,多線程甚至可以表現(xiàn)地更棒。異步I/O真正可以發(fā)揮效用,通常是在面對(duì)哪些有許多緩慢連接或者不常發(fā)生地事件地情況下。
使用回調(diào)編程
僅僅依靠我們已經(jīng)構(gòu)建地粗糙地異步框架,我們就可以構(gòu)建網(wǎng)絡(luò)爬蟲了嗎?事實(shí)上,即使一個(gè)簡(jiǎn)單地URL獲取器就很難編寫。
我們從全局的集合開始我們的爬蟲。集合分為兩個(gè),一個(gè)用來存儲(chǔ)我們已經(jīng)爬取過的URL,一個(gè)用來存儲(chǔ)待爬取的URL。
urls_todo = set(["/"])
urls_seen = set(["/"])
urls_seen集合包含了urls_todo加上已經(jīng)爬取完成的URL。兩個(gè)集合都使用根URL"/"來進(jìn)行初始化。
獲取一個(gè)頁面需要一系列的回調(diào)。之前已經(jīng)寫過的connected回調(diào)負(fù)責(zé)在套接字建立連接后向服務(wù)器發(fā)送一個(gè)GET請(qǐng)求。但是請(qǐng)求發(fā)送后,系統(tǒng)必須要等待并讀取響應(yīng)內(nèi)容,所以connected函數(shù)需要注冊(cè)另外一個(gè)回調(diào)函數(shù)。如果在響應(yīng)讀取的函數(shù)過程中,未能讀取到全部響應(yīng),那么它會(huì)再次被注冊(cè)。
我們可以使用一個(gè)Fetcher對(duì)象。它需要一個(gè)URL,一個(gè)套接字對(duì)象,和一段用來收集響應(yīng)內(nèi)容的空間。
class Fetcher:
def __init__(self, url):
self.url = url
self.response = b""
self.sock = None
我們通過調(diào)用Fetcher.fetch來開始我們的方法。
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.socket.connected(HOST_TUPLE)
except BlockingIOError:
pass
# register next callback.
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.connected)
fetch方法開啟一個(gè)套接字發(fā)起連接。并在連接建立之前注冊(cè)了方法的返回值。這個(gè)方法必須重新控制事件循環(huán)以等待連接。要理解其原因,假想我們的整個(gè)應(yīng)用的結(jié)構(gòu)如下:
fetcher = Fetcher("/353/")
fetcher.fetch()
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
所有的事件通知都在事件循環(huán)調(diào)用select的時(shí)候處理。因此fetch必須將控制權(quán)交給事件循環(huán),這樣程序才能直到什么時(shí)候套接字建立起了連接。只有當(dāng)套接字建立起連接后循環(huán)才會(huì)調(diào)用connected回調(diào),這個(gè)回調(diào)是在fetch的結(jié)尾被注冊(cè)的。
connected的實(shí)現(xiàn)如下:
def connected(self, key, mask):
print("connected")
selector.unregister(key.fd)
request = "GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n".format(self.url)
self.sock.send(request.encode("ascii"))
# register next callback.
selector.register(key.fd,
EVENT_READ,
self.read_response)
這個(gè)方法發(fā)送了一個(gè)GET請(qǐng)求。真實(shí)的應(yīng)用會(huì)檢查send的返回值,以防整個(gè)消息不能一次被發(fā)送。不過我們只是構(gòu)建一個(gè)小應(yīng)用用于演示。所以只是簡(jiǎn)單地調(diào)用了send,然后等待響應(yīng)。當(dāng)然,send方法也必須注冊(cè)另一個(gè)回調(diào)并移交事件循環(huán)地控制權(quán)。下一個(gè),也是最后一個(gè)回調(diào),read_response,處理服務(wù)器的回復(fù)。
每當(dāng)selector發(fā)現(xiàn)套接字是“可讀的”的時(shí)候,回調(diào)會(huì)被執(zhí)行??赡軙?huì)有兩種情況,套接字有數(shù)據(jù)或套接字被關(guān)閉。
回調(diào)函數(shù)從套接字每次請(qǐng)求4KB的數(shù)據(jù)。如果有效的響應(yīng)數(shù)據(jù)少于4KB,chunk包含了所有的的有效數(shù)據(jù)。如果有效響應(yīng)數(shù)據(jù)大于4KB,chunk會(huì)取出正好4KB的數(shù)據(jù),套接字仍然是可讀的,回調(diào)函數(shù)會(huì)再次被調(diào)用。當(dāng)響應(yīng)完成時(shí),服務(wù)器會(huì)將套接字關(guān)閉,chunk也會(huì)被清空。
parse_link方法,并未在這里展示,它的返回值是一個(gè)URL的集合。我們會(huì)為每個(gè)新的開啟一個(gè)新的fetcher,不限制并發(fā)量。值得一提的是,異步編程的一個(gè)良好的特性是:當(dāng)我們改變共享數(shù)據(jù)的時(shí)候,不需要加互斥鎖,例如,當(dāng)我們向urls_seen中添加鏈接的時(shí)候。由于不存在搶占式的多線程,所以在程序執(zhí)行的任意點(diǎn),都不需要被打斷。
我們添加了一個(gè)全局變量stopped,并用來控制整個(gè)循環(huán)。
stopped = False
def loop():
while not stopped:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
一旦所有頁面下載完成,fetcher會(huì)停止全局事件循環(huán),整個(gè)程序也會(huì)退出。
這個(gè)示例展示了異步程序的一個(gè)痛點(diǎn):意大利面條式代碼。我們需要某種方式來表達(dá)一系列的計(jì)算和I/O操作,并需要規(guī)劃這一系列操作的并行運(yùn)行方式。但是由于沒有多線程,這一系列的操作無法被收集到單個(gè)函數(shù)中,每當(dāng)一個(gè)函數(shù)開啟一項(xiàng)I/O操作,它將直接保存將來的狀態(tài)并返回。而你需要負(fù)責(zé)的是編寫這些狀態(tài)保存的代碼。
如何理解這個(gè)痛點(diǎn)。我們可以考慮一下通過線程的方式加上傳統(tǒng)的阻塞套接字來獲取一個(gè)頁面的方式:
def fetch(url):
""" traditional approch, use multi-threading and blocking socket """
sock = socket.socket()
sock.connect(("xkcd.com", 80))
request = "GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n".format(url)
sock.send(request.encode("ascii"))
response = b""
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = parse_links(response)
q.add(links)
考慮一個(gè)問題,這個(gè)函數(shù)在一個(gè)套接字操作和下一個(gè)之間記住了什么?看起來,一個(gè)套接字,一個(gè)URL,以及一個(gè)不斷累加的response結(jié)果都被程序記住了。運(yùn)行在線程上的函數(shù)使用到了編程語言的一個(gè)基本特性,將臨時(shí)狀態(tài)存儲(chǔ)在臨時(shí)變量中,而臨時(shí)變量存儲(chǔ)在棧區(qū)。這個(gè)函數(shù)的內(nèi)部是有“后續(xù)操作(continuation)”的,即在I/O完成后進(jìn)行的一系列操作。運(yùn)行時(shí)環(huán)境通過存儲(chǔ)線程的指令指針來保存后續(xù)的操作。你不需要考慮如何保存這些臨時(shí)變量和后續(xù)操作。這都是由編程需要幫你做好的。
但是在基于回調(diào)的異步框架中,這些語言特性都沒有用了。在等待I/O操作的過程中,一個(gè)函數(shù)必須手動(dòng)保存它的狀態(tài),因?yàn)樵贗/O完成前,一個(gè)函數(shù)就返回并丟失它的棧幀。為了替代臨時(shí)變量,我們的異步示例存儲(chǔ)sock和response作為Fetcher實(shí)例的一個(gè)屬性。為了替代指令指針,我們通過注冊(cè)connected和read_response的回調(diào)來存貯后續(xù)操作。但是當(dāng)應(yīng)用層序的規(guī)模增長(zhǎng)的時(shí)候,這樣通過手工回調(diào)的方式來存儲(chǔ)狀態(tài)的方式其復(fù)雜程度也會(huì)增加。而大規(guī)模的記錄狀態(tài)也會(huì)讓代碼編寫者頭大。
更壞的一點(diǎn)是,如果在注冊(cè)下一個(gè)回調(diào)之前當(dāng)前的回調(diào)拋出了異常會(huì)發(fā)生什么?假設(shè)我們的parse_link方法中有BUG,在解析HTML的時(shí)候拋出了異常。
Traceback (most recent call last):
File ".\crawler.py", line 145, in <module>
loop()
File ".\crawler.py", line 141, in loop
callback(event_key, event_mask)
File ".\crawler.py", line 116, in read_response
links = self.parse_links()
File ".\crawler.py", line 130, in parse_links
raise Exception("parse Exception")
Exception: parse Exception
棧的軌跡跟蹤僅僅顯示了時(shí)間循環(huán)在運(yùn)行回調(diào)。但是我們不知道到底是什么導(dǎo)致了這個(gè)異常。程序鏈在兩個(gè)末端都斷了:我們忘了我們將要去哪里,也不知道我們從哪里來。這種內(nèi)容的丟失被稱為“棧撕裂(stack ripping)”,而這導(dǎo)致了問題分析的困難。棧撕裂也阻止了我們?cè)诨卣{(diào)鏈中的異常處理,通常采用的方法是在一個(gè)函數(shù)及其樹節(jié)點(diǎn)上包裹"try/ except"語句。
所以,盡管一直以來都有對(duì)于多線程和異步的效率的爭(zhēng)論。在在此問題之外,對(duì)于出現(xiàn)問題后的處理也是存在爭(zhēng)論的:多線程,如果在同步數(shù)據(jù)的時(shí)候出現(xiàn)問題,通常對(duì)于數(shù)據(jù)間的競(jìng)態(tài)條件更加敏感;異步,由于棧撕裂的存在,通常來說更加難于調(diào)試。
協(xié)程
不過也可以告訴大家一個(gè)好消息,異步代碼想要結(jié)合回調(diào)的效率和多線程的美觀是可能的。這個(gè)結(jié)合通過一個(gè)叫做“協(xié)程(coroutines)”的模式來實(shí)現(xiàn)。通過使用python3.4中的標(biāo)準(zhǔn)異步I/O庫——aiohttp,在協(xié)程中獲取一個(gè)頁面是非常直觀的。
@asyncio.coroutine
def fetch(self, url):
response = yield from self.session.get(url)
body = yield from response.read()
同樣的,這段代碼也是可伸縮的,就如同前面提到的,一個(gè)線程占用50k內(nèi)存,而一個(gè)協(xié)程的內(nèi)存占用僅僅3k,所以開100k這個(gè)量級(jí)的協(xié)程也就不會(huì)帶來內(nèi)存支持不上的問題。
協(xié)程的概念來自于早期的計(jì)算機(jī)科學(xué),其本質(zhì)是一段可以被暫?;蛘呋謴?fù)。與操作系統(tǒng)控制的搶占式的多任務(wù)不同,協(xié)程協(xié)作式的完成多任務(wù),它們可以一個(gè)協(xié)程何時(shí)暫停,以及接下來執(zhí)行哪個(gè)協(xié)程。
協(xié)程有很多種實(shí)現(xiàn)方式,即使在python中就有好幾種。"asyncio"標(biāo)準(zhǔn)庫中的協(xié)程在python3.4中通過生成器,期物和"yield from"表達(dá)式來實(shí)現(xiàn)。而在python3.5中,協(xié)程已經(jīng)成為了語言本身的特性。但是,理解協(xié)程在python3.4中的實(shí)現(xiàn)方式(使用預(yù)存在的語言工具),是理解python3.5的協(xié)程的基礎(chǔ)。
接下來的部分將會(huì)演示生成器的機(jī)理和它如何被用作協(xié)程的。
Python 生成器的工作原理
在掌握python生成器的原理之前,你必須理解一個(gè)普通的python函數(shù)的工作機(jī)理。通常來說,當(dāng)python函數(shù)調(diào)用子程序的時(shí)候,在子程序執(zhí)行過程中或獲得進(jìn)程的控制權(quán),直到子程序返回或者拋出異常,之后的控制權(quán)會(huì)回到調(diào)用端。
>>> def foo():
... bar()
...
>>> def bar():
... pass
python的標(biāo)準(zhǔn)解釋器是用C語言編寫的。用來執(zhí)行python函數(shù)的C函數(shù)被稱作PyEval_EvalFrameEx。它消耗一個(gè)python的棧幀對(duì)象來在棧幀的內(nèi)容上計(jì)算python的字節(jié)碼。以下是foo函數(shù)的字節(jié)碼。
>>> dis.dis(foo)
2 0 LOAD_GLOBAL 0 (bar)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 POP_TOP
7 LOAD_CONST 0 (None)
10 RETURN_VALUE
foo函數(shù)將bar函數(shù)加載到其棧上進(jìn)行調(diào)用隨后從出棧其返回值,將None加載到棧上并返回None。
當(dāng)PyEval_EvalFrameEx遇到CALL_FUNCTION字節(jié)碼的時(shí)候,它會(huì)創(chuàng)建一個(gè)新的python棧幀并遞歸:這意味著,他將會(huì)使用這個(gè)新的棧幀來遞歸調(diào)用PyEval_EvalFrameEx,這個(gè)棧幀將會(huì)被用來執(zhí)行bar函數(shù)。
理解python的棧幀是在堆區(qū)(用戶空間)被分配的是非常重要的。python解釋器是一個(gè)普通的C程序,所以它的棧幀是普通的棧幀。但是python的棧幀是在堆區(qū)被分配的。這也就意味著,python的棧幀是可以存活在函數(shù)調(diào)用以外的。為了交互式的看這個(gè)說法,在棧幀調(diào)用的時(shí)候用一個(gè)全局變量指向這個(gè)棧幀。
>>> import inspect
>>> frame = None
>>> def foo():
... bar()
...
>>> def bar():
... global frame
... frame = inspect.currentframe()
...
>>> foo()
>>> frame.f_code.co_name
'bar'
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

生成器的內(nèi)部實(shí)現(xiàn)與函數(shù)實(shí)現(xiàn)的內(nèi)部有相似的地方:一個(gè)Code對(duì)象和一個(gè)棧幀。
生成器如下:
>>> def gen_fn():
... result = yield 1
... print("result of yield: {}".format(result))
... result2 = yield 2
... print("result of 2nd yield: {}".format(result2))
... return "done"
...
當(dāng)python將gen_fn編譯成字節(jié)碼的時(shí)候,它看到了yield表達(dá)式就知道gen_fn是一個(gè)生成器函數(shù),然后會(huì)設(shè)置一個(gè)flag來記住這個(gè)事實(shí)(實(shí)現(xiàn)方式是將gen_fn.__code__.co_flags左移5位)。
到我們調(diào)用生成器函數(shù)的時(shí)候,python看到生成器flag,就不會(huì)實(shí)際執(zhí)行這個(gè)函數(shù),還是創(chuàng)建一個(gè)生成器對(duì)象。
>>> gen = gen_fn()
>>> gen
<generator object gen_fn at 0x000001D9D6936AF0>
python的生成器封裝了一個(gè)棧幀加上對(duì)于一些代碼的引用,這些代碼的主體如下:
>>> gen.gi_code.co_name
'gen_fn'
所有的生成器的調(diào)用都指向了同一段代碼。但是每個(gè)調(diào)用都有自己的棧幀,這個(gè)棧幀不是真實(shí)的棧,而是位于堆區(qū)等待被使用。

這個(gè)棧幀對(duì)象有一個(gè)"last instruction"指針,存儲(chǔ)了最近被執(zhí)行的操作。一開始,最后的指針是-1,意味著生成器還沒有開始。
>>> gen.gi_frame.f_lasti
-1
當(dāng)調(diào)用send方法的時(shí)候,生成器執(zhí)行到第一個(gè)yield表達(dá)式然后暫停。send方法的返回值是-1,這個(gè)值是生成器gen傳遞給yield表達(dá)式的。
>>> gen.send(None)
1
生成器的操作指針現(xiàn)在已經(jīng)從開始位置0到達(dá)了3個(gè)字節(jié)碼。而編譯的python的字節(jié)碼長(zhǎng)度到了56。
>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56
生成器在任意時(shí)刻都可以從任意函數(shù)中被恢復(fù)。因?yàn)槠錀⒉辉贄^(qū),而是在堆區(qū)。它在調(diào)用層級(jí)中的為止還未恢復(fù),它不需要像大部分的函數(shù)調(diào)用一樣遵循先進(jìn)后出的執(zhí)行順序。
我們可以向生成器發(fā)送"hello",它會(huì)變成yield表達(dá)式的結(jié)果,生成器會(huì)繼續(xù)繼續(xù)執(zhí)行到生成2這一句。
>>> gen.send("hello")
result of yield: hello
2
現(xiàn)在生成器的棧幀會(huì)包含本地變量result:
>>> gen.gi_frame.f_locals
{'result': 'hello'}
其他從gen_fn創(chuàng)建的生成器會(huì)有它們自己的棧幀和本地變量。
當(dāng)我們?cè)俅握{(diào)用send方法的時(shí)候,生成器從它的第二個(gè)yield表達(dá)式出發(fā)繼續(xù)執(zhí)行,并通過拋出一個(gè)特殊的StopIteration異常來結(jié)束。
>>> gen.send("good bye")
result of 2nd yield: good bye
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: done
這個(gè)異常有一個(gè)值,這也是生成器的返回值:字符串"done"。
通過生成器構(gòu)建協(xié)程
所以一個(gè)生成器可以暫停,可以通過一個(gè)值繼續(xù)執(zhí)行,而且它有返回值。這些讓生成器成為構(gòu)建一個(gè)異步編程模型的絕佳選擇,再也不需要意大利面式的回調(diào)。我們想要構(gòu)建協(xié)程(coroutine)——一段可以在項(xiàng)目中與其他程序在執(zhí)行計(jì)劃上協(xié)同的程序。我們的協(xié)程是python標(biāo)準(zhǔn)庫中asyncio的簡(jiǎn)化版本。在asyncio中,我們會(huì)使用生成器,期物和"yield from"表達(dá)式。
首先我們需要一種方式來代表將來將會(huì)獲得結(jié)果(期物"future"),這個(gè)結(jié)果也是協(xié)程正在等待的。精簡(jiǎn)版如下:
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self = result
for fn in self._callbacks:
fn(self)
讓我們通過期物和協(xié)程來更新我們的Fetcher類。
期物在剛被創(chuàng)建的時(shí)候處于被掛起(pending)的狀態(tài),到調(diào)用set_result方法后,其狀態(tài)轉(zhuǎn)變成已確定("resolved")。
在之前我們通過回調(diào)的方式實(shí)現(xiàn)異步,fetch方法開始于連接一個(gè)套接字,然后注冊(cè)回調(diào)connected,這個(gè)回調(diào)會(huì)在套接字準(zhǔn)備好了以后執(zhí)行?,F(xiàn)在我們可以將這兩步合并到一個(gè)協(xié)程之中。
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(HOST_TUPLE)
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
selector.register(sock.fileno(),
EVENT_WRITE,
on_connected)
yield f
selector.unregister(sock.fileno())
print(connected)
現(xiàn)在fetch是一個(gè)生成器函數(shù),而非一個(gè)常規(guī)函數(shù),因?yàn)楹瘮?shù)體中包含了yield表達(dá)式。我們創(chuàng)建一個(gè)掛起的期物,然后通過yield表達(dá)式使fetch方法暫停直到套接字準(zhǔn)備好。內(nèi)部函數(shù)on_connected用來確定期物。
但是當(dāng)期物確定以后,依靠什么來重新激活生成器呢?我們需要一個(gè)協(xié)程驅(qū)動(dòng)——以"task"命名。
class Task:
""" 驅(qū)動(dòng)協(xié)程 """
def __init__(self, coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
任務(wù)調(diào)度對(duì)象Task通過發(fā)送None給fetch生成器來激活它。然后fetch開始運(yùn)行直到它產(chǎn)出期物,Task接收這個(gè)期物作為next_future。當(dāng)套接字連接以后,事件循環(huán)調(diào)用回調(diào)函數(shù)on_connected,這個(gè)回調(diào)函數(shù)負(fù)責(zé)解析期物,調(diào)用step,恢復(fù)fetch。