關(guān)于 多進(jìn)程 & RPC 的王者:MPI簡(jiǎn)單了解

image.png

mpi 是一項(xiàng)非常古老的框架和技術(shù),也是一款非常非常優(yōu)秀簡(jiǎn)潔的工具,只要做c++ 進(jìn)階到一定程度,一般都會(huì)接觸到它。相比較 java中的 dubbo scala的finagle 及 grpc thrift brpc zookeeper 分布式技術(shù) 集群 跨語(yǔ)言通信 等等都是望塵莫及

mpi 優(yōu)秀是有原因的 ,當(dāng)然是快,一個(gè)快 字 可以遮百丑,最關(guān)鍵的是 好像基本沒(méi)有什么缺點(diǎn)。

mpi 還有豐富的 python api :mpi4py

mpi的安裝 可以在 mac 和 centos上
mac 安裝 brew install mpich
或者 brew install openmpi # 有時(shí)候只能安裝成功一個(gè),兩者有時(shí)互斥
pip install mpi4py

python 版的mpi 運(yùn)行方式 在 Terminal 中

# 命令  -n 進(jìn)程數(shù)量   python解釋器   python腳本 路徑
 mpirun -n 5  python3  ./mpidemo.py 

mpi 集群通信
https://blog.csdn.net/secyb/article/details/78697976
https://blog.csdn.net/kongxx/article/details/52227572
https://download.csdn.net/download/xcmax/7029853
https://download.csdn.net/download/youlovechao/5278025
https://www.csdn.net/article/2009-08-26/4254
https://blog.csdn.net/baidu_24281959/article/details/51471017

mpi C++實(shí)踐參考
http://blog.septicmk.com/Concurrent-and-Parallel/MPI-tutorial.html

http://pytlab.org/2016/08/18/MPI%E9%9B%86%E5%90%88%E9%80%9A%E4%BF%A1%EF%BC%9A1-N/

https://plytools.github.io/2017/05/15/MPI%E5%B9%B6%E8%A1%8C%EF%BC%9APthread%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B/
http://www.ssc.net.cn/files/MPI%E7%BC%96%E7%A8%8B%E5%88%9D%E6%AD%A5.pdf

mpi 的python教程 實(shí)踐參考這個(gè)
http://pytlab.org/tags/MPI/
http://pytlab.org/2017/02/19/Python%E5%A4%9A%E8%BF%9B%E7%A8%8B%E5%B9%B6%E8%A1%8C%E7%BC%96%E7%A8%8B%E5%AE%9E%E8%B7%B5-mpi4py%E7%9A%84%E4%BD%BF%E7%94%A8/

https://blog.csdn.net/zouxy09/article/details/49031845

一、概述

CPU從三十多年前的8086,到十年前的奔騰,再到當(dāng)下的多核i7。一開始,以單核cpu的主頻為目標(biāo),架構(gòu)的改良和集成電路工藝的進(jìn)步使得cpu的性能高速上升,單核cpu的主頻從老爺車的MHz階段一度接近4GHz高地。然而,也因?yàn)楣に嚭凸牡鹊南拗?,單核cpu遇到了人生的天花板,急需轉(zhuǎn)換思維,以滿足無(wú)止境的性能需求。多核cpu在此登上歷史舞臺(tái)。給你的老爺車多加兩個(gè)引擎,讓你有法拉利的感覺?,F(xiàn)時(shí)代,連手機(jī)都到處叫囂自己有4核8核處理器的時(shí)代,PC就更不用說(shuō)了。

扯遠(yuǎn)了,anyway,對(duì)于俺們程序員來(lái)說(shuō),如何利用如此強(qiáng)大的引擎完成我們的任務(wù)才是我們要考慮的。隨著大規(guī)模數(shù)據(jù)處理、大規(guī)模問(wèn)題和復(fù)雜系統(tǒng)求解需求的增加,以前的單核編程已經(jīng)有心無(wú)力了。如果程序一跑就得幾個(gè)小時(shí),甚至一天,想想都無(wú)法原諒自己。那如何讓自己更快的過(guò)度到高大上的多核并行編程中去呢?哈哈,廣大人民的力量!

