Josh Gregory
  • Home
  • About
  • Projects
  • Blog
  • Resume

On this page

  • DDP Tutorial Notes
    • Links
    • High-level overview
    • Migrating single GPU code to DDP
  • Multi-GPU DDP Training with Fault-Tolerance
    • Running with Torchrun
  • Multinode DDP Training with Torchrun
    • Method 1: Run Torchrun on each machine
      • Common Troubleshooting
    • Method 2: Running torchnode on SLURM

PyTorch with Multiple GPUs

Notes
Deep learning
Machine learning
Artificial intelligence
How to implement DistributedDataParallel (DDP) in PyTorch
Author

Josh Gregory

Published

September 6, 2024

DDP Tutorial Notes

Links

PyTorch documentation

YouTube playlist

GitHub repo

High-level overview

When we launch a Distributed Data Parallel (DDP) process, DDP launches one process per GPU, where each GPU has its own local copy of the model. All replicas of the model and optimizers are identical. Everything uses the same random seed.

What we change here is the data. We get our InputBatch from the DataLoader, but this time we use something called DistributedSampler, which ensures that each GPU gets a chunk of the data inputs, all in parallel.

Each device gets a chunk of the data and locally runs the forward and backward pass. Because the devices have different data, running the optimizers wouldn’t make sense, since the gradients would be different. To help with this, DDP then runs a synchronization step, where all of the gradients are synchronized with each other.

Now each model has the same gradients. Then the optimizers are run.

Migrating single GPU code to DDP

Need a few new modules:

import torch.multiprocessing as mp

This is a wrapper around Python’s native multiprocessing

from torch.utils.data.distributed import DistributedSampler

This distributes our data across multiple GPUs

from torch.nn.parallel import DistributedDataParallel as DDP

Main workhorse function

from torch.distributed import init_process_group, destroy_process_group

These two functions initialize and destroy our distributed process group.

First thing we want to do is initialize the distributed process group. Can do this with a small function that takes in two parameters. The first one is world_size, which is the total number of processes in the group, and the rank is a unique number that is assigned to each process:

def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """

    # IP address of local machine that is running the process
    os.environ["MASTER_ADDR"] = "localhost" 
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

The backend argument for init_process_group being nccl is the default one for NVIDIA GPUs to let them use CUDA in a distributed fashion.

Here is our Trainer class as of now:

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int, 
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every

It remains mostly the same, but the model needs to be wrapped in the DDP class with the model and the device_ids, like so:

self.model = DDP(self.model, device_ids=[self.gpu_id])

To save the model properly, we also need to edit our _save_checkpoint function. As of now it is:

    def _save_checkpoint(self, epoch):
        ckp = self.model.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

We just need to change ckp = self.model.state_dict() to ckp = self.model.module.state_dict().

Another note: When we run our training class, if we save the model, we’re going to save a bunch of copies of the model, since they’re all synched (since DDP is launching the same processes, remember). We don’t want that, so for our train function, we want to save the model only from the rank 0 process. So go from this:

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if epoch % self.save_every == 0:
                self._save_checkpoint(epoch)

To this:

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)

We also need to change our DataLoader function, which as of now is

def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=True,
    )

This needs to be changed to

def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

We need to include the DistributedSampler to ensure that each input batch is chunked across each GPUs with no overlapping samples. Since we’re passing a sampler, we need to set shuffle to False.

Now we need to update our main function. Right now it’s this:

def main(device, total_epochs, save_every):
    dataset_model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size=32)
    trainer = Trainer(model, train_data, optimizer, device, save_every)
    trainer.train(total_epochs)

The first thing we need to do is add our distributed process group, so add

ddp_setup(rank, world_size)

We also need to switch device to rank, and at the end add destroy_process_group():

def main(rank: int, world_size: int, total_epochs: int, save_every: int):
    ddp_setup(rank, world_size)
    dataset_model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size=32)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()

Now we need to update our __main__ function. Right now it is

if __name__ == ""__main__"":
    import sys
    total_epochs = int(sys.argv[1])
    save_every = int(sys.argv[2])
    device = 0 # Shorthand for cuda:0
    main(device, total_epochs, save_every)

Change this to

if __name__ == ""__main__"":
    import sys
    total_epochs = int(sys.argv[1])
    save_every = int(sys.argv[2])
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, total_epochs, save_every), nprocs=world_size)

Multi-GPU DDP Training with Fault-Tolerance

When scaling up to multiple devices, performance is increased, but the risk of failure is also increased. A single process failure can throw the entire training process out of sync. PyTorch addresses this using Torchrun, whereby the training script takes snapshots of your training job at regular intervals, so if something goes wrong the code doesn’t shit itself. Torchrun will restart the processes and load the most recent snapshot instead of restarting from scratch. A snapshot includes the model’s state (similar to a checkpoint) but also includes any attributes of the job, like the learning rate scheduler, optimizer state, last epoch run, etc.

Let’s update our code to use torchrun. This is really convenient because it handles all of the environment variables under the hood. As of now our ddp_setup function is the following:

def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """

    # IP address of local machine that is running the process
    os.environ["MASTER_ADDR"] = "localhost" 
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

But we can remove the os.environ parts and let torchrun handle the rest. We just need to include the backend that we’re running on.

def ddp_setup():
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

