Message Passing Interface (MPI)

Author

Lars Pastewka

Overview

The Message Passing Interface (MPI) is the standard approach for distributed-memory parallel programming on HPC systems. While Kokkos handles parallelization within a single computing node (through threads, OpenMP, or GPU execution), MPI enables communication between multiple nodes, where each node can run Kokkos-accelerated code independently.

This lecture covers: - MPI fundamentals and basic communication patterns - How to integrate MPI with Kokkos for distributed GPU computing - GPU-aware MPI (CUDA-aware and HIP-aware), which enables direct GPU-to-GPU communication without transferring data through host (CPU) memory

References and Documentation

MPI Fundamentals

What is MPI?

MPI is a message-passing library specification that allows: - Multiple processes to run independently on different nodes or cores - Explicit point-to-point communication between processes - Collective operations (broadcast, reduce, all-gather, etc.) across all processes - Synchronization barriers

Unlike shared-memory threading models (OpenMP, Kokkos::Threads), MPI processes have completely separate address spaces. Data must be explicitly sent and received using MPI functions.

Basic MPI Program Structure

A minimal MPI program has the following structure:

#include <iostream>
#include <mpi.h>

int main(int argc, char* argv[]) {
  // Initialize MPI
  MPI_Init(&argc, &argv);

  // Get process rank and total number of processes
  int rank, size;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  std::cout << "Rank " << rank << " out of " << size << std::endl;

  // Finalize MPI
  MPI_Finalize();
  return 0;
}

Key MPI Concepts

Communicator: Defines a group of processes that can communicate with each other. MPI_COMM_WORLD includes all processes started by the MPI program.

Rank: A unique identifier assigned to each process, numbered from 0 to (size - 1). Think of it as the process ID within your communicator.

Size: The total number of processes running in the communicator.

Point-to-Point Communication

Blocking Send/Receive

The most basic MPI communication uses blocking send (MPI_Send) and receive (MPI_Recv):

// Process 0 sends to process 1
if (rank == 0) {
  double value = 42.0;
  // Send: destination is rank 1, message tag is 0
  MPI_Send(&value, 1, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);
} else if (rank == 1) {
  double value;
  // Receive: source is rank 0, message tag is 0
  MPI_Recv(&value, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  std::cout << "Received: " << value << std::endl;
}

Important: Blocking operations do not return until the operation is complete. This can cause deadlocks if processes wait for each other. For example, if all processes try to send before receiving, they will all wait forever. Later in this lecture, we’ll see how non-blocking operations can help avoid this.

Non-blocking Communication

Non-blocking operations (with names starting with I for “immediate”) return immediately without waiting for the operation to complete:

MPI_Request request;

if (rank == 0) {
  double value = 42.0;
  // Start the send - returns immediately
  MPI_Isend(&value, 1, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request);
  // Can do other work here while message is being sent
  std::cout << "Send started" << std::endl;
  // Wait for send to complete before accessing value again
  MPI_Wait(&request, MPI_STATUS_IGNORE);
} else if (rank == 1) {
  double value;
  // Start the receive - returns immediately
  MPI_Irecv(&value, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request);
  // Can do other work here while receiving
  std::cout << "Receive started" << std::endl;
  // Wait for receive to complete
  MPI_Wait(&request, MPI_STATUS_IGNORE);
  std::cout << "Received: " << value << std::endl;
}

Non-blocking operations are useful when you want to interleave computation and communication. However, you must call MPI_Wait() before accessing the data to ensure the operation is complete.

Collective Operations

Collective operations involve all processes in a communicator.

Broadcast

Broadcast sends data from one process to all others. All processes must call this function together:

double value;
if (rank == 0) value = 3.14159;
else value = 0.0;  // Other processes need to have variable allocated

// All processes participate: broadcast from rank 0
MPI_Bcast(&value, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);

// Now all processes have value = 3.14159
std::cout << "Rank " << rank << " has value: " << value << std::endl;

Notice that all processes call the same broadcast function. This is different from point-to-point communication where only sender and receiver participate.

Reduction

Reduction combines data from all processes using an operation like sum, maximum, minimum, etc. The result is gathered on a single process (the root):

double local_value = rank * 2.0;  // Each process has a local value
double global_sum = 0.0;

// Combine all local values using sum operation
// Result appears only on rank 0
MPI_Reduce(&local_value, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

// Only rank 0 has the result
if (rank == 0) {
  // With 4 processes: 0*2.0 + 1*2.0 + 2*2.0 + 3*2.0 = 12.0
  std::cout << "Global sum: " << global_sum << std::endl;
}

Common reduction operations include MPI_SUM, MPI_MAX, MPI_MIN, MPI_PROD (product).

All-Gather

All-gather collects data from every process and distributes the result to all processes:

int local_value = rank * 10;
std::vector<int> all_values(size);  // Need space for data from all processes

// Each process sends its local_value, receives from all others
MPI_Allgather(&local_value, 1, MPI_INT, all_values.data(), 1, MPI_INT, MPI_COMM_WORLD);

// Now all processes have the same array: [0, 10, 20, 30, ...]
// (assuming 4 processes with ranks 0, 1, 2, 3)
for (int i = 0; i < size; i++) {
  std::cout << "Rank " << rank << " sees: all_values[" << i << "] = " << all_values[i] << std::endl;
}

The key difference from MPI_Reduce: all-gather distributes the result to all processes, while reduce only gives the result to one (the root) process.

MPI with Kokkos

Basic Integration

When using MPI and Kokkos together, keep these principles in mind: 1. Initialize and finalize both Kokkos and MPI properly 2. Each MPI process runs its own independent Kokkos kernels 3. Use MPI functions to communicate between processes 4. Call Kokkos::fence() before MPI operations to ensure GPU work is complete

#include <mpi.h>
#include <Kokkos_Core.hpp>

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  Kokkos::initialize(argc, argv);

  {
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Each process runs Kokkos kernels independently
    int N = 1000;
    Kokkos::View<double*> data("data", N);

    // Fill with rank-dependent data
    Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) {
      data(i) = rank + i * 0.1;
    });
    Kokkos::fence();  // Important: ensure kernel completion before MPI

    // Communicate data between processes
    double local_sum = 0.0;
    Kokkos::parallel_reduce(N, KOKKOS_LAMBDA(int i, double& sum) {
      sum += data(i);
    }, local_sum);

    double global_sum;
    MPI_Reduce(&local_sum, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

    if (rank == 0) {
      std::cout << "Global sum: " << global_sum << std::endl;
    }
  }

  Kokkos::finalize();
  MPI_Finalize();
  return 0;
}