目前工作中我所接觸到的并行處理框架主要有MPI、OpenMP和MapReduce(Hadoop)三個(gè)(CUDA屬于GPU并行編程,這里不提及)。MPI和Hadoop都可以在集群中運(yùn)行,而OpenMP因?yàn)楣蚕泶鎯?chǔ)結(jié)構(gòu)的關(guān)系,不能在集群上運(yùn)行,只能單機(jī)。另外,MPI可以讓數(shù)據(jù)保留在內(nèi)存中,可以為節(jié)點(diǎn)間的通信和數(shù)據(jù)交互保存上下文,所以能執(zhí)行迭代算法,而Hadoop卻不具有這個(gè)特性。因此,需要迭代的機(jī)器學(xué)習(xí)算法大多使用MPI來(lái)實(shí)現(xiàn)。當(dāng)然了,部分機(jī)器學(xué)習(xí)算法也是可以通過(guò)設(shè)計(jì)使用Hadoop來(lái)完成的。(淺見,如果錯(cuò)誤,希望各位不吝指出,謝謝)。

   本文主要介紹Python環(huán)境下MPI編程的實(shí)踐基礎(chǔ)。

二、MPI與mpi4py

MPI是Message Passing Interface的簡(jiǎn)稱,也就是消息傳遞。消息傳遞指的是并行執(zhí)行的各個(gè)進(jìn)程具有自己獨(dú)立的堆棧和代碼段,作為互不相關(guān)的多個(gè)程序獨(dú)立執(zhí)行,進(jìn)程之間的信息交互完全通過(guò)顯示地調(diào)用通信函數(shù)來(lái)完成。

Mpi4py是構(gòu)建在mpi之上的python庫(kù),使得python的數(shù)據(jù)結(jié)構(gòu)可以在進(jìn)程(或者多個(gè)cpu)之間進(jìn)行傳遞。

2.1、MPI的工作方式

很簡(jiǎn)單,就是你啟動(dòng)了一組MPI進(jìn)程,每個(gè)進(jìn)程都是執(zhí)行同樣的代碼!然后每個(gè)進(jìn)程都有一個(gè)ID,也就是rank來(lái)標(biāo)記我是誰(shuí)。什么意思呢?假設(shè)一個(gè)CPU是你請(qǐng)的一個(gè)工人,共有10個(gè)工人。你有100塊磚頭要搬,然后很公平,讓每個(gè)工人搬10塊。這時(shí)候,你把任務(wù)寫到一個(gè)任務(wù)卡里面,讓10個(gè)工人都執(zhí)行這個(gè)任務(wù)卡中的任務(wù),也就是搬磚!這個(gè)任務(wù)卡中的“搬磚”就是你寫的代碼。然后10個(gè)CPU執(zhí)行同一段代碼。需要注意的是,代碼里面的所有變量都是每個(gè)進(jìn)程獨(dú)有的,雖然名字相同。

例如,一個(gè)腳本test.py,里面包含以下代碼:

from mpi4py import MPI  
print("hello world'')  
print("my rank is: %d" %MPI.ROOT) 

然后我們?cè)诿钚型ㄟ^(guò)以下方式運(yùn)行:
#mpirun –n 5 python test.py

-n 5 指定啟動(dòng)5個(gè)mpi進(jìn)程來(lái)執(zhí)行后面的程序。相當(dāng)于對(duì)腳本拷貝了5份,每個(gè)進(jìn)程運(yùn)行一份,互不干擾。在運(yùn)行的時(shí)候代碼里面唯一的不同,就是各自的rank也就是ID不一樣。所以這個(gè)代碼就會(huì)打印5個(gè)hello world和5個(gè)不同的rank值,從0到4.

2.2、點(diǎn)對(duì)點(diǎn)通信

點(diǎn)對(duì)點(diǎn)通信(Point-to-PointCommunication)的能力是信息傳遞系統(tǒng)最基本的要求。意思就是讓兩個(gè)進(jìn)程直接可以傳輸數(shù)據(jù),也就是一個(gè)發(fā)送數(shù)據(jù),另一個(gè)接收數(shù)據(jù)。接口就兩個(gè),send和recv,來(lái)個(gè)例子:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

