mpi4py 中的 futures 模塊

上一篇中我們介紹了 mpi4py 中的 profiling,下面我們將介紹 mpi4py 中的 futures 模塊。

mpi4py.futures 提供了一個由多個工作進(jìn)程使用 MPI 進(jìn)程間通信來異步執(zhí)行任務(wù)的高級別接口。mpi4py.futures 是建立在 Python 標(biāo)準(zhǔn)庫中的 concurrent.futures 的基礎(chǔ)之上的。這里先簡要介紹一下 concurrent.futures 模塊。

concurrent.futures

concurrent.futures 提供了一個異步并行執(zhí)行任務(wù)的高級別接口。異步執(zhí)行的任務(wù)可以通過線程來完成(使用 ThreadPoolExecutor),也可以通過不同的進(jìn)程來完成(使用 ProcessPoolExecutor),它們都繼承自抽象的 Executor 類。

為了有助于理解和使用 moi4py.futures,下面簡要給出 concurrent.futures 中的主要類,方法和函數(shù),但是不做詳細(xì)的講解,想要了解更多的讀者可以參見其文檔。

Executor 類

class concurrent.futures.Executor

提供異步執(zhí)行任務(wù)的抽象類,不能直接使用,而是使用其具體的子類。下面是其幾個主要的方法接口:

submit(fn, *args, **kwargs)

提交執(zhí)行 fn(*args **kwargs),返回一個 Future 對象(將在下面介紹)表示提交執(zhí)行的結(jié)果。

map(func, *iterables, timeout=None, chunksize=1)

類似于 Python 中的 map(func, *iterables),不同的是 func 是異步并發(fā)執(zhí)行的。

shutdown(wait=True)

通知任務(wù)執(zhí)行器在當(dāng)前掛起的 future 任務(wù)完成后釋放所占用的資源。

ThreadPoolExecutor 類

ThreadPoolExecutor 是 Executor 的一個子類,使用一個線程池來異步地執(zhí)行任務(wù),下面是其類原型:

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

ProcessPoolExecutor 類

ProcessPoolExecutor 是 Executor 的一個子類,使用一個進(jìn)程池來異步地執(zhí)行任務(wù),使用 multiprocessing 模塊,可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。下面是其類原型:

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

Future 對象

Future 類包裝異步執(zhí)行的任務(wù)結(jié)果。Future 類對象由 Executor.submit() 創(chuàng)建。

class concurrent.futures.Future()

Future 類原型,下面使其主要方法:

cancel()

嘗試取消調(diào)用。

cancelled()

如果調(diào)用被成功取消則返回 True。

running()

如果調(diào)用正在執(zhí)行而不能被取消則返回 True。

done()

如果調(diào)用被成功取消或已執(zhí)行完成則返回 True。

result(timeout=None)

返回調(diào)用的結(jié)果。

exception(timeout=None)

返回調(diào)用拋出的異常。

模塊函數(shù)

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 Future 對象 fs 完成。

concurrent.futures.as_completed(fs, timeout=None)

返回一個由一系列 Future 對象中完成(執(zhí)行完或者被取消)的那些組成的迭代器。

mpi4py.futures

mpi4py.futures 是基于 concurrent.futures 的。具體來說,mpi4py.futures 提供了 Executor 類的子類實現(xiàn) MPIPoolExecutor 和 MPICommExecutor。

MPIPoolExecutor 類

MPIPoolExecutor 使用一個 MPI 進(jìn)程池來異步執(zhí)行任務(wù)。同 ProcessPoolExecutor,MPIPoolExecutor 可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。因為工作進(jìn)程必須導(dǎo)入 __main__ 模塊,因此 MPIPoolExecutor 不能工作在交互的環(huán)境下(如 Ipython shell 中)。

MPIPoolExecutor 使用 MPI-2 標(biāo)準(zhǔn)中引進(jìn)的動態(tài)進(jìn)程管理特性。主進(jìn)程使用 MPI.COMM_SELF 的 Spawn() 方法啟動新的工作進(jìn)程。主進(jìn)程會分別使用一個單獨的線程同每個工作進(jìn)程進(jìn)行通信。工作進(jìn)程在其唯一的主線程內(nèi)執(zhí)行所分配到的任務(wù)。為了避免工作進(jìn)程再啟動新的工作進(jìn)程(會導(dǎo)致無限遞歸),一種簡單的方式是將 MPIPoolExecutor 執(zhí)行代碼放到主腳本的 if __name__ == '__main__': 語句下。

下面是其類原型:

class mpi4py.futures.MPIPoolExecutor(max_workers=None,**kwargs)

