要讓python實(shí)現(xiàn)多進(jìn)程「multiprocessing」。我們先來(lái)了解操作系統(tǒng)相關(guān)知識(shí)。
Unix 和 Linux 操作系統(tǒng)提供了一個(gè) fork() 函數(shù)系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù),調(diào)用一它次,執(zhí)行一次,但是 fork() 函數(shù)調(diào)用一次執(zhí)行兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程「稱(chēng)為父進(jìn)程」復(fù)制了一份「稱(chēng)為子進(jìn)程」,然后,分別在子進(jìn)程和父進(jìn)程中執(zhí)行。
子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的 ID。這樣做的理由是,一個(gè)父進(jìn)程可以 fork() 多個(gè)子進(jìn)程,所以父進(jìn)程要記下所有子進(jìn)程的 ID,而子進(jìn)程只要調(diào)用 getppid() 就可以拿到父進(jìn)程的 ID。
python中 os 模塊封裝了常見(jiàn)的系統(tǒng)調(diào)用,其中就包括 fork(),可以在python程序中輕松創(chuàng)建子程序:
import os
print('Process (%s) start ...' % os.getpid())
#Only work on Unix/linux/Mac
#不能在Windows平臺(tái)上運(yùn)行
pid = os.fork()
if pid == 0:
print('I am child process (%) and my parent is %s.' % (os.getpid(),os.getppid()))
else:
print('I (%) just created a child process (%).' % (os.getpid(),pid))
運(yùn)行結(jié)果:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于 Windows 平臺(tái)下沒(méi)有 fork() 函數(shù)調(diào)用,所以代碼沒(méi)有辦法在 Windows平臺(tái)下運(yùn)行。
有了 fork 調(diào)用,一個(gè)進(jìn)程在接到任務(wù)的時(shí)候就可以復(fù)雜出來(lái)一個(gè)子進(jìn)程來(lái)處理新任務(wù),常見(jiàn)的 Apache 服務(wù)器就是由父進(jìn)程監(jiān)聽(tīng)端口,每當(dāng)有新的 http 請(qǐng)求時(shí),就 fork 出新的子進(jìn)程來(lái)處理新的 http 請(qǐng)求。
multiprocessing「多進(jìn)程」
如果你想寫(xiě)多進(jìn)程的服務(wù)程序,Unix/Linux 平臺(tái)最好了,當(dāng)然也可以在 Windows 平臺(tái)下來(lái)編寫(xiě),因?yàn)?python 跨平臺(tái)。multiprocessing 模塊就是跨平臺(tái)版本的多進(jìn)程模塊。
multiprocessing 模塊提供了一個(gè) Process 類(lèi)來(lái)代表一個(gè)進(jìn)程對(duì)象,下面一個(gè)例子用來(lái)演示啟動(dòng)一個(gè)子進(jìn)程并等待結(jié)束的例子:
import os
from multiprocessing import Process
#子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print('Run child process %s (%s)' % (name,os.getpid()))
if __name__ == '__main__':
print('parent process %s' % os.getpid())
p = Process(target=run_proc,args=('test',))#創(chuàng)建子程序
print('Child process will start')
p.start()#子程序開(kāi)始執(zhí)行
p.join()
print('Child process end.')
- 創(chuàng)建子程序時(shí),只需要傳入一個(gè)執(zhí)行的函數(shù)和函數(shù)的參數(shù)。
- 創(chuàng)建一個(gè) Procsess 實(shí)例,用 start() 方式開(kāi)啟,這樣創(chuàng)建的進(jìn)程比 fork 還簡(jiǎn)單。
- join() 方法可以等jinc子進(jìn)程執(zhí)行完后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程之間的同步。
Pool
如果想要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程。
import os,time,random
from multiprocessing import Pool
def long_time_task(name):
print('Run task %s (%s)...' % (name,os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s run %0.2f seconds.' % (name,(end-start)))
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task,args=(i,))
print('Waiting for all subprocess done...')
p.close()
p.join()
print('All subprocess done')
執(zhí)行結(jié)果:
Parent process 7600.
Waiting for all subprocess done...
Run task 0 (11392)...
Run task 1 (6432)...
Run task 2 (10768)...
Run task 3 (5116)...
Task 0 run 0.03 seconds.
Run task 4 (11392)...
Task 3 run 1.42 seconds.
Task 1 run 1.77 seconds.
Task 4 run 2.59 seconds.
Task 2 run 2.93 seconds.
All subprocess done
Process finished with exit code 0
- 對(duì)Pool調(diào)用 join() 方法會(huì)等所用子進(jìn)程執(zhí)行完畢,調(diào)用 join() 之前一定要調(diào)用 close() 調(diào)用 close() 之后不能在有新的process
- 程序的輸出結(jié)果顯示 task 0、1、2、3 是同時(shí)執(zhí)行的,而 task 4 是等前四個(gè)執(zhí)行完畢才執(zhí)行,這是因?yàn)?,進(jìn)程池在我的電腦上是4。,因此最多執(zhí)行四個(gè)進(jìn)程,這是 Pool 有意設(shè)計(jì)的限制,并不是操作系統(tǒng)的限制,如果你改成
p = Pool(5)
就可以同時(shí)跑 5 個(gè)進(jìn)程。
子進(jìn)程
很多時(shí)候,子進(jìn)程并不是本身,熱是一個(gè)外部的進(jìn)程。我們創(chuàng)建了子進(jìn)程之后,還要控制進(jìn)程的輸入和輸出。
subprocess 模塊可以讓我們非常方便的啟動(dòng)一個(gè)子進(jìn)程,然后控制輸入和輸出。
這一部分未完待續(xù)
進(jìn)程間的通信
Process 間肯定是要通信的,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信,python 中的 multiprocessing 模塊包裝了底層的機(jī)制,提供了 Queue、Pipes 等多種方法來(lái)交換數(shù)據(jù)。
我們以 Queue 為例,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往 Queue 里寫(xiě)數(shù)據(jù),一個(gè)從 Queue 中讀數(shù)據(jù)。
from multiprocessing import Queue,Process
import os,time,random
#寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行的代碼
def write(q):
print('Process to write : %s' % os.getpid())
for value in ['A','B','C']:
print('Put %s queue...' % value)
q.put(value)
time.sleep(random.random())
#讀數(shù)據(jù)執(zhí)行的代碼
def read(q):
print('Process to read : %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == '__main__':
#父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
#啟動(dòng)子程序pw,寫(xiě)入:
pw.start()
#啟動(dòng)子程序pr,讀?。? pr.start()
#等待pw結(jié)束:
pw.join()
#pr進(jìn)程是死循環(huán),無(wú)法等待它結(jié)束,只能強(qiáng)行終止。
pr.terminate()
運(yùn)行結(jié)果
Process to read : 8416
Process to write : 12840
Put A queue...
Get A from queue.
Put B queue...
Get B from queue.
Put C queue...
Get C from queue.
Process finished with exit code 0
- 在Windows 平臺(tái)下實(shí)現(xiàn)多進(jìn)程,用multiprocessing 模塊
- 進(jìn)程間的通信是用 Queue 、Pipes 來(lái)進(jìn)行的。