# point to point communication
data_send = [comm_rank] * 5
comm.send(data_send, dest=(comm_rank + 1) % comm_size)
data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv  )

啟動(dòng)5個(gè)進(jìn)程運(yùn)行以上代碼,結(jié)果如下:

1.  my rank is 0, and I received:  
2.  [4, 4, 4, 4, 4]  
3.  my rank is 1, and I received:  
4.  [0, 0, 0, 0, 0]  
5.  my rank is 2, and I received:  
6.  [1, 1, 1, 1, 1]  
7.  my rank is 3, and I received:  
8.  [2, 2, 2, 2, 2]  
9.  my rank is 4, and I received:  
10.  [3, 3, 3, 3, 3]

可以看到,每個(gè)進(jìn)程都創(chuàng)建了一個(gè)數(shù)組,然后把它傳遞給下一個(gè)進(jìn)程,最后的那個(gè)進(jìn)程傳遞給第一個(gè)進(jìn)程。comm_size就是mpi的進(jìn)程個(gè)數(shù),也就是-np指定的那個(gè)數(shù)。MPI.COMM_WORLD 表示進(jìn)程所在的通信組。

但這里面有個(gè)需要注意的問(wèn)題,如果我們要發(fā)送的數(shù)據(jù)比較小的話,mpi會(huì)緩存我們的數(shù)據(jù),也就是說(shuō)執(zhí)行到send這個(gè)代碼的時(shí)候,會(huì)緩存被send的數(shù)據(jù),然后繼續(xù)執(zhí)行后面的指令,而不會(huì)等待對(duì)方進(jìn)程執(zhí)行recv指令接收完這個(gè)數(shù)據(jù)。但是,如果要發(fā)送的數(shù)據(jù)很大,那么進(jìn)程就是掛起等待,直到接收進(jìn)程執(zhí)行了recv指令接收了這個(gè)數(shù)據(jù),進(jìn)程才繼續(xù)往下執(zhí)行。所以上述的代碼發(fā)送[rank]5沒(méi)啥問(wèn)題,如果發(fā)送[rank]500程序就會(huì)半死不活的樣子了。因?yàn)樗械倪M(jìn)程都會(huì)卡在發(fā)送這條指令,等待下一個(gè)進(jìn)程發(fā)起接收的這個(gè)指令,但是進(jìn)程是執(zhí)行完發(fā)送的指令才能執(zhí)行接收的指令,這就和死鎖差不多了。所以一般,我們將其修改成以下的方式:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data_send = [comm_rank] * 5
if comm_rank == 0:
  comm.send(data_send, dest=(comm_rank + 1) % comm_size)
if comm_rank > 0:
  data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
  comm.send(data_send, dest=(comm_rank + 1) % comm_size)
if comm_rank == 0:
  data_recv = comm.recv(source=(comm_rank - 1) % comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)

第一個(gè)進(jìn)程一開始就發(fā)送數(shù)據(jù),其他進(jìn)程一開始都是在等待接收數(shù)據(jù),這時(shí)候進(jìn)程1接收了進(jìn)程0的數(shù)據(jù),然后發(fā)送進(jìn)程1的數(shù)據(jù),進(jìn)程2接收了,再發(fā)送進(jìn)程2的數(shù)據(jù)……知道最后進(jìn)程0接收最后一個(gè)進(jìn)程的數(shù)據(jù),從而避免了上述問(wèn)題。

一個(gè)比較常用的方法是封一個(gè)組長(zhǎng),也就是一個(gè)主進(jìn)程,一般是進(jìn)程0作為主進(jìn)程leader。主進(jìn)程將數(shù)據(jù)發(fā)送給其他的進(jìn)程,其他的進(jìn)程處理數(shù)據(jù),然后返回結(jié)果給進(jìn)程0。換句話說(shuō),就是進(jìn)程0來(lái)控制整個(gè)數(shù)據(jù)處理流程。

2.3、群體通信