Important Synchronization Pattern

This section is critical for GPU computing. Always call Kokkos::fence() before MPI operations to ensure all GPU kernels have completed:

// Run Kokkos kernels on GPU
Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) {
  // GPU work happens asynchronously
  data(i) = i * 2.0;
});

// CRITICAL: wait for GPU to finish all work
Kokkos::fence();

// Now safe to use MPI with GPU data
// The data is guaranteed to be ready
MPI_Send(data.data(), N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);

Why is this necessary? GPU kernels execute asynchronously—they launch quickly and return control to your CPU code immediately, while the GPU continues working in the background. Without Kokkos::fence(), your MPI_Send might try to send data that hasn’t finished being computed yet, resulting in incorrect communication or corrupted data.

GPU-Aware MPI (CUDA-Aware and HIP-Aware)

The Problem: GPU Data Transfer

Without GPU-aware MPI, GPU-to-GPU communication requires staging data through CPU memory:

GPU 0 Memory → (cudaMemcpy to CPU) → CPU RAM → MPI Network → CPU RAM → (cudaMemcpy to GPU) → GPU 1 Memory

This approach has several problems: 1. Extra copy overhead: Data must be copied from GPU to CPU, then sent, then copied back to GPU 2. Limited CPU memory bandwidth: The CPU memory system may become a bottleneck 3. Network interface limitation: Traditional network interfaces cannot directly access GPU memory

The Solution: GPU-Aware MPI

GPU-Aware MPI allows direct GPU-to-GPU communication without CPU staging:

GPU 0 Memory → MPI Network → GPU 1 Memory (Direct!)

For GPU-aware MPI to work, you need: 1. MPI library compiled with GPU support: Either CUDA-aware (for NVIDIA) or HIP-aware (for AMD) 2. Network interface that supports GPU memory access: Modern high-performance networks like InfiniBand support this 3. GPU pointers passed directly to MPI functions: Your Kokkos views contain GPU pointers that MPI can use directly

Checking GPU-Aware MPI Support

For CUDA-Aware MPI:

// Check if Open MPI is CUDA-aware
#ifdef MPIX_CUDA_AWARE_SUPPORT
  if (MPIX_Query_cuda_support() == 1) {
    std::cout << "CUDA-aware MPI is available" << std::endl;
  }
#endif

For HIP-Aware MPI:

