Python 多進程并行編程實踐: multiprocessing 模塊

前言

并行計算是使用并行計算機來減少單個計算問題所需要的時間,我們可以通過利用編程語言顯式的說明計算中的不同部分如何再不同的處理器上同時執(zhí)行來設(shè)計我們的并行程序,最終達到大幅度提升程序效率的目的。

眾所周知,Python中的GIL限制了Python多線程并行對多核CPU的利用,但是我們?nèi)匀豢梢酝ㄟ^各種其他的方式來讓Python真正利用多核資源, 例如通過C/C++擴展來實現(xiàn)多線程/多進程, 以及直接利用Python的多進程模塊multiprocessing來進行多進程編程。

本文主要嘗試僅僅通過python內(nèi)置的multiprocessing模塊對自己的動力學(xué)計算程序來進行優(yōu)化和效率提升,其中:

實現(xiàn)了單機利用多核資源來實現(xiàn)并行并進行加速對比

使用manager模塊實現(xiàn)了簡單的多機的分布式計算

本文并不是對Python的multiprocessing模塊的接口進行翻譯介紹,需要熟悉multiprocessing的童鞋可以參考官方文檔https://docs.python.org/2/library/multiprocessing.html。

正文

最近想用自己的微觀動力學(xué)程序進行一系列的求解并將結(jié)果繪制成二維Map圖進行可視化,這樣就需要對二維圖上的多個點進行計算并將結(jié)果收集起來并進行繪制,由于每個點都需要進行一次ODE積分以及牛頓法求解方程組,因此要串行地繪制整張圖可能會遇到極低的效率問題尤其是對參數(shù)進行測試的時候,每畫一張圖都需要等很久的時間。其中繪制的二維圖中每個點都是獨立計算的,于是很自然而然的想到了進行并行化處理。

串行的原始版本

由于腳本比較長,而且實現(xiàn)均為自己的程序,腳本的大致結(jié)構(gòu)如下, 本質(zhì)是一個二重循環(huán),循環(huán)的變量分別為反應(yīng)物氣體(O2 和 CO)的分壓的值:

import time

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

if "__main__" == __name__:

????try:

????????start = time.time()

????????for i, pO2 in enumerate(pO2s):

????????????# ...

????????????for j, pCO in enumerate(pCOs):

????????????????# 針對當(dāng)前的分壓值 pCO, pO2進行動力學(xué)求解

????????????????# 具體代碼略...

????????end = time.time()

????????t = end - start

????finally:

????????# 收集計算的結(jié)果并進行處理繪圖

整體過程就這么簡單,我需要做的就是使用multiprocessing的接口來對這個二重循環(huán)進行并行化。

使用單核串行繪制100個點所需要的時間如下, 總共花了240.76秒:

?

python學(xué)習(xí)交流群:923414804,群內(nèi)每天分享干貨,包括最新的企業(yè)級案例學(xué)習(xí)資料和零基礎(chǔ)入門教程,歡迎小伙伴入群學(xué)習(xí)。

二維map圖繪制的效果如下:

?

進行多進程并行處理

multiprocessing模塊

multiprocessing模塊提供了類似threading模塊的接口,并對進程的各種操作進行了良好的封裝,提供了各種進程間通信的接口例如Pipe,Queue等等,可以幫助我們實現(xiàn)進程間的通信,同步等操作。

使用Process類來動態(tài)創(chuàng)建進程實現(xiàn)并行

multiprocessing模塊提供了Process能讓我們通過創(chuàng)建進程對象并執(zhí)行該進程對象的start方法來創(chuàng)建一個真正的進程來執(zhí)行任務(wù),該接口類似threading模塊中的線程類Thread.

但是當(dāng)被操作對象數(shù)目不大的時候可以使用Process動態(tài)生成多個進程,但是如果需要的進程數(shù)一旦很多的時候,手動限制進程的數(shù)量以及處理不同進程返回值會變得異常的繁瑣,因此這個時候我們需要使用進程池來簡化操作。

使用進程池來管理進程

multiprocessing模塊提供了一個進程池Pool類,負責(zé)創(chuàng)建進程池對象,并提供了一些方法來講運算任務(wù)offload到不同的子進程中執(zhí)行,并很方便的獲取返回值。例如我們現(xiàn)在要進行的循環(huán)并行便很容易的將其實現(xiàn)。

對于這里的單指令多數(shù)據(jù)流的并行,我們可以直接使用Pool.map()來將函數(shù)映射到參數(shù)列表中。Pool.map其實是map函數(shù)的并行版本,此函數(shù)將會阻塞直到所有進程全部結(jié)束,而且此函數(shù)返回的結(jié)果順序仍然不變。

首先,我先把針對每對分壓數(shù)據(jù)的處理過程封裝成一個函數(shù),這樣可以將函數(shù)對象傳遞給子進程執(zhí)行。

import time

from multiprocessing import Pool

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

def task(pO2):

????'''接受一個O2分壓,根據(jù)當(dāng)前的CO分壓進行動力學(xué)求解'''

????# 代碼細節(jié)省略...

