在上一篇中我們介紹了 mpi4py 中的 Status 對象,下面我們將介紹 mpi4py 中的通用化請求。
MPI-2 通過通用請求對象為應用程序提供自定義非阻塞操作的機制。自定義的非阻塞操作也會返回 request 對象,可以使用 Test,Wait 及它們的變種來測試和等待其完成。通用請求(generalized requests)的非阻塞操作能夠與程序的正常執(zhí)行流程并發(fā)執(zhí)行。與其它的 MPI 操作不同,通用請求相關(guān)聯(lián)的操作是由應用程序而非 MPI 環(huán)境負責執(zhí)行,因此必須有一種機制使得應用程序在執(zhí)行完相關(guān)操作后能夠通知 MPI 環(huán)境。為此 MPI 提供了 MPI.Grequest.Complete 函數(shù)來實現(xiàn)這種“告知”,在此機制下,MPI 只負責保留通用請求操作的完成狀態(tài),其它狀態(tài)信息則需應用程序自身負責維護。
mpi4py 中,通用化請求操作通過 MPI.Grequest 對象來完成。MPI.Grequest 類是 MPI.Request 類的子類,故 Wait 和 Test 等方法都可使用。
下面是通用化請求創(chuàng)建及完成相關(guān)方法的接口:
MPI.Grequest.Start(type cls, query_fn, free_fn, cancel_fn, args=None, kargs=None)
由查詢回調(diào)函數(shù) query_fn,釋放回調(diào)函數(shù) free_fn,取消回調(diào)函數(shù) cancel_fn 及其它參數(shù) args 和 kargs 創(chuàng)建并注冊一個通用化請求,返回一個 MPI.Grequest 對象。
其中回調(diào)函數(shù)的接口如下:
query_fn(Status status, *args, **kargs)
query_fn 負責設置操作成功/失敗/取消等通用請求的狀態(tài)信息 status。僅在 MPI.Grequest.Complete 返回后才可能會被執(zhí)行到。Wait,Wait_any,Wait_some,Wait_all;Test,Test_any,Test_some,Test_all;以及 MPI.Grequest.Get_status 等都要調(diào)用 query_fn??赏ㄟ^參數(shù) args 可 kargs 傳遞額外信息。
free_fn(*args, **kargs)
該函數(shù)在釋放通用請求對象時調(diào)用,用于清除應用程序申請的內(nèi)存。要在 query_fn 之后執(zhí)行,由 Wait,Wait_any,Wait_some,Wait_all;Test,Test_any,Test_some,Test_all;以及 MPI.Grequest.Free 等調(diào)用觸發(fā)執(zhí)行。正常的程序中 free_fn 僅執(zhí)行一次??赏ㄟ^參數(shù) args 可 kargs 傳遞額外信息。
cancel_fn(bool completed, *args, **kargs)
由 MPI.Grequest.Cancel 觸發(fā)執(zhí)行。如果 cancel 執(zhí)行時已經(jīng)完成了 MPI.Grequest.Complete 則 MPI 會向 cancel_fn 傳遞 completed = True,否則傳遞 completed = False。可通過參數(shù) args 可 kargs 傳遞額外信息。
MPI.Grequest.Complete(self)
用于應用程序通知 MPI 環(huán)境其發(fā)起的通用請求操作已經(jīng)完成,此時 MPI.Grequest.Wait 系列調(diào)用會返回,而 MPI.Grequest.Test 系列調(diào)用會返回 True。
例程
下面給出使用例程。
# greq.py
"""
Demonstrates the usage of generalized request.
Run this with 1 processes like:
$ mpiexec -n 1 python greq.py
or
$ python greq.py
"""
import time
import threading
from mpi4py import MPI
comm = MPI.COMM_WORLD
def query_fn(status, *args, **kargs):
print 'Call query_fn with args = %s, kargs = %s...' % (args, kargs)
status.source = MPI.UNDEFINED
status.tag = MPI.UNDEFINED
status.cancelled = False
status.Set_elements(MPI.BYTE, 0)
return MPI.SUCCESS
def free_fn(*args, **kargs):
print 'Call free_fn with args = %s, kargs = %s...' % (args, kargs)
if 'a' in kargs:
# change the kargs argument (have effect only for changeable type like list, dict, etc)
print "Append 3 to kargs['a']"
kargs['a'].append(3)
return MPI.SUCCESS
def cancel_fn(completed, *args, **kargs):
print 'Call cancel_fn with completed = %s, args = %s, kargs = %s...' % (completed, args, kargs)
return MPI.SUCCESS
# define an user-defined non-blocking operate
def iop(*args, **kargs):
def compute(greq):
# sleep 1 second to simulate a compute-intensive task
time.sleep(1.0)
# call Complete method to inform MPI implementation that
# the operation associated with this greq has completed
greq.Complete()
# create a generalized request
greq = MPI.Grequest.Start(query_fn, free_fn, cancel_fn, args=args, kargs=kargs)
# call compute in a separate thread, so it will not block the return of this
iop_thread = threading.Thread(target=compute, name='iop_thread', args=(greq,))
iop_thread.daemon = True
# start the thread
iop_thread.start()
return greq
a = []
print 'Before the cal of iop, a = %s' % a
# call the user-defined non-blocking operation,
# which will return a MPI.Grequest object immediately
greq = iop(1, 2, a=a)
# test if the non-blocking operate is completed
status = MPI.Status()
print 'Is complete: %s' % greq.Test(status)
print 'source = %d, tag = %d, cancelled = %s, count = %d' % (status.source, status.tag, status.cancelled, status.count)
# call Cancel
greq.Cancel()
print 'Is complete: %s' % greq.Test()
# wait 1 second for the complete of iop
time.sleep(1.0)
# call Cancel
greq.Cancel()
print 'Is complete: %s' % greq.Test(status)
print 'source = %d, tag = %d, cancelled = %s, count = %d' % (status.source, status.tag, status.cancelled, status.count)
try:
# call Cancel after the complete of iop, wich will throw an exception
greq.Cancel()
except MPI.Exception as e:
print e.error_string
print 'After the complete of iop, a = %s' % a
運行結(jié)果如下:
$ python greq.py
Before the cal of iop, a = []
Is complete: False
source = -1, tag = -1, cancelled = False, count = 0
Call cancel_fn with completed = False, args = (1, 2), kargs = {'a': []}...
Is complete: False
Call cancel_fn with completed = True, args = (1, 2), kargs = {'a': []}...
Call query_fn with args = (1, 2), kargs = {'a': []}...
Call free_fn with args = (1, 2), kargs = {'a': []}...
Append 3 to kargs['a']
Is complete: True
source = -32766, tag = -32766, cancelled = False, count = 0
MPI_ERR_REQUEST: invalid request
After the complete of iop, a = [3]
以上介紹了 mpi4py 中的通用化請求,在下一篇中我們將介紹 mpi4py 中的數(shù)據(jù)類型解析。