2019-03-07多線程&多進程(下)

? ? ? ?在《多線程&多進程(上)》中,記錄了python中的threading模塊常用的類的使用方法,對比了Lock/RLock和condition版本的生產(chǎn)者與消費者的問題,但是python中并不支持真正的支持多線程,不能充分的利用多核cpu的資源,大部分情況下使用的是多進程。在下半部分中,將記錄多進程的使用。

多進程

? ? ? ?官方文檔
? ? ? ? multiprocessing模塊常用的類和方法(腦圖)
本節(jié)將介紹:

  • Process(用于創(chuàng)建進程模塊)
  • Pool(用于創(chuàng)建管理進程池)
  • Queue(用于進程通信,資源共享)
  • Lock
  • Pipe(用于管道通信)
  • Semaphore

1. Process模塊

  • 基本使用
    ? ? ? ?在multiprocessing中,每一個進程都用一個Process類來表示。其用法和Thread對象的用法很相似,也有start(),run(),join()等方法。Process類適合簡單的進程創(chuàng)建,如需資源共享可以結(jié)合multiprocessing.Queue使用;如果想要控制進程數(shù)量,則建議使用進程池Pool類。
    首先看下它的API
Process([group [, target [, name [, args [, kwargs]]]]])
  • target:調(diào)用對象,你可以傳入方法的名字。
  • args:被調(diào)用對象的位置參數(shù)元組,比如target是函數(shù)a,他有兩個參數(shù)m,n,那么args就傳入(m, n)即可。
  • kwargs:調(diào)用對象的字典。
  • name:別名,相當于給這個進程取一個名字。
  • group:線程組,目前還沒有實現(xiàn),庫引用中提示必須是None。

其包含以下實例方法,和Thead類似:

方法 說明
is_alive() 返回進程是否在運行
join([timeout]) 阻塞當前上下文環(huán)境的進程程,直到調(diào)用此方法的進程終止或到達指定的timeout(可選參數(shù))
start() 進程準備就緒,等待CPU調(diào)度
run() strat()調(diào)用run方法,如果實例進程時未制定傳入target,這star執(zhí)行t默認run()方法。
terminate() 不管任務(wù)是否完成,立即停止工作進程。

Process類中創(chuàng)建多進程有兩種方法
(1)用法與threading類似

from multiprocessing import Process  #導(dǎo)入Process模塊 
import os  
def test(name):
    '''
    函數(shù)輸出當前進程ID,以及其父進程ID。
    此代碼應(yīng)在Linux下運行,因為windows下os模塊不支持getppid()
    '''
    print("Process ID: %s" % os.getpid())
    print("Parent Process ID: %s" % os.getppid())

if __name__ == "__main__": 
    '''
    windows下,創(chuàng)建進程的代碼一下要放在main函數(shù)里面
    ''' 
    proc = Process(target=test, args=('nmask',))  
    proc.start()  
    proc.join()

(2)繼承Process類,修r(nóng)un函數(shù)代碼

from multiprocessing import Process
import time
class MyProcess(Process):
    '''
    繼承Process類,類似threading.Thread
    '''
    def __init__(self, arg):
        super(MyProcess, self).__init__()
        #multiprocessing.Process.__init__(self)
        self.arg = arg

    def run(self):
        '''
        重構(gòu)run函數(shù)
        '''
        print('nMask', self.arg)
        time.sleep(1)
        
if __name__ == '__main__':
    for i in range(10):
        p = MyProcess(i)
        p.start()
    for i in range(10):
        p.join()

Process 的屬性

  • authkey
  • daemon:和線程的setDeamon功能一樣(將父進程設(shè)置為守護進程,當父進程結(jié)束時,子進程也結(jié)束)。
  • exitcode(進程在運行時為None、如果為–N,表示被信號N結(jié)束)。
  • name:進程名字。
  • pid:進程號。
    例如使用daemon屬性:
from multiprocessing import Process
import time
 
 
class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop
 
    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
 
 
if __name__ == '__main__':
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
        #p.join()
 
    print('Main process Ended!')

輸出結(jié)果為:
Main process Ended!
? ? ? ?因為主進程只輸出一句話就結(jié)束了,并且我們設(shè)置了p.daemon=True,所以此時并不會等待子進程結(jié)束,類似多線程里說介紹的,我們同樣可以使用join()方法,就可等待子進程完成再結(jié)束主進程了(將上面代碼中p.join() 取消注釋即可)。

2.Pool模塊