Executor 的子類,使用至多 max_workers 個進(jìn)程組成的進(jìn)程池來異步執(zhí)行任務(wù)。如果 max_workers 為 None (默認(rèn)值),則其值由環(huán)境變量 MPI4PY_MAX_WORKERS (如果設(shè)置了)決定,或者由 MPI universe 的大?。ㄈ绻O(shè)置了)決定,否則只會生成單個工作進(jìn)程。如果 max_workers 的值小于或等于 0,則會拋出 ValueError 異常。其它可設(shè)置參數(shù)有:

  • python_exe:Python 執(zhí)行程序路徑。
  • python_args:列表或者可迭代對象,用來向 Python 執(zhí)行程序傳遞額外的命令行參數(shù)。
  • mpi_info:字典或可產(chǎn)生 (key, value) 對的迭代對象。這些 (key, value) 對會通過 MPI.Info 對象傳遞給 MPI.Intracomm.Spawn() 調(diào)用以啟動工作進(jìn)程。可以通過此機制告訴 MPI 在什么地方及怎樣啟動新的進(jìn)程。
  • globals:字典或可產(chǎn)生 (name, value) 對的迭代對象,用來初始化工作進(jìn)程的主模塊命名空間。
  • main:如果為 False,則不會在工作進(jìn)程中導(dǎo)入 __main__ 模塊。
  • path:列表或可迭代對象,向 sys.path 中追加一系列工作進(jìn)程搜尋路徑。
  • wdir:設(shè)置工作進(jìn)程的當(dāng)前工作目錄。
  • env:字典可產(chǎn)生 (name, value) 對的迭代對象,用來更新工作進(jìn)程的 os.environ。
submit(func, *args, **kwargs)

以 func(*args, **kwargs) 提交執(zhí)行任務(wù),返回一個 Future 對象作為提交結(jié)果。簡單的使用例程如下:

executor = MPIPoolExecutor(max_workers=1)
future = executor.submit(pow, 321, 1234)
print(future.result())
map(func, *iterables, timeout=None, chunksize=1, **kwargs)

等價于 map(func, *iterables),不同的是 func 是被異步地執(zhí)行,對 func 的多個調(diào)用可以在多個進(jìn)程中并發(fā)地?zé)o序地執(zhí)行。返回的迭代器會拋出一個 TimeoutError 如果 __next__() 調(diào)用后 timout 秒還沒有得到結(jié)果,timeout 可以為一個整數(shù)或一個浮點數(shù)。如果 timeout 為 None,則等待的時間沒有限制。如果某個調(diào)用拋出了異常,則在獲取返回的迭代器中該值時會重新拋出該異常。該方法會將 iterables 分割成若干塊分別提交到進(jìn)程池中執(zhí)行,塊的近似大小由 chunksize 設(shè)置。對非常長的 iterables,使用一個大的 chunksize 可以顯著地提高執(zhí)行性能。在默認(rèn)情況下,返回的迭代器會產(chǎn)生與原 iterables 相同順序的結(jié)果,等待提交的任務(wù)依次完成,如果傳遞并設(shè)置關(guān)鍵字參數(shù) unordered 為 True,則返回的迭代器會盡快地返回任意已經(jīng)完成的任務(wù)。

starmap(func, iterable, timeout=None, chunksize=1, **kwargs)

等價于 itertools.starmap(func, iterable)。如果 iterable 已經(jīng) "zip" 過了,則使用該方法更方便。map(func, *iterable) 等價于 starmap(func, zip(*iterable))。

shutdown(wait=True)

通知任務(wù)執(zhí)行器在當(dāng)前掛起的 future 任務(wù)完成后釋放所占用的資源。在該方法執(zhí)行后的 submit() 和 map() 調(diào)用會拋出 RuntimeError。如果 wait 為 True (默認(rèn)),則該方法會一直等到所有掛起的 future 任務(wù)都執(zhí)行完并且執(zhí)行器的相關(guān)資源都已釋放后才返回。如果 wait 為 False,則該方法會立即返回,但執(zhí)行器的相關(guān)資源則會等到所有掛起的 future 任務(wù)都完成后才釋放。不管 wait 的值是什么,整個 Python 程序都會在所有掛起的 future 任務(wù)都完成后才會結(jié)束退出。使用 with 語句可以避免顯式地調(diào)用該方法。with 語句相當(dāng)于設(shè)置 wait 為 True 調(diào)用 shutdown(),舉例如下:

import time
with MPIPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 2) assert future.done()
bootup(wait=True)

通知執(zhí)行器盡早分配所需的資源(特別是 MPI 進(jìn)程)。如果 wait 為 True,則該方法會直到資源已經(jīng)分配好才返回。在第一次調(diào)用 submit() 時會自動分配所需的資源,因此很少需要顯式地調(diào)用該方法。

