0三、線程和進(jìn)程的關(guān)系
1、一個(gè)進(jìn)程可以有多個(gè)線程,但至少有一個(gè)線程;而一個(gè)線程只能在一個(gè)進(jìn)程的地址空間內(nèi)活動(dòng)。
2、資源分配給進(jìn)程,同一個(gè)進(jìn)程的所有線程共享該進(jìn)程所有資源。
3、CPU分配給線程,即真正在處理器運(yùn)行的是線程。
4、線程在執(zhí)行過程中需要協(xié)作同步,不同進(jìn)程的線程間要利用消息通信的辦法實(shí)現(xiàn)同步。
注:進(jìn)程是最基本的資源擁有單位和調(diào)度單位。
進(jìn)程間通信方式:(1)消息傳遞(2)共享存儲(chǔ)(3)管道通信
搞定python多線程和多進(jìn)程
2017年02月24日 22:30
1 概念梳理:
1.1 線程1.1.1 什么是線程
線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線程并行執(zhí)行不同的任務(wù)。一個(gè)線程是一個(gè)execution context(執(zhí)行上下文),即一個(gè)cpu執(zhí)行時(shí)所需要的一串指令。
1.1.2 線程的工作方式
假設(shè)你正在讀一本書,沒有讀完,你想休息一下,但是你想在回來時(shí)恢復(fù)到當(dāng)時(shí)讀的具體進(jìn)度。有一個(gè)方法就是記下頁(yè)數(shù)、行數(shù)與字?jǐn)?shù)這三個(gè)數(shù)值,這些數(shù)值就是execution context。如果你的室友在你休息的時(shí)候,使用相同的方法讀這本書。你和她只需要這三個(gè)數(shù)字記下來就可以在交替的時(shí)間共同閱讀這本書了。
線程的工作方式與此類似。CPU會(huì)給你一個(gè)在同一時(shí)間能夠做多個(gè)運(yùn)算的幻覺,實(shí)際上它在每個(gè)運(yùn)算上只花了極少的時(shí)間,本質(zhì)上CPU同一時(shí)刻只干了一件事。它能這樣做就是因?yàn)樗忻總€(gè)運(yùn)算的execution context。就像你能夠和你朋友共享同一本書一樣,多任務(wù)也能共享同一塊CPU。
1.2 進(jìn)程
一個(gè)程序的執(zhí)行實(shí)例就是一個(gè)進(jìn)程。每一個(gè)進(jìn)程提供執(zhí)行程序所需的所有資源。(進(jìn)程本質(zhì)上是資源的集合)
一個(gè)進(jìn)程有一個(gè)虛擬的地址空間、可執(zhí)行的代碼、操作系統(tǒng)的接口、安全的上下文(記錄啟動(dòng)該進(jìn)程的用戶和權(quán)限等等)、唯一的進(jìn)程ID、環(huán)境變量、優(yōu)先級(jí)類、最小和最大的工作空間(內(nèi)存空間),還要有至少一個(gè)線程。
每一個(gè)進(jìn)程啟動(dòng)時(shí)都會(huì)最先產(chǎn)生一個(gè)線程,即主線程。然后主線程會(huì)再創(chuàng)建其他的子線程。
與進(jìn)程相關(guān)的資源包括:
內(nèi)存頁(yè)(同一個(gè)進(jìn)程中的所有線程共享同一個(gè)內(nèi)存空間)
文件描述符(e.g. open sockets)
安全憑證(e.g.啟動(dòng)該進(jìn)程的用戶ID)
1.3 進(jìn)程與線程區(qū)別
1.同一個(gè)進(jìn)程中的線程共享同一內(nèi)存空間,但是進(jìn)程之間是獨(dú)立的。
2.同一個(gè)進(jìn)程中的所有線程的數(shù)據(jù)是共享的(進(jìn)程通訊),進(jìn)程之間的數(shù)據(jù)是獨(dú)立的。
3.對(duì)主線程的修改可能會(huì)影響其他線程的行為,但是父進(jìn)程的修改(除了刪除以外)不會(huì)影響其他子進(jìn)程。
4.線程是一個(gè)上下文的執(zhí)行指令,而進(jìn)程則是與運(yùn)算相關(guān)的一簇資源。
5.同一個(gè)進(jìn)程的線程之間可以直接通信,但是進(jìn)程之間的交流需要借助中間代理來實(shí)現(xiàn)。
6.創(chuàng)建新的線程很容易,但是創(chuàng)建新的進(jìn)程需要對(duì)父進(jìn)程做一次復(fù)制。
7.一個(gè)線程可以操作同一進(jìn)程的其他線程,但是進(jìn)程只能操作其子進(jìn)程。
8.線程啟動(dòng)速度快,進(jìn)程啟動(dòng)速度慢(但是兩者運(yùn)行速度沒有可比性)。
2 多線程
2.1 線程常用方法
方法 注釋
start() 線程準(zhǔn)備就緒,等待CPU調(diào)度
setName() 為線程設(shè)置名稱
getName() 獲取線程名稱
setDaemon(True) 設(shè)置為守護(hù)線程
join() 逐個(gè)執(zhí)行每個(gè)線程,執(zhí)行完畢后繼續(xù)往下執(zhí)行
run() 線程被cpu調(diào)度后自動(dòng)執(zhí)行線程對(duì)象的run方法,如果想自定義線程類,直接重寫run方法就行了
2.1.1 Thread類
1.普通創(chuàng)建方式
import threading import time defrun(n): print("task", n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() """ task t1 task t2 2s 2s 1s 1s 0s 0s """
2.繼承threading.Thread來自定義線程類
其本質(zhì)是重構(gòu)Thread類中的run方法
import threading import time classMyThread(threading.Thread):def__init__(self, n): super(MyThread, self).init() # 重構(gòu)run函數(shù)必須要寫 self.n = n defrun(self): print("task", self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if name == "main": t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start()2.1.2 計(jì)算子線程執(zhí)行的時(shí)間
注:sleep的時(shí)候是不會(huì)占用cpu的,在sleep的時(shí)候操作系統(tǒng)會(huì)把線程暫時(shí)掛起。
join() #等此線程執(zhí)行完后,再執(zhí)行其他線程或主線程 threading.current_thread() #輸出當(dāng)前線程import threading import time defrun(n): print("task", n,threading.current_thread()) #輸出當(dāng)前的線程 time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') strat_time = time.time() t_obj = [] #定義列表用于存放子線程實(shí)例for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) """ 由主線程生成的三個(gè)子線程 task t-0 <Thread(Thread-1, started 44828)> task t-1 <Thread(Thread-2, started 42804)> task t-2 <Thread(Thread-3, started 41384)> """for tmp in t_obj: t.join() #為每個(gè)子線程添加join之后,主線程就會(huì)等這些子線程執(zhí)行完之后再執(zhí)行。 print("cost:", time.time() - strat_time) #主線程 print(threading.current_thread()) #輸出當(dāng)前線程""" <_MainThread(MainThread, started 43740)> """2.1.3 統(tǒng)計(jì)當(dāng)前活躍的線程數(shù)
由于主線程比子線程快很多,當(dāng)主線程執(zhí)行active_count()時(shí),其他子線程都還沒執(zhí)行完畢,因此利用主線程統(tǒng)計(jì)的活躍的線程數(shù)num = sub_num(子線程數(shù)量)+1(主線程本身)
import threading import time defrun(n): print("task", n) time.sleep(1) #此時(shí)子線程停1sfor i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(0.5) #主線程停0.5秒 print(threading.active_count()) #輸出當(dāng)前活躍的線程數(shù)""" task t-0 task t-1 task t-2 4 """
由于主線程比子線程慢很多,當(dāng)主線程執(zhí)行active_count()時(shí),其他子線程都已經(jīng)執(zhí)行完畢,因此利用主線程統(tǒng)計(jì)的活躍的線程數(shù)num = 1(主線程本身)
import threading import time defrun(n): print("task", n) time.sleep(0.5) #此時(shí)子線程停0.5sfor i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(1) #主線程停1秒 print(threading.active_count()) #輸出活躍的線程數(shù)""" task t-0 task t-1 task t-2 1 """
此外我們還能發(fā)現(xiàn)在python內(nèi)部默認(rèn)會(huì)等待最后一個(gè)進(jìn)程執(zhí)行完后再執(zhí)行exit(),或者說python內(nèi)部在此時(shí)有一個(gè)隱藏的join()。
2.2 守護(hù)進(jìn)程
我們看下面這個(gè)例子,這里使用setDaemon(True)把所有的子線程都變成了主線程的守護(hù)線程,因此當(dāng)主進(jìn)程結(jié)束后,子線程也會(huì)隨之結(jié)束。所以當(dāng)主線程結(jié)束后,整個(gè)程序就退出了。
import threading import time defrun(n): print("task", n) time.sleep(1) #此時(shí)子線程停1s print('3') time.sleep(1) print('2') time.sleep(1) print('1') for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.setDaemon(True) #把子進(jìn)程設(shè)置為守護(hù)線程,必須在start()之前設(shè)置 t.start() time.sleep(0.5) #主線程停0.5秒 print(threading.active_count()) #輸出活躍的線程數(shù)""" task t-0 task t-1 task t-2 4 Process finished with exit code 0 """2.3 GIL
在非python環(huán)境中,單核情況下,同時(shí)只能有一個(gè)任務(wù)執(zhí)行。多核時(shí)可以支持多個(gè)線程同時(shí)執(zhí)行。但是在python中,無論有多少核,同時(shí)只能執(zhí)行一個(gè)線程。究其原因,這就是由于GIL的存在導(dǎo)致的。
GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設(shè)計(jì)之初的考慮,為了數(shù)據(jù)安全所做的決定。某個(gè)線程想要執(zhí)行,必須先拿到GIL,我們可以把GIL看作是“通行證”,并且在一個(gè)python進(jìn)程中,GIL只有一個(gè)。拿不到通行證的線程,就不允許進(jìn)入CPU執(zhí)行。GIL只在cpython中才有,因?yàn)閏python調(diào)用的是c語(yǔ)言的原生線程,所以他不能直接操作cpu,只能利用GIL保證同一時(shí)間只能有一個(gè)線程拿到數(shù)據(jù)。而在pypy和jpython中是沒有GIL的。
Python多線程的工作過程:
python在使用多線程的時(shí)候,調(diào)用的是c語(yǔ)言的原生線程。
拿到公共數(shù)據(jù)
申請(qǐng)gil
python解釋器調(diào)用os原生線程
os操作cpu執(zhí)行運(yùn)算
當(dāng)該線程執(zhí)行時(shí)間到后,無論運(yùn)算是否已經(jīng)執(zhí)行完,gil都被要求釋放
進(jìn)而由其他進(jìn)程重復(fù)上面的過程
等其他進(jìn)程執(zhí)行完后,又會(huì)切換到之前的線程(從他記錄的上下文繼續(xù)執(zhí)行)
整個(gè)過程是每個(gè)線程執(zhí)行自己的運(yùn)算,當(dāng)執(zhí)行時(shí)間到就進(jìn)行切換(context switch)。
python針對(duì)不同類型的代碼執(zhí)行效率也是不同的:
1、CPU密集型代碼(各種循環(huán)處理、計(jì)算等等),在這種情況下,由于計(jì)算工作多,ticks計(jì)數(shù)很快就會(huì)達(dá)到閾值,然后觸發(fā)GIL的釋放與再競(jìng)爭(zhēng)(多個(gè)線程來回切換當(dāng)然是需要消耗資源的),所以python下的多線程對(duì)CPU密集型代碼并不友好。
2、IO密集型代碼(文件處理、網(wǎng)絡(luò)爬蟲等涉及文件讀寫的操作),多線程能夠有效提升效率(單線程下有IO操作會(huì)進(jìn)行IO等待,造成不必要的時(shí)間浪費(fèi),而開啟多線程能在線程A等待時(shí),自動(dòng)切換到線程B,可以不浪費(fèi)CPU的資源,從而能提升程序執(zhí)行效率)。所以python的多線程對(duì)IO密集型代碼比較友好。
使用建議?
python下想要充分利用多核CPU,就用多進(jìn)程。因?yàn)槊總€(gè)進(jìn)程有各自獨(dú)立的GIL,互不干擾,這樣就可以真正意義上的并行執(zhí)行,在python中,多進(jìn)程的執(zhí)行效率優(yōu)于多線程(僅僅針對(duì)多核CPU而言)。
GIL在python中的版本差異:
1、在python2.x里,GIL的釋放邏輯是當(dāng)前線程遇見IO操作或者ticks計(jì)數(shù)達(dá)到100時(shí)進(jìn)行釋放。(ticks可以看作是python自身的一個(gè)計(jì)數(shù)器,專門做用于GIL,每次釋放后歸零,這個(gè)計(jì)數(shù)可以通過sys.setcheckinterval 來調(diào)整)。而每次釋放GIL鎖,線程進(jìn)行鎖競(jìng)爭(zhēng)、切換線程,會(huì)消耗資源。并且由于GIL鎖存在,python里一個(gè)進(jìn)程永遠(yuǎn)只能同時(shí)執(zhí)行一個(gè)線程(拿到GIL的線程才能執(zhí)行),這就是為什么在多核CPU上,python的多線程效率并不高。
2、在python3.x中,GIL不使用ticks計(jì)數(shù),改為使用計(jì)時(shí)器(執(zhí)行時(shí)間達(dá)到閾值后,當(dāng)前線程釋放GIL),這樣對(duì)CPU密集型程序更加友好,但依然沒有解決GIL導(dǎo)致的同一時(shí)間只能執(zhí)行一個(gè)線程的問題,所以效率依然不盡如人意。
2.4 線程鎖
由于線程之間是進(jìn)行隨機(jī)調(diào)度,并且每個(gè)線程可能只執(zhí)行n條執(zhí)行之后,當(dāng)多個(gè)線程同時(shí)修改同一條數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)臟數(shù)據(jù),所以,出現(xiàn)了線程鎖,即同一時(shí)刻允許一個(gè)線程執(zhí)行操作。線程鎖用于鎖定資源,你可以定義多個(gè)鎖, 像下面的代碼, 當(dāng)你需要獨(dú)占某一資源時(shí),任何一個(gè)鎖都可以鎖這個(gè)資源,就好比你用不同的鎖都可以把相同的一個(gè)門鎖住是一個(gè)道理。
由于線程之間是進(jìn)行隨機(jī)調(diào)度,如果有多個(gè)線程同時(shí)操作一個(gè)對(duì)象,如果沒有很好地保護(hù)該對(duì)象,會(huì)造成程序結(jié)果的不可預(yù)期,我們也稱此為“線程不安全”。
實(shí)測(cè):在python2.7、mac os下,運(yùn)行以下代碼可能會(huì)產(chǎn)生臟數(shù)據(jù)。但是在python3中就不一定會(huì)出現(xiàn)下面的問題。import threading import time defrun(n):global num num += 1 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print"num:", num """ 產(chǎn)生臟數(shù)據(jù)后的運(yùn)行結(jié)果: num: 19999 """2.5 互斥鎖(mutex)
為了方式上面情況的發(fā)生,就出現(xiàn)了互斥鎖(Lock)
import threading import time defrun(n): lock.acquire() #獲取鎖global num num += 1 lock.release() #釋放鎖 lock = threading.Lock() #實(shí)例化一個(gè)鎖對(duì)象 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print"num:", num2.6 遞歸鎖
RLcok類的用法和Lock類一模一樣,但它支持嵌套,,在多個(gè)鎖沒有釋放的時(shí)候一般會(huì)使用使用RLcok類。
import threading import time gl_num = 0 lock = threading.RLock() defFunc(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()2.7 信號(hào)量(BoundedSemaphore類)
互斥鎖同時(shí)只允許一個(gè)線程更改數(shù)據(jù),而Semaphore是同時(shí)允許一定數(shù)量的線程更改數(shù)據(jù) ,比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去。
import threading import time defrun(n): semaphore.acquire() #加鎖 time.sleep(1) print("run the thread:%s\n" % n) semaphore.release() #釋放 num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允許5個(gè)線程同時(shí)運(yùn)行for i in range(22): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() while threading.active_count() != 1: pass# print threading.active_count()else: print('-----all threads done-----')
2.8 事件(Event類)
python線程的事件用于主線程控制其他線程的執(zhí)行,事件是一個(gè)簡(jiǎn)單的線程同步對(duì)象,其主要提供以下幾個(gè)方法:
方法 注釋
clear 將flag設(shè)置為“False”
set 將flag設(shè)置為“True”
is_set 判斷是否設(shè)置了flag
wait 會(huì)一直監(jiān)聽flag,如果沒有檢測(cè)到flag就一直處于阻塞狀態(tài)
事件處理的機(jī)制:全局定義了一個(gè)“Flag”,當(dāng)flag值為“False”,那么event.wait()就會(huì)阻塞,當(dāng)flag值為“True”,那么event.wait()便不再阻塞。
利用Event類模擬紅綠燈import threading import time event = threading.Event() deflighter(): count = 0 event.set() #初始值為綠燈whileTrue: if5 < count <=10 : event.clear() # 紅燈,清除標(biāo)志位 print("\33[41;1mred light is on...\033[0m") elif count > 10: event.set() # 綠燈,設(shè)置標(biāo)志位 count = 0else: print("\33[42;1mgreen light is on...\033[0m") time.sleep(1) count += 1defcar(name):whileTrue: if event.is_set(): #判斷是否設(shè)置了標(biāo)志位 print("[%s] running..."%name) time.sleep(1) else: print("[%s] sees red light,waiting..."%name) event.wait() print("[%s] green light is on,start going..."%name) light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=("MINI",)) car.start()2.9 條件(Condition類)
使得線程等待,只有滿足某條件時(shí),才釋放n個(gè)線程
2.10 定時(shí)器(Timer類)
定時(shí)器,指定n秒后執(zhí)行某操作
from threading import Timer defhello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
3 多進(jìn)程
在linux中,每個(gè)進(jìn)程都是由父進(jìn)程提供的。每啟動(dòng)一個(gè)子進(jìn)程就從父進(jìn)程克隆一份數(shù)據(jù),但是進(jìn)程之間的數(shù)據(jù)本身是不能共享的。
from multiprocessing import Process import time deff(name): time.sleep(2) print('hello', name) if name == 'main': p = Process(target=f, args=('bob',)) p.start() p.join() from multiprocessing import Process import os definfo(title): print(title) print('module name:', name) print('parent process:', os.getppid()) #獲取父進(jìn)程id print('process id:', os.getpid()) #獲取自己的進(jìn)程id print("\n\n") deff(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if name == 'main': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()3.1 進(jìn)程間通信
由于進(jìn)程之間數(shù)據(jù)是不共享的,所以不會(huì)出現(xiàn)多線程GIL帶來的問題。多進(jìn)程之間的通信通過Queue()或Pipe()來實(shí)現(xiàn)
3.1.1 Queue()
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue deff(q): q.put([42, None, 'hello']) if name == 'main': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()3.1.2 Pipe()
Pipe的本質(zhì)是進(jìn)程之間的數(shù)據(jù)傳遞,而不是數(shù)據(jù)共享,這和socket有點(diǎn)像。pipe()返回兩個(gè)連接對(duì)象分別表示管道的兩端,每端都有send()和recv()方法。如果兩個(gè)進(jìn)程試圖在同一時(shí)間的同一端進(jìn)行讀取和寫入那么,這可能會(huì)損壞管道中的數(shù)據(jù)。
from multiprocessing import Process, Pipe deff(conn): conn.send([42, None, 'hello']) conn.close() if name == 'main': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()3.2 Manager
通過Manager可實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)的共享。Manager()返回的manager對(duì)象會(huì)通過一個(gè)服務(wù)進(jìn)程,來使其他進(jìn)程通過代理的方式操作python對(duì)象。manager對(duì)象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.
from multiprocessing import Process, Manager deff(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if name == 'main': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)3.3 進(jìn)程鎖(進(jìn)程同步)
數(shù)據(jù)輸出的時(shí)候保證不同進(jìn)程的輸出內(nèi)容在同一塊屏幕正常顯示,防止數(shù)據(jù)亂序的情況。
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock deff(l, i): l.acquire() try: print('hello world', i) finally: l.release() if name == 'main': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()3.4 進(jìn)程池
由于進(jìn)程啟動(dòng)的開銷比較大,使用多進(jìn)程的時(shí)候會(huì)導(dǎo)致大量?jī)?nèi)存空間被消耗。為了防止這種情況發(fā)生可以使用進(jìn)程池,(由于啟動(dòng)線程的開銷比較小,所以不需要線程池這種概念,多線程只會(huì)頻繁得切換cpu導(dǎo)致系統(tǒng)變慢,并不會(huì)占用過多的內(nèi)存空間)
進(jìn)程池中常用方法:
apply() 同步執(zhí)行(串行)
apply_async() 異步執(zhí)行(并行)
terminate() 立刻關(guān)閉進(jìn)程池
join() 主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢。必須在close或terminate()之后。
close() 等待所有進(jìn)程結(jié)束后,才關(guān)閉進(jìn)程池。
from multiprocessing import Process,Pool import time defFoo(i): time.sleep(2) return i+100defBar(arg): print(,arg) pool = Pool(5) #允許進(jìn)程池同時(shí)放入5個(gè)進(jìn)程for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #func子進(jìn)程執(zhí)行完后,才會(huì)執(zhí)行callback,否則callback不執(zhí)行(而且callback是由父進(jìn)程來執(zhí)行了)#pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join() #主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢。必須在close()或terminate()之后。
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。在上面的程序中產(chǎn)生了10個(gè)進(jìn)程,但是只能有5同時(shí)被放入進(jìn)程池,剩下的都被暫時(shí)掛起,并不占用內(nèi)存空間,等前面的五個(gè)進(jìn)程執(zhí)行完后,再執(zhí)行剩下5個(gè)進(jìn)程。
4 補(bǔ)充:協(xié)程
線程和進(jìn)程的操作是由程序觸發(fā)系統(tǒng)接口,最后的執(zhí)行者是系統(tǒng),它本質(zhì)上是操作系統(tǒng)提供的功能。而協(xié)程的操作則是程序員指定的,在python中通過yield,人為的實(shí)現(xiàn)并發(fā)處理。
協(xié)程存在的意義:對(duì)于多線程應(yīng)用,CPU通過切片的方式來切換線程間的執(zhí)行,線程切換時(shí)需要耗時(shí)。協(xié)程,則只使用一個(gè)線程,分解一個(gè)線程成為多個(gè)“微線程”,在一個(gè)線程中規(guī)定某個(gè)代碼塊的執(zhí)行順序。
協(xié)程的適用場(chǎng)景:當(dāng)程序中存在大量不需要CPU的操作時(shí)(IO)。
常用第三方模塊gevent和greenlet。(本質(zhì)上,gevent是對(duì)greenlet的高級(jí)封裝,因此一般用它就行,這是一個(gè)相當(dāng)高效的模塊。)
4.1 greenletfrom greenlet import greenlet deftest1(): print(12) gr2.switch() print(34) gr2.switch() deftest2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
實(shí)際上,greenlet就是通過switch方法在不同的任務(wù)之間進(jìn)行切換。
4.2 geventfrom gevent import monkey; monkey.patch_all() import gevent import requests deff(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
通過joinall將任務(wù)f和它的參數(shù)進(jìn)行統(tǒng)一調(diào)度,實(shí)現(xiàn)單線程中的協(xié)程。代碼封裝層次很高,實(shí)際使用只需要了解它的幾個(gè)主要方法即可。