快捷方式

流水線並行

注意

torch.distributed.pipelining 目前處於 alpha 階段,仍在開發中。API 可能會發生變化。它從 PiPPy 專案遷移而來。

為何選擇流水線並行?

流水線並行 (Pipeline Parallelism) 是深度學習的**基本**並行化方法之一。它允許模型**執行**被分割,以便多個**微批次**可以併發執行模型程式碼的不同部分。流水線並行是一種有效的技術,適用於:

  • 大規模訓練

  • 頻寬受限的叢集

  • 大型模型推理

上述場景有一個共同點:每個裝置的計算無法隱藏傳統並行化的通訊開銷,例如 FSDP 的權重 all-gather。

什麼是 torch.distributed.pipelining

儘管流水線化在擴充套件方面很有前景,但實現起來通常很困難,因為它不僅需要分割模型權重,還需要**分割模型的執行**。分割執行通常需要對您的模型進行侵入性修改。複雜性的另一個方面在於**在分散式環境中排程微批次**,同時**考慮資料流依賴性**。

pipelining 包提供了一個工具包,可以**自動**完成上述任務,從而可以在**通用**模型上輕鬆實現流水線並行化。

它由兩部分組成:一個**分割前端**和一個**分散式執行時**。分割前端接收您原封不動的模型程式碼,將其分割成“模型分割槽”,並捕獲資料流關係。分散式執行時在不同裝置上並行執行流水線階段,處理微批次分割、排程、通訊和梯度傳播等任務。

總的來說,pipelining 包提供了以下特性:

  • 基於簡單規範分割模型程式碼。

  • 豐富支援流水線排程,包括 GPipe、1F1B、交錯 1F1B (Interleaved 1F1B) 和迴圈 BFS (Looped BFS),並提供基礎設施用於編寫自定義排程。

  • 一流地支援跨主機流水線並行化,因為 PP 通常在這種場景下使用(透過較慢的互連)。

  • 可與其他 PyTorch 並行化技術(如資料並行 (DDP, FSDP) 或張量並行)組合使用。TorchTitan 專案展示了 Llama 模型上的“3D 並行”應用。

步驟 1: 構建 PipelineStage

在使用 PipelineSchedule 之前,我們需要建立 PipelineStage 物件,這些物件封裝了在該階段執行的模型部分。PipelineStage 負責分配通訊緩衝區並建立傳送/接收操作來與其對等方通訊。它管理中間緩衝區,例如尚未被消耗的前向傳播輸出,並提供一個工具來執行階段模型的反向傳播。

PipelineStage 需要知道階段模型的輸入和輸出形狀,以便正確分配通訊緩衝區。形狀必須是靜態的,例如,在執行時形狀不能一步一步地改變。如果執行時形狀與預期形狀不匹配,將引發 PipeliningShapeError 類錯誤。與其他並行化技術組合或應用混合精度時,必須考慮這些技術,以便 PipelineStage 在執行時知道階段模組輸出的正確形狀(和 dtype)。

使用者可以透過傳入一個 nn.Module 例項直接構建 PipelineStage 例項,該例項代表應在該階段執行的模型部分。這可能需要修改原始模型程式碼。請參見Option 1: 手動分割模型中的示例。

或者,分割前端可以使用圖分割槽自動將模型分割成一系列 nn.Module。這種技術要求模型可以使用 torch.Export 進行跟蹤(traceable)。由此生成的 nn.Module 與其他並行化技術的組合性是實驗性的,可能需要一些變通方法。如果使用者無法輕鬆更改模型程式碼,使用此前端可能更具吸引力。有關更多資訊,請參見Option 2: 自動分割模型

步驟 2: 使用 PipelineSchedule 執行

現在我們可以將 PipelineStage 連線到流水線排程器,並使用輸入資料執行排程器。以下是一個 GPipe 示例:

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

請注意,上述程式碼需要在每個工作程序中啟動,因此我們使用啟動器服務來啟動多個程序。

torchrun --nproc_per_node=2 example.py

分割模型的選項

Option 1: 手動分割模型

