
之前兩篇文章討論了Multiprocessing模塊的基本概念以及進程間的數(shù)據(jù)交換。本文將要介紹Multiprocessing模塊進程間的同步以及池化。

進程間同步
進程同步是一種確保兩個或多個并發(fā)進程不同時執(zhí)行某些特定的程序段(關鍵段)的一種機制。所謂關鍵段,是指程序中訪問共享數(shù)據(jù)的部分。
如果不同的進程同時訪問共享數(shù)據(jù)的話,可能會引起資源競爭的問題。所謂資源競爭,就是多個進程同時訪問共享數(shù)據(jù),并且當它們試圖同時更改該數(shù)據(jù)時,就會出現(xiàn)競爭。這種情況下,變量的修改結(jié)果就是不可預測的了。
我們用一個程序來展示資源競爭問題:
# Python program to illustrate
# the concept of race condition
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance):
for _ in range(10000):
balance.value = balance.value - 1
# function to deposit to account
def deposit(balance):
for _ in range(10000):
balance.value = balance.value + 1
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value('i', 100)
# creating new processes
p1 = multiprocessing.Process(target=withdraw, args=(balance,))
p2 = multiprocessing.Process(target=deposit, args=(balance,))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print("Final balance = {}".format(balance.value))
if __name__ == "__main__":
for _ in range(10):
# perform same transaction process 10 times
perform_transactions()
如果你運行上面的程序,就會得到一些意想不到的值:
Final balance = 1311
Final balance = 199
Final balance = 558
Final balance = -2265
Final balance = 1371
Final balance = 1158
Final balance = -577
Final balance = -1300
Final balance = -341
Final balance = 157
在上面的程序中,初始值為100。存10000,取10000,理論最終值也是100,但是在10次不同的運行后,我們得到了10個不同的值。
這就是多進程并發(fā)訪問共享數(shù)據(jù)造成的資源競爭所帶來的結(jié)果。我們用一個表格,來幫助理解為什么會產(chǎn)生這樣的錯誤:
| P1 | P2 | 共享數(shù)據(jù) |
|---|---|---|
| 讀取數(shù)據(jù)(100) | 100 | |
| 讀取數(shù)據(jù)(100) | 100 | |
| 數(shù)據(jù)-1 = 99 | 100 | |
| 寫入數(shù)據(jù)(99) | 99 | |
| 數(shù)據(jù)+1 = 101 | 99 | |
| 寫入數(shù)據(jù)(101) | 101 |
為了防止多進程并行出現(xiàn)這樣的錯誤,Multiprocessing模塊提供了Lock類來處理資源競爭。Lock類是使用一種操作系統(tǒng)提供的叫Semaphore的計數(shù)信號量來實現(xiàn)的。
Semaphore控制并行編程環(huán)境中多個進程對公共資源的訪問。Semaphore只是操作系統(tǒng)存儲中指定位置的一個值,每個進程都可以檢查并更改它。根據(jù)Semaphore的值,進程會選擇使用資源,或者發(fā)現(xiàn)資源已經(jīng)在使用,則必須等待一段時間后再嘗試。Semaphore可以是二進制(0或1),也可以有其他值。一般當進程檢查Semaphore并確認可以使用了之后,進程就會修改這個值,這樣,后續(xù)進程就會知道,需要等待一段時間了。
既然如此,我們就給上面的程序加上一個鎖,看看是否能夠如愿運行:
r_none
edit
play_arrow
brightness_4
# Python program to illustrate
# the concept of locks
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance, lock):
for _ in range(10000):
lock.acquire()
balance.value = balance.value - 1
lock.release()
# function to deposit to account
def deposit(balance, lock):
for _ in range(10000):
lock.acquire()
balance.value = balance.value + 1
lock.release()
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value('i', 100)
# creating a lock object
lock = multiprocessing.Lock()
# creating new processes
p1 = multiprocessing.Process(target=withdraw, args=(balance,lock))
p2 = multiprocessing.Process(target=deposit, args=(balance,lock))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print("Final balance = {}".format(balance.value))
if __name__ == "__main__":
for _ in range(10):
# perform same transaction process 10 times
perform_transactions()
輸出結(jié)果如下:
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
最終,輸出結(jié)果與預期的一致。我們在程序中的改動,只有創(chuàng)建了一個Lock類:
lock = multiprocessing.Lock()
并且將這個鎖,作為參數(shù)傳遞給了函數(shù)。在關鍵區(qū),我們使用lock.acquire()方法檢查并使用了鎖。一旦確認使用,那么其他進程就不能再使用了,直到該進程用lock.release()方法釋放了該鎖。

進程池化
首先我們來看一個例子:
def square(n):
return (n*n)
if __name__ == "__main__":
# input list
mylist = [1,2,3,4,5]
# empty list to store result
result = []
for num in mylist:
result.append(square(num))
print(result)
這是一個計算給定列表元素的平方的非常簡單程序。在多核/多處理器系統(tǒng)中,我們來看一張圖來理解上述程序?qū)⑷绾喂ぷ?

雖然這是一個很簡單的程序,但是這里只使用了一個核心用于程序執(zhí)行,其他的核心可能保持空閑狀態(tài)。
為了能夠充分發(fā)揮多核處理器的威力,Multiprocessing模塊提供了一個Pool類。所謂Pool類是指一個工作進程池,它能夠?qū)⑷蝿辗峙浣o不同的工作進程,我們來看一個圖:

Pool類能夠自動地將計算任務分配到了不同的核心,這樣我們就不需要手動顯式地創(chuàng)建進程了。Pool類調(diào)用起來也非常簡單:
import multiprocessing
import os
def square(n):
print("Worker process id for {0}: {1}".format(n, os.getpid()))
return (n*n)
if __name__ == "__main__":
mylist = [1,2,3,4,5]
# creating a pool object
p = multiprocessing.Pool()
# map list to target function
result = p.map(square, mylist)
print(result)
這樣就可以了,首先我們創(chuàng)建了一個Pool對象:
p = multiprocessing.Pool()
我們還可以向Pool傳遞其他參數(shù),比如:
- processes:用于指定分配的進程數(shù)量
- maxtasksperchild:用于指定每個子進程所分配到的最大任務數(shù)
我們還可以用這些參數(shù)對池中的進程進行一些初始化:
- initializer:為進程指定初始化函數(shù)
- initargs:向初始化函數(shù)傳遞的參數(shù)值
在創(chuàng)建了Pool對象之后,我們只需要使用map()方法,使其執(zhí)行具體的任務即可。
result = p.map(square, mylist)
這里我們使用map()方法向square函數(shù)輸入了mylist。這樣計算任務就會自動分配給各個核心了。當所有的進程都完成了任務后,就會返回最終的結(jié)果。
