在上一篇中我們介紹了 mpi4py 中的 profiling,下面我們將介紹 mpi4py 中的 futures 模塊。
mpi4py.futures 提供了一個由多個工作進(jìn)程使用 MPI 進(jìn)程間通信來異步執(zhí)行任務(wù)的高級別接口。mpi4py.futures 是建立在 Python 標(biāo)準(zhǔn)庫中的 concurrent.futures 的基礎(chǔ)之上的。這里先簡要介紹一下 concurrent.futures 模塊。
concurrent.futures
concurrent.futures 提供了一個異步并行執(zhí)行任務(wù)的高級別接口。異步執(zhí)行的任務(wù)可以通過線程來完成(使用 ThreadPoolExecutor),也可以通過不同的進(jìn)程來完成(使用 ProcessPoolExecutor),它們都繼承自抽象的 Executor 類。
為了有助于理解和使用 moi4py.futures,下面簡要給出 concurrent.futures 中的主要類,方法和函數(shù),但是不做詳細(xì)的講解,想要了解更多的讀者可以參見其文檔。
Executor 類
class concurrent.futures.Executor
提供異步執(zhí)行任務(wù)的抽象類,不能直接使用,而是使用其具體的子類。下面是其幾個主要的方法接口:
submit(fn, *args, **kwargs)
提交執(zhí)行 fn(*args **kwargs),返回一個 Future 對象(將在下面介紹)表示提交執(zhí)行的結(jié)果。
map(func, *iterables, timeout=None, chunksize=1)
類似于 Python 中的 map(func, *iterables),不同的是 func 是異步并發(fā)執(zhí)行的。
shutdown(wait=True)
通知任務(wù)執(zhí)行器在當(dāng)前掛起的 future 任務(wù)完成后釋放所占用的資源。
ThreadPoolExecutor 類
ThreadPoolExecutor 是 Executor 的一個子類,使用一個線程池來異步地執(zhí)行任務(wù),下面是其類原型:
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
ProcessPoolExecutor 類
ProcessPoolExecutor 是 Executor 的一個子類,使用一個進(jìn)程池來異步地執(zhí)行任務(wù),使用 multiprocessing 模塊,可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。下面是其類原型:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
Future 對象
Future 類包裝異步執(zhí)行的任務(wù)結(jié)果。Future 類對象由 Executor.submit() 創(chuàng)建。
class concurrent.futures.Future()
Future 類原型,下面使其主要方法:
cancel()
嘗試取消調(diào)用。
cancelled()
如果調(diào)用被成功取消則返回 True。
running()
如果調(diào)用正在執(zhí)行而不能被取消則返回 True。
done()
如果調(diào)用被成功取消或已執(zhí)行完成則返回 True。
result(timeout=None)
返回調(diào)用的結(jié)果。
exception(timeout=None)
返回調(diào)用拋出的異常。
模塊函數(shù)
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 Future 對象 fs 完成。
concurrent.futures.as_completed(fs, timeout=None)
返回一個由一系列 Future 對象中完成(執(zhí)行完或者被取消)的那些組成的迭代器。
mpi4py.futures
mpi4py.futures 是基于 concurrent.futures 的。具體來說,mpi4py.futures 提供了 Executor 類的子類實現(xiàn) MPIPoolExecutor 和 MPICommExecutor。
MPIPoolExecutor 類
MPIPoolExecutor 使用一個 MPI 進(jìn)程池來異步執(zhí)行任務(wù)。同 ProcessPoolExecutor,MPIPoolExecutor 可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。因為工作進(jìn)程必須導(dǎo)入 __main__ 模塊,因此 MPIPoolExecutor 不能工作在交互的環(huán)境下(如 Ipython shell 中)。
MPIPoolExecutor 使用 MPI-2 標(biāo)準(zhǔn)中引進(jìn)的動態(tài)進(jìn)程管理特性。主進(jìn)程使用 MPI.COMM_SELF 的 Spawn() 方法啟動新的工作進(jìn)程。主進(jìn)程會分別使用一個單獨的線程同每個工作進(jìn)程進(jìn)行通信。工作進(jìn)程在其唯一的主線程內(nèi)執(zhí)行所分配到的任務(wù)。為了避免工作進(jìn)程再啟動新的工作進(jìn)程(會導(dǎo)致無限遞歸),一種簡單的方式是將 MPIPoolExecutor 執(zhí)行代碼放到主腳本的 if __name__ == '__main__': 語句下。
下面是其類原型:
class mpi4py.futures.MPIPoolExecutor(max_workers=None,**kwargs)
Executor 的子類,使用至多 max_workers 個進(jìn)程組成的進(jìn)程池來異步執(zhí)行任務(wù)。如果 max_workers 為 None (默認(rèn)值),則其值由環(huán)境變量 MPI4PY_MAX_WORKERS (如果設(shè)置了)決定,或者由 MPI universe 的大?。ㄈ绻O(shè)置了)決定,否則只會生成單個工作進(jìn)程。如果 max_workers 的值小于或等于 0,則會拋出 ValueError 異常。其它可設(shè)置參數(shù)有:
-
python_exe:Python 執(zhí)行程序路徑。 -
python_args:列表或者可迭代對象,用來向 Python 執(zhí)行程序傳遞額外的命令行參數(shù)。 -
mpi_info:字典或可產(chǎn)生 (key, value) 對的迭代對象。這些 (key, value) 對會通過 MPI.Info 對象傳遞給 MPI.Intracomm.Spawn() 調(diào)用以啟動工作進(jìn)程。可以通過此機制告訴 MPI 在什么地方及怎樣啟動新的進(jìn)程。 -
globals:字典或可產(chǎn)生 (name, value) 對的迭代對象,用來初始化工作進(jìn)程的主模塊命名空間。 -
main:如果為 False,則不會在工作進(jìn)程中導(dǎo)入 __main__ 模塊。 -
path:列表或可迭代對象,向 sys.path 中追加一系列工作進(jìn)程搜尋路徑。 -
wdir:設(shè)置工作進(jìn)程的當(dāng)前工作目錄。 -
env:字典可產(chǎn)生 (name, value) 對的迭代對象,用來更新工作進(jìn)程的 os.environ。
submit(func, *args, **kwargs)
以 func(*args, **kwargs) 提交執(zhí)行任務(wù),返回一個 Future 對象作為提交結(jié)果。簡單的使用例程如下:
executor = MPIPoolExecutor(max_workers=1)
future = executor.submit(pow, 321, 1234)
print(future.result())
map(func, *iterables, timeout=None, chunksize=1, **kwargs)
等價于 map(func, *iterables),不同的是 func 是被異步地執(zhí)行,對 func 的多個調(diào)用可以在多個進(jìn)程中并發(fā)地?zé)o序地執(zhí)行。返回的迭代器會拋出一個 TimeoutError 如果 __next__() 調(diào)用后 timout 秒還沒有得到結(jié)果,timeout 可以為一個整數(shù)或一個浮點數(shù)。如果 timeout 為 None,則等待的時間沒有限制。如果某個調(diào)用拋出了異常,則在獲取返回的迭代器中該值時會重新拋出該異常。該方法會將 iterables 分割成若干塊分別提交到進(jìn)程池中執(zhí)行,塊的近似大小由 chunksize 設(shè)置。對非常長的 iterables,使用一個大的 chunksize 可以顯著地提高執(zhí)行性能。在默認(rèn)情況下,返回的迭代器會產(chǎn)生與原 iterables 相同順序的結(jié)果,等待提交的任務(wù)依次完成,如果傳遞并設(shè)置關(guān)鍵字參數(shù) unordered 為 True,則返回的迭代器會盡快地返回任意已經(jīng)完成的任務(wù)。
starmap(func, iterable, timeout=None, chunksize=1, **kwargs)
等價于 itertools.starmap(func, iterable)。如果 iterable 已經(jīng) "zip" 過了,則使用該方法更方便。map(func, *iterable) 等價于 starmap(func, zip(*iterable))。
shutdown(wait=True)
通知任務(wù)執(zhí)行器在當(dāng)前掛起的 future 任務(wù)完成后釋放所占用的資源。在該方法執(zhí)行后的 submit() 和 map() 調(diào)用會拋出 RuntimeError。如果 wait 為 True (默認(rèn)),則該方法會一直等到所有掛起的 future 任務(wù)都執(zhí)行完并且執(zhí)行器的相關(guān)資源都已釋放后才返回。如果 wait 為 False,則該方法會立即返回,但執(zhí)行器的相關(guān)資源則會等到所有掛起的 future 任務(wù)都完成后才釋放。不管 wait 的值是什么,整個 Python 程序都會在所有掛起的 future 任務(wù)都完成后才會結(jié)束退出。使用 with 語句可以避免顯式地調(diào)用該方法。with 語句相當(dāng)于設(shè)置 wait 為 True 調(diào)用 shutdown(),舉例如下:
import time
with MPIPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 2) assert future.done()
bootup(wait=True)
通知執(zhí)行器盡早分配所需的資源(特別是 MPI 進(jìn)程)。如果 wait 為 True,則該方法會直到資源已經(jīng)分配好才返回。在第一次調(diào)用 submit() 時會自動分配所需的資源,因此很少需要顯式地調(diào)用該方法。
需要注意的是,因為主進(jìn)程要使用單獨的線程同每一個工作進(jìn)程進(jìn)行 MPI 通信,因此所使用的 MPI 環(huán)境需要提供 MPI.THREAD_MULTIPLE 級別的多線程支持。如果 MPI 環(huán)境所支持的線程級別比 MPI.THREAD_MULTIPLE 低,則 mpi4py.futures 會使用一個全局鎖來系列化 MPI 調(diào)用。如果支持的線程級別比 MPI.THREAD_SERIALIZED 低, 則 mpi4py.futures 會發(fā)出 RuntimeWarning 警告。
MPICommExecutor 類
對只支持 MPI-1 標(biāo)準(zhǔn)的 MPI 實現(xiàn),無法使用 MPI-2 標(biāo)準(zhǔn)中才引入的動態(tài)進(jìn)程管理特性,另外在一些超算平臺上對調(diào)用 MPI_Comm_spawn() 方法可能有額外的限制或引起額外的復(fù)雜度。針對這些情況,mpi4py.futures 支持另一種更加傳統(tǒng)的類似于 SPMD 的使用方式,這種使用方式只用到 MPI-1 的相關(guān)特性。用戶使用 mpiexec 命令來啟動 Python 應(yīng)用程序,在程序里面集合調(diào)用 MPICommExecutor 上下文管理器將啟動起來的若干 MPI 進(jìn)程分割成一個主進(jìn)程和多個工作進(jìn)程。主進(jìn)程訪問 MPICommExecutor 實例以提交任務(wù),與此同時,工作進(jìn)程沿著另一個不同的執(zhí)行路徑執(zhí)行主進(jìn)程提交的任務(wù)。
下面是 MPICommExecutor 的原型:
class mpi4py.futures.MPICommExecutor(comm=None,root=0)
MPICommExecutor 的上下文管理器,將一個 MPI 組內(nèi)通信子 comm (默認(rèn)值 None 表示 MPI.COMM_WORLD) 分割成兩個無交集的集合:單個主進(jìn)程(rank 為 root 的進(jìn)程)和其它的所有進(jìn)程作為工作進(jìn)程。這兩個集合通過組間通信子連接在一起。with 語句的目標(biāo)要么是一個 MPICommExecutor 實例(對主進(jìn)程),要么是 None (對工作進(jìn)程)。簡短的使用例程如下:
from mpi4py import MPI
from mpi4py.futures import MPICommExecutor
with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
if executor is not None:
future = executor.submit(abs, -42)
assert future.result() == 42
answer = set(executor.map(abs, [-42, 42]))
assert answer == {42}
需要注意的是,如果向 MPICommExecutor 傳遞了一個 size 為 1 的通信子(如 MPI.COMM_SELF),with 語句的目標(biāo)將會將所有提交的任務(wù)在一個單獨的工作線程上完成,用于保證任務(wù)會被異步地執(zhí)行。但是,Python 的 Global Interpreter Lock (GIL) 會阻止主線程和工作線程并發(fā)地執(zhí)行,即使是運行在多核處理器上。線程的頻繁切換可能會降低程序的性能,因此一般不建議使用一個 size 為 1 的通信子來執(zhí)行 MPICommExecutor,如果確實要使用的話,可以考慮使用 concurrent.futures 的 ThreadPoolExecutor。
命令行執(zhí)行方法
當(dāng)所使用的 MPI 實現(xiàn)不支持動態(tài)進(jìn)程管理特性時,可以用另一種方式來使用 mpi4py.futures:在命令行方式下傳遞 -m mpi4py.futures 給 python 執(zhí)行程序,此外 mpi4py.futures 接受 -m mod 以執(zhí)行一個模塊,-c cmd 以執(zhí)行一條 Python 語句,或者 - 從標(biāo)準(zhǔn)輸入(sys.stdin)讀取 Python 命令語句??偟膩碚f,可以使用下面 4 種命令行方式來運行 mpi4py.futures:
- $ mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -m mod [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -c cmd [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures - [arg] ...
在開始執(zhí)行主腳本之前,mpi4py.futures 會將 MPI.COMM_WORLD 分割成一個主進(jìn)程(MPI.COMM_WORLD 中 rank 為 0 的進(jìn)程)和 numprocs - 1 個工作進(jìn)程,這些進(jìn)程會通過一個 MPI 組間通信子連接起來。然后,主進(jìn)程執(zhí)行用戶腳本代碼,最終會創(chuàng)建 MPIPoolExecutor 實例以提交計算任務(wù),與此同時,工作進(jìn)程沿著一個不同的執(zhí)行路徑以服務(wù)于主進(jìn)程。當(dāng)主進(jìn)程順利地執(zhí)行完主腳本結(jié)束時,整個 MPI 執(zhí)行環(huán)境會合適地退出,但是在遇到?jīng)]有處理的異常情況時,主進(jìn)程會調(diào)用 MPI.COMM_WORLD.Abort(1) 以避免死鎖并強制整個 MPI 執(zhí)行環(huán)境退出。
例程
下面給出相應(yīng)的使用例程。
# julia.py
"""
Demonstrates the usage of mpi4py.futures.MPIPoolExecutor.
Run this with 1 processes like:
$ mpiexec -n 1 -usize 17 python julia.py
or 17 processes like:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
"""
from mpi4py.futures import MPIPoolExecutor
x0, x1, w = -2.0, +2.0, 640*2
y0, y1, h = -1.5, +1.5, 480*2
dx = (x1 - x0) / w
dy = (y1 - y0) / h
c = complex(0, 0.65)
def julia(x, y):
z = complex(x, y)
n = 255
while abs(z) < 3 and n > 1:
z = z**2 + c
n -= 1
return n
def julia_line(k):
line = bytearray(w)
y = y1 - k * dy
for j in range(w):
x = x0 + j * dx
line[j] = julia(x, y)
return line
if __name__ == '__main__':
with MPIPoolExecutor() as executor:
image = executor.map(julia_line, range(h))
with open('julia.pgm', 'wb') as f:
f.write(b'P5 %d %d %d\n' % (w, h, 255))
for line in image:
f.write(line)
推薦以 1 個 MPI 進(jìn)程并設(shè)置所需的 universe size 的方式使用 mpiexec 命令以啟動并執(zhí)行以上腳本:
$ mpiexec -n 1 -usize 17 python julia.py
注意以上 -usize 標(biāo)志(或者等價的設(shè)置 MPIEXEC_UNIVERSE_SIZE 環(huán)境變量)只適用于 MPICH。對 OPenMPI,則需要設(shè)置 OMPI_UNIVERSE_SIZE 環(huán)境變量來給定 universe size。
在以上執(zhí)行方式中,mpiexec 命令啟動單個 MPI 進(jìn)程(主進(jìn)程)以執(zhí)行主腳本,當(dāng)需要時,mpi4py.futures 生成另外的 16 個 MPI 進(jìn)程以組成一個工作進(jìn)程池。主進(jìn)程提交任務(wù)到工作進(jìn)程池并等待其返回結(jié)果。工作進(jìn)程接收來自主進(jìn)程提交的任務(wù),執(zhí)行并返回結(jié)果給主進(jìn)程。
另外,用戶還可以以一種更加傳統(tǒng)的方式來執(zhí)行以上腳本,即一次啟動所有需要的 MPI 進(jìn)程。這種執(zhí)行方式類似下面的命令:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
此時,啟動的 17 個進(jìn)程會被分割成一個主進(jìn)程和 16 個工作進(jìn)程。主進(jìn)程執(zhí)行主腳本并提交任務(wù),工作進(jìn)程執(zhí)行提交的任務(wù)并返回結(jié)果給主進(jìn)程。
程序執(zhí)行后的結(jié)果如下:

以上介紹了 mpi4py 中的 futures 模塊,在下一篇中我們將介紹 mpi4py 中的 run 模塊。