快捷方式

彈性 Agent

伺服器

彈性 Agent 是 torchelastic 的控制平面。

它是一個啟動和管理底層工作程序的程序。Agent 負責以下職責:

  1. 與分散式 torch 協作:工作程序啟動時會獲得所有必要資訊,以便成功且簡單地呼叫 torch.distributed.init_process_group()

  2. 容錯:監視工作程序,並在檢測到工作程序故障或不健康時,關閉所有工作程序並重啟所有程序。

  3. 彈性:對成員變更做出反應,並用新成員重啟工作程序。

最簡單的 Agent 是按節點部署的,並與本地程序一起工作。更高階的 Agent 可以遠端啟動和管理工作程序。Agent 可以完全去中心化,根據其管理的工作程序做出決策;也可以是協調一致的,與其他管理同一作業中工作程序的 Agent 通訊,以做出集體決策。

下面是一個管理本地工作程序組的 Agent 示意圖。

../_images/agent_diagram.jpg

概念

本節描述了與理解 agent 在 torchelastic 中的作用相關的高階類和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[source][source]

負責管理一個或多個工作程序的 Agent 程序。

這些工作程序被假定為常規的分散式 PyTorch 指令碼。當 Agent 建立工作程序時,Agent 會提供必要的資訊,使工作程序能夠正確初始化 torch 程序組。

Agent 的具體實現以及使用者的作業 placement 偏好決定了 Agent 與工作程序的確切部署拓撲和比例。例如,要在 GPU 上執行一個包含 8 個訓練器(每個 GPU 一個)的分散式訓練作業,可以採用以下方式:

  1. 使用 8 個單 GPU 例項,每個例項放置一個 Agent,每個 Agent 管理 1 個工作程序。

  2. 使用 4 個雙 GPU 例項,每個例項放置一個 Agent,每個 Agent 管理 2 個工作程序。

  3. 使用 2 個四 GPU 例項,每個例項放置一個 Agent,每個 Agent 管理 4 個工作程序。

  4. 使用 1 個八 GPU 例項,每個例項放置一個 Agent,每個 Agent 管理 8 個工作程序。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[source][source]

返回給定 role 對應的 WorkerGroup

請注意,工作組是一個可變物件,因此在多執行緒/多程序環境中,其狀態可能會改變。鼓勵(但不強制要求)實現者返回一個防禦性的只讀副本。

返回型別

WorkerGroup

abstract run(role='default')[source][source]

執行 Agent。

支援在故障時重試工作組,最多 max_restarts 次。

返回值

執行結果,包含按工作程序全域性 rank 對映的每個工作程序的返回值或故障詳細資訊。

丟擲

Exception - 任何與工作程序無關的其他故障

返回型別

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source][source]

關於特定型別工作程序的藍圖資訊。

對於給定的 role,只能存在一個 worker spec。Worker spec 在所有節點(機器)上應是同質的,即每個節點針對特定的 spec 執行相同數量的工作程序。

引數
  • role (str) – 具有此 spec 的工作程序的使用者定義 role

  • local_world_size (int) – 要執行的本地工作程序數量

  • fn (Optional[Callable]) – (已棄用,請改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – 工作程序函式或命令

  • args (tuple) – 傳遞給 entrypoint 的引數

  • rdzv_handler (RendezvousHandler) – 處理這組工作程序的 rdzv

  • max_restarts (int) – 工作程序的最大重試次數

  • monitor_interval (float) – 每隔 n 秒監控工作程序狀態

  • master_port (Optional[int]) – 在 rank 0 上執行 c10d store 的固定埠,如果未指定,則選擇一個隨機的空閒埠

  • master_addr (Optional[str]) – 在 rank 0 上執行 c10d store 的固定 master_addr,如果未指定,則選擇 Agent rank 0 上的主機名

  • redirects – 將標準流重定向到檔案,透過傳遞一個 map 有選擇地重定向特定 local rank 的流

  • tee – 將指定的標準流(s) 同時輸出到控制檯和檔案,透過傳遞一個 map 有選擇地對特定 local rank 進行 tee 操作,其優先順序高於 redirects 設定。

get_entrypoint_name()[source][source]

獲取入口點名稱。

如果入口點是函式(例如 Callable),則返回其 __qualname__;如果入口點是二進位制檔案(例如 str),則返回二進位制檔名稱。

class torch.distributed.elastic.agent.server.WorkerState(value)[source][source]

WorkerGroup 的狀態。

工作組中的工作程序作為一個單元改變狀態。如果工作組中的一個工作程序失敗,則整個集合被視為失敗。

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

工作組從初始的 INIT 狀態開始,然後進展到 HEALTHY(健康)或 UNHEALTHY(不健康)狀態,最終達到終止的 SUCCEEDED(成功)或 FAILED(失敗)狀態。

Agent 可以中斷工作組並暫時將其置於 STOPPED(停止)狀態。處於 STOPPED 狀態的工作程序計劃在不久的將來由 Agent 重啟。將工作程序置於 STOPPED 狀態的一些示例包括:

  1. 觀察到工作組失敗 | 不健康

  2. 檢測到成員變更

當對工作組執行操作(啟動、停止、rdzv、重試等)失敗,並且該操作部分應用於工作組時,狀態將變為 UNKNOWN(未知)。這通常發生在 Agent 上狀態變更事件期間出現未捕獲/未處理的異常時。Agent 不期望恢復處於 UNKNOWN 狀態的工作組,最好自行終止並允許作業管理器重試節點。

static is_running(state)[source][source]

返回 Worker 的狀態。

返回值

如果工作程序狀態表示工作程序仍在執行(例如,程序存在但不一定健康),則為 True。

返回型別

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source][source]