點(diǎn)對(duì)點(diǎn)通信是A發(fā)送給B,一個(gè)人將自己的秘密告訴另一個(gè)人,群體通信(Collective Communications)像是拿個(gè)大喇叭,一次性告訴所有的人。前者是一對(duì)一,后者是一對(duì)多。但是,群體通信是以更有效的方式工作的。它的原則就一個(gè):盡量把所有的進(jìn)程在所有的時(shí)刻都使用上!我們?cè)谙旅娴腷cast小節(jié)講述。

群體通信還是發(fā)送和接收兩類,一個(gè)是一次性把數(shù)據(jù)發(fā)給所有人,另一個(gè)是一次性從所有人那里回收結(jié)果。

1)廣播bcast

將一份數(shù)據(jù)發(fā)送給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程都會(huì)得到這200份數(shù)據(jù)。


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
  data = range(comm_size)
data = comm.bcast(data if comm_rank == 0 else None, root=0)
print('rank %d, got:' % (comm_rank))
print(data)

結(jié)果如下

rank 0, got:
[0, 1, 2, 3, 4]
rank 1, got:
[0, 1, 2, 3, 4]
rank 2, got:
[0, 1, 2, 3, 4]
rank 3, got:
[0, 1, 2, 3, 4]
rank 4, got:
[0, 1, 2, 3, 4]

Root進(jìn)程自己建了一個(gè)列表,然后廣播給所有的進(jìn)程。這樣所有的進(jìn)程都擁有了這個(gè)列表。然后愛干嘛就干嘛了。

對(duì)廣播最直觀的觀點(diǎn)是某個(gè)特定進(jìn)程將數(shù)據(jù)一一發(fā)送給每個(gè)進(jìn)程。假設(shè)有n個(gè)進(jìn)程,那么假設(shè)我們的數(shù)據(jù)在0進(jìn)程,那么0進(jìn)程就需要將數(shù)據(jù)發(fā)送給剩下的n-1個(gè)進(jìn)程,這是非常低效的,復(fù)雜度是O(n)。那有沒(méi)有高效的方式?一個(gè)最常用也是非常高效的手段是規(guī)約樹廣播:收到廣播數(shù)據(jù)的所有進(jìn)程都參與到數(shù)據(jù)廣播的過(guò)程中。首先只有一個(gè)進(jìn)程有數(shù)據(jù),然后它把它發(fā)送給第一個(gè)進(jìn)程,此時(shí)有兩個(gè)進(jìn)程有數(shù)據(jù);然后這兩個(gè)進(jìn)程都參與到下一次的廣播中,這時(shí)就會(huì)有4個(gè)進(jìn)程有數(shù)據(jù),……,以此類推,每次都會(huì)有2的次方個(gè)進(jìn)程有數(shù)據(jù)。通過(guò)這種規(guī)約樹的廣播方法,廣播的復(fù)雜度降為O(log n)。這就是上面說(shuō)的群體通信的高效原則:充分利用所有的進(jìn)程來(lái)實(shí)現(xiàn)數(shù)據(jù)的發(fā)送和接收。

2)散播scatter

將一份數(shù)據(jù)平分給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程會(huì)分別得到20份數(shù)據(jù)。

  import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
  data = range(comm_size)
  print(data)
else:
  data = None
local_data = comm.scatter(data, root=0)
print('rank %d, got:' % comm_rank)
print(local_data)

結(jié)果如下

[0, 1, 2, 3, 4]
rank 0, got:
0
rank 1, got:
1
rank 2, got:
2
rank 3, got:
3
rank 4, got:
4

這里root進(jìn)程創(chuàng)建了一個(gè)list,然后將它散播給所有的進(jìn)程,相當(dāng)于對(duì)這個(gè)list做了劃分,每個(gè)進(jìn)程獲得等分的數(shù)據(jù),這里就是list的每一個(gè)數(shù)。(主要根據(jù)list的索引來(lái)劃分,list索引為第i份的數(shù)據(jù)就發(fā)送給第i個(gè)進(jìn)程)。如果是矩陣,那么就等分的劃分行,每個(gè)進(jìn)程獲得相同的行數(shù)進(jìn)行處理。