要直接構建 PipelineStage,使用者需要負責提供一個單獨的 nn.Module 例項,該例項擁有相關的 nn.Parametersnn.Buffers,並定義一個 forward() 方法來執行與該階段相關的操作。例如,Torchtitan 中定義的 Transformer 類的一個精簡版本展示了一種構建易於分割槽模型的方式。

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers without affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以這種方式定義的模型可以很容易地按階段配置:首先初始化整個模型(使用 meta-device 避免 OOM 錯誤),刪除該階段不需要的層,然後建立一個封裝該模型的 PipelineStage。例如:

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
    )

與其他資料或模型並行化技術組合時,如果模型塊的輸出形狀/dtype 將受到影響,可能還需要 output_args

Option 2: 自動分割模型

如果您有一個完整的模型,並且不想花費時間將其修改成一系列“模型分割槽”,那麼 pipeline API 可以提供幫助。以下是一個簡要示例:

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果我們列印模型,可以看到多層層次結構,這使得手動分割變得困難

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

讓我們看看 pipeline API 是如何工作的

from torch.distributed.pipelining import pipeline, SplitPoint

# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])

pipe = pipeline(
    module=mod,
    mb_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

pipeline API 根據 split_spec 分割您的模型,其中 SplitPoint.BEGINNING 表示在 forward 函式中某個子模組執行*之前*新增分割點,類似地,SplitPoint.END 表示在*之後*新增分割點。

如果我們 print(pipe),可以看到

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

“模型分割槽”由子模組(submod_0, submod_1)表示,每個子模組都使用原始模型的運算、權重和層次結構重建。此外,還重建了一個“根級別”的 forward 函式,以捕獲這些分割槽之間的資料流。這種資料流稍後將由流水線執行時以分散式方式重放。

Pipe 物件提供了一個方法來檢索“模型分割槽”:

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

返回的 stage_mod 是一個 nn.Module,您可以使用它來建立最佳化器、儲存或載入檢查點,或應用其他並行化技術。

Pipe 還允許您在給定 ProcessGroup 的裝置上建立一個分散式階段執行時:

stage = pipe.build_stage(stage_idx, device, group)

或者,如果您想在修改 stage_mod 後再構建階段執行時,可以使用 build_stage API 的函式版本。例如:

from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel

dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)

注意

pipeline 前端使用一個跟蹤器(torch.export)將您的模型捕獲到一個單一圖中。如果您的模型不能進行完整圖捕獲(full-graph’able),您可以使用下面的手動前端。

Hugging Face 示例

該包最初創建於 PiPPy 倉庫,我們在其中保留了基於未經修改的 Hugging Face 模型的示例。請參見 examples/huggingface 目錄。

示例包括:

技術深入

pipeline API 如何分割模型?

首先,pipeline API 透過跟蹤模型將其轉化為有向無環圖 (DAG)。它使用 torch.export(一個 PyTorch 2 完整圖捕獲工具)來跟蹤模型。

然後,它將一個階段所需的操作和引數分組到一個重建的子模組中:submod_0, submod_1, …

Module.children() 等傳統的子模組訪問方法不同,pipeline API 不僅會剪下您模型的模組結構,還會剪下模型的 forward 函式。

這是必要的,因為 Module.children() 等模型結構僅捕獲 Module.__init__() 期間的資訊,而不捕獲有關 Module.forward() 的任何資訊。換句話說,Module.children() 缺乏對流水線化至關重要的以下方面資訊:

  • 子模組在 forward 中的執行順序

  • 子模組之間的啟用流

  • 子模組之間是否存在函式運算子(例如,Module.children() 不會捕獲 reluadd 運算)。

相反,pipeline API 確保 forward 行為被真正保留。它還捕獲分割槽之間的啟用流,幫助分散式執行時無需人工干預即可進行正確的傳送/接收呼叫。

pipeline API 的另一個靈活性在於,分割點可以在您的模型層次結構的任意級別上。在分割的分割槽中,與該分割槽相關的原始模型層次結構將被重建,對您而言沒有額外開銷。因此,指向子模組或引數的完全限定名稱 (FQN) 仍然有效,並且依賴於 FQN 的服務(例如 FSDP、TP 或檢查點)仍然可以與您分割槽後的模組一起執行,幾乎無需修改程式碼。

