• 教程 >
  • 使用 DDP 進行多 GPU 訓練
快捷方式

入門 || 什麼是 DDP || 單節點多 GPU 訓練 || 容錯 || 多節點訓練 || minGPT 訓練

使用 DDP 進行多 GPU 訓練

創建於:2022 年 9 月 27 日 | 最後更新:2024 年 11 月 03 日 | 最後驗證:未驗證

作者:Suraj Subramanian

你將學到什麼
  • 如何透過 DDP 將單 GPU 訓練指令碼遷移到多 GPU

  • 設定分散式程序組

  • 在分散式設定中儲存和載入模型

GitHub 上檢視本教程使用的程式碼

前提條件
  • DDP 工作原理的高階概述

  • 一臺具有多個 GPU 的機器(本教程使用 AWS p3.8xlarge 例項)

  • 安裝 CUDA 的 PyTorch

觀看下方影片或在 youtube 上同步學習。

前一篇教程中,我們對 DDP 的工作原理有了一個高階概述;現在我們來看如何在程式碼中使用 DDP。在本教程中,我們將從一個單 GPU 訓練指令碼開始,並將其遷移到單節點上執行在 4 個 GPU 上。在此過程中,我們將討論分散式訓練中的重要概念,並在程式碼中實現它們。

注意

如果你的模型包含任何 BatchNorm 層,則需要將其轉換為 SyncBatchNorm,以便在副本之間同步 BatchNorm 層的執行統計資訊。

使用輔助函式 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 將模型中的所有 BatchNorm 層轉換為 SyncBatchNorm

single_gpu.pymultigpu.py 的區別

這些是你在單 GPU 訓練指令碼中通常需要進行的更改,以啟用 DDP。

匯入

  • torch.multiprocessing 是 PyTorch 對 Python 原生多程序的封裝

  • 分散式程序組包含所有可以相互通訊和同步的程序。

import torch
import torch.nn.functional as F
from utils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

構建程序組

  • 首先,在初始化程序組之前,呼叫 set_device,它為每個程序設定預設 GPU。這對於防止 GPU:0 上的掛起或過度記憶體佔用非常重要。

  • 程序組可以透過 TCP(預設)或共享檔案系統進行初始化。請閱讀更多關於程序組初始化的資訊

  • init_process_group 初始化分散式程序組。

  • 閱讀更多關於選擇 DDP 後端的資訊

def ddp_setup(rank: int, world_size: int):
   """
   Args:
       rank: Unique identifier of each process
      world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"
   os.environ["MASTER_PORT"] = "12355"
   torch.cuda.set_device(rank)
   init_process_group(backend="nccl", rank=rank, world_size=world_size)

構建 DDP 模型

self.model = DDP(model, device_ids=[gpu_id])

分佈輸入資料

  • DistributedSampler 將輸入資料分塊分發到所有分散式程序。

  • The DataLoader 結合了資料集和

    取樣器,併為給定資料集提供一個可迭代物件。

  • 每個程序將接收一個包含 32 個樣本的輸入批次;有效批次大小是 32 * nprocs,在使用 4 個 GPU 時為 128。

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,  # We don't shuffle
    sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
)
  • 在每個 epoch 開始時呼叫 DistributedSampler 上的 set_epoch() 方法是必要的,以確保在多個 epoch 中正確地進行洗牌。否則,每個 epoch 將使用相同的順序。

def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # call this additional line at every epoch
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

儲存模型檢查點

  • 我們只需要從一個程序儲存模型檢查點。如果沒有這個條件,每個程序都會儲存一份相同的模型副本。閱讀更多關於使用 DDP 儲存和載入模型的資訊此處

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
  self._save_checkpoint(epoch)

警告

Collective calls 是在所有分散式程序上執行的函式,用於將某些狀態或值收集到特定程序。集體呼叫要求所有 rank 都執行集體程式碼。在此示例中,_save_checkpoint 不應包含任何集體呼叫,因為它僅在 rank:0 程序上執行。如果你需要進行任何集體呼叫,應在 if self.gpu_id == 0 檢查之前進行。

執行分散式訓練任務

  • 包含新的引數 rank(替換 device)和 world_size

  • 呼叫 mp.spawn 時,rank 由 DDP 自動分配。

  • world_size 是整個訓練任務中的程序數。對於 GPU 訓練,這對應於使用的 GPU 數量,並且每個程序在一個專用的 GPU 上工作。

- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+  ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
-  trainer = Trainer(model, train_data, optimizer, device, save_every)
+  trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
+  destroy_process_group()

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
-  device = 0      # shorthand for cuda:0
-  main(device, total_epochs, save_every)
+  world_size = torch.cuda.device_count()
+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

程式碼示例如下

延伸閱讀

文件

訪問 PyTorch 全面開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深入教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源