? ? ? ?Pool模塊是用來創(chuàng)建管理進程池的,當子進程非常多且需要控制子進程數(shù)量時可以使用此模塊?!ultiprocessing.Pool可以提供指定數(shù)量的進程供用戶調(diào)用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行它。在共享資源時,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array。
? ? ? ?我們看看它的API:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes :使用的工作進程的數(shù)量,如果processes是None那么使用 os.cpu_count()返回的數(shù)量。
  • initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調(diào)用initializer(*initargs)。
  • maxtasksperchild:工作進程退出之前可以完成的任務(wù)數(shù),完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
  • context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當?shù)脑O(shè)置了context。

其包含如下實例方法

方法 說明
apply_async(func[, args[, kwds[, callback]]]) 非阻塞
apply(func[, args[, kwds]]) 阻塞
close() 關(guān)閉pool,使其不在接受新的任務(wù)。
terminate() 關(guān)閉pool,結(jié)束工作進程,不在處理未完成的任務(wù)。
join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。

Pool使用方法
(1)Pool+map函數(shù)

from multiprocessing import Pool
def test(i):
    print(i)

if __name__=="__main__":
    lists=[1,2,3,4,5]
    pool=Pool(processes=2) #定義最大的進程數(shù)
    pool.map(test,lists)#map的第二個參數(shù)必須是一個可迭代變量--如list。
    pool.close()
    pool.join()

此寫法缺點在于只能通過map向函數(shù)傳遞一個參數(shù)。

(2)異步進程池(非阻塞)

from multiprocessing import Pool

def test(i):
    print(i)

if __name__=="__main__":
    pool = Pool(processes=10)
    for i in range(500):
        pool.apply_async(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當一個進程執(zhí)行完后啟動一個新進程.       
    print("test")
    pool.close()
    pool.join()

輸出:

For循環(huán)中執(zhí)行步驟:

  • 循環(huán)遍歷,將500個子進程添加到進程池(相對父進程會阻塞)
  • 每次執(zhí)行10個子進程,等一個子進程執(zhí)行完后,立馬啟動新的子進程。(相對父進程不阻塞)

? ? ? ?apply_async為異步進程池寫法。
? ? ? ?異步指的是啟動子進程的過程,與父進程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進程池添加子進程的過程,與父進程本身的執(zhí)行卻是同步的。
注意:調(diào)用join之前,先調(diào)用close或者terminate方法,否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束。

(3)同步進程池(阻塞)

from multiprocessing import Pool
import time

def test(i):
    print(i)

if __name__=="__main__":
    pool = Pool(processes=10)
    for i in range(500):
        pool.apply(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當一個進程執(zhí)行完后啟動一個新進程.       
    print("test")
    pool.close()
    pool.join()

輸出:

實際測試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:

  • 遍歷500個可迭代對象,往進程池放一個子進程
  • 執(zhí)行這個子進程,等子進程執(zhí)行完畢,再往進程池放一個子進程,再執(zhí)行。(同時只執(zhí)行一個子進程)
  • for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。
    (并未實現(xiàn)多進程并行)

3. queue線程安全隊列

? ? ? ?該用法和線程中的用法一樣。

4. Lock模塊

? ? ? ?當多進程需要訪問共享資源的時候,類似多線程,它同樣有一個Lock類,可以避免訪問的沖突。

from multiprocessing import Process, Lock  

def l(lock, num):  
    with lock: 
        # lock.acquire() 
        print("Hello Num: %s" % (num))
        # lock.release()  

if __name__ == '__main__':  
    lock = Lock()  #這個一定要定義為全局
    for num in range(20):  
        Process(target=l, args=(lock, num)).start()  
        #這個類似多線程中的threading,但是進程太多了,控制不了。

? ? ? ?父進程的全局變量能不能被子進程共享呢?答案是否定的,如果想要共享資源,可以使用manage類,或者queue模塊。但這里我就有個疑問了,不是說多線程之中的內(nèi)存資源是不共享的嗎,那么它的Lock有什么用呢?
其使用場景可以參考這篇文章:Python的多進程鎖的使用

? ? ? ?多進程中一般是不推薦使用資源共享,如果要使用,可以參考:多進程共享資源

5. Pipe 管道

? ? ? ?顧名思義,一端發(fā)一端收。Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。

6. Semaphore,信號量

? ? ? ?其是在進程同步過程中一個比較重要的角色??梢钥刂婆R界資源的數(shù)量,保證各個進程之間的互斥和同步。

? ? ? ?對于上述內(nèi)容的詳細解釋,可以參考:https://cuiqingcai.com/3335.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容