? ? 最近在python3.7上用asyncio做項(xiàng)目,實(shí)現(xiàn)web的服務(wù)端,一邊從GitHub和StackOverflow上抄代碼,一邊在看asyncio相關(guān)的源碼,所學(xué)所思,姑且寫在這里。
為什么會出現(xiàn)協(xié)程(coroutine)這種設(shè)計(jì)?
? ? 多線程(thread)也是同時(shí)執(zhí)行多個(gè)任務(wù)的一種設(shè)計(jì),為什么有了多線程,我們還要設(shè)計(jì)協(xié)程,它有什么不同呢?
? ? 首先,多線程的目的是什么?
? ? 一種說法,利用多核的性能,讓代碼占用盡可能的計(jì)算資源,運(yùn)行快一點(diǎn)。這算是個(gè)原因吧,我們簡稱為并行(parallelism)。在這里,我還不想摳字眼討論并發(fā)(cocurrent)和并行。
? ? 另一種說法,我們程序有些任務(wù)是cpu密集型(邏輯計(jì)算比較多),有些任務(wù)是IO密集型(讀寫文件或網(wǎng)絡(luò)比較多),如果遇到IO密集型計(jì)算,比如從網(wǎng)站上下載一個(gè)大文件,這時(shí)候如果是阻塞下載,并且只有一個(gè)線程的話,程序的其他邏輯就無法執(zhí)行,從這個(gè)程序的角度講,他沒能很好的占有cpu的資源,白白地等待下載的結(jié)束。如果開了另外一個(gè)線程的話,這個(gè)下載線程雖然阻塞掉了,但是別的線程依然可以跑別的邏輯,就可以更充分的占有cpu的資源。有人稱同時(shí)執(zhí)行不同的任務(wù)為并發(fā)。
(TODO? 圖1 任務(wù)調(diào)度示意圖)
? ? 同樣針對上面第二種問題,我們換個(gè)角度表述,我們有一個(gè)IO密集型任務(wù),還有一個(gè)cpu密集型任務(wù),我們想有效的運(yùn)行這兩個(gè)任務(wù),不讓IO密集型任務(wù)阻塞了cpu密集型任務(wù),所以我們構(gòu)造了一個(gè)任務(wù)調(diào)度器,當(dāng)IO密集任務(wù)開始從網(wǎng)站上下載大文件的時(shí)候,我們把他從調(diào)度器上暫時(shí)移開,后面再去檢查他,讓調(diào)度器去執(zhí)行cpu密集任務(wù),在此期間我們可能會時(shí)不時(shí)的去檢查IO任務(wù)有沒有下載完畢(也可能是被動通知,如軟件驅(qū)動的中斷),如果發(fā)現(xiàn)完畢了,調(diào)度器可能會暫停當(dāng)前的cpu密集任務(wù),轉(zhuǎn)而繼續(xù)執(zhí)行IO密集任務(wù)的后續(xù)工作。在多線程環(huán)境下,這個(gè)調(diào)度器就是操作系統(tǒng),兩個(gè)任務(wù)是兩個(gè)線程。而協(xié)程,就是這個(gè)思路,只不過設(shè)計(jì)的更加極端一點(diǎn),我們在多線程環(huán)境下,幾乎不可能手動地控制先執(zhí)行什么線程,再執(zhí)行什么線程(只是操作系統(tǒng)的調(diào)度工作),而如果我們專門寫了一個(gè)任務(wù)調(diào)度器,我們自己實(shí)現(xiàn)應(yīng)用層面的調(diào)度算法,就可能實(shí)現(xiàn)先執(zhí)行什么任務(wù),再執(zhí)行什么任務(wù),什么時(shí)候暫停一個(gè)任務(wù),什么時(shí)候恢復(fù)運(yùn)行這個(gè)任務(wù)。這個(gè)可控的調(diào)度機(jī)制,就是協(xié)程。他的初始目的就是實(shí)現(xiàn)任務(wù)的并發(fā),只不過想更加精細(xì)地控制任務(wù)的暫停和繼續(xù)。協(xié)程是種設(shè)計(jì)思想,線程是計(jì)算機(jī)實(shí)現(xiàn)多任務(wù)的一種工程機(jī)制,線程可以用于實(shí)現(xiàn)協(xié)程。
asyncio的模樣
? ? 我們先杜撰一段代碼應(yīng)景。