// Similar support check for HIP
#ifdef MPIX_HIP_AWARE_SUPPORT
  if (MPIX_Query_hip_support() == 1) {
    std::cout << "HIP-aware MPI is available" << std::endl;
  }
#endif

Example: CUDA-Aware MPI with Kokkos

#include <mpi.h>
#include <Kokkos_Core.hpp>

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  Kokkos::initialize(argc, argv);

  {
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (size < 2) {
      std::cerr << "Need at least 2 processes" << std::endl;
      return 1;
    }

    // Create GPU-resident data using Kokkos
    int N = 10000;
    Kokkos::View<double*, Kokkos::CudaSpace> gpu_data("gpu_data", N);

    // Initialize with rank-dependent data
    Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) {
      gpu_data(i) = rank + i * 0.01;
    });

    // Wait for GPU kernel to complete
    Kokkos::fence();

    // Now we can directly communicate GPU-resident data!
    // With CUDA-aware MPI, this pointer is GPU memory
    MPI_Request request;

    if (rank == 0) {
      // Send GPU data directly - no CPU staging needed!
      MPI_Isend(gpu_data.data(), N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request);
    } else if (rank == 1) {
      // Receive directly into GPU memory
      MPI_Irecv(gpu_data.data(), N, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request);
    }

    MPI_Wait(&request, MPI_STATUS_IGNORE);
    Kokkos::fence();  // Wait for MPI to complete

    std::cout << "Rank " << rank << " received: " << gpu_data(0) << std::endl;
  }

  Kokkos::finalize();
  MPI_Finalize();
  return 0;
}

Performance Comparison

GPU-aware MPI provides significant performance improvements, especially for large data transfers. This table shows typical measured times on modern HPC systems with InfiniBand interconnect:

Data Size Staging through CPU (μs) GPU-Aware MPI (μs) Speedup
1 MB 150 50 3x
100 MB 5,000 200 25x
1 GB 50,000 1,000 50x

For a 1 GB transfer, GPU-aware MPI saves 49 milliseconds—which may not sound like much, but in large-scale simulations with thousands of communication steps, this accumulates to significant time savings. Additionally, the CPU is freed to do other work while the GPU and network handle the transfer.

HIP-Aware MPI (AMD GPUs)

Installation

For AMD GPUs with HIP, you need HIP-aware MPI. Install with:

# On systems with ROCm
module load rocm mpi/openmpi/5.0-rocm

# Or manually build Open MPI with HIP support
cd openmpi
./configure --with-pmix --with-hip=/opt/rocm
make -j $(nproc)
sudo make install

HIP-Aware MPI Example

The code is nearly identical to CUDA-aware, just with HIP memory spaces:

#include <mpi.h>
#include <Kokkos_Core.hpp>

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  Kokkos::initialize(argc, argv);

  {
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Create HIP-resident data
    int N = 10000;
    Kokkos::View<double*, Kokkos::HIPSpace> gpu_data("gpu_data", N);

    // Initialize on GPU
    Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) {
      gpu_data(i) = rank + i * 0.01;
    });
    Kokkos::fence();

    // Use HIP-aware MPI to communicate
    if (rank == 0) {
      MPI_Send(gpu_data.data(), N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);
    } else if (rank == 1) {
      MPI_Recv(gpu_data.data(), N, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    }

    Kokkos::fence();
  }

  Kokkos::finalize();
  MPI_Finalize();
  return 0;
}

Building with MPI and Kokkos

CMakeLists.txt Example

