
mpi 是一項(xiàng)非常古老的框架和技術(shù),也是一款非常非常優(yōu)秀簡(jiǎn)潔的工具,只要做c++ 進(jìn)階到一定程度,一般都會(huì)接觸到它。相比較 java中的 dubbo scala的finagle 及 grpc thrift brpc zookeeper 分布式技術(shù) 集群 跨語(yǔ)言通信 等等都是望塵莫及
mpi 優(yōu)秀是有原因的 ,當(dāng)然是快,一個(gè)快 字 可以遮百丑,最關(guān)鍵的是 好像基本沒(méi)有什么缺點(diǎn)。
mpi 還有豐富的 python api :mpi4py
mpi的安裝 可以在 mac 和 centos上
mac 安裝 brew install mpich
或者 brew install openmpi # 有時(shí)候只能安裝成功一個(gè),兩者有時(shí)互斥
pip install mpi4py
python 版的mpi 運(yùn)行方式 在 Terminal 中
# 命令 -n 進(jìn)程數(shù)量 python解釋器 python腳本 路徑
mpirun -n 5 python3 ./mpidemo.py
mpi 集群通信
https://blog.csdn.net/secyb/article/details/78697976
https://blog.csdn.net/kongxx/article/details/52227572
https://download.csdn.net/download/xcmax/7029853
https://download.csdn.net/download/youlovechao/5278025
https://www.csdn.net/article/2009-08-26/4254
https://blog.csdn.net/baidu_24281959/article/details/51471017
mpi C++實(shí)踐參考
http://blog.septicmk.com/Concurrent-and-Parallel/MPI-tutorial.html
http://pytlab.org/2016/08/18/MPI%E9%9B%86%E5%90%88%E9%80%9A%E4%BF%A1%EF%BC%9A1-N/
https://plytools.github.io/2017/05/15/MPI%E5%B9%B6%E8%A1%8C%EF%BC%9APthread%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B/
http://www.ssc.net.cn/files/MPI%E7%BC%96%E7%A8%8B%E5%88%9D%E6%AD%A5.pdf
mpi 的python教程 實(shí)踐參考這個(gè)
http://pytlab.org/tags/MPI/
http://pytlab.org/2017/02/19/Python%E5%A4%9A%E8%BF%9B%E7%A8%8B%E5%B9%B6%E8%A1%8C%E7%BC%96%E7%A8%8B%E5%AE%9E%E8%B7%B5-mpi4py%E7%9A%84%E4%BD%BF%E7%94%A8/
https://blog.csdn.net/zouxy09/article/details/49031845
一、概述
CPU從三十多年前的8086,到十年前的奔騰,再到當(dāng)下的多核i7。一開始,以單核cpu的主頻為目標(biāo),架構(gòu)的改良和集成電路工藝的進(jìn)步使得cpu的性能高速上升,單核cpu的主頻從老爺車的MHz階段一度接近4GHz高地。然而,也因?yàn)楣に嚭凸牡鹊南拗?,單核cpu遇到了人生的天花板,急需轉(zhuǎn)換思維,以滿足無(wú)止境的性能需求。多核cpu在此登上歷史舞臺(tái)。給你的老爺車多加兩個(gè)引擎,讓你有法拉利的感覺?,F(xiàn)時(shí)代,連手機(jī)都到處叫囂自己有4核8核處理器的時(shí)代,PC就更不用說(shuō)了。
扯遠(yuǎn)了,anyway,對(duì)于俺們程序員來(lái)說(shuō),如何利用如此強(qiáng)大的引擎完成我們的任務(wù)才是我們要考慮的。隨著大規(guī)模數(shù)據(jù)處理、大規(guī)模問(wèn)題和復(fù)雜系統(tǒng)求解需求的增加,以前的單核編程已經(jīng)有心無(wú)力了。如果程序一跑就得幾個(gè)小時(shí),甚至一天,想想都無(wú)法原諒自己。那如何讓自己更快的過(guò)度到高大上的多核并行編程中去呢?哈哈,廣大人民的力量!
目前工作中我所接觸到的并行處理框架主要有MPI、OpenMP和MapReduce(Hadoop)三個(gè)(CUDA屬于GPU并行編程,這里不提及)。MPI和Hadoop都可以在集群中運(yùn)行,而OpenMP因?yàn)楣蚕泶鎯?chǔ)結(jié)構(gòu)的關(guān)系,不能在集群上運(yùn)行,只能單機(jī)。另外,MPI可以讓數(shù)據(jù)保留在內(nèi)存中,可以為節(jié)點(diǎn)間的通信和數(shù)據(jù)交互保存上下文,所以能執(zhí)行迭代算法,而Hadoop卻不具有這個(gè)特性。因此,需要迭代的機(jī)器學(xué)習(xí)算法大多使用MPI來(lái)實(shí)現(xiàn)。當(dāng)然了,部分機(jī)器學(xué)習(xí)算法也是可以通過(guò)設(shè)計(jì)使用Hadoop來(lái)完成的。(淺見,如果錯(cuò)誤,希望各位不吝指出,謝謝)。
本文主要介紹Python環(huán)境下MPI編程的實(shí)踐基礎(chǔ)。
二、MPI與mpi4py
MPI是Message Passing Interface的簡(jiǎn)稱,也就是消息傳遞。消息傳遞指的是并行執(zhí)行的各個(gè)進(jìn)程具有自己獨(dú)立的堆棧和代碼段,作為互不相關(guān)的多個(gè)程序獨(dú)立執(zhí)行,進(jìn)程之間的信息交互完全通過(guò)顯示地調(diào)用通信函數(shù)來(lái)完成。
Mpi4py是構(gòu)建在mpi之上的python庫(kù),使得python的數(shù)據(jù)結(jié)構(gòu)可以在進(jìn)程(或者多個(gè)cpu)之間進(jìn)行傳遞。
2.1、MPI的工作方式
很簡(jiǎn)單,就是你啟動(dòng)了一組MPI進(jìn)程,每個(gè)進(jìn)程都是執(zhí)行同樣的代碼!然后每個(gè)進(jìn)程都有一個(gè)ID,也就是rank來(lái)標(biāo)記我是誰(shuí)。什么意思呢?假設(shè)一個(gè)CPU是你請(qǐng)的一個(gè)工人,共有10個(gè)工人。你有100塊磚頭要搬,然后很公平,讓每個(gè)工人搬10塊。這時(shí)候,你把任務(wù)寫到一個(gè)任務(wù)卡里面,讓10個(gè)工人都執(zhí)行這個(gè)任務(wù)卡中的任務(wù),也就是搬磚!這個(gè)任務(wù)卡中的“搬磚”就是你寫的代碼。然后10個(gè)CPU執(zhí)行同一段代碼。需要注意的是,代碼里面的所有變量都是每個(gè)進(jìn)程獨(dú)有的,雖然名字相同。
例如,一個(gè)腳本test.py,里面包含以下代碼:
from mpi4py import MPI
print("hello world'')
print("my rank is: %d" %MPI.ROOT)
然后我們?cè)诿钚型ㄟ^(guò)以下方式運(yùn)行:
#mpirun –n 5 python test.py
-n 5 指定啟動(dòng)5個(gè)mpi進(jìn)程來(lái)執(zhí)行后面的程序。相當(dāng)于對(duì)腳本拷貝了5份,每個(gè)進(jìn)程運(yùn)行一份,互不干擾。在運(yùn)行的時(shí)候代碼里面唯一的不同,就是各自的rank也就是ID不一樣。所以這個(gè)代碼就會(huì)打印5個(gè)hello world和5個(gè)不同的rank值,從0到4.
2.2、點(diǎn)對(duì)點(diǎn)通信
點(diǎn)對(duì)點(diǎn)通信(Point-to-PointCommunication)的能力是信息傳遞系統(tǒng)最基本的要求。意思就是讓兩個(gè)進(jìn)程直接可以傳輸數(shù)據(jù),也就是一個(gè)發(fā)送數(shù)據(jù),另一個(gè)接收數(shù)據(jù)。接口就兩個(gè),send和recv,來(lái)個(gè)例子:
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
# point to point communication
data_send = [comm_rank] * 5
comm.send(data_send, dest=(comm_rank + 1) % comm_size)
data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv )
啟動(dòng)5個(gè)進(jìn)程運(yùn)行以上代碼,結(jié)果如下:
1. my rank is 0, and I received:
2. [4, 4, 4, 4, 4]
3. my rank is 1, and I received:
4. [0, 0, 0, 0, 0]
5. my rank is 2, and I received:
6. [1, 1, 1, 1, 1]
7. my rank is 3, and I received:
8. [2, 2, 2, 2, 2]
9. my rank is 4, and I received:
10. [3, 3, 3, 3, 3]
可以看到,每個(gè)進(jìn)程都創(chuàng)建了一個(gè)數(shù)組,然后把它傳遞給下一個(gè)進(jìn)程,最后的那個(gè)進(jìn)程傳遞給第一個(gè)進(jìn)程。comm_size就是mpi的進(jìn)程個(gè)數(shù),也就是-np指定的那個(gè)數(shù)。MPI.COMM_WORLD 表示進(jìn)程所在的通信組。
但這里面有個(gè)需要注意的問(wèn)題,如果我們要發(fā)送的數(shù)據(jù)比較小的話,mpi會(huì)緩存我們的數(shù)據(jù),也就是說(shuō)執(zhí)行到send這個(gè)代碼的時(shí)候,會(huì)緩存被send的數(shù)據(jù),然后繼續(xù)執(zhí)行后面的指令,而不會(huì)等待對(duì)方進(jìn)程執(zhí)行recv指令接收完這個(gè)數(shù)據(jù)。但是,如果要發(fā)送的數(shù)據(jù)很大,那么進(jìn)程就是掛起等待,直到接收進(jìn)程執(zhí)行了recv指令接收了這個(gè)數(shù)據(jù),進(jìn)程才繼續(xù)往下執(zhí)行。所以上述的代碼發(fā)送[rank]5沒(méi)啥問(wèn)題,如果發(fā)送[rank]500程序就會(huì)半死不活的樣子了。因?yàn)樗械倪M(jìn)程都會(huì)卡在發(fā)送這條指令,等待下一個(gè)進(jìn)程發(fā)起接收的這個(gè)指令,但是進(jìn)程是執(zhí)行完發(fā)送的指令才能執(zhí)行接收的指令,這就和死鎖差不多了。所以一般,我們將其修改成以下的方式:
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data_send = [comm_rank] * 5
if comm_rank == 0:
comm.send(data_send, dest=(comm_rank + 1) % comm_size)
if comm_rank > 0:
data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
comm.send(data_send, dest=(comm_rank + 1) % comm_size)
if comm_rank == 0:
data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)
第一個(gè)進(jìn)程一開始就發(fā)送數(shù)據(jù),其他進(jìn)程一開始都是在等待接收數(shù)據(jù),這時(shí)候進(jìn)程1接收了進(jìn)程0的數(shù)據(jù),然后發(fā)送進(jìn)程1的數(shù)據(jù),進(jìn)程2接收了,再發(fā)送進(jìn)程2的數(shù)據(jù)……知道最后進(jìn)程0接收最后一個(gè)進(jìn)程的數(shù)據(jù),從而避免了上述問(wèn)題。
一個(gè)比較常用的方法是封一個(gè)組長(zhǎng),也就是一個(gè)主進(jìn)程,一般是進(jìn)程0作為主進(jìn)程leader。主進(jìn)程將數(shù)據(jù)發(fā)送給其他的進(jìn)程,其他的進(jìn)程處理數(shù)據(jù),然后返回結(jié)果給進(jìn)程0。換句話說(shuō),就是進(jìn)程0來(lái)控制整個(gè)數(shù)據(jù)處理流程。
2.3、群體通信
點(diǎn)對(duì)點(diǎn)通信是A發(fā)送給B,一個(gè)人將自己的秘密告訴另一個(gè)人,群體通信(Collective Communications)像是拿個(gè)大喇叭,一次性告訴所有的人。前者是一對(duì)一,后者是一對(duì)多。但是,群體通信是以更有效的方式工作的。它的原則就一個(gè):盡量把所有的進(jìn)程在所有的時(shí)刻都使用上!我們?cè)谙旅娴腷cast小節(jié)講述。
群體通信還是發(fā)送和接收兩類,一個(gè)是一次性把數(shù)據(jù)發(fā)給所有人,另一個(gè)是一次性從所有人那里回收結(jié)果。
1)廣播bcast
將一份數(shù)據(jù)發(fā)送給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程都會(huì)得到這200份數(shù)據(jù)。
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
data = comm.bcast(data if comm_rank == 0 else None, root=0)
print('rank %d, got:' % (comm_rank))
print(data)
結(jié)果如下
rank 0, got:
[0, 1, 2, 3, 4]
rank 1, got:
[0, 1, 2, 3, 4]
rank 2, got:
[0, 1, 2, 3, 4]
rank 3, got:
[0, 1, 2, 3, 4]
rank 4, got:
[0, 1, 2, 3, 4]
Root進(jìn)程自己建了一個(gè)列表,然后廣播給所有的進(jìn)程。這樣所有的進(jìn)程都擁有了這個(gè)列表。然后愛干嘛就干嘛了。
對(duì)廣播最直觀的觀點(diǎn)是某個(gè)特定進(jìn)程將數(shù)據(jù)一一發(fā)送給每個(gè)進(jìn)程。假設(shè)有n個(gè)進(jìn)程,那么假設(shè)我們的數(shù)據(jù)在0進(jìn)程,那么0進(jìn)程就需要將數(shù)據(jù)發(fā)送給剩下的n-1個(gè)進(jìn)程,這是非常低效的,復(fù)雜度是O(n)。那有沒(méi)有高效的方式?一個(gè)最常用也是非常高效的手段是規(guī)約樹廣播:收到廣播數(shù)據(jù)的所有進(jìn)程都參與到數(shù)據(jù)廣播的過(guò)程中。首先只有一個(gè)進(jìn)程有數(shù)據(jù),然后它把它發(fā)送給第一個(gè)進(jìn)程,此時(shí)有兩個(gè)進(jìn)程有數(shù)據(jù);然后這兩個(gè)進(jìn)程都參與到下一次的廣播中,這時(shí)就會(huì)有4個(gè)進(jìn)程有數(shù)據(jù),……,以此類推,每次都會(huì)有2的次方個(gè)進(jìn)程有數(shù)據(jù)。通過(guò)這種規(guī)約樹的廣播方法,廣播的復(fù)雜度降為O(log n)。這就是上面說(shuō)的群體通信的高效原則:充分利用所有的進(jìn)程來(lái)實(shí)現(xiàn)數(shù)據(jù)的發(fā)送和接收。
2)散播scatter
將一份數(shù)據(jù)平分給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程會(huì)分別得到20份數(shù)據(jù)。
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
print(data)
else:
data = None
local_data = comm.scatter(data, root=0)
print('rank %d, got:' % comm_rank)
print(local_data)
結(jié)果如下
[0, 1, 2, 3, 4]
rank 0, got:
0
rank 1, got:
1
rank 2, got:
2
rank 3, got:
3
rank 4, got:
4
這里root進(jìn)程創(chuàng)建了一個(gè)list,然后將它散播給所有的進(jìn)程,相當(dāng)于對(duì)這個(gè)list做了劃分,每個(gè)進(jìn)程獲得等分的數(shù)據(jù),這里就是list的每一個(gè)數(shù)。(主要根據(jù)list的索引來(lái)劃分,list索引為第i份的數(shù)據(jù)就發(fā)送給第i個(gè)進(jìn)程)。如果是矩陣,那么就等分的劃分行,每個(gè)進(jìn)程獲得相同的行數(shù)進(jìn)行處理。
需要注意的是,MPI的工作方式是每個(gè)進(jìn)程都會(huì)執(zhí)行所有的代碼,所以每個(gè)進(jìn)程都會(huì)執(zhí)行scatter這個(gè)指令,但只有root執(zhí)行它的時(shí)候,它才兼?zhèn)浒l(fā)送者和接收者的身份(root也會(huì)得到屬于自己的數(shù)據(jù)),對(duì)于其他進(jìn)程來(lái)說(shuō),他們都只是接收者而已。
3)收集gather
那有發(fā)送,就有一起回收的函數(shù)。Gather是將所有進(jìn)程的數(shù)據(jù)收集回來(lái),合并成一個(gè)列表。下面聯(lián)合scatter和gather組成一個(gè)完成的分發(fā)和收回過(guò)程:
Root進(jìn)程將數(shù)據(jù)通過(guò)scatter等分發(fā)給所有的進(jìn)程,等待所有的進(jìn)程都處理完后(這里只是簡(jiǎn)單的乘以2),root進(jìn)程再通過(guò)gather回收他們的結(jié)果,和分發(fā)的原則一樣,組成一個(gè)list。Gather還有一個(gè)變體就是allgather,可以理解為它在gather的基礎(chǔ)上將gather的結(jié)果再bcast了一次。啥意思?意思是root進(jìn)程將所有進(jìn)程的結(jié)果都回收統(tǒng)計(jì)完后,再把整個(gè)統(tǒng)計(jì)結(jié)果告訴大家。這樣,不僅root可以訪問(wèn)combine_data,所有的進(jìn)程都可以訪問(wèn)combine_data了。
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
print(data)
else:
data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print('rank %d, got and do:' % comm_rank)
print(local_data)
combine_data = comm.gather(local_data, root=0)
if comm_rank == 0:
print(combine_data)
[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 4, got and do:
8
rank 3, got and do:
6
[0, 2, 4, 6, 8]
4)規(guī)約reduce
規(guī)約是指不但將所有的數(shù)據(jù)收集回來(lái),收集回來(lái)的過(guò)程中還進(jìn)行了簡(jiǎn)單的計(jì)算,例如求和,求最大值等等。為什么要有這個(gè)呢?我們不是可以直接用gather全部收集回來(lái)了,再對(duì)列表求個(gè)sum或者max就可以了嗎?這樣不是累死組長(zhǎng)嗎?為什么不充分使用每個(gè)工人呢?規(guī)約實(shí)際上是使用規(guī)約樹來(lái)實(shí)現(xiàn)的。例如求max,完成可以讓工人兩兩pk后,再返回兩兩pk的最大值,然后再對(duì)第二層的最大值兩兩pk,直到返回一個(gè)最終的max給組長(zhǎng)。組長(zhǎng)就非常聰明的將工作分配下工人高效的完成了。這是O(n)的復(fù)雜度,下降到O(log n)(底數(shù)為2)的復(fù)雜度。
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
print(data)
else:
data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print('rank %d, got and do:' % comm_rank)
print(local_data)
all_sum = comm.reduce(local_data, root=0, op=MPI.SUM)
if comm_rank == 0:
print('sumis:%d' % all_sum)
結(jié)果如下
[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 3, got and do:
6
rank 4, got and do:
8
sum is:20
可以看到,最后可以得到一個(gè)sum值。
三、常見用法
3.1、對(duì)一個(gè)文件的多個(gè)行并行處理
#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()
if __name__ == '__main__':
if comm_rank == 0:
sys.stderr.write("processor root starts reading data...\n")
all_lines = sys.stdin.readlines()
all_lines = comm.bcast(all_lines if comm_rank == 0 else None, root = 0)
num_lines = len(all_lines)
local_lines_offset = np.linspace(0, num_lines, comm_size +1).astype('int')
local_lines = all_lines[local_lines_offset[comm_rank] :local_lines_offset[comm_rank + 1]]
sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_lines), num_lines))
cnt = 0
for line in local_lines:
fields = line.strip().split('\t')
cnt += 1
if cnt % 100 == 0:
sys.stderr.write("processor %d has processed %d/%d lines \n" %(comm_rank, cnt, len(local_lines)))
output = line.strip() + ' process every line here'
print output
3.2、對(duì)多個(gè)文件并行處理
如果我們的文件太大,例如幾千萬(wàn)行,那么mpi是沒(méi)辦法將這么大的數(shù)據(jù)bcast給所有的進(jìn)程的,所以我們可以先把大的文件split成小的文件,再讓每個(gè)進(jìn)程處理少數(shù)的文件。
#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write("Usage: python *.py directoty_with_files\n")
sys.exit(1)
path = sys.argv[1]
if comm_rank == 0:
file_list = os.listdir(path)
sys.stderr.write("%d files\n" % len(file_list))
file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0)
num_files = len(file_list)
local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int')
local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]]
sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files))
cnt = 0
for file_name in local_files:
hd = open(os.path.join(path, file_name))
for line in hd:
output = line.strip() + ' process every line here'
print output
cnt += 1
sys.stderr.write("processor %d has processed %d/%d files \n" %(comm_rank, cnt, len(local_files)))
hd.close()
import os, sys, time
import numpy as np
import mpi4py.MPI as MPI
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()
# test MPI
if __name__ == "__main__":
# create a matrix
if comm_rank == 0:
all_data = np.arange(20).reshape(4, 5)
print("************ data ******************")
print(all_data)
# broadcast the data to all processors
all_data = comm.bcast(all_data if comm_rank == 0 else None, root=0)
# divide the data to each processor
num_samples = all_data.shape[0]
local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int')
# get the local data which will be processed in this processor
local_data = all_data[local_data_offset[comm_rank]:local_data_offset[comm_rank + 1]]
print("****** %d/%d processor gets local data ****" % (comm_rank, comm_size))
print(local_data)
# reduce to get sum of elements
local_sum = local_data.sum()
all_sum = comm.reduce(local_sum, root=0, op=MPI.SUM)
# process in local
local_result = local_data ** 2
# gather the result from all processors and broadcast it
result = comm.allgather(local_result)
result = np.vstack(result)
if comm_rank == 0:
print("*** sum: ", all_sum)
print("************ result ******************")
print(result)
3.3、聯(lián)合numpy對(duì)矩陣的多個(gè)行或者多列并行處理
Mpi4py一個(gè)非常優(yōu)秀的特性是完美支持numpy!