????如圖2所示,我們用async def定義了兩個(gè)函數(shù),一個(gè)下載大文件,一個(gè)做一些邏輯運(yùn)算。然后我們實(shí)現(xiàn)了一個(gè)調(diào)度函數(shù),在調(diào)度函數(shù)里,我們分別用兩個(gè)任務(wù)函數(shù)創(chuàng)建了兩個(gè)任務(wù),加入到asyncio的event loop里面,接著,我們運(yùn)行這個(gè)event loop,這樣,兩個(gè)任務(wù)就開始執(zhí)行起來了。注意,async with和await的時(shí)候,都是執(zhí)行一個(gè)異步函數(shù)的過程,這個(gè)時(shí)候,當(dāng)前任務(wù)會主動讓出event loop,去后臺執(zhí)行一些網(wǎng)絡(luò)IO,event loop會選擇自己等待隊(duì)列的任務(wù)繼續(xù)執(zhí)行。等原來網(wǎng)絡(luò)IO的任務(wù)結(jié)束網(wǎng)絡(luò)IO,他會重新加入到event loop的等待隊(duì)列,等待其他任務(wù)主動讓出event loop,被動等待調(diào)度。比如self_play在執(zhí)行await asyncio.sleep(3)的時(shí)候會主動讓出event loop。
????上面我特別強(qiáng)調(diào)了主動讓出event loop,這是協(xié)程的核心思想,如果一個(gè)任務(wù)沒有任何await或async with邏輯,那么它一旦執(zhí)行,別的任務(wù)再也沒有機(jī)會被調(diào)度到。比如,我們?nèi)绻サ魋elf_play的await語句,整個(gè)event loop將永遠(yuǎn)被self_play所占用,其他任務(wù)再也沒有機(jī)會執(zhí)行,整個(gè)輸出只有左右右手慢動作了。總之,爸爸不給,你不能搶。這一點(diǎn)和樸素的多線程很不一樣。
asyncio實(shí)現(xiàn)原理推測
????從上面的介紹,我們可以大概猜出,asyncio主要有一個(gè)任務(wù)調(diào)度器(event loop),然后可以用async def定義異步函數(shù)作為任務(wù)邏輯,通過create_task接口把任務(wù)掛到event loop上。event loop的運(yùn)行過程應(yīng)該是個(gè)不停循環(huán)的過程,不停查看等待類別有沒有可以執(zhí)行的任務(wù),如果有的話執(zhí)行任務(wù),直到碰到await之類的主動讓出event loop的函數(shù),如此反復(fù)。
(TODO? 圖3 event loop調(diào)度示意圖)
asyncio源碼分析
? ? 更進(jìn)一步的問,evnet loop大致是怎么實(shí)現(xiàn)的呢?怎么進(jìn)行調(diào)度的呢?
? ? 我們順藤摸瓜,在asyncio/base_events.py里面我們看到了create_task的源碼實(shí)現(xiàn),代碼的關(guān)鍵是Task的構(gòu)造,傳了一個(gè)event loop(loop參數(shù))進(jìn)去,也就是在這個(gè)時(shí)候,task注冊到了event loop上面。注冊過程是c實(shí)現(xiàn)的(見文末附錄1),但本質(zhì)上都是通過event loop的call_soon()。

? ? 圖5,是run_forever的實(shí)現(xiàn),基本上是不停的在循環(huán),然后每一個(gè)循環(huán)執(zhí)行一幀(_run_once)。

? ? 圖6是每一幀的代碼實(shí)現(xiàn),基本上是在調(diào)度隊(duì)列里找到這一幀應(yīng)該執(zhí)行的任務(wù)(任務(wù)最終注冊在event loop的結(jié)構(gòu)是Handle,通過call_soon()實(shí)現(xiàn)),直接_run()。

? ? event loop的call_soon,是注冊任務(wù)時(shí)使用的,字面意思是下一幀執(zhí)行當(dāng)前注冊的任務(wù)。它的本質(zhì)就是把當(dāng)前任務(wù)封裝成Handle,放到_ready里面,如圖7所示。