需要注意的是,MPI的工作方式是每個(gè)進(jìn)程都會(huì)執(zhí)行所有的代碼,所以每個(gè)進(jìn)程都會(huì)執(zhí)行scatter這個(gè)指令,但只有root執(zhí)行它的時(shí)候,它才兼?zhèn)浒l(fā)送者和接收者的身份(root也會(huì)得到屬于自己的數(shù)據(jù)),對(duì)于其他進(jìn)程來(lái)說(shuō),他們都只是接收者而已。

3)收集gather

那有發(fā)送,就有一起回收的函數(shù)。Gather是將所有進(jìn)程的數(shù)據(jù)收集回來(lái),合并成一個(gè)列表。下面聯(lián)合scatter和gather組成一個(gè)完成的分發(fā)和收回過(guò)程:

Root進(jìn)程將數(shù)據(jù)通過(guò)scatter等分發(fā)給所有的進(jìn)程,等待所有的進(jìn)程都處理完后(這里只是簡(jiǎn)單的乘以2),root進(jìn)程再通過(guò)gather回收他們的結(jié)果,和分發(fā)的原則一樣,組成一個(gè)list。Gather還有一個(gè)變體就是allgather,可以理解為它在gather的基礎(chǔ)上將gather的結(jié)果再bcast了一次。啥意思?意思是root進(jìn)程將所有進(jìn)程的結(jié)果都回收統(tǒng)計(jì)完后,再把整個(gè)統(tǒng)計(jì)結(jié)果告訴大家。這樣,不僅root可以訪問(wèn)combine_data,所有的進(jìn)程都可以訪問(wèn)combine_data了。


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
  data = range(comm_size)
  print(data)
else:
  data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print('rank %d, got and do:' % comm_rank)
print(local_data)
combine_data = comm.gather(local_data, root=0)
if comm_rank == 0:
  print(combine_data)


[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 4, got and do:
8
rank 3, got and do:
6
[0, 2, 4, 6, 8]

4)規(guī)約reduce

規(guī)約是指不但將所有的數(shù)據(jù)收集回來(lái),收集回來(lái)的過(guò)程中還進(jìn)行了簡(jiǎn)單的計(jì)算,例如求和,求最大值等等。為什么要有這個(gè)呢?我們不是可以直接用gather全部收集回來(lái)了,再對(duì)列表求個(gè)sum或者max就可以了嗎?這樣不是累死組長(zhǎng)嗎?為什么不充分使用每個(gè)工人呢?規(guī)約實(shí)際上是使用規(guī)約樹來(lái)實(shí)現(xiàn)的。例如求max,完成可以讓工人兩兩pk后,再返回兩兩pk的最大值,然后再對(duì)第二層的最大值兩兩pk,直到返回一個(gè)最終的max給組長(zhǎng)。組長(zhǎng)就非常聰明的將工作分配下工人高效的完成了。這是O(n)的復(fù)雜度,下降到O(log n)(底數(shù)為2)的復(fù)雜度。

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
  data = range(comm_size)
  print(data)
else:
  data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print('rank %d, got and do:' % comm_rank)
print(local_data)
all_sum = comm.reduce(local_data, root=0, op=MPI.SUM)
if comm_rank == 0:
  print('sumis:%d' % all_sum)


結(jié)果如下

[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 3, got and do:
6
rank 4, got and do:
8
sum is:20

可以看到,最后可以得到一個(gè)sum值。

三、常見用法

3.1、對(duì)一個(gè)文件的多個(gè)行并行處理

#!usr/bin/env python
#-*- coding: utf-8 -*-
 
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
 
 
#
#  Global variables for MPI
#
 
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()
 
 
if __name__ == '__main__':
   if comm_rank == 0:
       sys.stderr.write("processor root starts reading data...\n")
       all_lines = sys.stdin.readlines()
   all_lines = comm.bcast(all_lines if comm_rank == 0 else None, root = 0)
   num_lines = len(all_lines)
   local_lines_offset = np.linspace(0, num_lines, comm_size +1).astype('int')
   local_lines = all_lines[local_lines_offset[comm_rank] :local_lines_offset[comm_rank + 1]]
   sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_lines), num_lines))
   cnt = 0
   for line in local_lines:
       fields = line.strip().split('\t')
       cnt += 1
       if cnt % 100 == 0:
           sys.stderr.write("processor %d has processed %d/%d lines \n" %(comm_rank, cnt, len(local_lines)))
       output = line.strip() + ' process every line here'
       print output