一個工作程序例項。

將其與表示工作程序規範的 WorkerSpec 進行對比。一個 Worker 是從一個 WorkerSpec 建立的。Worker 之於 WorkerSpec 就像物件之於類。

工作程序的 idElasticAgent 的具體實現來解釋。對於本地 Agent,它可以是工作程序的 pid (int);對於遠端 Agent,它可以編碼為 host:port (string)

引數
  • id (Any) – 唯一標識一個工作程序(由 Agent 解釋)

  • local_rank (int) – 工作程序的本地 rank

  • global_rank (int) – 工作程序的全域性 rank

  • role_rank (int) – 在所有具有相同 role 的工作程序中的 rank

  • world_size (int) – 工作程序數量(全域性)

  • role_world_size (int) – 具有相同 role 的工作程序數量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source][source]

一組 Worker 例項。

該類定義了由 ElasticAgent 管理的給定 WorkerSpec 對應的一組 Worker 例項。工作組是否包含跨例項工作程序取決於 Agent 的實現。

實現

以下是 torchelastic 提供的 Agent 實現。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source][source]

這是 torchelastic.agent.server.ElasticAgent 的一個實現,用於處理主機本地的工作程序。

此 Agent 按主機部署,配置為生成 n 個工作程序。使用 GPU 時,n 對應於主機上可用的 GPU 數量。

本地 Agent 不會與部署在其他主機上的其他本地 Agent 通訊,即使工作程序之間可能進行跨主機通訊。工作程序 id 被解釋為本地程序。Agent 將所有工作程序作為一個單元啟動和停止。

傳遞給工作程序函式及其引數必須與 python multiprocessing 相容。要將多程序資料結構傳遞給工作程序,可以在與指定的 start_method 相同的多程序上下文中建立該資料結構,並將其作為函式引數傳遞。

exit_barrier_timeout 指定等待其他 Agent 完成的時間量(以秒為單位)。這作為一個安全網,用於處理工作程序在不同時間完成的情況,以防止 Agent 將提前完成的工作程序視為縮減事件。強烈建議使用者程式碼處理確保工作程序同步終止,而不是依賴於 exit_barrier_timeout。

