Skip to content

DDP (Distributed Data Parallel) in PyTorch

This manual is from the PyTorch DDP tutorial. The code can be found here. This manual summarizes the changes needed when transitioning from a single GPU to multiple GPUs on a single node, both with and without torchrun, as well as multiple GPUs across multiple nodes. You can compare the code yourself using the diff command:

diff --color -U 0 multigpu.py multigpu_torchrun.py

Single node multiple GPU

Imports

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

Script function

def main(rank, world_size, other_args):
    ddp_setup(rank, world_size)
    # define dataset, model, optimizer, trainer
    destroy_process_group()


world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, other_args,), nprocs=world_size)

DDP setup

os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)

model

- self.model = model.to(gpu_id)
+ self.model = DDP(model, device_ids=[gpu_id])

data

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
-   shuffle=True,
+   shuffle=False,
+   sampler=DistributedSampler(train_dataset),
)

Shuffling across multiple epochs

Calling the set_epoch() method on the DistributedSampler at the beginning of each epoch is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be used in each epoch.

for epoch in epochs:
    train_data.sampler.set_epoch(epoch)
    for source, targets in train_data:

Save checkpoints

We only need to save model checkpoints from one process.

- ckp = model.state_dict()
+ ckp = model.module.state_dict()
- if epoch % save_every == 0:
+ if gpu_id == 0 and epoch % save_every == 0:

Slurm job

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks=2
#SBATCH --gpus=2
#SBATCH --cpus-per-task=18
#SBATCH --partition=gpu
#SBATCH -o test_multigpu_%j.out
python python_script.py arguments
# python multigpu.py 50 10

Single node multiple GPU with torchrun

Script function

-  world_size = torch.cuda.device_count()
-  mp.spawn(main, args=(world_size, other_args,), nprocs=world_size)
+  main(other_args)

DDP setup

torchrun provided environment variables os.environ["LOCAL_RANK"] for the GPU id:

gpu_id = int(os.environ["LOCAL_RANK"])
- os.environ["MASTER_ADDR"] = "localhost"
- os.environ["MASTER_PORT"] = "12355"
- init_process_group(backend="nccl", rank=rank, world_size=world_size)
+ init_process_group(backend="nccl")
+ torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))

Slurm job

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks=2
#SBATCH --gpus=2
#SBATCH --cpus-per-task=18
#SBATCH --partition=gpu
#SBATCH --time=00:05:00
#SBATCH -o test_multigpu_torchrun_%j.out    
torchrun --nnodes=1 --nproc_per_node=2 python_script.py arguments
# torchrun --nnodes=1 --nproc_per_node=2 multigpu_torchrun.py 50 10

Multi node multi GPU

The only diffeerence with previous one:

+ local_rank = int(os.environ["LOCAL_RANK"])
+ global_rank = int(os.environ["RANK"])
+ model = model.to(local_rank)

- local_rank = int(os.environ["LOCAL_RANK"]) # local_rank == gpu_id
- model = model.to(local_rank)

Slurm job

#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=4
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=18
#SBATCH --partition=gpu
#SBATCH --time=00:05:00
#SBATCH -o test_multinode_%j.out  
torchrun --nnodes=2 --nproc_per_node=4 python_script.py arguments
# torchrun --nnodes=2 --nproc_per_node=1 multigpu_torchrun.py 50 10

References

  • https://medium.com/pytorch/training-a-1-trillion-parameter-model-with-pytorch-fully-sharded-data-parallel-on-aws-3ac13aa96cff
  • https://medium.com/pytorch/pytorch-data-parallel-best-practices-on-google-cloud-6c8da2be180d
  • https://medium.com/pytorch/pytorch-sessions-at-nvidia-gtc-march-20-2023-b86210711c9b