需要注意的是,因為主進(jìn)程要使用單獨的線程同每一個工作進(jìn)程進(jìn)行 MPI 通信,因此所使用的 MPI 環(huán)境需要提供 MPI.THREAD_MULTIPLE 級別的多線程支持。如果 MPI 環(huán)境所支持的線程級別比 MPI.THREAD_MULTIPLE 低,則 mpi4py.futures 會使用一個全局鎖來系列化 MPI 調(diào)用。如果支持的線程級別比 MPI.THREAD_SERIALIZED 低, 則 mpi4py.futures 會發(fā)出 RuntimeWarning 警告。

MPICommExecutor 類

對只支持 MPI-1 標(biāo)準(zhǔn)的 MPI 實現(xiàn),無法使用 MPI-2 標(biāo)準(zhǔn)中才引入的動態(tài)進(jìn)程管理特性,另外在一些超算平臺上對調(diào)用 MPI_Comm_spawn() 方法可能有額外的限制或引起額外的復(fù)雜度。針對這些情況,mpi4py.futures 支持另一種更加傳統(tǒng)的類似于 SPMD 的使用方式,這種使用方式只用到 MPI-1 的相關(guān)特性。用戶使用 mpiexec 命令來啟動 Python 應(yīng)用程序,在程序里面集合調(diào)用 MPICommExecutor 上下文管理器將啟動起來的若干 MPI 進(jìn)程分割成一個主進(jìn)程和多個工作進(jìn)程。主進(jìn)程訪問 MPICommExecutor 實例以提交任務(wù),與此同時,工作進(jìn)程沿著另一個不同的執(zhí)行路徑執(zhí)行主進(jìn)程提交的任務(wù)。

下面是 MPICommExecutor 的原型:

class mpi4py.futures.MPICommExecutor(comm=None,root=0)

MPICommExecutor 的上下文管理器,將一個 MPI 組內(nèi)通信子 comm (默認(rèn)值 None 表示 MPI.COMM_WORLD) 分割成兩個無交集的集合:單個主進(jìn)程(rank 為 root 的進(jìn)程)和其它的所有進(jìn)程作為工作進(jìn)程。這兩個集合通過組間通信子連接在一起。with 語句的目標(biāo)要么是一個 MPICommExecutor 實例(對主進(jìn)程),要么是 None (對工作進(jìn)程)。簡短的使用例程如下:

from mpi4py import MPI
from mpi4py.futures import MPICommExecutor

with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
    if executor is not None:
        future = executor.submit(abs, -42)
        assert future.result() == 42
        answer = set(executor.map(abs, [-42, 42]))
        assert answer == {42}

需要注意的是,如果向 MPICommExecutor 傳遞了一個 size 為 1 的通信子(如 MPI.COMM_SELF),with 語句的目標(biāo)將會將所有提交的任務(wù)在一個單獨的工作線程上完成,用于保證任務(wù)會被異步地執(zhí)行。但是,Python 的 Global Interpreter Lock (GIL) 會阻止主線程和工作線程并發(fā)地執(zhí)行,即使是運行在多核處理器上。線程的頻繁切換可能會降低程序的性能,因此一般不建議使用一個 size 為 1 的通信子來執(zhí)行 MPICommExecutor,如果確實要使用的話,可以考慮使用 concurrent.futures 的 ThreadPoolExecutor。

命令行執(zhí)行方法

當(dāng)所使用的 MPI 實現(xiàn)不支持動態(tài)進(jìn)程管理特性時,可以用另一種方式來使用 mpi4py.futures:在命令行方式下傳遞 -m mpi4py.futures 給 python 執(zhí)行程序,此外 mpi4py.futures 接受 -m mod 以執(zhí)行一個模塊,-c cmd 以執(zhí)行一條 Python 語句,或者 - 從標(biāo)準(zhǔn)輸入(sys.stdin)讀取 Python 命令語句??偟膩碚f,可以使用下面 4 種命令行方式來運行 mpi4py.futures:

  • $ mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...
  • $ mpiexec -n numprocs python -m mpi4py.futures -m mod [arg] ...
  • $ mpiexec -n numprocs python -m mpi4py.futures -c cmd [arg] ...
  • $ mpiexec -n numprocs python -m mpi4py.futures - [arg] ...