實現您自己的排程器

您可以透過擴充套件以下兩個類中的一個來實現您自己的流水線排程器:

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle 用於為每個 rank 僅分配一個階段的排程器。PipelineScheduleMulti 用於為每個 rank 分配多個階段的排程器。

例如,ScheduleGPipeSchedule1F1BPipelineScheduleSingle 的子類。而 ScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubblePipelineScheduleMulti 的子類。

日誌記錄

您可以使用 torch._logging 中的 TORCH_LOGS 環境變數來開啟額外的日誌記錄:

  • TORCH_LOGS=+pp 將顯示 logging.DEBUG 級別及以上的所有訊息。

  • TORCH_LOGS=pp 將顯示 logging.INFO 級別及以上的所有訊息。

  • TORCH_LOGS=-pp 將顯示 logging.WARNING 級別及以上的所有訊息。

API 參考

模型分割 API

以下一系列 API 將您的模型轉換為流水線表示形式。

class torch.distributed.pipelining.SplitPoint(value)[原始碼][原始碼]

列舉,表示子模組執行中可以發生分割的點。 :ivar BEGINNING: 表示在 forward 函式中某個子模組執行*之前*新增分割點。 :ivar END: 表示在 forward 函式中某個子模組執行*之後*新增分割點。

torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[原始碼][原始碼]

根據規範分割模組。

有關更多詳細資訊,請參見 Pipe

引數
返回型別

Pipe 的流水線表示。

class torch.distributed.pipelining.Pipe(split_gm, num_stages, has_loss_and_backward, loss_spec)[原始碼][原始碼]
torch.distributed.pipelining.pipe_split()[原始碼][原始碼]

pipe_split 是一個特殊運算子,用於標記模組中階段之間的邊界。它用於將模組分割成階段。如果您的標註模組在 eagerly 模式下執行,它是一個無操作 (no-op)。

示例

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

以上示例將被分割成兩個階段。

微批次實用工具

class torch.distributed.pipelining.microbatch.TensorChunkSpec(split_dim)[原始碼][原始碼]

用於指定輸入分塊的類

torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[原始碼][原始碼]

給定一系列 args 和 kwargs,根據它們各自的分塊規範將它們分割成多個塊。

引數
返回

分片 args 列表 kwargs_split:分片 kwargs 列表

返回型別

args_split

torch.distributed.pipelining.microbatch.merge_chunks(chunks, chunk_spec)[source][source]

給定一個塊列表,根據塊規範將它們合併為單個值。

引數
  • chunks (list[Any]) – 塊列表

  • chunk_spec – 這些塊的分塊規範

返回

合併後的值

返回型別

流水線階段

class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[source][source]

一個類,表示流水線並行設定中的一個流水線階段。

PipelineStage 假設模型是順序劃分的,即模型被分割成塊,其中一個塊的輸出作為下一個塊的輸入,沒有跳躍連線。

PipelineStage 透過按線性順序將 stage0 的輸出傳播到 stage1 等,自動執行執行時 shape/dtype 推斷。要繞過 shape 推斷,請將 input_argsoutput_args 傳遞給每個 PipelineStage 例項。

引數
  • submodule (nn.Module) – 此階段封裝的 PyTorch 模組。

  • stage_index (int) – 此階段的 ID。

  • num_stages (int) – 階段總數。

  • device (torch.device) – 此階段所在的裝置。

  • input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入引數。

  • output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出引數。

  • group (dist.ProcessGroup, optional) – 分散式訓練的程序組。如果為 None,則使用預設組。

  • dw_builder (Optional[Callable[[], Callable[..., None]]) – 如果提供,dw_builder 將構建一個新的 dw_runner 函式,該函式用於 F, I, W (前向, 輸入, 權重) 零氣泡排程中的 W 操作(輸入權重)。

torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source][source]

根據要由此階段封裝的 stage_module 和流水線資訊建立一個流水線階段。

引數
  • stage_module (torch.nn.Module) – 要由此階段封裝的模組

  • stage_index (int) – 此階段在流水線中的索引

  • pipe_info (PipeInfo) – 關於流水線的資訊,可以透過 pipe.info() 獲取

  • device (torch.device) – 此階段要使用的裝置

  • group (Optional[dist.ProcessGroup]) – 此階段要使用的程序組

