多進(jìn)程
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

image.png
join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。
Pool
如果要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程:
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close() # pool.join前必須有它,沒(méi)有會(huì)報(bào)錯(cuò)
p.join() # pool.join開(kāi)啟池中的子進(jìn)程并等待所有子進(jìn)程執(zhí)行完畢
print('All subprocesses done.')

image.png
代碼解讀:
對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
請(qǐng)注意輸出的結(jié)果,task 0,1,2,3是立刻執(zhí)行的,而task 4要等待前面某個(gè)task完成后才執(zhí)行,這是因?yàn)镻ool的默認(rèn)大小在我的電腦上是4,因此,最多同時(shí)執(zhí)行4個(gè)進(jìn)程。這是Pool有意設(shè)計(jì)的限制,并不是操作系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時(shí)跑5個(gè)進(jìn)程。
進(jìn)程間通信
Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue、Pipes等多種方式來(lái)交換數(shù)據(jù)。我們以Queue為例,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫(xiě)數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù):
from multiprocessing import Process, Queue
import os, time, random
# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動(dòng)子進(jìn)程pw,寫(xiě)入:
pw.start()
# 啟動(dòng)子進(jìn)程pr,讀取:
pr.start()
# 等待pw結(jié)束:
pw.join()
# pr進(jìn)程里是死循環(huán),無(wú)法等待其結(jié)束,只能強(qiáng)行終止:
pr.terminate()

image.png