Python多進程運行——Multiprocessing基礎教程3

之前兩篇文章討論了Multiprocessing模塊的基本概念以及進程間的數(shù)據(jù)交換。本文將要介紹Multiprocessing模塊進程間的同步以及池化。

進程間同步

進程同步是一種確保兩個或多個并發(fā)進程不同時執(zhí)行某些特定的程序段(關鍵段)的一種機制。所謂關鍵段,是指程序中訪問共享數(shù)據(jù)的部分。

如果不同的進程同時訪問共享數(shù)據(jù)的話,可能會引起資源競爭的問題。所謂資源競爭,就是多個進程同時訪問共享數(shù)據(jù),并且當它們試圖同時更改該數(shù)據(jù)時,就會出現(xiàn)競爭。這種情況下,變量的修改結(jié)果就是不可預測的了。

我們用一個程序來展示資源競爭問題:

# Python program to illustrate  
# the concept of race condition 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance):     
    for _ in range(10000): 
        balance.value = balance.value - 1
        
# function to deposit to account 
def deposit(balance):     
    for _ in range(10000): 
        balance.value = balance.value + 1
        
def perform_transactions(): 
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,)) 
    
    # starting processes 
    p1.start() 
    p2.start() 
    
    # wait until processes are finished 
    p1.join() 
    p2.join() 
    
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
    
if __name__ == "__main__": 
    for _ in range(10): 
        
        # perform same transaction process 10 times 
        perform_transactions() 

如果你運行上面的程序,就會得到一些意想不到的值:

Final balance = 1311
Final balance = 199
Final balance = 558
Final balance = -2265
Final balance = 1371
Final balance = 1158
Final balance = -577
Final balance = -1300
Final balance = -341
Final balance = 157

在上面的程序中,初始值為100。存10000,取10000,理論最終值也是100,但是在10次不同的運行后,我們得到了10個不同的值。

這就是多進程并發(fā)訪問共享數(shù)據(jù)造成的資源競爭所帶來的結(jié)果。我們用一個表格,來幫助理解為什么會產(chǎn)生這樣的錯誤:

P1 P2 共享數(shù)據(jù)
讀取數(shù)據(jù)(100) 100
讀取數(shù)據(jù)(100) 100
數(shù)據(jù)-1 = 99 100
寫入數(shù)據(jù)(99) 99
數(shù)據(jù)+1 = 101 99
寫入數(shù)據(jù)(101) 101

為了防止多進程并行出現(xiàn)這樣的錯誤,Multiprocessing模塊提供了Lock類來處理資源競爭。Lock類是使用一種操作系統(tǒng)提供的叫Semaphore的計數(shù)信號量來實現(xiàn)的。

Semaphore控制并行編程環(huán)境中多個進程對公共資源的訪問。Semaphore只是操作系統(tǒng)存儲中指定位置的一個值,每個進程都可以檢查并更改它。根據(jù)Semaphore的值,進程會選擇使用資源,或者發(fā)現(xiàn)資源已經(jīng)在使用,則必須等待一段時間后再嘗試。Semaphore可以是二進制(0或1),也可以有其他值。一般當進程檢查Semaphore并確認可以使用了之后,進程就會修改這個值,這樣,后續(xù)進程就會知道,需要等待一段時間了。

既然如此,我們就給上面的程序加上一個鎖,看看是否能夠如愿運行:

r_none
edit
play_arrow

brightness_4
# Python program to illustrate  
# the concept of locks 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value - 1
        lock.release() 
        
# function to deposit to account 
def deposit(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value + 1
        lock.release() 
        
def perform_transactions(): 
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    
    # creating a lock object 
    lock = multiprocessing.Lock() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,lock)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,lock)) 
    
    # starting processes 
    p1.start() 
    p2.start() 
    
    # wait until processes are finished 
    p1.join() 
    p2.join() 
    
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
    
if __name__ == "__main__": 
    for _ in range(10): 
        
        # perform same transaction process 10 times 
        perform_transactions() 

輸出結(jié)果如下:

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100

最終,輸出結(jié)果與預期的一致。我們在程序中的改動,只有創(chuàng)建了一個Lock類:

lock = multiprocessing.Lock()

并且將這個鎖,作為參數(shù)傳遞給了函數(shù)。在關鍵區(qū),我們使用lock.acquire()方法檢查并使用了鎖。一旦確認使用,那么其他進程就不能再使用了,直到該進程用lock.release()方法釋放了該鎖。

進程池化

首先我們來看一個例子:

def square(n): 
    return (n*n) 
  
if __name__ == "__main__": 
    
    # input list 
    mylist = [1,2,3,4,5] 
    
    # empty list to store result 
    result = [] 
    
    for num in mylist: 
        result.append(square(num)) 
        
    print(result) 

這是一個計算給定列表元素的平方的非常簡單程序。在多核/多處理器系統(tǒng)中,我們來看一張圖來理解上述程序?qū)⑷绾喂ぷ?

圖1 for循環(huán)計算.png

雖然這是一個很簡單的程序,但是這里只使用了一個核心用于程序執(zhí)行,其他的核心可能保持空閑狀態(tài)。

為了能夠充分發(fā)揮多核處理器的威力,Multiprocessing模塊提供了一個Pool類。所謂Pool類是指一個工作進程池,它能夠?qū)⑷蝿辗峙浣o不同的工作進程,我們來看一個圖:

圖2 池化計算.png

Pool類能夠自動地將計算任務分配到了不同的核心,這樣我們就不需要手動顯式地創(chuàng)建進程了。Pool類調(diào)用起來也非常簡單:

import multiprocessing 
import os 
  
def square(n): 
    print("Worker process id for {0}: {1}".format(n, os.getpid())) 
    return (n*n) 
  
if __name__ == "__main__": 
    mylist = [1,2,3,4,5] 
    
    # creating a pool object 
    p = multiprocessing.Pool() 
    
    # map list to target function 
    result = p.map(square, mylist) 
    
    print(result) 

這樣就可以了,首先我們創(chuàng)建了一個Pool對象:

p = multiprocessing.Pool()

我們還可以向Pool傳遞其他參數(shù),比如:

  • processes:用于指定分配的進程數(shù)量
  • maxtasksperchild:用于指定每個子進程所分配到的最大任務數(shù)

我們還可以用這些參數(shù)對池中的進程進行一些初始化:

  • initializer:為進程指定初始化函數(shù)
  • initargs:向初始化函數(shù)傳遞的參數(shù)值

在創(chuàng)建了Pool對象之后,我們只需要使用map()方法,使其執(zhí)行具體的任務即可。

result = p.map(square, mylist) 

這里我們使用map()方法向square函數(shù)輸入了mylist。這樣計算任務就會自動分配給各個核心了。當所有的進程都完成了任務后,就會返回最終的結(jié)果。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容