? ? 調(diào)度隊(duì)列是event驅(qū)動形成的,這也是為什么asyncio的核心叫做event loop。這部分代碼同樣也在_run_once()里面,見圖7,這個(gè)select就是某種多路復(fù)用機(jī)制,比如select,epoll和iocp。

? ? 圖8給出了select機(jī)制下的selector.select實(shí)現(xiàn),看起來是不是有點(diǎn)熟悉啊。消息處理相關(guān)我們后續(xù)在常用接口里會再次提到。

總結(jié)一下asyncio的實(shí)現(xiàn)思路
????有一個(gè)任務(wù)調(diào)度器event loop,我們可以把需要執(zhí)行的coroutine打包成task加入到event loop的調(diào)度列表里面(以Handle形式)。
????在event loop的每個(gè)幀里面,它會檢查需要執(zhí)行那些task,然后運(yùn)行這些task,可能拿到最終結(jié)果,也可能執(zhí)行一半繼續(xù)await別的任務(wù),任務(wù)之間互相wait,通過回調(diào)來把任務(wù)串聯(lián)起來(后面常用接口會繼續(xù)深入介紹,實(shí)現(xiàn)細(xì)節(jié)見附錄2)。
????任務(wù)可能會依賴別的IO消息,在每一幀,event loop都會用selector處理相應(yīng)的消息,執(zhí)行相應(yīng)的callback函數(shù)。
? ? 我們當(dāng)前的介紹里,只有一個(gè)event loop,這個(gè)event loop跑在主線程里面。當(dāng)然,event loop還可以開線程池處理別的任務(wù),或者,多個(gè)線程里執(zhí)行多個(gè)event loop,他們之間還有交互,我們這里不在介紹。? ?
????單個(gè)event loop跑在單個(gè)線程有個(gè)好處,只要自己不主動await,就會一直占有主線程,換句話說,同步函數(shù)一定沒有數(shù)據(jù)沖突(data racing)。對比多線程方案,如果需要處理數(shù)據(jù)沖突,就需要加鎖了,這在很多情況下會降低程序的性能。所以協(xié)程這種設(shè)計(jì)思路,非常適合有多個(gè)用戶、但是每個(gè)用戶之間沒有共享數(shù)據(jù)的場景。如果需要實(shí)現(xiàn)并行,多開幾個(gè)進(jìn)程就行了。
? ? 但是實(shí)際上在工程里面,我們很難單用一個(gè)線程處理問題,asyncio也不例外,特別在集成別的同步庫的時(shí)候,可能需要用到別的線程,我們后續(xù)介紹。
后續(xù)筆記
asyncio常用接口及其意義(實(shí)用主義)
如何集成asyncio和同步庫(介紹executor線程池對event loop的影響)
為什么異步編程容易犯錯(cuò)(數(shù)據(jù)沖突)
不適宜騷年的附錄
1. Task的構(gòu)造的c實(shí)現(xiàn)
????我們打開男性社交網(wǎng)站,這是event loop實(shí)現(xiàn)的核心代碼。在這里我們找到了task的c實(shí)現(xiàn)_asyncio_Task___init___impl(L1933),它的核心代碼執(zhí)行ask_all_step_soon,間接調(diào)用腳本的event loop的call_soon,并且把自己加入到all_task這個(gè)全局list(通過register_task,主要是后面索引使用)。

2. task執(zhí)行細(xì)節(jié)
????task的執(zhí)行,實(shí)現(xiàn)在task_step(L2878)和task_step_impl(L2540)? 。其中task_step是asyncio任務(wù)執(zhí)行的核心,對于一個(gè)coroutine,每次task_step得到一個(gè)結(jié)果,然后根據(jù)結(jié)果判斷是否拿到了最終結(jié)果,或者需要繼續(xù)計(jì)算等待別的結(jié)果,或者把結(jié)果扔給自己的waiter。
????python的await都是通過generator實(shí)現(xiàn)的,具體的計(jì)算在genobject,主要是通過PyEval_EvalFrameEx拿計(jì)算結(jié)果。