• 文件 >
  • 分佈式資料平行
捷徑

分佈式資料平行

警告

torch.nn.parallel.DistributedDataParallel 的實作會隨著時間而演進。本設計說明是根據 v1.4 的狀態撰寫的。

torch.nn.parallel.DistributedDataParallel (DDP) 會透明地執行分佈式資料平行訓練。本頁面說明其運作方式並揭露實作細節。

範例

讓我們從一個簡單的 torch.nn.parallel.DistributedDataParallel 範例開始。此範例使用 torch.nn.Linear 作為本地模型,使用 DDP 包裝它,然後在 DDP 模型上執行一次正向傳遞、一次反向傳遞和一個優化器步驟。之後,本地模型上的參數將會更新,並且不同行程上的所有模型應該完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 可與 TorchDynamo 搭配使用。與 TorchDynamo 搭配使用時,請在編譯模型之前套用 DDP 模型包裝器,以便 torchdynamo 可以根據 DDP 儲存區大小套用 DDPOptimizer(圖形中斷最佳化)。(如需詳細資訊,請參閱 TorchDynamo DDPOptimizer。)

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

內部設計

本節將深入探討 torch.nn.parallel.DistributedDataParallel 的內部運作方式,詳細說明一次迭代中每個步驟的細節。

  • **先決條件**:DDP 仰賴 c10d ProcessGroup 進行通訊。因此,應用程式必須在建構 DDP 之前建立 ProcessGroup 執行個體。

  • **建構**:DDP 建構函式會取得本地模組的參考,並將排名為 0 的行程中的 state_dict() 廣播到群組中的所有其他行程,以確保所有模型複本都從完全相同的狀態開始。然後,每個 DDP 行程都會建立一個本地的 Reducer,它稍後會在反向傳遞期間處理梯度同步。為了提高通訊效率,Reducer 會將參數梯度組織到儲存區中,並一次減少一個儲存區。儲存區大小可以透過設定 DDP 建構函式中的 bucket_cap_mb 參數來設定。參數梯度到儲存區的映射是在建構時確定的,具體取決於儲存區大小限制和參數大小。模型參數會以給定模型中 Model.parameters() 的(大致)相反順序分配到儲存區中。使用相反順序的原因是,DDP 預計梯度會在反向傳遞期間以大致相同的順序準備就緒。下圖顯示了一個範例。請注意,grad0grad1 位於 bucket1 中,而另外兩個梯度位於 bucket0 中。當然,這個假設可能並不總是正確的,當發生這種情況時,它可能會損害 DDP 反向速度,因為 Reducer 無法在最早的可能時間開始通訊。除了儲存區之外,Reducer 還會在建構期間註冊自動梯度鉤子,每個參數一個鉤子。當梯度準備就緒時,這些鉤子將在反向傳遞期間被觸發。

  • **正向傳遞**:DDP 接收輸入並將其傳遞給本地模型,然後分析本地模型的輸出(如果 find_unused_parameters 設定為 True)。此模式允許在模型的子圖上執行反向傳遞,DDP 透過從模型輸出遍歷自動梯度圖並將所有未使用的參數標記為已準備好進行縮減,來找出哪些參數參與了反向傳遞。在反向傳遞期間,Reducer 只會等待未準備好的參數,但它仍然會縮減所有儲存區。將參數梯度標記為已準備好並不能幫助 DDP 跳過儲存區(目前),但它會阻止 DDP 在反向傳遞期間永遠等待不存在的梯度。請注意,遍歷自動梯度圖會引入額外的開銷,因此應用程式只有在必要時才應將 find_unused_parameters 設定為 True

  • 反向傳遞backward() 函數會直接在 DDP 無法控制的損失 Tensor 上被呼叫,而 DDP 會使用建構時期註冊的 autograd 鉤子來觸發梯度同步。當一個梯度準備就緒時,其在該梯度累加器上對應的 DDP 鉤子將會觸發,然後 DDP 會將該參數梯度標記為可進行縮減。當一個桶中的梯度都準備就緒時,Reducer 會在該桶上啟動非同步的 allreduce,以計算所有處理序中梯度的平均值。當所有桶都準備就緒時,Reducer 會封鎖並等待所有 allreduce 操作完成。完成後,平均梯度會被寫入所有參數的 param.grad 欄位。因此,在反向傳遞之後,不同 DDP 處理序中相同對應參數上的 grad 欄位應該會相同。

  • 優化器步驟:從優化器的角度來看,它正在優化本地模型。所有 DDP 處理序上的模型複本都可以保持同步,因為它們都是從相同的狀態開始,並且在每次迭代中都具有相同的平均梯度。