We also need to modify the Trainer class. As of now, we’re passing the rank of the GPU. We don’t need to do that anymore, since Torchrun will have an environment variable called local_rank that we can use. Here’s the edited version:

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int, 
    ) -> None:
        self.gpu_id = int(os.environ["LOCAL_RANK"])
        self.model = model.to(self.gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

Now we need to add the fault-tolerance part:

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int, 
    ) -> None:
        self.gpu_id = int(os.environ["LOCAL_RANK"])
        self.model = model.to(self.gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_run = 0
        if os,path.exists(snapshot_path):
            print('Loading snapshot')
            self._load_snapshot(snapshot_path)
        self.model = DDP(self.model, device_ids=[self.gpu_id])

    def _load_snalshot(self, snapshot_path):
        snapshot = torch.load(snapshot_path)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_run = snapshot["EPOCHS_RUN"]
        print(f'Resuming training from snapshot at epoch {self.epochs_run}')

    def _save_snapshot(self, epoch):
        snapshot = {}
        snapshot["MODEL_STATE"] = self.model.module.state_dict()
        snapshot["EPOCHS_RUN"] = epoch
        torch.save(snapshot, "snapshot.pt")
        print(f'Epoch {epoch} | Training snapshot saved as snapshot.pt')

Now update the train def to start from epochs_run:

def train(self, max_epochs: int):
    for epoch in range(self.epochs_run, max_epochs):
        self._run_epoch(epoch)
        if self.gpu_id == 0 and epoch...

Everything else remains the same. We can remove the rank and world_size though:

def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
    ddp_setup()
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)
    destroy_process_group()

We can also remove the mp.spawn call in the __main__:

if __name__ == "__main__":
    import sys
    total_epochs = int(sys.argv[1])
    save_every = int(sys.argv[2])
    main(save_every, total_epochs)

Running with Torchrun

Run it as

torchrun --standalone --nproc_per_node=gpu name_of_file.py

Flags:

  • --standalone: This is a single-machine setup (single node)
  • --nproc_per_node: Number of GPUs per node. Would pass the number of GPUs available. Can also just pass gpu and let PyTorch figure it out and use all available GPUs.

Multinode DDP Training with Torchrun

Will be focusing on using multiple nodes with multiple GPUs to train.

Can deploy two ways:

  • Multiple jobs by submitting one job per node
  • Using a workload scheduler like SLURM

Torchrun makes things pretty convenient to move from multi-GPU to multiple nodes. Training code pretty much remains the same.

Will need to add a few things for clarity. In the Trainer class, will add a global_rank variable that has a unique identifier for each process across all of our nodes:

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int, 
    ) -> None:
        self.local_rank = int(os.environ["LOCAL_RANK"])
        self.global_rank = int(os.environ["RANK"])
        self.model = model.to(self.gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_run = 0
        if os,path.exists(snapshot_path):
            print('Loading snapshot')
            self._load_snapshot(snapshot_path)
        self.model = DDP(self.model, device_ids=[self.gpu_id])

    def _load_snalshot(self, snapshot_path):
        snapshot = torch.load(snapshot_path)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_run = snapshot["EPOCHS_RUN"]
        print(f'Resuming training from snapshot at epoch {self.epochs_run}')

    def _save_snapshot(self, epoch):
        snapshot = {}
        snapshot["MODEL_STATE"] = self.model.module.state_dict()
        snapshot["EPOCHS_RUN"] = epoch
        torch.save(snapshot, "snapshot.pt")
        print(f'Epoch {epoch} | Training snapshot saved as snapshot.pt')

If I have two machines with 4 GPUs each, the local rank will range from 0 to 3 on both machine 1 and machine 2, but the global ranks will be 0-3 for machine 1 (first node) and 4-7 on machine 2 (second node).

Will also update the _run_epoch function to use the global rank to know which process is printing:

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.local_rank)
            targets = targets.to(self.local_rank)
            self._run_batch(source, targets)

Method 1: Run Torchrun on each machine

Use the command:

torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --rdzv_id=456  --rdzv_backend=c10d --rdzv_endpoint=172.31.43.139:29603

where

  • --nproc_per_node is the number of GPUs we want to use on one specific node
  • --nnodes is the number of machines (or nodes)
  • node_rank is the rank of the machine/node. So if using two machines (--nnodes=2), the first machine would be set with node_rank=0 and the second one would be set with node_rank=1

We also need to specify rondezvous arguments so that all of the nodes can sync with each other, which should be identical for all of the nodes: * --rdzv_id is the rondezvous ID, which can be any random number * --rdzv_backend is the backend, recommended to be c10d. * --rdzv_endpoint is the IP address of any of the participating nodes. Recommended to choose a machine that has a high network bandwidth.

The command on the other machine is the same, but adapt nproc_per_node if necessary, and make sure to change the node_rank. Torchrun supports heterogeneous distributed training, so two machines can have a different number of GPUs.

Common Troubleshooting

  • Make sure that the nodes are able to communicate with each other over TCP.
  • Can explicitly pass the network to nccl socket via
export NCCL_SOCKET_IFNAME=eth0

(I think this should work, not entirely sure though)

Method 2: Running torchnode on SLURM

See here

Citation

BibTeX citation:
@online{gregory2024,
  author = {Gregory, Josh},
  title = {PyTorch with {Multiple} {GPUs}},
  date = {2024-09-06},
  url = {https://joshgregory42.github.io/posts/2024-09-06-ddp/},
  langid = {en}
}
For attribution, please cite this work as:
Gregory, Josh. 2024. “PyTorch with Multiple GPUs.” September 6, 2024. https://joshgregory42.github.io/posts/2024-09-06-ddp/.
  • © 2025 Josh Gregory