3.2、對(duì)多個(gè)文件并行處理

如果我們的文件太大,例如幾千萬(wàn)行,那么mpi是沒(méi)辦法將這么大的數(shù)據(jù)bcast給所有的進(jìn)程的,所以我們可以先把大的文件split成小的文件,再讓每個(gè)進(jìn)程處理少數(shù)的文件。

#!usr/bin/env python
#-*- coding: utf-8 -*-
 
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
 
#
#  Global variables for MPI
#
 
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()
 
 
if __name__ == '__main__':
   if len(sys.argv) != 2:
       sys.stderr.write("Usage: python *.py directoty_with_files\n")
       sys.exit(1)
   path = sys.argv[1]
   if comm_rank == 0:
       file_list = os.listdir(path)
       sys.stderr.write("%d files\n" % len(file_list))
   file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0)
   num_files = len(file_list)
   local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int')
   local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]]
   sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files))
    cnt = 0
   for file_name in local_files:
       hd = open(os.path.join(path, file_name))
       for line in hd:
           output = line.strip() + ' process every line here'
           print output
       cnt += 1
       sys.stderr.write("processor %d has processed %d/%d files \n" %(comm_rank, cnt, len(local_files)))
       hd.close()
import os, sys, time
import numpy as np
import mpi4py.MPI as MPI


#
#  Global variables for MPI
#


# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()

# test MPI
if __name__ == "__main__":
  # create a matrix
  if comm_rank == 0:
    all_data = np.arange(20).reshape(4, 5)
    print("************ data ******************")
    print(all_data)

    # broadcast the data to all processors
  all_data = comm.bcast(all_data if comm_rank == 0 else None, root=0)

  # divide the data to each processor
  num_samples = all_data.shape[0]
  local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int')

  # get the local data which will be processed in this processor
  local_data = all_data[local_data_offset[comm_rank]:local_data_offset[comm_rank + 1]]
  print("****** %d/%d processor gets local data ****" % (comm_rank, comm_size))
  print(local_data)

  # reduce to get sum of elements
  local_sum = local_data.sum()
  all_sum = comm.reduce(local_sum, root=0, op=MPI.SUM)

  # process in local
  local_result = local_data ** 2

  # gather the result from all processors and broadcast it
  result = comm.allgather(local_result)
  result = np.vstack(result)

  if comm_rank == 0:
    print("*** sum: ", all_sum)
    print("************ result ******************")
    print(result)

3.3、聯(lián)合numpy對(duì)矩陣的多個(gè)行或者多列并行處理

   Mpi4py一個(gè)非常優(yōu)秀的特性是完美支持numpy!
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 計(jì)算機(jī)編程語(yǔ)言很多,但是適合高性能數(shù)值計(jì)算的語(yǔ)言卻并不多,在高性能計(jì)算的項(xiàng)目中通常會(huì)使用到的語(yǔ)言有 Fort...
    自可樂(lè)閱讀 20,310評(píng)論 3 21
  • 作者:邵正將PytLab,Python 中文社區(qū)專欄作者。主要從事科學(xué)計(jì)算與高性能計(jì)算領(lǐng)域的應(yīng)用,主要語(yǔ)言為Pyt...
    Python中文社區(qū)閱讀 4,887評(píng)論 1 31
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說(shuō)閱讀 12,389評(píng)論 6 13
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,756評(píng)論 0 5
  • 這節(jié)主要學(xué)習(xí)POST請(qǐng)求的接受方法。對(duì)于POST請(qǐng)求的處理,Koa2沒(méi)有封裝方便的獲取參數(shù)的方法,需要通過(guò)解析上下...
    qqqc閱讀 505評(píng)論 0 0

友情鏈接更多精彩內(nèi)容