ddp_grad_sync.png

備註

DDP 要求所有處理序上的 Reducer 實例以完全相同的順序呼叫 allreduce,這是透過始終按照桶索引順序(而不是實際的桶準備就緒順序)執行 allreduce 來完成的。處理序之間不匹配的 allreduce 順序可能會導致錯誤結果或 DDP 反向傳遞掛起。

實作

以下是 DDP 實作組件的指標。堆疊圖顯示了程式碼的結構。

處理序群組

  • ProcessGroup.hpp:包含所有處理序群組實作的抽象 API。c10d 函式庫提供了 3 個開箱即用的實作,分別是 ProcessGroupGlooProcessGroupNCCLProcessGroupMPIDistributedDataParallel 使用 ProcessGroup::broadcast() 在初始化期間將模型狀態從等級為 0 的處理序發送到其他處理序,並使用 ProcessGroup::allreduce() 加總梯度。

  • Store.hpp:協助處理序群組實例的會合服務找到彼此。

分散式資料平行

  • distributed.py:是 DDP 的 Python 進入點。它實作了初始化步驟和 nn.parallel.DistributedDataParallel 模組的 forward 函數,這些函數會呼叫 C++ 函式庫。當一個 DDP 處理序在多個裝置上運作時,其 _sync_param 函數會執行處理序內參數同步,並且它還會將模型緩衝區從等級為 0 的處理序廣播到所有其他處理序。處理序間參數同步發生在 Reducer.cpp 中。

  • comm.h:實作了合併廣播輔助函數,該函數會在初始化期間被呼叫以廣播模型狀態,並在正向傳遞之前同步模型緩衝區。

  • reducer.h:提供了反向傳遞中梯度同步的核心實作。它具有三個進入點函數

    • Reducer:建構函數會在 distributed.py 中被呼叫,它會將 Reducer::autograd_hook() 註冊到梯度累加器。

    • 當梯度準備就緒時,autograd 引擎將會呼叫 autograd_hook() 函數。

    • prepare_for_backward() 會在 distributed.py 中的 DDP 正向傳遞結束時被呼叫。當 DDP 建構函數中的 find_unused_parameters 設定為 True 時,它會遍歷 autograd 圖形以查找未使用的參數。

ddp_code.png

TorchDynamo DDPOptimizer

DDP 的效能優勢來自於在反向傳播期間將 allreduce 集體運算與計算重疊。當與 TorchDynamo 一起使用以編譯整個正向和反向圖形時,AotAutograd 會阻止這種重疊,因為 allreduce 操作是由 autograd 鉤子在整個優化的反向計算完成_之後_啟動的。

TorchDynamo 的 DDPOptimizer 透過在反向傳播期間在 DDP 的 allreduce 桶的邏輯邊界處斷開正向圖形來提供幫助。注意:目標是在反向傳播期間斷開圖形,而最簡單的實作是斷開正向圖形,然後在每個區段上呼叫 AotAutograd 和編譯。這允許 DDP 的 allreduce 鉤子在反向傳播的區段之間觸發,並排程通訊以與計算重疊。

如需更深入的說明和實驗結果,請參閱此部落格文章,或閱讀 torch/_dynamo/optimizations/distributed.py 中的文件和程式碼

若要偵錯 DDPOptimizer,請設定 TORCH_LOGS=’ddp_graphs’ 以取得完整的圖形傾印。對於沒有圖形的日誌,請將 ‘dynamo’、’distributed’ 或 ‘dist_ddp’ 中的任何一個新增至 TORCH_LOGS(以取得有關桶邊界的基本資訊)。若要停用 DDPOptimizer,請設定 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 應該仍然可以在沒有 DDPOptimizer 的情況下正常運作,但效能會下降。

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並取得問題解答

查看資源