我正在学习 MPI4Py,我想实现一个简单的程序。
解释
这里,每个等级都有一个
send_array
大小
rank+1
和值分别等于
rank+1
rank0 = [1]
rank1 = [2 2]
rank2 = [3 3 3]
rank3 = [4 4 4 4]
我想收集值
rank=0
到缓冲区
rbuf
它的大小等于所有本地
send_arrays
的总大小,即
1+2+3+4 = 10
程序
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
send_array = np.ones(rank+1).astype(int) * (rank + 1)
print(rank, send_array)
if rank == 0:
gather_size = np.array([sum([i+1 for i in range(size)])])
print(gather_size)
rbuf = np.zeros(gather_size[0]).astype(int)
else:
gather_size = None
rbuf = None
# comm.Gatherv(sendbuf, recvbuf=(recvbuf, recvcounts, displs, datatype), root=0)
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, (1,2,3,4),(0,1,3,6), MPI.INT), root=0)
if rank == 0:
print(rbuf, len(rbuf))
观察/错误/怀疑
我手动将这些值硬编码为更明确,因为我有疑问关于
recvcounts
和
displs
的值。我收到以下错误:
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, (1,2,3,4),(0,1,3,6), MPI.INT), root=0)
File "mpi4py/MPI/Comm.pyx", line 724, in mpi4py.MPI.Comm.Gatherv
mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated
我期望的结果是:
[1 2 2 3 3 3 4 4 4 4]
玩了一下之后,我还意识到代码
有效
如果我保持 rbuf 的大小等于 [| ||],在本例中可能是
maximum_local_array_size*number_of_ranks
,因为我有 4 个等级。
4*4
我得到的结果不是我想要的,但我没有收到任何错误。请注意,这次我也没有指定
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# send_array = np.ones(10).astype(int) * (rank + 1)
send_array = np.ones(rank+1).astype(int) * (rank + 1)
print(rank, send_array)
if rank == 0:
gather_size = np.array([16])
print(gather_size)
rbuf = np.zeros(gather_size[0]).astype(int)
else:
gather_size = None
rbuf = None
# comm.Gatherv(sendbuf, recvbuf=(recvbuf, recvcounts, displs, datatype), root=0)
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, MPI.INT), root=0)
if rank == 0:
print(rbuf, len(rbuf))
和
recvcounts
所以我不确定这些参数默认使用的确切值是什么,因为这些值似乎有效。
displs
. So I am not sure what exact values are used by default for these arguments, since those seem to work.
[1 0 0 0 2 2 0 0 3 3 3 0 4 4 4 4]
你的
recvcounts
和
displs
参数的理解是正确的。但是,你遇到的问题在于
Gatherv
操作如何将数据放入接收缓冲区(
rbuf
)。
在你的代码中,你正确地计算了
recvcounts
为
(1, 2, 3, 4)
,
displs
为
(0, 1, 3, 6)
。这意味着:
-
进程 0 发送 1 个元素,这些元素应该放在
rbuf
的索引 0 处开始。 -
进程 1 发送 2 个元素,这些元素应该放在
rbuf
的索引 1 处开始。 -
进程 2 发送 3 个元素,这些元素应该放在
rbuf
的索引 3 处开始。 -
进程 3 发送 4 个元素,这些元素应该放在
rbuf
的索引 6 处开始。
问题在于,当你使用
recvcounts
和
displs
时,
Gatherv
不会
将接收到的数据视为一个连续的数组。相反,它将接收到的数据视为多个块,每个块都由
recvcounts
和
displs
指定。
因此,当你在进程 0 上打印
rbuf
时,你会看到:
[1 0 0 0 2 2 0 0 3 3 3 0 4 4 4 4]
这是因为
Gatherv
按以下方式填充了
rbuf
:
-
来自进程 0 的
[1]
被放置在索引 0 处。 -
来自进程 1 的
[2 2]
被放置在索引 1 处开始。 -
来自进程 2 的
[3 3 3]
被放置在索引 3 处开始。 -
来自进程 3 的
[4 4 4 4]
被放置在索引 6 处开始。
rbuf
中剩余的零值是未初始化的内存。
为了获得你期望的结果
[1 2 2 3 3 3 4 4 4 4]
,你需要在执行
Gatherv
之后
将接收到的数据块复制到一个新的数组中:
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
send_array = np.ones(rank + 1).astype(int) * (rank + 1)
print(rank, send_array)
if rank == 0:
gather_size = np.array([sum([i + 1 for i in range(size)])])
print(gather_size)
rbuf = np.zeros(gather_size[0]).astype(int)
else:
gather_size = None
rbuf = None
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, (1, 2, 3, 4), (0, 1, 3, 6), MPI.INT), root=0)
if rank == 0:
result = np.concatenate([rbuf[displs[i]:displs[i] + recvcounts[i]] for i in range(size)])
print(result, len(result))
这段代码将创建一个新的
result
数组,并将接收到的数据块从
rbuf
复制到
result
中,从而产生你期望的结果。