捷徑

多程序處理

用於啟動和管理由函式或二進制檔案指定的 n 個工作程序副本的函式庫。

對於函式,它使用 torch.multiprocessing(因此也使用 Python multiprocessing)來產生/分岔工作程序。對於二進制檔案,它使用 Python subprocessing.Popen 來建立工作程序。

用法 1:將兩個訓練器作為函式啟動

from torch.distributed.elastic.multiprocessing import Std, start_processes

def trainer(a, b, c):
    pass # train


# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
        name="trainer",
        entrypoint=trainer,
        args={0: (1,2,3), 1: (4,5,6)},
        envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
        log_dir="/tmp/foobar",
        redirects=Std.ALL, # write all worker stdout/stderr to a log file
        tee={0: Std.ERR}, # tee only local rank 0's stderr to console
      )

# waits for all copies of trainer to finish
ctx.wait()

用法 2:將 2 個回應工作程序作為二進制檔案啟動

# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )

就像 torch.multiprocessing 一樣,函式 start_processes() 的回傳值是一個程序上下文 (api.PContext)。如果啟動的是函式,則會回傳 api.MultiprocessContext,如果啟動的是二進制檔案,則會回傳 api.SubprocessContext。兩者都是父類別 api.PContext 的特定實作。

啟動多個工作程序

torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[來源]

使用提供的選項啟動 nentrypoint 程序副本。

entrypoint 可以是 Callable(函式)或 str(二進制檔案)。副本的數量由 argsenvs 參數的項目數量決定,這些參數需要具有相同的鍵集。

argsenv 參數是要傳遞給由副本索引(本地排名)映射的進入點的參數和環境變數。所有本地排名都必須被考慮在內。也就是說,鍵集應該是 {0,1,...,(nprocs-1)}

備註

entrypoint 是二進制檔案 (str) 時,args 只能是字串。如果給定任何其他類型,則會將其轉換為字串表示形式(例如 str(arg1))。此外,只有在主要函式使用 torch.distributed.elastic.multiprocessing.errors.record 標記時,二進制檔案失敗才會寫入 error.json 錯誤檔案。對於函式啟動,這是預設行為,不需要使用 @record 標記手動標記。

redirectstee 是位元遮罩,用於指定要將哪些標準串流重定向到 log_dir 中的日誌檔案。有效的遮罩值在 Std 中定義。若要僅重定向/複製特定本地排名的輸出,請將 redirects 作為一個映射傳遞,並將鍵設為本地排名,以指定其重定向行為。任何遺漏的本地排名都將預設為 Std.NONE

tee 的作用類似於 unix 的「tee」命令,它會將輸出重定向並列印到主控台。若要避免工作程序的標準輸出/標準錯誤列印到主控台,請使用 redirects 參數。

對於每個程序,log_dir 將包含

  1. {local_rank}/error.json:如果程序失敗,則為包含錯誤資訊的檔案

  2. {local_rank}/stdout.json:如果 redirect & STDOUT == STDOUT

  3. {local_rank}/stderr.json:如果 redirect & STDERR == STDERR

備註

預期 log_dir 存在、為空目錄。

範例

log_dir = "/tmp/test"

# ok; two copies of foo: foo("bar0"), foo("bar1")
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
)

# invalid; envs missing for local rank 1
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}},
   log_dir=log_dir
)

# ok; two copies of /usr/bin/touch: touch file1, touch file2
start_processes(
   name="trainer",
   entrypoint="/usr/bin/touch",
   args:{0:("file1",), 1:("file2",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )

# caution; arguments casted to string, runs:
# echo "1" "2" "3" and echo "[1, 2, 3]"
start_processes(
   name="trainer",
   entrypoint="/usr/bin/echo",
   args:{0:(1,2,3), 1:([1,2,3],),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )
參數
  • name (str) – 描述處理程序的人類可讀簡稱(在標準輸出/錯誤輸出時用作標題)

  • entrypoint (Union[Callable, str]) – Callable(函數)或 cmd(二進制文件)

  • args (Dict[int, Tuple]) – 每個副本的參數

  • envs (Dict[int, Dict[str, str]]) – 每個副本的環境變數

  • log_dir – 用於寫入日誌文件的目錄

  • start_method (str) – 多進程啟動方法(spawn、fork、forkserver),二進制文件會忽略

  • redirects – 要重定向到日誌文件的標準流

  • tee – 要重定向並列印到控制台的標準流

  • local_ranks_filter – 要列印到控制台的日誌等級

返回類型

PContext

處理程序上下文

class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[原始碼]

標準化透過不同機制啟動的一組處理程序操作的基類。

名稱 PContext 是為了與 torch.multiprocessing.ProcessContext 區分開來。

警告

標準輸出和標準錯誤輸出應該始終分別是 tee_stdouts 和 tee_stderrs 的超集,這是因為 tee 是透過重定向 + tail -f <stdout/stderr.log> 實作的

class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[原始碼]

以函數形式調用的工作進程的 PContext

class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[原始碼]

以二進制文件形式調用的工作進程的 PContext

class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[原始碼]

start_processes() 啟動的處理程序完成執行的結果。由 PContext 返回。

請注意以下幾點

  1. 所有字段都按本地等級映射

  2. return_values - 僅針對函數(而非二進制文件)填充。

  3. stdouts - stdout.log 的路徑(如果沒有重定向,則為空字符串)

  4. stderrs - stderr.log 的路徑(如果沒有重定向,則為空字符串)

class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[原始碼]

預設 LogsSpecs 實作

  • 如果 log_dir 不存在,將會建立

  • 為每次嘗試和等級生成嵌套文件夾。

reify(envs)[原始碼]

使用以下方案構建日誌目標路徑

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stdout.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stderr.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/error.json

返回類型

LogsDest

class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[原始碼]

對於每種類型的日誌,保存本地等級 ID 到文件路徑的映射。

class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[原始碼]

定義每個工作進程的日誌處理和重定向。

參數
  • log_dir (Optional[str]) – 將寫入日誌文件的基目錄。

  • redirects (Union[Std, Dict[int, Std]]) – 要重定向到文件的流。傳遞單個 Std 枚舉以重定向所有工作進程,或傳遞由 local_rank 鍵控的映射以選擇性地重定向。

  • tee (Union[Std, Dict[int, Std]]) – 要複製到標準輸出/錯誤輸出的流。傳遞單個 Std 枚舉以複製所有工作進程的流,或傳遞由 local_rank 鍵控的映射以選擇性地複製。

abstract reify(envs)[原始碼]

給定環境變數,構建每個本地等級的日誌文件目標。

Envs 參數包含每個本地等級的環境變數字典,其中條目定義在:_start_workers()

返回類型

LogsDest

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並獲得問題解答

查看資源