管道平行化¶
注意事項
torch.distributed.pipelining 目前處於 alpha 狀態且正在開發中。API 可能會變更。它已從 PiPPy 專案遷移過來。
為什麼要使用管道平行化?¶
管道平行化是深度學習的基本平行化方法之一。它允許對模型的執行進行分割,以便多個微批次可以同時執行模型程式碼的不同部分。管道平行化對於以下情況可能是一種有效的技術
- 大規模訓練 
- 頻寬受限的叢集 
- 大型模型推論。 
上述情況都有一個共同點,即每個裝置的計算量無法掩蓋傳統平行化方法的通訊成本,例如 FSDP 的權重全收集。
什麼是 torch.distributed.pipelining?¶
雖然管道化在擴展方面很有希望,但它通常難以實作,因為除了模型權重之外,它還需要分割模型的執行。分割執行通常需要對模型進行侵入式的程式碼變更。另一個複雜性來自於在分散式環境中排程微批次,並考慮資料流依賴性。
pipelining 套件提供了一個工具包,可以自動完成上述工作,這使得在一般模型上輕鬆實作管道平行化成為可能。
它由兩部分組成:分割前端和分散式執行階段。分割前端會按原樣取得您的模型程式碼,將其分割成「模型分割區」,並擷取資料流關係。分散式執行階段會在不同的裝置上平行執行管道階段,處理微批次分割、排程、通訊和梯度傳播等工作。
總體而言,pipelining 套件提供了以下功能
- 根據簡單的規範分割模型程式碼。 
- 豐富的管道排程支援,包括 GPipe、1F1B、交錯式 1F1B 和迴圈式 BFS,並提供編寫自訂排程的基礎架構。 
- 對跨主機管道平行化的第一級支援,因為這是 PP 通常使用的地方(透過較慢的互連)。 
- 與其他 PyTorch 平行化技術(例如資料平行化 (DDP、FSDP) 或張量平行化)的可組合性。TorchTitan 專案展示了 Llama 模型上的「3D 平行化」應用程式。 
步驟 1:建置 PipelineStage 以進行執行¶
在我們可以使用 PipelineSchedule 之前,我們需要建立 PipelineStage 物件,這些物件會包裝在該階段執行的模型部分。PipelineStage 負責配置通訊緩衝區,並建立發送/接收運算以與其對等方通訊。它會管理中繼緩衝區,例如尚未被使用的正向傳播輸出的緩衝區,並且它提供了一個用於執行階段模型的反向傳播的工具。
PipelineStage 需要知道階段模型的輸入和輸出形狀,以便它可以正確配置通訊緩衝區。形狀必須是靜態的,例如在執行階段,形狀不能在每個步驟之間改變。如果執行階段形狀與預期形狀不符,則會引發 PipeliningShapeError 類別。當與其他平行化方法組合或套用混合精度時,必須考慮這些技術,以便 PipelineStage 知道執行階段階段模組輸出的正確形狀(和資料類型)。
使用者可以直接建構 PipelineStage 執行個體,方法是傳入代表應該在該階段執行的模型部分的 nn.Module。這可能需要變更原始模型程式碼。請參閱 選項 1:手動分割模型 中的範例。
或者,分割前端可以使用圖形分割將您的模型自動分割成一系列的 nn.Module。此技術需要模型可以使用 torch.Export 進行追蹤。產生的 nn.Module 與其他平行化技術的可組合性仍在實驗階段,可能需要一些解決方法。如果使用者無法輕鬆變更模型程式碼,則使用此前端可能更具吸引力。如需更多資訊,請參閱 選項 2:自動分割模型。
步驟 2:使用 PipelineSchedule 進行執行¶
我們現在可以將 PipelineStage 附加到管道排程,並使用輸入資料執行排程。以下是 GPipe 的範例
from torch.distributed.pipelining import ScheduleGPipe
# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)
# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)
# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()
請注意,上述程式碼需要為每個工作節點啟動,因此我們使用啟動器服務來啟動多個程序
torchrun --nproc_per_node=2 example.py
模型分割選項¶
選項 1:手動分割模型¶
若要直接建構 PipelineStage,使用者必須提供單一 nn.Module 實例,該實例擁有相關的 nn.Parameters 和 nn.Buffers,並定義執行該階段相關操作的 forward() 方法。例如,Torchtitan 中定義的 Transformer 類別的簡化版本顯示了建構易於分割模型的模式。
class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()
        self.tok_embeddings = nn.Embedding(...)
        # Using a ModuleDict lets us delete layers witout affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)
        self.output = nn.Linear(...)
    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)
        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output
