• 教程 >
  • 使用 C++ 擴充套件定製程序組後端
快捷方式

使用 C++ 擴充套件定製程序組後端

建立日期:2022 年 2 月 1 日 | 最後更新:2024 年 11 月 14 日 | 最後驗證:2024 年 11 月 5 日

作者: Howard Huang, Feng Tian, Shen Li, Min Si

注意

editgithub 上檢視和編輯此教程。

前提條件

本教程演示瞭如何實現一個自定義的 Backend,並使用 C++ 擴充套件將其插入到 PyTorch 分散式包中。當你需要為特定硬體提供專門的軟體棧,或者想要嘗試新的集體通訊演算法時,這會很有幫助。

基礎知識

PyTorch 集體通訊為幾個廣泛採用的分散式訓練功能提供了支援,包括 DistributedDataParallelZeroRedundancyOptimizer。為了使相同的集體通訊 API 適用於不同的通訊後端,分散式包將集體通訊操作抽象為一個 Backend 類。然後可以使用首選的第三方庫將不同的後端實現為 Backend 的子類。PyTorch 分散式帶有三個預設後端:ProcessGroupNCCLProcessGroupGlooProcessGroupMPI。然而,除了這三個後端之外,還有其他的通訊庫(例如 UCCOneCCL)、不同型別的硬體(例如 TPUTrainum)以及新興的通訊演算法(例如 HerringReduction Server)。因此,分散式包暴露了擴充套件 API,以允許定製集體通訊後端。

下面的 4 個步驟展示瞭如何實現一個 dummy Backend 後端,並在 Python 應用程式程式碼中使用它。請注意,本教程側重於演示擴充套件 API,而不是開發一個功能齊全的通訊後端。因此,這個 dummy 後端只涵蓋了 API 的一部分(all_reduceall_gather),並且簡單地將 tensor 的值設定為 0。

步驟 1:實現 Backend 的子類

第一步是實現一個 Backend 子類,該子類覆蓋目標集體通訊 API 並執行自定義通訊演算法。該擴充套件還需要實現一個 Work 子類,該子類充當通訊結果的未來,並允許在應用程式程式碼中進行非同步執行。如果該擴充套件使用第三方庫,它可以包含標頭檔案並從 BackendDummy 子類呼叫庫 API。下面的兩個程式碼片段展示了 dummy.hdummy.cpp 的實現。有關完整實現,請參閱 dummy collectives 倉庫。

// file name: dummy.hpp
#include <torch/python.h>

#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>

#include <pybind11/chrono.h>

namespace c10d {

class BackendDummy : public Backend {
  public:
    BackendDummy(int rank, int size);

    c10::intrusive_ptr<Work> allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& opts = AllgatherOptions()) override;

    c10::intrusive_ptr<Work> allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts = AllreduceOptions()) override;

    // The collective communication APIs without a custom implementation
    // will error out if invoked by application code.
};

class WorkDummy : public Work {
  public:
    WorkDummy(
      OpType opType,
      c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
      : Work(
          -1, // rank, only used by recvAnySource, irrelevant in this demo
          opType),
      future_(std::move(future)) {}
    bool isCompleted() override;
    bool isSuccess() const override;
    bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
    virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;

  private:
    c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"

namespace c10d {

// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& /* unused */) {
    for (auto& outputTensorVec : outputTensors) {
        for (auto& outputTensor : outputTensorVec) {
            outputTensor.zero_();
        }
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
    future->markCompleted(c10::IValue(outputTensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}

// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts) {
    for (auto& tensor : tensors) {
        tensor.zero_();
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::TensorType::get()));
    future->markCompleted(c10::IValue(tensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d

步驟 2:暴露擴充套件的 Python API

後端建構函式是從 Python 端呼叫的,因此該擴充套件還需要向 Python 暴露建構函式 API。可以透過新增以下方法來完成。在此示例中,storetimeoutBackendDummy 例項化方法忽略,因為它們在此 dummy 實現中未使用。然而,實際的擴充套件應該考慮使用 store 進行 rendezvous 並支援 timeout 引數。

// file name: dummy.hpp
class BackendDummy : public Backend {
    ...
    <Step 1 code>
    ...

    static c10::intrusive_ptr<Backend> createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& store,
        int rank,
        int size,
        const std::chrono::duration<float>& timeout);

    static void BackendDummyConstructor() __attribute__((constructor)) {
        py::object module = py::module::import("torch.distributed");
        py::object register_backend =
            module.attr("Backend").attr("register_backend");
        // torch.distributed.Backend.register_backend will add `dummy` as a
        // new valid backend.
        register_backend("dummy", py::cpp_function(createBackendDummy));
    }
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& /* unused */,
        int rank,
        int size,
        const std::chrono::duration<float>& /* unused */) {
    return c10::make_intrusive<BackendDummy>(rank, size);
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
    m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}

步驟 3:構建自定義擴充套件

現在,擴充套件的原始碼檔案已準備就緒。然後我們可以使用 C++ 擴充套件來構建它。為此,建立一個 setup.py 檔案,該檔案準備路徑和命令。然後呼叫 python setup.py develop 來安裝擴充套件。

如果該擴充套件依賴於第三方庫,你還可以向 C++ 擴充套件 API 指定 libraries_dirslibraries。有關實際示例,請參閱 torch ucc 專案。

# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension

sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]

if torch.cuda.is_available():
    module = cpp_extension.CUDAExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )
else:
    module = cpp_extension.CppExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )

setup(
    name = "Dummy-Collectives",
    version = "0.0.1",
    ext_modules = [module],
    cmdclass={'build_ext': cpp_extension.BuildExtension}
)

步驟 4:在應用程式中使用擴充套件

安裝後,你可以在呼叫 init_process_group 時方便地使用 dummy 後端,就好像它是內建後端一樣。

我們可以透過更改 init_process_groupbackend 引數來指定基於後端的排程。我們可以透過將 cpu:gloo,cuda:dummy 指定為 backend 引數,將帶有 CPU tensor 的集體通訊排程到 gloo 後端,將帶有 CUDA tensor 的集體通訊排程到 dummy 後端。

要將所有 tensor 傳送到 dummy 後端,我們可以簡單地將 dummy 指定為 backend 引數。

import os

import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives

import torch.distributed as dist

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)

# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")

# this goes through dummy
if torch.cuda.is_available():
    y = x.cuda()
    dist.all_reduce(y)
    print(f"cuda allreduce: {y}")

    try:
        dist.broadcast(y, 0)
    except RuntimeError:
        print("got RuntimeError when calling broadcast")

文件

訪問 PyTorch 的全面開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源