一:啟動(dòng)多進(jìn)程
1.1 multiprocessing是一個(gè)跨平臺(tái)的python包,因此我們使用該工具包進(jìn)行多進(jìn)程的開發(fā).
from multiprocessing import Process
#子進(jìn)程需要執(zhí)行的代碼
def run(str):
while True:
# os.getpid()獲取當(dāng)前進(jìn)程id號(hào)
# os.getppid()獲取當(dāng)前進(jìn)程的父進(jìn)程id號(hào)
print(" %s--%s--%s"%(str, os.getpid(),os.getppid()))
sleep(1)
#創(chuàng)建子進(jìn)程
#target說明進(jìn)程執(zhí)行的任務(wù)
p = Process(target=run, args=("test",))
#啟動(dòng)進(jìn)程
p.start()
#父進(jìn)程的結(jié)束不能影響子進(jìn)程,讓父進(jìn)程等待子進(jìn)程結(jié)束再執(zhí)行父進(jìn)程
process.join()
1.2利用進(jìn)程池生成進(jìn)程.
開多進(jìn)程是為了并發(fā),通常有幾個(gè)cpu核心就開幾個(gè)進(jìn)程,但是進(jìn)程開多了會(huì)影響效率,主要體現(xiàn)在切換的開銷,所以引入進(jìn)程池限制進(jìn)程的數(shù)量。
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,初始化一個(gè)Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),如果超過這個(gè)數(shù),那么請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行
from multiprocessing import Pool
import os, time, random
def run(name):
print("子進(jìn)程%d啟動(dòng)--%s" % (name, os.getpid()))
if __name__ == "__main__":
#創(chuàng)建多個(gè)進(jìn)程
#進(jìn)程池
#表示可以同時(shí)執(zhí)行的進(jìn)程數(shù)量
#Pool默認(rèn)大小是CPU核心數(shù)
pp = Pool(2)
for i in range(3):
#創(chuàng)建進(jìn)程,放入進(jìn)程池同意管理
pp.apply_async(run,args=(i,))
#在調(diào)用join之前必須先調(diào)用close,調(diào)用close之后就不能再繼續(xù)添加新的進(jìn)程了
pp.close()
#進(jìn)程池對(duì)象調(diào)用join,會(huì)等待進(jìn)程池中所有的子進(jìn)程結(jié)束完畢再去執(zhí)行父進(jìn)程
pp.join()
二,多進(jìn)程之間的通信.
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的。
2.1進(jìn)程隊(duì)列queue
from multiprocessing import Process, Queue
import os, time
def write(q):
print("啟動(dòng)寫子進(jìn)程%s" % (os.getpid()))
q.put("A")
print("結(jié)束寫子進(jìn)程%s" % (os.getpid()))
def read(q):
print("啟動(dòng)讀子進(jìn)程%s" % (os.getpid()))
while True:
value = q.get(True)
print("value = " + value)
print("結(jié)束讀子進(jìn)程%s" % (os.getpid()))
if __name__ == "__main__":
#父進(jìn)程創(chuàng)建隊(duì)列,并傳遞給子進(jìn)程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
#pr進(jìn)程里是個(gè)死循環(huán),無法等待其結(jié)束,只能強(qiáng)行結(jié)束
pr.terminate()
print("父進(jìn)程結(jié)束")
2.2進(jìn)程池中的Queue
如果使用進(jìn)程池pool創(chuàng)建進(jìn)程的話,就需要使用Manager().Queue()
from multiprocessing import Manager, Process, Pool
import threading
import random, time,os
# queue,實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,其實(shí)就是個(gè)消息隊(duì)列
def write(q):
print('---write thread is %s' % os.getpid())
for value in ['A', 'B', 'C']:
print("put %s to queue" % value)
q.put(value)
def read(q):
print('---read thread is %s' % os.getpid())
for i in range(q.qsize()):
print("Get value is %s" % q.get(True))
if '__main__' == __name__:
print('---main thread is %s' % os.getpid())
q = Manager().Queue()
po = Pool()
po.apply(write, args=(q,))
po.apply(read, args=(q,))
po.close()
po.join()
print('---end---')