PyTorch with Multiple GPUs
DDP Tutorial Notes
Links
High-level overview
When we launch a Distributed Data Parallel (DDP) process, DDP launches one process per GPU, where each GPU has its own local copy of the model. All replicas of the model and optimizers are identical. Everything uses the same random seed.
What we change here is the data. We get our InputBatch
from the DataLoader
, but this time we use something called DistributedSampler
, which ensures that each GPU gets a chunk of the data inputs, all in parallel.
Each device gets a chunk of the data and locally runs the forward and backward pass. Because the devices have different data, running the optimizers wouldn’t make sense, since the gradients would be different. To help with this, DDP then runs a synchronization step, where all of the gradients are synchronized with each other.
Now each model has the same gradients. Then the optimizers are run.
Migrating single GPU code to DDP
Need a few new modules:
import torch.multiprocessing as mp
This is a wrapper around Python’s native multiprocessing
from torch.utils.data.distributed import DistributedSampler
This distributes our data across multiple GPUs
from torch.nn.parallel import DistributedDataParallel as DDP
Main workhorse function
from torch.distributed import init_process_group, destroy_process_group
These two functions initialize and destroy our distributed process group.
First thing we want to do is initialize the distributed process group. Can do this with a small function that takes in two parameters. The first one is world_size
, which is the total number of processes in the group, and the rank
is a unique number that is assigned to each process:
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
# IP address of local machine that is running the process
"MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
os.environ[="nccl", rank=rank, world_size=world_size) init_process_group(backend
The backend
argument for init_process_group
being nccl
is the default one for NVIDIA GPUs to let them use CUDA in a distributed fashion.
Here is our Trainer
class as of now:
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,int,
gpu_id: int,
save_every: -> None:
) self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
It remains mostly the same, but the model needs to be wrapped in the DDP
class with the model and the device_ids, like so:
self.model = DDP(self.model, device_ids=[self.gpu_id])
To save the model properly, we also need to edit our _save_checkpoint
function. As of now it is:
def _save_checkpoint(self, epoch):
= self.model.state_dict()
ckp = "checkpoint.pt"
PATH
torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
We just need to change ckp = self.model.state_dict()
to ckp = self.model.module.state_dict()
.
Another note: When we run our training class, if we save the model, we’re going to save a bunch of copies of the model, since they’re all synched (since DDP is launching the same processes, remember). We don’t want that, so for our train
function, we want to save the model only from the rank 0 process. So go from this:
def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)
To this:
def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)
We also need to change our DataLoader function, which as of now is
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,=batch_size,
batch_size=True,
pin_memory=True,
shuffle )
This needs to be changed to
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,=batch_size,
batch_size=True,
pin_memory=False,
shuffle=DistributedSampler(dataset)
sampler )
We need to include the DistributedSampler
to ensure that each input batch is chunked across each GPUs with no overlapping samples. Since we’re passing a sampler
, we need to set shuffle
to False
.
Now we need to update our main
function. Right now it’s this:
def main(device, total_epochs, save_every):
= load_train_objs()
dataset_model, optimizer = prepare_dataloader(dataset, batch_size=32)
train_data = Trainer(model, train_data, optimizer, device, save_every)
trainer trainer.train(total_epochs)
The first thing we need to do is add our distributed process group, so add
ddp_setup(rank, world_size)
We also need to switch device
to rank
, and at the end add destroy_process_group()
:
def main(rank: int, world_size: int, total_epochs: int, save_every: int):
ddp_setup(rank, world_size)= load_train_objs()
dataset_model, optimizer = prepare_dataloader(dataset, batch_size=32)
train_data = Trainer(model, train_data, optimizer, rank, save_every)
trainer
trainer.train(total_epochs) destroy_process_group()
Now we need to update our __main__
function. Right now it is
if __name__ == ""__main__"":
import sys
= int(sys.argv[1])
total_epochs = int(sys.argv[2])
save_every = 0 # Shorthand for cuda:0
device main(device, total_epochs, save_every)
Change this to
if __name__ == ""__main__"":
import sys
= int(sys.argv[1])
total_epochs = int(sys.argv[2])
save_every = torch.cuda.device_count()
world_size =(world_size, total_epochs, save_every), nprocs=world_size) mp.spawn(main, args
Multi-GPU DDP Training with Fault-Tolerance
When scaling up to multiple devices, performance is increased, but the risk of failure is also increased. A single process failure can throw the entire training process out of sync. PyTorch addresses this using Torchrun
, whereby the training script takes snapshots of your training job at regular intervals, so if something goes wrong the code doesn’t shit itself. Torchrun will restart the processes and load the most recent snapshot instead of restarting from scratch. A snapshot includes the model’s state (similar to a checkpoint) but also includes any attributes of the job, like the learning rate scheduler, optimizer state, last epoch run, etc.
Let’s update our code to use torchrun
. This is really convenient because it handles all of the environment variables under the hood. As of now our ddp_setup
function is the following:
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
# IP address of local machine that is running the process
"MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
os.environ[="nccl", rank=rank, world_size=world_size) init_process_group(backend
But we can remove the os.environ
parts and let torchrun handle the rest. We just need to include the backend that we’re running on.
def ddp_setup():
="nccl", rank=rank, world_size=world_size) init_process_group(backend
We also need to modify the Trainer
class. As of now, we’re passing the rank of the GPU. We don’t need to do that anymore, since Torchrun will have an environment variable called local_rank
that we can use. Here’s the edited version:
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,int,
save_every: -> None:
) self.gpu_id = int(os.environ["LOCAL_RANK"])
self.model = model.to(self.gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP(model, device_ids=[gpu_id])
Now we need to add the fault-tolerance part:
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,int,
save_every: -> None:
) self.gpu_id = int(os.environ["LOCAL_RANK"])
self.model = model.to(self.gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
if os,path.exists(snapshot_path):
print('Loading snapshot')
self._load_snapshot(snapshot_path)
self.model = DDP(self.model, device_ids=[self.gpu_id])
def _load_snalshot(self, snapshot_path):
= torch.load(snapshot_path)
snapshot self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f'Resuming training from snapshot at epoch {self.epochs_run}')
def _save_snapshot(self, epoch):
= {}
snapshot "MODEL_STATE"] = self.model.module.state_dict()
snapshot["EPOCHS_RUN"] = epoch
snapshot["snapshot.pt")
torch.save(snapshot, print(f'Epoch {epoch} | Training snapshot saved as snapshot.pt')
Now update the train
def to start from epochs_run
:
def train(self, max_epochs: int):
for epoch in range(self.epochs_run, max_epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch...
Everything else remains the same. We can remove the rank and world_size though:
def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
ddp_setup()= load_train_objs()
dataset, model, optimizer = prepare_dataloader(dataset, batch_size)
train_data = Trainer(model, train_data, optimizer, save_every, snapshot_path)
trainer
trainer.train(total_epochs) destroy_process_group()
We can also remove the mp.spawn
call in the __main__
:
if __name__ == "__main__":
import sys
= int(sys.argv[1])
total_epochs = int(sys.argv[2])
save_every main(save_every, total_epochs)
Running with Torchrun
Run it as
torchrun --standalone --nproc_per_node=gpu name_of_file.py
Flags:
--standalone
: This is a single-machine setup (single node)--nproc_per_node
: Number of GPUs per node. Would pass the number of GPUs available. Can also just passgpu
and let PyTorch figure it out and use all available GPUs.
Multinode DDP Training with Torchrun
Will be focusing on using multiple nodes with multiple GPUs to train.
Can deploy two ways:
- Multiple jobs by submitting one job per node
- Using a workload scheduler like SLURM
Torchrun makes things pretty convenient to move from multi-GPU to multiple nodes. Training code pretty much remains the same.
Will need to add a few things for clarity. In the Trainer
class, will add a global_rank
variable that has a unique identifier for each process across all of our nodes:
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,int,
save_every: -> None:
) self.local_rank = int(os.environ["LOCAL_RANK"])
self.global_rank = int(os.environ["RANK"])
self.model = model.to(self.gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
if os,path.exists(snapshot_path):
print('Loading snapshot')
self._load_snapshot(snapshot_path)
self.model = DDP(self.model, device_ids=[self.gpu_id])
def _load_snalshot(self, snapshot_path):
= torch.load(snapshot_path)
snapshot self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f'Resuming training from snapshot at epoch {self.epochs_run}')
def _save_snapshot(self, epoch):
= {}
snapshot "MODEL_STATE"] = self.model.module.state_dict()
snapshot["EPOCHS_RUN"] = epoch
snapshot["snapshot.pt")
torch.save(snapshot, print(f'Epoch {epoch} | Training snapshot saved as snapshot.pt')
If I have two machines with 4 GPUs each, the local rank will range from 0 to 3 on both machine 1 and machine 2, but the global ranks will be 0-3 for machine 1 (first node) and 4-7 on machine 2 (second node).
Will also update the _run_epoch
function to use the global rank to know which process is printing:
def _run_epoch(self, epoch):
= len(next(iter(self.train_data))[0])
b_sz print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
= source.to(self.local_rank)
source = targets.to(self.local_rank)
targets self._run_batch(source, targets)
Method 1: Run Torchrun on each machine
Use the command:
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --rdzv_id=456 --rdzv_backend=c10d --rdzv_endpoint=172.31.43.139:29603
where
--nproc_per_node
is the number of GPUs we want to use on one specific node--nnodes
is the number of machines (or nodes)node_rank
is the rank of the machine/node. So if using two machines (--nnodes=2
), the first machine would be set withnode_rank=0
and the second one would be set withnode_rank=1
We also need to specify rondezvous arguments so that all of the nodes can sync with each other, which should be identical for all of the nodes: * --rdzv_id
is the rondezvous ID, which can be any random number * --rdzv_backend
is the backend, recommended to be c10d
. * --rdzv_endpoint
is the IP address of any of the participating nodes. Recommended to choose a machine that has a high network bandwidth.
The command on the other machine is the same, but adapt nproc_per_node
if necessary, and make sure to change the node_rank
. Torchrun supports heterogeneous distributed training, so two machines can have a different number of GPUs.
Common Troubleshooting
- Make sure that the nodes are able to communicate with each other over TCP.
- Can explicitly pass the network to nccl socket via
export NCCL_SOCKET_IFNAME=eth0
(I think this should work, not entirely sure though)
Method 2: Running torchnode on SLURM
See here
Citation
@online{gregory2024,
author = {Gregory, Josh},
title = {PyTorch with {Multiple} {GPUs}},
date = {2024-09-06},
url = {https://joshgregory42.github.io/posts/2024-09-06-ddp/},
langid = {en}
}