MPI point-to-point communication
Sending and receiving of messages by processes is the basic MPI communication mechanism.
Sending messages
The MPI_Send
function is used to send data to another process. This function
has the following signature:
Parameter | Description |
---|---|
buf |
a pointer to the data to send |
count |
the number of elements to send from the buffer |
datatype |
data type of the element in the buffer |
dest |
the rank of recipient process |
tag |
a tag (user choice) to assign to the communication |
comm |
the communicator in which the communication takes place |
In plain English: a buffer which consists of count
successive entries of
the type indicated by datatype
, starting with the entry at address buf
should be sent to the process with rank dest
in the communicator comm
. The
message is identified by tag
.
The basic datatypes that can be specified correspond to the basic datatypes of the C programming language. The MPI basic datatypes and their corresponding C types are listed in the table below.
MPI Datatype | C equivalent | MPI Datatype | C equivalent |
---|---|---|---|
MPI_CHAR |
char |
MPI_UNSIGNED_CHAR |
unsigned char |
MPI_INT |
int |
MPI_UNSIGNED |
unsigned int |
MPI_LONG |
long int |
MPI_UNSIGNED_LONG |
unsigned long int |
MPI_LONG_LONG |
long long int |
MPI_UNSIGNED_LONG_LONG |
unsigned long long int |
MPI_FLOAT |
float |
MPI_DOUBLE |
double |
As an example, consider sending an array of 16 doubles to rank 1
using tag
99
and the MPI_COMM_WORLD
communicator, then the call to MPI_Send
will be
Receiving messages
The MPI_Recv
function is used to receive data from another process. This
function has the following signature:
int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
int tag, MPI_Comm comm, MPI_Status *status)
Parameter | Description |
---|---|
buf |
a pointer to where the data should be received |
count |
the maximum number of elements the receive buffer can hold |
datatype |
data type of the element of the receive buffer |
source |
the rank of sender process |
tag |
a tag (user choice) to assign to the communication |
comm |
the communicator in which the communication takes place |
status |
a pointer object representing the status of a receive operation |
In plain English: count
successive entries of the type indicated by datatype
can be received in the buffer starting at address buf
. The message should come
from the process with rank source
in the communicator comm
and should have a
the tag tag
.
As an example, consider receiving an array of 16 doubles from rank 0
using tag
99
and the MPI_COMM_WORLD
communicator, then the call to MPI_Recv
will be
The MPI_Send
function will block until a message is received. It means that as
long as a message is not received, the program cannot progress and that you have
to make sure that for every call to MPI_Recv
, there is a matching MPI_Send
or your program will be in a deadlock.
Send and receive example
To illustrate the process of communication with MPI_Send
and MPI_Recv
we
will consider a simple example were to processes are launched. For both these
processes a buffer that can hold up to 20 elements of type char
is allocated.
Then,
- the first process of rank
0
, will fill the buffer and send it to the process with rank1
usingMPI_Send
. - the second process will call
MPI_Recv
to receive the buffer from the first process (rank0
) - the second process prints the buffer
The code decribed above is implemented in the following program:
#include <string.h>
#include <stdio.h>
#include <mpi.h>
int main(int argc, char** argv) {
char message[20];
MPI_Status status;
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
if (world_rank == 0) {
strcpy(message,"Hello, there");
MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
printf("Rank %d received: %s\n", world_rank, message);
}
MPI_Finalize();
return 0;
}
This code can be compiled and run by using the following commands.
$ module load OpenMPI
$ mpicc -o mpi_hello_msg mpi_hello_msg.c
$ mpirun -np 2 ./mpi_hello_msg
Rank 1 received: Hello, there
Message matching
An MPI message consists of two parts:
- the data part and the information that can
- information about the message
The last part is called the envelope and contains the source, the destination,
the tag and the communicator. The source is automatically determined by
identifying the sender while the destination, tag and communicator are provided
as parameters of the MPI_Send
call.
In order for a message to be received, by a process all the elements of the
envelope need to match the MPI_Recv
call:
- the
source
parameter ofMPI_Recv
should match the source in the envelope - the process that called
MPI_Recv
should match the destination in the envelope - the
tag
parameter ofMPI_Recv
should match the tag in the envelope
For example, consider this change in the previous example code, where the value
of the tag on the sender side is changed from 99
to 89
.
if (world_rank == 0) {
strcpy(message,"Hello, there");
- MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
+ MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
printf("Rank %d received: %s\n", world_rank, message);
}
If we try to run the code with the change of tag on the sender side, not output
will be produced and the processes keep running until we cancel the execution:
we are in a deadlock. The reason for this deadlock is that the process of rank
1
is waiting for a message with tag = 99
but on the sender side (rank 0
)
the message that was sent with tag = 89
. As a result, rank 1
is blocking,
waiting for a message that was never sent.
Another example to illustrate how messages are matched is the following:
if (world_rank == 0) {
+ strcpy(message,"I'm rank 0");
+ MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
+
strcpy(message,"Hello, there");
MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
printf("Rank %d received: %s\n", world_rank, message);
+
+ MPI_Recv(message, 20, MPI_CHAR, 0, 89, MPI_COMM_WORLD, &status);
+ printf("Rank %d received: %s\n", world_rank, message);
}
The order of the tags on the receiver side is inverted (89
, then 99
) with
respect to the order on the sender side (99
, then 89
). As a result, when we
run this the code we obtain the following output
The messages have been matched by the tag. Even if, on the sender side, the
message with tag 89
has been issued first, it's not the first to be received.
Instead, it's the second message that is received first because its tag (99
)
matches the tag in the first call to MPI_Recv
on the receiver side.
However, in some situations, the last example will cause a deadlock due to the fact that MPI uses different communication protocols depending on the size of the message.
MPI communication protocols
The standard it's specified that MPI_Send
will return when it's safe to modify
the send buffer. However it does not mean that the message has reached its
destination (the standard doesn't require it). In fact, depending on the message
size, most MPI implementation will use different communication protocols. For one
of these communication protocols MPI_Send
will return before the message
arrives at its destination: the eager protocol.
With the eager protocol, when MPI_Send
is called, the MPI implementation
will store a copy of the data for the message in a temporary internal buffer and
return. Subsequently, when a matching receive is issued, MPI will transfer the
data from this temporary buffer to the receiver.
Another protocol is the rendezvous protocol. In this protocol, the first ask to the receiver if it's ready to receive the data, i.e., has a buffer to store the data. The data transfer takes place only when the receiver has confirmed that it's ready to receive the data.
For small to medium message sizes, most MPI implementations will use the eager protocol. However for large messages, where using a temporary buffer for large messages will consume too much memory, MPI will use the rendezvous protocol.
MPI_Recv is always blocking
MPI_Recv
returns when the received buffer as been populated by the data.
As a consequence it's always blocking as there is no way to receive the data
before it has been send.
A way to force the use of the rendezvous protocol is to use the MPI_Ssend
function in place of MPI_Send
.
MPI_Ssend
stands for synchronous send and has almost the signature as the
standard send:
If we convert the MPI_Send
of the last example to MPI_Ssend
:
if (world_rank == 0) {
strcpy(message,"I'm rank 0");
- MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
+ MPI_Ssend(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
strcpy(message,"Hello, there");
- MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
+ MPI_Ssend(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
printf("Rank %d received: %s\n", world_rank, message);
MPI_Recv(message, 20, MPI_CHAR, 0, 89, MPI_COMM_WORLD, &status);
printf("Rank %d received: %s\n", world_rank, message);
}
the result is a deadlock. The reason for this deadlock is that
- Sender: issue a send with
tag = 89
and block waiting for the message to be transmitted - Receiver: issue a receive waiting for a message with
tag = 99
and wait for this message
Because the sender is waiting for a matching receive before returning, the
second send with tag = 99
is never issued and the receiving side cannot
progress. The same is true for the sender side as the second receive is never
reached.
The reason why the code was able to progress when we used MPI_Send
is because
MPI was using the eager protocol and the second send was reached, allowing the
application to progress.
This illustrates the importance of issuing the send and receive calls in the right order as well as the importance of the tag. This is a common problem faced by MPI beginners. Initially, when testing their code with smaller problem sizes, it often functions correctly due to the use of the eager protocol. However, as they apply the same code to larger problem sizes, MPI switches to the rendezvous protocol, resulting in a deadlock situation.
Work-sharing with MPI
Up to this point, we've focused on communication aspects without delving into the methods for achieving parallelization, or work-sharing. The fundamental approach to attaining parallelism with MPI involves breaking down the global problem into smaller sub-problems and then distributing these subproblems among the individual ranks.
To illustrate how it can be done, we will consider a simple application that sums integers from 1 to N:
$$ sum = \sum_{i = 1}^N i $$
intsum_serial.c | |
---|---|
We can subdivide the problem by assigning a range of values to each rank. For that, we define the starting value ($start_i$) for a given process $i$ with rank $rank_i$ as
$$ start_i = \frac{N\cdot rank_i}{worldsize} + 1 $$
Note that we added $+ 1$ to the start because we want to sum all the integers from 1 to N.
Similarly, we can define the end value ($end_i$) for a given process with rank $rank_i$ as
$$ end_i = \frac{N\cdot (rank_i + 1)}{worldsize} $$
After obtaining the range of values, each rank can calculate its "local" sum. Upon completing this local computation, we transmit the local sum to the process with rank 0, which will handle the final summation.
Note that we used MPI_STATUS_IGNORE
in the MPI_Recv
call. This informs MPI
to not fill an MPI_Status
, which, in our case, saves some time as we have no
use for it.
There is a better way to perform the final sum
Later on, we'll discuss a more efficient approach for performing the final sum than using multiple receives (through collective communication). The purpose of presenting this pattern here is to demonstrate how work-sharing can be accomplished with MPI. It's worth noting that this approach is not necessarily the optimal implementation.
Alternative implementation for the communication
Since the order of message reception doesn't affect correctness, we can
consider an alternative communication approach by employing the
MPI_ANY_SOURCE
wildcard for the source parameter in the MPI_Recv
call.
This instructs MPI to receive the message without specifying the sender's
rank, allowing for relaxed order in message reception.
Diffusion in one dimension
For our next example, we will consider the diffusion equation in one dimension which reads
$$ \frac{\partial u}{\partial t} = \alpha \frac{\partial^2 u}{\partial x^2} $$
where $u(x,t)$ is the unknown function to be solved for, $x$ is a coordinate in space, and $t$ is time. The coefficient $\alpha$ is the diffusion coefficient which determines how fast $u$ changes in time. This problem can be discretized in time and space to give
$$ \frac{\partial}{\partial t}u(x_i, t_n) = \alpha \frac{\partial^2}{\partial x^2}u(x_i, t_n) $$
After applying a forward difference in time and a central difference in space to the equation above we obtain
$$ \frac{u_i^{n+1} - u_i^n}{\Delta t} = \frac{u_{i+1}^n - 2 u_i^n + u_{i-1}^{n}}{\Delta x^2} $$
so that the values on the one-dimensional grid ($u_i^{n+1}$) for the next time step ($t_{n+1}$) can be computed from the values obtained at the previous time step ($t_n$) using
$$ u_i^{n+1} = u_i^n + \alpha\frac{\Delta t}{\Delta x^2}(u_{i+1}^n - 2 u_i^n + u_{i-1}^{n}) $$
From the equation above, we see that to compute an updated value on the grid, we need the value from the previous step on at the same location on the grid ($u_i^n$) but also the value of the neighbors ($u_{i+1}^n$, and $u_{i-1}^{n}$).
The domain can be divided into chunks that are computed on different processes. However, since computing the value of each point requires the values of the neighbors, the computation is not embarrassingly parallel: the points at the borders of a chunk require the values of points from the neighboring chunks. This can be implemented by adding extra cells to the chunk called "ghost cell" that will be updated with values coming from the neighbors using MPI communication.
To compute the chunk start and end values, we can use a similar approach to the
one used for the sum of integers example. Once we have computed these values,
we can determine the size of the chunk and allocate memory to hold the values
for the previous (uold
) and current time step (unew
).
my_start = NPOINTS * my_rank / world_size;
my_end = NPOINTS * (my_rank + 1) / world_size;
my_size = my_end - my_start;
uold = malloc(sizeof(double)*(my_size+2));
unew = malloc(sizeof(double)*(my_size+2));
Note that we allocated two extra cells in our call to malloc
. These extra cells
are the ghost cells used to store the values we require from the neighbors.
The neighbor's ranks are defined as our own rank minus one for the left neighbor and plus one for the right neighbor.
left_rank = my_rank > 0 ? my_rank - 1 : MPI_PROC_NULL;
right_rank = my_rank < world_size-1 ? my_rank + 1 : MPI_PROC_NULL;
were we used MPI_PROC_NULL
to represent the non-existent neighbors at the
start and end of the domain. It's a constant that represents a dummy MPI process
rank. When passing it to a send or receive operation, it is guaranteed to
succeed and return as soon as possible.
Now that we have defined the ranks of the neighbors, we can implement the update of the ghost cells for every time steps as
MPI_Send(&uold[my_size], 1, MPI_DOUBLE, right_rank, 0, MPI_COMM_WORLD);
MPI_Recv(&uold[0], 1, MPI_DOUBLE, left_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
MPI_Send(&uold[1], 1, MPI_DOUBLE, left_rank, 1, MPI_COMM_WORLD);
MPI_Recv(&uold[my_size+1], 1, MPI_DOUBLE, right_rank, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
where we first send the value at the end of the process subdomain to the right neighbor and receive the value from the left neighbor to update the ghost cell at the beginning of the subdomain. In a subsequent communication step, we do the opposite, send to the left neighbor and update the ghost cell and the end of the subdomain by receiving the value from the right neighbor.
The code where everything is put together is presented below.
Source code for this example and gnuplot input for visualization