if "__main__" == __name__:

????try:

????????start = time.time()

????????pool = Pool()????????????????# 創(chuàng)建進程池對象,進程數(shù)與multiprocessing.cpu_count()相同

????????tofs = pool.map(task, pCOs)??# 并行執(zhí)行函數(shù)

????????end = time.time()

????????t = end - start

????finally:

????????# 收集計算的結(jié)果并進行處理繪圖

使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比為1.62

?

對不同核心的加速效果進行測試

為了查看使用不同核心數(shù)對程序效率的改善,我對不同的核心數(shù)和加速比進行了測試繪圖,效果如下:

運行核心數(shù)與程序運行時間:

?

運行核心數(shù)與加速比:

?

可見,由于我外層循環(huán)只循環(huán)了10次因此使用的核心數(shù)超過10以后核心數(shù)的增加并不能對程序進行加速,也就是多余的核心都浪費掉了。

使用manager實現(xiàn)簡單的分布式計算

前面使用了multiprocessing包提供的接口我們使用了再一臺機器上進行多核心計算的并行處理,但是multiprocessing的用處還有更多,通過multiprocessing.managers模塊,我們可以實現(xiàn)簡單的多機分布式并行計算,將計算任務(wù)分布到不同的計算機中運行。

Managers提供了另外的多進程通信工具,他提供了在多臺計算機之間共享數(shù)據(jù)的接口和數(shù)據(jù)對象,這些數(shù)據(jù)對象全部都是通過代理類實現(xiàn)的,比如ListProxy和DictProxy等等,他們都實現(xiàn)了與原生list和dict相同的接口,但是他們可以通過網(wǎng)絡(luò)在不同計算機中的進程中進行共享。

關(guān)于managers模塊的接口的詳細使用可以參考官方文檔:https://docs.python.org/2/library/multiprocessing.html#managers

好了現(xiàn)在我們開始嘗試將繪圖程序改造成可以在多臺計算機中分布式并行的程序。改造的主要思想是:

使用一臺計算機作為服務(wù)端(server),此臺計算機通過一個Manager對象來管理共享對象,任務(wù)分配以及結(jié)果的接收,并再收集結(jié)果以后進行后處理(繪制二維map圖)。

其他多臺計算機可以作為客戶端來接收server的數(shù)據(jù)進行計算,并將結(jié)果傳到共享數(shù)據(jù)中,讓server可以收集。同時再client端可以同時進行上文所實現(xiàn)的多進程并行來充分利用計算機的多核優(yōu)勢。

大致可總結(jié)為下圖:

?

服務(wù)進程

首先服務(wù)端需要一個manager對象來管理共享對象

def get_manager():

????'''創(chuàng)建服務(wù)端manager對象.

????'''

????# 自定義manager類

????class JobManager(BaseManager):

????????pass

????# 創(chuàng)建任務(wù)隊列,并將此數(shù)據(jù)對象共享在網(wǎng)絡(luò)中

????jobid_queue = Queue()

????JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

????# 創(chuàng)建列表代理類,并將其共享再網(wǎng)絡(luò)中

????tofs = [None]*N

????JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)

????# 將分壓參數(shù)共享到網(wǎng)絡(luò)中

????JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)

????JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)

????# 創(chuàng)建manager對象并返回

????manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)

????return manager

BaseManager.register是一個類方法,它可以將某種類型或者可調(diào)用的對象綁定到manager對象并共享到網(wǎng)絡(luò)中,使得其他在網(wǎng)絡(luò)中的計算機能夠獲取相應(yīng)的對象。

例如,

1JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

我就將一個返回任務(wù)隊列的函數(shù)對象同manager對象綁定并共享到網(wǎng)絡(luò)中,這樣在網(wǎng)絡(luò)中的進程就可以通過自己的manager對象的get_jobid_queue方法得到相同的隊列,這樣便實現(xiàn)了數(shù)據(jù)的共享.

2. 創(chuàng)建manager對象的時候需要兩個參數(shù),