可以透過先初始化整個模型(使用中繼裝置以避免 OOM 錯誤)、刪除該階段不需要的層級,然後建立包裝模型的 PipelineStage,輕鬆地為每個階段配置以此方式定義的模型。例如
with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"
    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()
    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None
    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]
    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )
PipelineStage 需要一個範例引數 input_args 來表示階段的執行階段輸入,這將是一個微批次的輸入資料。此引數會透過階段模組的 forward 方法傳遞,以確定通訊所需的輸入和輸出形狀。
與其他資料或模型平行技術組合時,如果模型區塊的輸出形狀/資料類型會受到影響,則可能也需要 output_args。
選項 2:自動分割模型¶
如果您有一個完整的模型,並且不想花時間將其修改為一系列「模型分割區」,則可以使用 pipeline API。以下是一個簡要範例
class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x
如果我們列印模型,可以看到多個階層,這使得難以手動分割
Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)
讓我們看看 pipeline API 如何運作
from torch.distributed.pipelining import pipeline, SplitPoint
x = torch.LongTensor([1, 2, 4, 5])
pipe = pipeline(
    module=mod,
    num_chunks=1,
    example_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)
pipeline API 會根據 split_spec 分割模型,其中 SplitPoint.BEGINNING 表示在 forward 函數中執行某些子模組「之前」新增分割點,同樣地,SplitPoint.END 表示「之後」新增分割點。
如果我們 print(pipe),可以看到
GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)
def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)
「模型分割區」由子模組(submod_0、submod_1)表示,每個子模組都使用原始模型操作和階層重建。此外,還會重建「根層級」forward 函數,以擷取這些分割區之間的資料流。稍後,管線執行階段會以分散式方式重播此資料流。
Pipe 物件提供了一個用於擷取「模型分割區」的方法
stage_mod : nn.Module = pipe.get_stage_module(stage_idx)
您也可以使用 Pipe 在裝置上建立分散式階段執行階段
stage = pipe.build_stage(stage_idx, device, group)
注意事項
pipeline 前端使用追蹤器(torch.export)將您的模型擷取到單一圖形中。如果您的模型無法完全繪製成圖形,則可以使用以下我們的手動前端。
Hugging Face 範例¶
在最初建立此套件的 PiPPy 儲存庫中,我們保留了基於未修改的 Hugging Face 模型的範例。請參閱 examples/huggingface 目錄。
範例包括
技術深入探討¶
pipeline API 如何分割模型?pipeline-api-如何分割模型?" title="Permalink to this heading">¶
首先,pipeline API 會透過追蹤模型將模型轉換為有向無環圖 (DAG)。它使用 torch.export(一種 PyTorch 2 完整圖形擷取工具)來追蹤模型。
然後,它將階段所需的「操作和參數」組合到一個重建的子模組中:submod_0、submod_1…
與傳統的子模組存取方法(例如 Module.children())不同,pipeline API 不僅會切割模型的模組結構,還會切割模型的「forward」函數。
這是必要的,因為像 Module.children() 這樣的模型結構僅擷取 Module.__init__() 期間的資訊,而不會擷取有關 Module.forward() 的任何資訊。換句話說,Module.children() 缺少有關管線化關鍵的以下方面的資訊
- forward中子模組的執行順序
- 子模組之間的啟動流 
- 子模組之間是否有任何函數運算子(例如, - relu或- add操作不會被- Module.children()擷取)。
相反地,pipeline API 可確保真正保留「forward」行為。它還會擷取分割區之間的啟動流,幫助分散式執行階段在沒有人為干預的情況下進行正確的發送/接收呼叫。
pipeline API 的另一個靈活性是分割點可以位於模型階層中的任意層級。在分割區中,與該分割區相關的原始模型階層將會重建,而不會產生任何成本。因此,指向子模組或參數的完整合格名稱 (FQN) 仍然有效,並且依賴 FQN 的服務(例如 FSDP、TP 或檢查點)仍然可以使用分割的模組執行,而幾乎不需要更改程式碼。
實作您自己的排程¶
您可以透過擴展以下兩個類別之一來實作您自己的管線排程
- PipelineScheduleSingle
- PipelineScheduleMulti
PipelineScheduleSingle 適用於每個等級「僅」指派一個階段的排程。PipelineScheduleMulti 適用於每個等級指派多個階段的排程。
例如,ScheduleGPipe 和 Schedule1F1B 是 PipelineScheduleSingle 的子類別。而 ScheduleInterleaved1F1B 和 ScheduleLoopedBFS 是 PipelineScheduleMulti 的子類別。
API 參考¶
模型分割 API¶
以下 API 集會將您的模型轉換為管線表示法。
- torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source]¶
- 根據規格分割模組。 - 如需更多詳細資訊,請參閱 Pipe。 - 參數
- module (Module) – 要分割的模組。 
- mb_kwargs (Optional[Dict[str, Any]]) – 範例關鍵字引數,採用微批次形式。(預設值:None) 
- split_spec (Optional[Dict[str, SplitPoint]]) – 使用子模組名稱作為分割標記的字典。(預設值:None) 
- split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用於分割模組的策略。(預設值:None) 
 