如果在於 `LocalElasticAgent` 程序中定義了環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 且其值為 1,則可以在 `LocalElasticAgent` 中啟用基於命名管道的看門狗。可選地,可以設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`,為其指定命名管道的唯一檔名。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 將在內部建立一個唯一的檔名,並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,此環境變數將傳播到工作程序,使其能夠連線到 `LocalElasticAgent` 使用的同一命名管道。

日誌會寫入指定的日誌目錄。預設情況下,每行日誌都會以 [${role_name}${local_rank}]: 作為字首(例如 [trainer0]: foobar)。日誌字首可以透過傳遞一個 模板字串 作為 log_line_prefix_template 引數來定製。執行時會替換以下宏(識別符號):${role_name}, ${local_rank}, ${rank}。例如,要用全域性 rank 代替本地 rank 作為每行日誌的字首,請設定 log_line_prefix_template = "[${rank}]:

啟動函式示例

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

啟動二進位制檔案示例

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

擴充套件 Agent

要擴充套件 Agent,可以直接實現 `ElasticAgent`,但我們建議改為擴充套件 SimpleElasticAgent,它提供了大部分基礎結構,只需您實現一些特定的抽象方法。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source][source]

管理特定型別工作程序 role 的 ElasticAgent

管理單個 WorkerSpec(例如特定型別的工作程序 role)對應的工作程序(WorkerGroup)的 ElasticAgent

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source][source]

確定工作程序的正確 rank。

快速路徑:當所有工作程序具有相同的 role 和 world size 時。我們將全域性 rank 計算為 group_rank * group_world_size + local_rank。role_world_sizeglobal_world_size 相同。在這種情況下不使用 TCP store。此模式僅在使用者將環境變數 TORCH_ELASTIC_WORKER_IDENTICAL 設定為 1 時啟用。

時間複雜度:每個工作程序 O(1),總體 O(1)

慢速路徑:當工作程序具有不同的 role 和 world size 時。我們使用以下演算法:

  1. 每個 Agent 將其配置(group_rank, group_world_size, num_workers)寫入公共 store。

  2. Rank 0 的 Agent 從 store 讀取所有 role_info,並確定每個 Agent 的工作程序 rank。

  3. 確定全域性 rank:worker 的全域性 rank 是透過在其前面所有 worker 的 local_world_size 的累加和計算得出的。出於效率原因,每個 worker 都被分配一個基礎全域性 rank,使得其 worker 位於 [base_global_rank, base_global_rank + local_world_size) 範圍內。

  4. 確定角色 rank:角色 rank 是使用點 3 中的演算法確定的,但 rank 是相對於角色名稱計算的。

  5. rank 0 代理將分配的 rank 寫入 store。

  6. 每個代理從 store 讀取分配的 rank。

時間複雜度:每個 worker O(1),rank0 O(n),總體 O(n)

返回型別

列表[torch.distributed.elastic.agent.server.api.Worker]

_exit_barrier()[source][source]

定義一個屏障,保持代理程序存活直到所有 worker 完成。

等待 exit_barrier_timeout 秒,直到所有代理完成執行其本地 worker(無論成功與否)。這作為一道安全防護,防止使用者指令碼在不同時間終止。

_initialize_workers(worker_group)[source][source]

為 `worker_group` 啟動一組新的 worker。

本質上是一個 rendezvous,然後是 start_workers。呼叫者應在此方法之前先呼叫 _stop_workers() 來停止正在執行的 worker。

樂觀地將剛啟動的 worker 組的狀態設定為 HEALTHY,並將實際的狀態監控委託給 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source][source]

檢查 worker_group 中的 worker。

此函式也返回 worker 組的新狀態。

返回型別

RunResult

_rendezvous(worker_group)[source][source]

為 worker 規範指定的 worker 執行 rendezvous。

為 worker 分配新的全域性 rank 和 world size。更新 worker 組的 rendezvous store。

_restart_workers(worker_group)[source][source]

重啟(停止、rendezvous、啟動)組中的所有本地 worker。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source][source]

清理代理工作期間分配的任何資源。

引數

death_sig (Signals) – 傳送給子程序的訊號,預設為 SIGTERM

abstract _start_workers(worker_group)[source][source]

啟動數量為 worker_group.spec.local_world_size 的 worker。

這是根據 worker 組的 worker 規範進行的。返回一個 local_rank 到 worker id 的對映。

返回型別

字典[整型, 任何型別]

abstract _stop_workers(worker_group, is_restart=False)[source][source]

停止給定 worker 組中的所有 worker。

實現者必須處理 WorkerState 定義的所有狀態下的 worker。也就是說,必須優雅地處理停止不存在的 worker、不健康(卡住)的 worker 等情況。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source][source]

返回 worker 執行的結果。

執行結果遵循“全有或全無”策略,即當且僅當此代理管理的所有本地 worker 都成功完成時,執行才算成功。

如果結果成功(例如 is_failed() = False),則 return_values 欄位包含由此代理管理的 worker 的輸出(返回值),按其全域性 rank 進行對映。即 result.return_values[0] 是全域性 rank 0 的返回值。

注意

return_values 僅在 worker 入口點是函式時才有意義。指定為二進位制入口點的 worker 通常沒有返回值,因此 return_values 欄位沒有意義,可能為空。

如果 is_failed() 返回 True,則 failures 欄位包含失敗資訊,同樣按失敗 worker 的全域性 rank 進行對映。

return_valuesfailures 中的鍵是互斥的,即 worker 的最終狀態只能是以下之一:成功、失敗。由代理根據其重啟策略有意終止的 worker 不會出現在 return_valuesfailures 中。

代理中的看門狗

如果在於 `LocalElasticAgent` 程序中定義了環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 且其值為 1,則可以在 `LocalElasticAgent` 中啟用基於命名管道的看門狗。可選地,可以設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`,為其指定命名管道的唯一檔名。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 將在內部建立一個唯一的檔名,並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,此環境變數將傳播到工作程序,使其能夠連線到 `LocalElasticAgent` 使用的同一命名管道。

健康檢查伺服器

如果在 ``LocalElasticAgent`` 程序中定義了環境變數 ``TORCHELASTIC_HEALTH_CHECK_PORT``,則可以在 ``LocalElasticAgent`` 中啟用健康檢查監控伺服器。新增健康檢查伺服器介面,可以透過在指定埠號上啟動 tcp/http 伺服器來擴充套件。此外,健康檢查伺服器將有一個回撥函式來檢檢視門狗是否存活。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source][source]

健康檢查監控伺服器介面,可以透過在指定埠上啟動 tcp/http 伺服器來擴充套件。

引數
  • alive_callback (Callable[[], int]) – Callable[[], int],代理最後進度時間的回撥函式。

  • port (int) – int,啟動 tcp/http 伺服器的埠號。

  • timeout (int) – int,判斷代理是否存活/死亡的超時秒數。

start()[source][source]

Pytorch 不支援此功能,不啟動任何健康檢查伺服器。

stop()[source][source]

停止健康檢查伺服器的函式。

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source][source]

建立健康檢查伺服器物件。

返回型別

HealthCheckServer

文件

訪問 PyTorch 的完整開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深入教程。

檢視教程

資源

查詢開發資源並獲得問題解答。

檢視資源