受益于在此的討論:http://stackoverflow.com/questions/2846653/how-to-use-threading-in-python
另外一篇了解Python內(nèi)線程機(jī)制的交好blog: https://jeffknupp.com/blog/2012/03/31/pythons-hardest-problem/
1、使用 threading module:
適用范圍:
適用于建立那些與I/O操作相關(guān)的操作線程。
因?yàn)镃Python并不真正在多個(gè)core上并發(fā)執(zhí)行多個(gè)CPU密集型(CPU-bound)的線程。使用threading.Thread class來(lái)建立的線程在真正執(zhí)行時(shí),受限于Python 解釋器的全局鎖,只能串行地在CPU上進(jìn)行執(zhí)行即使我們的CPU上有多個(gè)core充分支持多線程并發(fā)執(zhí)行。但是對(duì)于某些包含有重I/O操作的進(jìn)程中,我們可以使用threading來(lái)建立線程,然后由它來(lái)單獨(dú)等待i/o操作(網(wǎng)絡(luò)數(shù)據(jù)或disk數(shù)據(jù))完成,此時(shí)將cpu資源釋放出來(lái)交由主程序線程繼續(xù)執(zhí)行,因此整體起到了讓主程序線程不被i/o阻塞的作用。
應(yīng)用實(shí)例:
實(shí)例一:
import Queue
import threading
import urllib2
# 每個(gè)新建線程所調(diào)用的執(zhí)行函數(shù)
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue() #Queue在內(nèi)部實(shí)現(xiàn)的時(shí)候就是線程安全的,所以在使用它的時(shí)候不需考慮鎖、條件變量、事件及信息量等多線程間執(zhí)行協(xié)調(diào)因子
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
# 將每個(gè)新建的線程設(shè)為daemon模式,這樣當(dāng)主線程結(jié)束后,新建的子線程可繼續(xù)執(zhí)行
t.daemon = True #
t.start()
s = q.get()
print s
2、真正的并發(fā)執(zhí)行-使用multiprocessing:
適用范圍:
適用于需要fork出多個(gè)進(jìn)程,然后執(zhí)行真正的多core CPU之上并發(fā)執(zhí)行的程序。
multiprocessing module可以fork出多個(gè)真正并發(fā)執(zhí)行的進(jìn)程。由于全局解釋器鎖的原因,使用threading模塊只能交替執(zhí)行其建立的多個(gè)線程,而這只有在有i/o操作阻塞需要繞過(guò)的時(shí)候才能發(fā)揮較好的作用。。
應(yīng)用實(shí)例:
實(shí)例一:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
這個(gè)小程序創(chuàng)建了一個(gè)進(jìn)程,讓它運(yùn)行起來(lái)。主程序在將它啟動(dòng)起來(lái)后,又使用join方法等待它執(zhí)行結(jié)束退出,然后回收它。此后主進(jìn)程才會(huì)退出,程序執(zhí)行完成。如果我們不在程序的最后使用join來(lái)等待子進(jìn)程執(zhí)行結(jié)束,那么主進(jìn)程退出后,子進(jìn)程有可能尚未退出,這會(huì)導(dǎo)致它成為一個(gè)僵尸進(jìn)程,資源無(wú)法得到有效回收。
3、使用multiprocessing.dummy中的Pool或者直接使用multiprocessing.Pool(關(guān)于multiprocessing.dummy的區(qū)別,可由官方文檔上的說(shuō)明而知?!甿ultiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.’)
通過(guò)使用map與Pool來(lái)生成多個(gè)具有相同目標(biāo)函數(shù)的并發(fā)線程。其中map是函數(shù)式編程的一個(gè)常用函數(shù),另一個(gè)是reduce。
實(shí)例一:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
它是由如下單線程程序改進(jìn)而得的多線程程序。
results = []
for item in my_array:
results.append(my_function(item))
實(shí)例二:
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# 在各個(gè)線程內(nèi)部調(diào)用同一個(gè)目標(biāo)運(yùn)行函數(shù)'urllib2.urlopen',但是傳入不同的參數(shù),進(jìn)而在這些線程中返回不同的值
with Pool(4) as pool:
results = pool.map(urllib2.urlopen, urls)
#當(dāng)然也可使用多個(gè)數(shù)組作為參數(shù),如下所示
results = pool.starmap(function, zip(list_a, list_b))
#或者也可以向下面這樣使用一個(gè)常量與數(shù)組作為參數(shù),如下所示
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
4、結(jié)合threading, multiprocessing, subprocess進(jìn)行的多進(jìn)程并發(fā)
利用threading來(lái)建立多個(gè)交叉執(zhí)行的線程,然后在每個(gè)線程中執(zhí)行由subprocess直接啟動(dòng)的可在shell下執(zhí)行的進(jìn)程。
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): #將我們要執(zhí)行的30個(gè)任務(wù)放入隊(duì)列當(dāng)中
q.put(i)
def worker():
while True:
item = q.get()
#執(zhí)行一個(gè)任務(wù),調(diào)用shell程序,在下面執(zhí)行我們的task,然后等待它結(jié)束返回
subprocess.call("echo "+str(item), shell=True)
q.task_done()
cpus=multiprocessing.cpu_count() #獲得我們系統(tǒng)中所具有的可執(zhí)行cpu核的數(shù)目
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() #隊(duì)列阻塞直到所有task被執(zhí)行完畢