- 回傳型別
- Pipe 類別的 pipeline 表示法。 
 
微批次工具¶
Pipeline 階段¶
- class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args, output_args=None, group=None)[source]¶
- 表示 pipeline parallelism 設定中 pipeline 階段的類別。這個類別是透過提供範例輸入(以及可選的輸出)手動建立的,而不是由 pipeline() 輸出的 PipelineStage 類別。這個類別繼承自 _PipelineStageBase 類別,並且可以類似地在 PipelineScheule 中使用。 - 參數
- submodule (nn.Module) – 這個階段包裝的 PyTorch 模組。 
- stage_index (int) – 這個階段的 ID。 
- num_stages (int) – 階段總數。 
- device (torch.device) – 這個階段所在的裝置。 
- input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入引數。 
- output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出引數。 
- group (dist.ProcessGroup, optional) – 分散式訓練的處理程序群組。如果為 None,則為預設群組。 
 
 
- torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source]¶
- 建立一個 pipeline 階段,給定要由這個階段包裝的 stage_module 和 pipeline 資訊。 - 參數
- stage_module (torch.nn.Module) – 要由這個階段包裝的模組 
- stage_index (int) – 這個階段在 pipeline 中的索引 
- pipe_info (PipeInfo) – 關於 pipeline 的資訊,可以透過 pipe.info() 取得 
- device (torch.device) – 這個階段要使用的裝置 
- group (Optional[dist.ProcessGroup]) – 這個階段要使用的處理程序群組 
 
- 回傳值
- 可以與 PipelineSchedules 一起執行的 pipeline 階段。 
- 回傳型別
- _PipelineStage 
 
Pipeline 排程¶
- class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]¶
- GPipe 排程。將以填滿-排空的方式處理所有微批次。 
- class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]¶
- 1F1B 排程。將在穩定狀態下對微批次執行一次正向傳遞和一次反向傳遞。 
- class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]¶
- 交錯 1F1B 排程。如需詳細資訊,請參閱 https://arxiv.org/pdf/2104.04473。將在穩定狀態下對微批次執行一次正向傳遞和一次反向傳遞,並支援每個秩有多個階段。當微批次準備好進行多個本地階段時,交錯 1F1B 優先考慮較早的微批次(也稱為「深度優先」)。 
- class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None)[source]¶
- 廣度優先管道平行。如需詳細資訊,請參閱 https://arxiv.org/abs/2211.05953。與交錯 1F1B 類似,迴圈 BFS 支援每個秩有多個階段。不同之處在於,當微批次準備好進行多個本地階段時,迴圈 BFS 將優先考慮較早的階段,一次執行所有可用的微批次。 
- class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]¶
- 單階段排程的基類。實作 step 方法。衍生類別應實作 _step_microbatches。