最近用python要寫一些大量爬取api數(shù)據(jù),然后分析、存儲的程序,api提供的數(shù)據(jù)格式完全一致,只是url不同而已,這是一個明顯受限于網(wǎng)絡和數(shù)據(jù)庫的問題,因此想利用用一些python對并發(fā)編程方面的支持,以提高cpu的利用率。我是用的是python 2.7版本。目前標準庫已經(jīng)支持多線程、多進程模型,協(xié)程方面也有相當有名的第三方庫gevent。由于我的程序執(zhí)行的任務基本完全一致,每個任務需要做的事情也不多,但是任務量相當大,如果每個任務均發(fā)起起一個進程/線程,那么僅進程/線程的創(chuàng)建、銷毀、上下文切換都需要消耗相當大的系統(tǒng)資源。我的想法是使用進城池/線程池,這樣系統(tǒng)中存在的總進程數(shù)/線程數(shù)不變,每個進程/線程可以根據(jù)自己的實際執(zhí)行情況從任務隊列中獲取任務。python作為著名的以庫多、庫封裝良好著稱的語言,對這方面的支持也是相當優(yōu)秀,至少解決了我的需求。在此將python中多進程多線程以及協(xié)程池的使用做一個大概的總結。
multiprocessing
進城池
python 對多進程的支持都通過multiprocessing這個庫提供。先看python中對進程池的支持,我僅僅使用如下幾行代碼就完成了將自己原來單進程的程序改造為多進程的程序。
import multiprocessing
p = multiprocessing.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.close()
p.join()
其中multiprocessing.cpu_count可以獲得系統(tǒng)的cpu core num,所以我的進城池中共有multiprocessing.cpu_count() * 2 + 1的進程數(shù)。 類似python中的map函數(shù),可以將一個函數(shù)依次作用于一個序列,我們的p.map_async就是依次將func 函數(shù)依次作用于iterable參數(shù)提供的任務序列,不過是在多進程中進行的而已。另外說明一下這個chuncksize參數(shù),默認情況下進城池中的每個進程從iterable中拿到一個任務,然后傳給func函數(shù),執(zhí)行完畢后再到iterable中取下一個任務。這個chunck_size就是進程一次可以從iterable中獲取chunck_size個任務,然后他把這chunck_size個任務執(zhí)行完畢之后再去取任務,在任務量相當大的情況下適當調大chunck_size可以發(fā)揮更好的性能。
多進程通信
多進程編程自然是好,但是由于多個進程被分配到不同的虛擬內存中,不能相互訪問,因此多進程編程就必須解決多進程通信的問題??梢允褂玫姆椒ㄒ灿泻芏?,管道(multiprocessing.Pipe)、共享內存(multiprocessing.sharedtypes)、隊列(multiprocessing.Queue)、套接字甚至自己搞一個文件來解決這個問題也未嘗不可。python中對這些都有良好的支持。我對其中的Queue比較感興趣,看了文檔,說Queue實際上底層還是用的Pipe,隊列會有個后臺線程,每次放入一個元素,這個元素就會被pickle序列化,然后隊列的后臺線程就會把他沖到底層管道中去,當然這個隊列是可以有多個消費者進程多個生產者進程安全的共同操作。除了同一個機器上多進程之間的通信,python還通過multiprocessing.manager模塊提供了分布式的多進程通信。
multiprocessing.dummy
突然出來multiprocessing.dummy這么一個模塊,搞得我云里霧里的,python標準庫說這個模塊為多線程復制的multiprocessing,我才發(fā)現(xiàn)多線程的待遇實在一般。而且多線程執(zhí)行時,python 中的全局鎖還會導致每一時刻同時只有一個線程可以執(zhí)行,現(xiàn)如今手機都有四核的年代,這個全局鎖來的有些太不識時務了。那為啥還要用這個多線程呢?無他,因為進程的創(chuàng)建、銷毀、切換都比線程大,而且進程之間共享數(shù)據(jù)也沒有線程方便。所以也要看看這個multiprocessing.dummy,線程池是這樣使用的
from multiprocessing import dummy
p = dummy.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.close()
p.join()
除了import的模塊不同之外,其他完全和多進程一致。
gevent
gevent是一個第三方庫,他提供了python對協(xié)程的支持,通過monkey在底層也對IO多路復用提供了支撐,可以說是專門為IO bound的程序量身訂做的。他的使用也很簡單,還是用一個gevent.Pool的例子來看一下,方便和之前對比。
from gevent import pool
p = pool.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.join()
和之前的長得實在太像了,這樣方便我們使用,python庫的作者真是用心良苦。