mpi4py 中的通用化請求

上一篇中我們介紹了 mpi4py 中的 Status 對象,下面我們將介紹 mpi4py 中的通用化請求。

MPI-2 通過通用請求對象為應用程序提供自定義非阻塞操作的機制。自定義的非阻塞操作也會返回 request 對象,可以使用 Test,Wait 及它們的變種來測試和等待其完成。通用請求(generalized requests)的非阻塞操作能夠與程序的正常執(zhí)行流程并發(fā)執(zhí)行。與其它的 MPI 操作不同,通用請求相關(guān)聯(lián)的操作是由應用程序而非 MPI 環(huán)境負責執(zhí)行,因此必須有一種機制使得應用程序在執(zhí)行完相關(guān)操作后能夠通知 MPI 環(huán)境。為此 MPI 提供了 MPI.Grequest.Complete 函數(shù)來實現(xiàn)這種“告知”,在此機制下,MPI 只負責保留通用請求操作的完成狀態(tài),其它狀態(tài)信息則需應用程序自身負責維護。

mpi4py 中,通用化請求操作通過 MPI.Grequest 對象來完成。MPI.Grequest 類是 MPI.Request 類的子類,故 Wait 和 Test 等方法都可使用。

下面是通用化請求創(chuàng)建及完成相關(guān)方法的接口:

MPI.Grequest.Start(type cls, query_fn, free_fn, cancel_fn, args=None, kargs=None)

由查詢回調(diào)函數(shù) query_fn,釋放回調(diào)函數(shù) free_fn,取消回調(diào)函數(shù) cancel_fn 及其它參數(shù) argskargs 創(chuàng)建并注冊一個通用化請求,返回一個 MPI.Grequest 對象。

其中回調(diào)函數(shù)的接口如下:

query_fn(Status status, *args, **kargs)

query_fn 負責設置操作成功/失敗/取消等通用請求的狀態(tài)信息 status。僅在 MPI.Grequest.Complete 返回后才可能會被執(zhí)行到。Wait,Wait_any,Wait_some,Wait_all;Test,Test_any,Test_some,Test_all;以及 MPI.Grequest.Get_status 等都要調(diào)用 query_fn??赏ㄟ^參數(shù) argskargs 傳遞額外信息。

free_fn(*args, **kargs)

該函數(shù)在釋放通用請求對象時調(diào)用,用于清除應用程序申請的內(nèi)存。要在 query_fn 之后執(zhí)行,由 Wait,Wait_any,Wait_some,Wait_all;Test,Test_any,Test_some,Test_all;以及 MPI.Grequest.Free 等調(diào)用觸發(fā)執(zhí)行。正常的程序中 free_fn 僅執(zhí)行一次??赏ㄟ^參數(shù) argskargs 傳遞額外信息。

cancel_fn(bool completed, *args, **kargs)

由 MPI.Grequest.Cancel 觸發(fā)執(zhí)行。如果 cancel 執(zhí)行時已經(jīng)完成了 MPI.Grequest.Complete 則 MPI 會向 cancel_fn 傳遞 completed = True,否則傳遞 completed = False。可通過參數(shù) argskargs 傳遞額外信息。

MPI.Grequest.Complete(self)

用于應用程序通知 MPI 環(huán)境其發(fā)起的通用請求操作已經(jīng)完成,此時 MPI.Grequest.Wait 系列調(diào)用會返回,而 MPI.Grequest.Test 系列調(diào)用會返回 True。

例程

下面給出使用例程。

# greq.py

"""
Demonstrates the usage of generalized request.

Run this with 1 processes like:
$ mpiexec -n 1 python greq.py
or
$ python greq.py
"""

import time
import threading
from mpi4py import MPI


comm = MPI.COMM_WORLD

def query_fn(status, *args, **kargs):
    print 'Call query_fn with args = %s, kargs = %s...' % (args, kargs)
    status.source = MPI.UNDEFINED
    status.tag = MPI.UNDEFINED
    status.cancelled = False
    status.Set_elements(MPI.BYTE, 0)

    return MPI.SUCCESS

def free_fn(*args, **kargs):
    print 'Call free_fn with args = %s, kargs = %s...' % (args, kargs)
    if 'a' in kargs:
        # change the kargs argument (have effect only for changeable type like list, dict, etc)
        print "Append 3 to kargs['a']"
        kargs['a'].append(3)

    return MPI.SUCCESS

def cancel_fn(completed, *args, **kargs):
    print 'Call cancel_fn with completed = %s, args = %s, kargs = %s...' % (completed, args, kargs)

    return MPI.SUCCESS

# define an user-defined non-blocking operate
def iop(*args, **kargs):

    def compute(greq):
        # sleep 1 second to simulate a compute-intensive task
        time.sleep(1.0)

        # call Complete method to inform MPI implementation that
        # the operation associated with this greq has completed
        greq.Complete()

    # create a generalized request
    greq = MPI.Grequest.Start(query_fn, free_fn, cancel_fn, args=args, kargs=kargs)
    # call compute in a separate thread, so it will not block the return of this
    iop_thread = threading.Thread(target=compute, name='iop_thread', args=(greq,))
    iop_thread.daemon = True
    # start the thread
    iop_thread.start()

    return greq


a = []
print 'Before the cal of iop, a = %s' % a

# call the user-defined non-blocking operation,
# which will return a MPI.Grequest object immediately
greq = iop(1, 2, a=a)

# test if the non-blocking operate is completed
status = MPI.Status()
print 'Is complete: %s' % greq.Test(status)
print 'source = %d, tag = %d, cancelled = %s, count = %d' % (status.source, status.tag, status.cancelled, status.count)

# call Cancel
greq.Cancel()
print 'Is complete: %s' % greq.Test()

# wait 1 second for the complete of iop
time.sleep(1.0)

# call Cancel
greq.Cancel()
print 'Is complete: %s' % greq.Test(status)
print 'source = %d, tag = %d, cancelled = %s, count = %d' % (status.source, status.tag, status.cancelled, status.count)

try:
    # call Cancel after the complete of iop, wich will throw an exception
    greq.Cancel()
except MPI.Exception as e:
    print e.error_string

print 'After the complete of iop, a = %s' % a

運行結(jié)果如下:

$ python greq.py
Before the cal of iop, a = []
Is complete: False
source = -1, tag = -1, cancelled = False, count = 0
Call cancel_fn with completed = False, args = (1, 2), kargs = {'a': []}...
Is complete: False
Call cancel_fn with completed = True, args = (1, 2), kargs = {'a': []}...
Call query_fn with args = (1, 2), kargs = {'a': []}...
Call free_fn with args = (1, 2), kargs = {'a': []}...
Append 3 to kargs['a']
Is complete: True
source = -32766, tag = -32766, cancelled = False, count = 0
MPI_ERR_REQUEST: invalid request
After the complete of iop, a = [3]

以上介紹了 mpi4py 中的通用化請求,在下一篇中我們將介紹 mpi4py 中的數(shù)據(jù)類型解析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容