返回

一個可以與 PipelineSchedules 一起執行的流水線階段。

返回型別

_PipelineStage

流水線排程器

class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

GPipe 排程器。將以填充-排空方式處理所有微批次。

class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

1F1B 排程器。將在穩態下對微批次執行一次前向傳播和一次反向傳播。

class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

交錯式 1F1B 排程器。詳情請參閱 https://arxiv.org/pdf/2104.04473。將在穩態下對微批次執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。當多個本地階段的微批次準備就緒時,交錯式 1F1B 優先處理較早的微批次(也稱為“深度優先”)。

這個排程器與原始論文基本相似。不同之處在於它放寬了 num_microbatch % pp_size == 0 的要求。使用 flex_pp 排程器,我們將得到 num_rounds = max(1, n_microbatches // pp_group_size),並且只要 n_microbatches % num_rounds 等於 0 即可工作。舉例說明,支援

  1. pp_group_size = 4, n_microbatches = 10。我們將得到 num_rounds = 2,並且 n_microbatches % 2 等於 0。

  2. pp_group_size = 4, n_microbatches = 3。我們將得到 num_rounds = 1,並且 n_microbatches % 1 等於 0。

class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None, scale_grads=True)[source][source]

廣度優先流水線並行。詳情請參閱 https://arxiv.org/abs/2211.05953。與交錯式 1F1B 類似,Looped BFS 支援每個 rank 多個階段。不同之處在於,當多個本地階段的微批次準備就緒時,Looped BFS 將優先處理較早的階段,一次性執行所有可用的微批次。

class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

交錯式零氣泡排程器。詳情請參閱 https://arxiv.org/pdf/2401.10241。將在穩態下對微批次的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。利用對權重的反向傳播來填充流水線氣泡。

特別地,這實現了論文中的 ZB1P 排程器。

class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

零氣泡排程器(ZBV 變體)。詳情請參閱 https://arxiv.org/pdf/2401.10241 的第 6 節。

這個排程器要求每個 rank 恰好有兩個階段。

這個排程器將在穩態下對微批次的輸入執行一次前向傳播和一次反向傳播,並支援每個 rank 多個階段。利用相對於權重的反向傳播來填充流水線氣泡。

只有當前向時間 == 輸入反向時間 == 權重反向時間時,這種 ZB-V 排程器才具有“零氣泡”特性。在實踐中,對於真實模型這不太可能成立,因此對於不相等/不平衡的時間可以另外實現一個貪心排程器。

class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

單階段排程器的基類。實現了 step 方法。派生類應該實現 _step_microbatches 方法。

梯度根據 scale_grads 引數按微批次數量進行縮放,預設為 True。此設定應與您的 loss_fn 配置匹配,loss_fn 可能對損失進行平均(scale_grads=True)或求和(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source][source]

使用完整批次輸入執行流水線排程的一個迭代。將自動把輸入分塊成微批次,並根據排程實現遍歷微批次。

args: 模型的 位置引數(與非流水線情況相同)。 kwargs: 模型的 關鍵詞引數(與非流水線情況相同)。 target: 損失函式的目標。 losses: 一個列表,用於儲存每個微批次的損失。

class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, use_full_backward=None, scale_grads=True)[source][source]

多階段排程器的基類。實現了 step 方法。

梯度根據 scale_grads 引數按微批次數量進行縮放,預設為 True。此設定應與您的 loss_fn 配置匹配,loss_fn 可能對損失進行平均(scale_grads=True)或求和(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source][source]

使用完整批次輸入執行流水線排程的一個迭代。將自動把輸入分塊成微批次,並根據排程實現遍歷微批次。

args: 模型的 位置引數(與非流水線情況相同)。 kwargs: 模型的 關鍵詞引數(與非流水線情況相同)。 target: 損失函式的目標。 losses: 一個列表,用於儲存每個微批次的損失。

文件

訪問 PyTorch 的完整開發者文件

檢視文件

教程

獲取針對初學者和高階開發者的深入教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源