address, 便是manager所在的ip以及用于監(jiān)聽與服務(wù)端連接的端口號,例如我如果是在內(nèi)網(wǎng)中的192.168.0.1地址的5000端口進行監(jiān)聽,那么此參數(shù)可以是('192.169.0.1, 5000)`

authkey, 顧名思義,就是一個認證碼,用于驗證客戶端時候可以連接到服務(wù)端,此參數(shù)必須是一個字符串對象.

進行任務(wù)分配

上面我們將一個任務(wù)隊列綁定到了manager對象中,現(xiàn)在我需要將隊列進行填充,這樣才能將任務(wù)發(fā)放到不同的客戶端來進行并行執(zhí)行。

def fill_jobid_queue(manager, nclient):

????indices = range(N)

????interval = N/nclient

????jobid_queue = manager.get_jobid_queue()

????start = 0

????for i in range(nclient):

????????jobid_queue.put(indices[start: start+interval])

????????start += interval

????if N % nclient > 0:

????????jobid_queue.put(indices[start:])

這里所謂的任務(wù)其實就是相應(yīng)參數(shù)在list中的index值,這樣不同計算機中得到的結(jié)果可以按照相應(yīng)的index將結(jié)果填入到結(jié)果列表中,這樣服務(wù)端就能在共享的網(wǎng)絡(luò)中收集各個計算機計算的結(jié)果。

啟動服務(wù)端進行監(jiān)聽

def run_server():

????# 獲取manager

????manager = get_manager()

????print "Start manager at {}:{}...".format(ADDR, PORT)

????# 創(chuàng)建一個子進程來啟動manager

????manager.start()

????# 填充任務(wù)隊列

????fill_jobid_queue(manager, NNODE)

????shared_job_queue = manager.get_jobid_queue()

????shared_tofs_list = manager.get_tofs_list()

????queue_size = shared_job_queue.qsize()

????# 循環(huán)進行監(jiān)聽,直到結(jié)果列表被填滿

????while None in shared_tofs_list:

????????if shared_job_queue.qsize() < queue_size:

????????????queue_size = shared_job_queue.qsize()

????????????print "Job picked..."

????return manager

任務(wù)進程

服務(wù)進程負責(zé)進行簡單的任務(wù)分配和調(diào)度,任務(wù)進程則只負責(zé)獲取任務(wù)并進行計算處理。

在任務(wù)進程(客戶端)中基本代碼與我們上面單機中的多核運行的腳本基本相同(因為都是同一個函數(shù)處理不同的數(shù)據(jù)),但是我們也需要為客戶端創(chuàng)建一個manager來進行任務(wù)的獲取和返回。

def get_manager():

????class WorkManager(BaseManager):

????????pass

????# 由于只是從共享網(wǎng)絡(luò)中獲取,因此只需要注冊名字即可

????WorkManager.register('get_jobid_queue')

????WorkManager.register('get_tofs_list')

????WorkManager.register('get_pCOs')

????WorkManager.register('get_pO2s')

????# 這里的地址和驗證碼要與服務(wù)端相同才可以進行數(shù)據(jù)共享

????manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)

????return manager

在客戶端我們?nèi)匀豢梢远噙M程利用多核資源來加速計算。

if "__main__" == __name__:

????manager = get_manager()

????print "work manager connect to {}:{}...".format(ADDR, PORT)

????# 將客戶端本地的manager連接到相應(yīng)的服務(wù)端manager

????manager.connect()

????# 獲取共享的結(jié)果收集列表

????shared_tofs_list = manager.get_tofs_list()

????# 獲取共享的任務(wù)隊列

????shared_jobid_queue = manager.get_jobid_queue()

????# 從服務(wù)端獲取計算參數(shù)

????pCOs = manager.get_pCOs()

????shared_pO2s = manager.get_pO2s()

????# 創(chuàng)建進程池在本地計算機進行多核并行

????pool = Pool()

????while 1:

????????try:

????????????indices = shared_jobid_queue.get_nowait()

????????????pO2s = [shared_pO2s[i] for i in indices]

????????????print "Run {}".format(str(pO2s))

????????????tofs_2d = pool.map(task, pO2s)

????????????# Update shared tofs list.

????????????for idx, tofs_1d in zip(indices, tofs_2d):

????????????????shared_tofs_list[idx] = tofs_1d

????????# 直到將任務(wù)隊列中的任務(wù)全部取完,結(jié)束任務(wù)進程

????????except Queue.Empty:

????????????break

下面我將在3臺在同一局域網(wǎng)中的電腦來進行簡單的分布式計算測試,

其中一臺是實驗室器群中的管理節(jié)點, 內(nèi)網(wǎng)ip為10.10.10.245

另一臺為集群中的一個節(jié)點, 共有12個核心

最后一臺為自己的本本,4個核心

?先在服務(wù)端運行服務(wù)腳本進行任務(wù)分配和監(jiān)聽:

1python server.py

2. 在兩個客戶端運行任務(wù)腳本來獲取任務(wù)隊列中的任務(wù)并執(zhí)行

1python worker.py

當(dāng)任務(wù)隊列為空且任務(wù)完成時,任務(wù)進程終止; 當(dāng)結(jié)果列表中的結(jié)果收集完畢時,服務(wù)進程也會終止。

執(zhí)行過程如圖:

?

執(zhí)行結(jié)果如下圖:

?

上面的panel為服務(wù)端監(jiān)聽,左下為自己的筆記本運行結(jié)果,右下panel為集群中的其中一個節(jié)點。

可見運行時間為56.86s,無奈,是我的本子脫了后腿(-_-!)

總結(jié)

本文通過python內(nèi)置模塊multiprocessing實現(xiàn)了單機內(nèi)多核并行以及簡單的多臺計算機的分布式并行計算,multiprocessing為我們提供了封裝良好并且友好的接口來使我們的Python程序更方面利用多核資源加速自己的計算程序,希望能對使用python實現(xiàn)并行話的童鞋有所幫助。

參考

https://docs.python.org/2/library/multiprocessing.html

分布式進程-廖雪峰的官方網(wǎng)站

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容