在開始執(zhí)行主腳本之前,mpi4py.futures 會將 MPI.COMM_WORLD 分割成一個主進(jìn)程(MPI.COMM_WORLD 中 rank 為 0 的進(jìn)程)和 numprocs - 1 個工作進(jìn)程,這些進(jìn)程會通過一個 MPI 組間通信子連接起來。然后,主進(jìn)程執(zhí)行用戶腳本代碼,最終會創(chuàng)建 MPIPoolExecutor 實例以提交計算任務(wù),與此同時,工作進(jìn)程沿著一個不同的執(zhí)行路徑以服務(wù)于主進(jìn)程。當(dāng)主進(jìn)程順利地執(zhí)行完主腳本結(jié)束時,整個 MPI 執(zhí)行環(huán)境會合適地退出,但是在遇到?jīng)]有處理的異常情況時,主進(jìn)程會調(diào)用 MPI.COMM_WORLD.Abort(1) 以避免死鎖并強制整個 MPI 執(zhí)行環(huán)境退出。

例程

下面給出相應(yīng)的使用例程。

# julia.py

"""
Demonstrates the usage of mpi4py.futures.MPIPoolExecutor.

Run this with 1 processes like:
$ mpiexec -n 1 -usize 17 python julia.py
or 17 processes like:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
"""


from mpi4py.futures import MPIPoolExecutor

x0, x1, w = -2.0, +2.0, 640*2
y0, y1, h = -1.5, +1.5, 480*2
dx = (x1 - x0) / w
dy = (y1 - y0) / h

c = complex(0, 0.65)

def julia(x, y):
    z = complex(x, y)
    n = 255
    while abs(z) < 3 and n > 1:
        z = z**2 + c
        n -= 1
    return n

def julia_line(k):
    line = bytearray(w)
    y = y1 - k * dy
    for j in range(w):
        x = x0 + j * dx
        line[j] = julia(x, y)
    return line


if __name__ == '__main__':

    with MPIPoolExecutor() as executor:
        image = executor.map(julia_line, range(h))
        with open('julia.pgm', 'wb') as f:
            f.write(b'P5 %d %d %d\n' % (w, h, 255))
            for line in image:
                    f.write(line)

推薦以 1 個 MPI 進(jìn)程并設(shè)置所需的 universe size 的方式使用 mpiexec 命令以啟動并執(zhí)行以上腳本:

$ mpiexec -n 1 -usize 17 python julia.py

注意以上 -usize 標(biāo)志(或者等價的設(shè)置 MPIEXEC_UNIVERSE_SIZE 環(huán)境變量)只適用于 MPICH。對 OPenMPI,則需要設(shè)置 OMPI_UNIVERSE_SIZE 環(huán)境變量來給定 universe size。

在以上執(zhí)行方式中,mpiexec 命令啟動單個 MPI 進(jìn)程(主進(jìn)程)以執(zhí)行主腳本,當(dāng)需要時,mpi4py.futures 生成另外的 16 個 MPI 進(jìn)程以組成一個工作進(jìn)程池。主進(jìn)程提交任務(wù)到工作進(jìn)程池并等待其返回結(jié)果。工作進(jìn)程接收來自主進(jìn)程提交的任務(wù),執(zhí)行并返回結(jié)果給主進(jìn)程。

另外,用戶還可以以一種更加傳統(tǒng)的方式來執(zhí)行以上腳本,即一次啟動所有需要的 MPI 進(jìn)程。這種執(zhí)行方式類似下面的命令:

$ mpiexec -n 17 python -m mpi4py.futures julia.py

此時,啟動的 17 個進(jìn)程會被分割成一個主進(jìn)程和 16 個工作進(jìn)程。主進(jìn)程執(zhí)行主腳本并提交任務(wù),工作進(jìn)程執(zhí)行提交的任務(wù)并返回結(jié)果給主進(jìn)程。

程序執(zhí)行后的結(jié)果如下:

julia 集

以上介紹了 mpi4py 中的 futures 模塊,在下一篇中我們將介紹 mpi4py 中的 run 模塊。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 178,733評論 25 709
  • 用兩張圖告訴你,為什么你的 App 會卡頓? - Android - 掘金 Cover 有什么料? 從這篇文章中你...
    hw1212閱讀 13,900評論 2 59
  • 在上一篇中我們介紹了 mpi4py 中初始化和運行時設(shè)置,下面我們將介紹 mpi4py 中的 profiling。...
    自可樂閱讀 1,761評論 1 0
  • 關(guān)于當(dāng)年的離開,我聽到了幾個版本的傳言。那些版本我都仔細(xì)聽過,心里難免的還是會不舒服。 7年過去了,我發(fā)現(xiàn)自己也不...
    未央素年閱讀 537評論 1 2
  • 一開始接觸護(hù)膚知識是因為我實在忍受不了自己在這樣懶下去了,我不想做丑女人。硬著頭皮去看,去學(xué),才發(fā)現(xiàn)護(hù)膚還有那么多...
    芹Danae閱讀 136評論 0 0

友情鏈接更多精彩內(nèi)容