在上一篇中我們介紹了 mpi4py 中的 memory 對象及內(nèi)存操作,下面我們將介紹 mpi4py 中的簡單并行 I/O 操作。
在前面我們已經(jīng)簡要地介紹了 mpi4py 中的并行 I/O 及文件視圖等相關(guān)概念和操作,但是 MPI 中的 I/O 操作應該是 MPI 標準中最復雜和最難理解的部分,因為 MPI 中 I/O 操作的方法非常多,而且極易發(fā)生混淆(可參見這里的表格),很難知道在什么情況下該選用什么合適的并行 I/O 操作方法以達到高的 I/O 性能。在很多時候,并行計算程序的性能瓶頸都在 I/O 操作上,因此理解并選用合適的 I/O 操作對提高程序的整體性能是非常重要的。在下面的若干篇中,我們將逐步由淺入深地介紹 mpi4py 中的并行 I/O 操作方法,講解這些方法的區(qū)別和使用場合,以及獲得高性能 I/O 操作的途徑、注意事項和建議等。
使用獨立文件指針
對最簡單的并行文件 I/O 操作,其實也遵循通常的(非并行)I/O 操作方式,即:打開文件,移動文件指針到指定位置,讀/寫文件,然后關(guān)閉文件。對這種并行 I/O 操作方式,每個進程都擁有自己的文件指針,每個進程都獨立地移動自己的文件指針,并且從自己的文件指針所指的位置讀寫數(shù)據(jù),因此被稱作使用獨立文件指針的并行 I/O。不過需要注意的是,使用獨立文件指針的 I/O 操作不是線程安全的,因為獨立文件指針為每個進程所擁有,如果在進程中使用多線程,則每個線程對獨立文件指針的操作會相互沖突。下面是使用獨立文件指針相關(guān)方法的使用接口:
MPI.File.Open(type cls, Intracomm comm, filename, int amode=MODE_RDONLY, Info info=INFO_NULL)
并行打開文件,返回打開文件的句柄。此為一個集合操作,comm 必須為一個組內(nèi)通信子對象,該通信子內(nèi)的所有進程以訪問模式 amode 同時打開名為 filename 的文件,可通過 info 參數(shù)向 MPI 環(huán)境傳遞一些 hints,這些 hints 通常用來指出文件訪問以及文件系統(tǒng)相關(guān)的一些特殊信息,要求所有進程打開的文件 filename 都在物理上指向同一個文件(即文件路徑可能不同,但一定要是磁盤上同一個物理位置的文件),所有進程打開文件使用的 amode 也必須相同,但每個進程可以分別使用自己的 info 對象。如果某個進程需要獨自打開一個文件訪問,則可設(shè)置其參數(shù) comm 為 MPI.COMM_SELF。訪問模式 amode 有如下幾種:
- MPI.MODE_RDONLY:只讀;
- MPI.MODE_RDWR;讀寫;
- MPI.MODE_WRONLY:只寫;
- MPI.MODE_CREATE:如果不存在,則創(chuàng)建文件;
- MPI.MODE_EXCL:如果要創(chuàng)建的文件已經(jīng)存在則報錯;
- MPI.MODE_DELETE_ON_CLOSE:關(guān)閉時刪除文件;
- MPI.MODE_UNIQUE_OPEN:不允許同時打開文件,包括從 MPI 環(huán)境內(nèi)和環(huán)境外兩種情況;
- MPI.MODE_SEQUENCIAL:順序方式訪問文件,利于針對串行設(shè)備進行優(yōu)化;
- MPI.MODE_APPEND:所有文件指針指向文件末尾。
在 MPI 中,要區(qū)別兩種形式的文件——傳統(tǒng)的隨機訪問文件和串行流式訪問文件(如管道、磁帶等)。對串行文件(只允許以 amode = MPI.MODE_SEQUENCIAL 打開),調(diào)用 MPI.File.Seek_shared 和 MPI.File.Get_position_shared 都可能導致錯誤,也不允許在 filetype 和 etype 中出現(xiàn)空洞,僅允許執(zhí)行共享文件指針的讀寫操作,但這些操作所定義的文件指針更新策略對串行文件不再適用。
MPI.File.Seek(self, Offset offset, int whence=SEEK_SET)
根據(jù) whence 參數(shù)指定的方式更新進程各自的文件指針到指定偏移位置 offset,不是一個集合稱作。whence 可能的取值如下:
- MPI.SEEK_SET:將文件指針設(shè)置為
offset參數(shù)給出的值; - MPI.SEEK_CUR:將文件指針設(shè)置為相對于當前位置的
offset偏移處,即當前位置加上offset參數(shù); - MPI.SEEK_END:將文件指針設(shè)置為指向文件末尾再加上
offset參數(shù)的值。
offset 的值可為負,但要注意在文件視圖中指定相對于視圖起始位置負數(shù)的偏移會導致錯誤。
MPI.File.Read(self, buf, Status status=None)
從進程當前獨立文件指針的位置讀取數(shù)據(jù)到數(shù)據(jù)緩沖區(qū) buf 中,buf 是一個形如 [data, count, datatype] 或 [data, datatype] 的三元或二元序列,其中 data 是實際的數(shù)據(jù)緩沖區(qū),count 指明最大讀取的數(shù)據(jù)計數(shù)(以 datatype 為單位),當 count 省略時會利用 data 的字節(jié)長度和 datatype 計算出 count 值。對 numpy 數(shù)組,其計數(shù)和數(shù)據(jù)類型可以自動推斷出來,因此可以直接以數(shù)組本身作為參數(shù)傳給 buf。實際讀取的數(shù)據(jù)量可以從傳遞給 statuts 參數(shù)的 MPI.Status 對象中通過方法 Get_count 和 Get_elements 獲取。
該方法是一個阻塞的非集合操作。如果在打開文件時使用了 MPI.MODE_SEQUENCIAL,則使用該方法時會出錯。
MPI.File.Write(self, buf, Status status=None)
將數(shù)據(jù)緩沖區(qū) buf 中的數(shù)據(jù)寫入進程當前獨立文件指針的位置,buf 是一個形如 [data, count, datatype] 或 [data, datatype] 的三元或二元序列,其中 data 是實際的數(shù)據(jù)緩沖區(qū),count 指明最大寫入的數(shù)據(jù)計數(shù)(以 datatype 為單位),當 count 省略時會利用 data 的字節(jié)長度和 datatype 計算出 count 值。對 numpy 數(shù)組,其計數(shù)和數(shù)據(jù)類型可以自動推斷出來,因此可以直接以數(shù)組本身作為參數(shù)傳給 buf。實際寫入的數(shù)據(jù)量可以從傳遞給 statuts 參數(shù)的 MPI.Status 對象中通過方法 Get_count 和 Get_elements 獲取。
該方法是一個阻塞的非集合操作。如果在打開文件時使用了 MPI.MODE_SEQUENCIAL,則使用該方法時會出錯。
MPI.File.Close(self)
關(guān)閉當前并行文件。所有進程通過這個方法執(zhí)行一個集合操作關(guān)閉打開的并行文件。該操作會首先執(zhí)行 MPI.File.Sync 然后再關(guān)閉文件句柄。如果打開文件時使用的 amode 為 MPI.MODE_DELETE_ON_CLOSE,則關(guān)閉后還會自動調(diào)用 MPI.File.Delete。最后該函數(shù)把文件句柄設(shè)置成 MPI.FILE_NULL。與串行程序一樣,應用程序應設(shè)法保證關(guān)閉文件時的數(shù)據(jù)安全。
以上介紹的五個并行 I/O 操作方法實際上已經(jīng)足夠完成任何 I/O 操作任務(wù),并且這些方法和 Unix/Linux 系統(tǒng)提供的 I/O 操作和使用方法非常類似。MPI 還提供了數(shù)量眾多的其它 I/O 操作方法,這些方法要么是提供更高的性能,要么具有更好的可移植性,要么更加方便易用,等等。因此要很好地利用 MPI 并行 I/O 操作的這些優(yōu)勢,僅知道和使用這五個基本方法還是不夠的,我們將在后面逐步介紹 mpi4py 中的其它并行 I/O 操作方法。
使用顯式偏移地址
MPI 也提供了另外一組并行 I/O 方法,稱作顯式偏移地址方法,這組方法不使用獨立文件指針,而是直接將文件中的偏移地址作為參數(shù)傳遞給文件讀/寫函數(shù)。使用顯式偏移地址并行 I/O 方法與使用獨立文件指針的方法幾乎一樣,只不過不用再單獨調(diào)用 MPI.File.Seek 來設(shè)置文件的偏移位置。使用顯式偏移地址的并行 I/O 方法是線程安全的。下面是使用顯式偏移地址相關(guān)方法的使用接口,這兩個方法除了額外的 offset 參數(shù)外,其余參數(shù)都與使用獨立文件指針的對應方法同,使用條件也一樣,也是阻塞非集合操作。offset 參數(shù)以當前文件視圖的 etype 為單位指定對文件進行操作的起始偏移位置。
MPI.File.Read_at(self, Offset offset, buf, Status status=None)
MPI.File.Write_at(self, Offset offset, buf, Status status=None)
獲取文件大小及刪除文件
再 I/O 操作中可能需要知道文件的大小以確定各個進程的數(shù)據(jù)分配,在 I/O 操作完后,可能需要刪除文件,下面是相應的方法接口:
MPI.File.Get_size(self)
獲取當前并行文件的大小,以字節(jié)為單位計算。也可以通過屬性 size 獲取。
MPI.File.Delete(type cls, filename, Info info=INFO_NULL)
刪除一個沒有被任何進程打開的文件 filename,否則會出錯。如果文件不存在,則通過 MPI.ERR_NO_SUCH_FILE 錯誤給出提示信息。可用 info 對象傳遞與特定文件系統(tǒng)相關(guān)的信息。注意:該方法不是一個集合操作,一般使用一個單獨的進程執(zhí)行該方法以刪除文件。
例程
下面給出使用例程。
# simple_io.py
"""
Demonstrates the usage of individual file pointer and explicit offsets I/O methods.
Run this with 4 processes like:
$ mpiexec -n 4 python simple_io.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank < 3:
num_ints = 10 # number of int types
else:
num_ints = 8 # number of int types
buf1 = np.arange(num_ints, dtype='i')
buf2 = np.zeros(10, dtype='i') # initialize to all zeros
offset = 10 * MPI.INT.Get_size() # in unit of bytes
filename = 'temp.txt'
# use individual file pointer
# ------------------------------------------------------------------------------
# open the file for write only, create it if it does not exist
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_WRONLY)
# set individual file pointer of each process
# here we use the default file view, so offset is in bytes
fh.Seek(rank*offset, whence=MPI.SEEK_SET)
# each process writes buf1 to file
fh.Write(buf1)
# close the file
fh.Close()
# open the existed file for read only
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_RDONLY)
print 'size of file: %d bytes' % fh.Get_size()
# set individual file pointer of each process, prepare for reading
fh.Seek(rank*offset, whence=MPI.SEEK_SET)
# each process reads data to buf2 from file
status = MPI.Status()
fh.Read(buf2, status)
# get the amount of data actually read
print 'rank %d read %d MPI.INTs' % (rank, status.Get_count(datatype=MPI.INT))
print 'process %d has buf2 = %s' % (rank, buf2)
# check position of individual file pointer
print 'process %d has file pointer position %d after read' % (rank, fh.Get_position())
# close the file
fh.Close()
# delete the file
if rank == 0:
MPI.File.Delete(filename)
# use explicit offsets
# ------------------------------------------------------------------------------
# open the file for write only, create it if it does not exist
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_WRONLY)
# each process writes buf1 to file, start from the position of rank*offset
fh.Write_at(rank*offset, buf1)
# close the file
fh.Close()
# open the existed file for read only, and delete the file on close
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_RDONLY | MPI.MODE_DELETE_ON_CLOSE)
# each process reads data to buf2 from file, start from the position of rank*offset
status = MPI.Status()
fh.Read_at(rank*offset, buf2, status)
# close the file
fh.Close()
運行結(jié)果如下:
$ mpiexec -n 4 python simple_io.py
size of file: 152 bytes
rank 0 read 10 MPI.INTs
size of file: 152 bytes
rank 1 read 10 MPI.INTs
size of file: 152 bytes
rank 2 read 10 MPI.INTs
size of file: 152 bytes
rank 3 read 8 MPI.INTs
process 3 has buf2 = [0 1 2 3 4 5 6 7 0 0]
process 3 has file pointer position 152 after read
process 0 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 0 has file pointer position 40 after read
process 1 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 1 has file pointer position 80 after read
process 2 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 2 has file pointer position 120 after read
以上介紹了 mpi4py 中的簡單并行 I/O 操作,在下一篇中我們將介紹 mpi4py 中的不連續(xù)讀/寫和集合 I/O 操作。