cmake_minimum_required(VERSION 3.21)
project(MPIKokkos LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Find MPI
find_package(MPI REQUIRED)

# Find Kokkos (compiled with CUDA support)
find_package(Kokkos REQUIRED)

add_executable(mpi_kokkos mpi_kokkos.cpp)
target_link_libraries(mpi_kokkos PRIVATE MPI::MPI_CXX Kokkos::kokkos)

Compiling on HPC Cluster

# Load modules
module load compiler/gnu mpi/openmpi

# Configure Kokkos with CUDA support
cd kokkos
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release \
      -DKokkos_ENABLE_CUDA=ON \
      -DKokkos_ARCH_AMPERE80=ON \
      ..
cmake --build . -j $(nproc)
cmake --install . --prefix ~/kokkos-install

# Build your project
cd ../..
mkdir build && cd build
cmake -DKokkos_DIR=~/kokkos-install/lib/cmake/Kokkos ..
cmake --build . -j $(nproc)

Practical Example: Distributed Wave Solver

Here’s a more realistic example combining MPI and Kokkos for a distributed wave equation solver:

#include <mpi.h>
#include <Kokkos_Core.hpp>
#include <iostream>
#include <vector>

using Field = Kokkos::View<double*>;

// Compute local part of wave equation
void compute_wave_step(Field& u, Field& v, Field& a, int n, double dx, double dt) {
  // Compute acceleration
  Kokkos::parallel_for(n - 2, KOKKOS_LAMBDA(int i) {
    a(i + 1) = (u(i) - 2*u(i+1) + u(i+2)) / (dx * dx);
  });
  Kokkos::fence();

  // Update velocity and displacement
  Kokkos::parallel_for(n, KOKKOS_LAMBDA(int i) {
    v(i) += a(i) * dt;
    u(i) += v(i) * dt;
  });
  Kokkos::fence();
}

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  Kokkos::initialize(argc, argv);

  {
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Local domain size (ghost cells for MPI communication)
    int n_local = 1000;
    int n_total = n_local * size;

    Field u("u", n_local + 2);   // +2 for ghost cells
    Field v("v", n_local + 2);
    Field a("a", n_local + 2);

    // Simulate multiple time steps
    for (int step = 0; step < 100; ++step) {
      // Compute locally
      compute_wave_step(u, v, a, n_local, 0.01, 0.001);

      // Exchange ghost cells with neighbors
      if (rank > 0) {
        MPI_Send(u.data() + 1, 1, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD);
        MPI_Recv(u.data() + 0, 1, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      }
      if (rank < size - 1) {
        MPI_Send(u.data() + n_local, 1, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
        MPI_Recv(u.data() + n_local + 1, 1, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      }
    }

    if (rank == 0) {
      std::cout << "Simulation complete" << std::endl;
    }
  }

  Kokkos::finalize();
  MPI_Finalize();
  return 0;
}

Common Pitfalls

1. Forgetting Kokkos::fence() Before MPI

This is one of the most subtle bugs in GPU computing. GPU kernels run asynchronously, so your code continues immediately after launching them.

❌ Wrong (data not ready):

Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) { data(i) = i; });
// GPU kernel is still running...
MPI_Send(data.data(), N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);  // Sends incomplete data!

✓ Correct (wait for GPU):

Kokkos::parallel_for(N, KOKKOS_LAMBDA(int i) { data(i) = i; });
Kokkos::fence();  // Block until GPU computation is complete
MPI_Send(data.data(), N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);  // Now data is ready

The receiving process will get garbage data if you forget Kokkos::fence(). This is particularly difficult to debug because the program may run fine with small data or on slower GPUs!

2. Deadlock from Blocking Communication

Blocking MPI operations (MPI_Send, MPI_Recv) can cause deadlocks if processes wait for each other.

❌ Wrong (can deadlock with 2+ processes):

if (rank == 0) {
  MPI_Send(data, N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD);
  MPI_Recv(other_data, N, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} else {
  MPI_Send(data, N, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
  MPI_Recv(other_data, N, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
// With 2 processes: rank 0 sends to 1, rank 1 sends to 0
// If the network buffer is small, both processes block on their MPI_Send!

✓ Better (use non-blocking to avoid deadlock):

MPI_Request send_req, recv_req;
int dest = (rank + 1) % size;  // Ring topology: send to next rank
int src = (rank - 1 + size) % size;  // receive from previous rank

// Both start immediately without blocking
MPI_Isend(data, N, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD, &send_req);
MPI_Irecv(other_data, N, MPI_DOUBLE, src, 0, MPI_COMM_WORLD, &recv_req);

// Wait for operations to complete
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
MPI_Wait(&recv_req, MPI_STATUS_IGNORE);

This “ring” communication pattern allows each process to send and receive simultaneously without blocking.

3. Not Checking GPU-Aware Support

Not all HPC systems have GPU-aware MPI. Always check if it’s available before assuming your GPU pointers work with MPI:

bool has_gpu_aware_mpi = false;
#ifdef MPIX_CUDA_AWARE_SUPPORT
  has_gpu_aware_mpi = (MPIX_Query_cuda_support() == 1);
#endif

if (has_gpu_aware_mpi) {
  // Use GPU pointers directly with MPI
  MPI_Send(gpu_data.data(), N, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD);
} else {
  // Fall back: copy to CPU, send, copy back to GPU
  std::vector<double> host_buffer(N);
  Kokkos::deep_copy(host_buffer, gpu_data);
  MPI_Send(host_buffer.data(), N, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD);
}

This fallback approach ensures your code works on systems without GPU-aware MPI, though it will be slower.

Further Resources