捷徑

彈性代理

伺服器

彈性代理是 torchelastic 的控制平面。

它是一個用於啟動和管理底層工作程序的程序。代理負責

  1. 使用分散式 torch:工作程序在啟動時會取得所有必要資訊,以便成功且輕鬆地呼叫 torch.distributed.init_process_group()

  2. 容錯:監控工作程序,並在偵測到工作程序失敗或不健康時,關閉所有工作程序並重新啟動所有工作程序。

  3. 彈性:對成員變更做出反應,並使用新成員重新啟動工作程序。

最簡單的代理是每個節點部署一個,並與本地程序一起運作。更進階的代理可以遠端啟動和管理工作程序。代理可以完全去中心化,根據其管理的工作程序做出決策。或者可以協調,與其他代理(管理同一個作業中的工作程序)通訊以做出集體決策。

以下是一個管理本地工作程序群組的代理示意圖。

../_images/agent_diagram.jpg

概念

本節說明與瞭解 代理 在 torchelastic 中的角色相關的高階類別和概念。

類別 torch.distributed.elastic.agent.server.ElasticAgent[來源]

一個負責管理一個或多個工作程序的代理程序。

假設工作程序是常規的分散式 PyTorch 腳本。當代理建立工作程序時,代理會提供必要資訊,以便工作程序正確初始化 torch 程序群組。

確切的部署拓撲和代理與工作程序的比率取決於代理的具體實作和使用者對作業放置的偏好。例如,若要在 GPU 上執行具有 8 個訓練器(每個 GPU 一個)的分散式訓練作業,可以

  1. 使用 8 個單 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 1 個工作程序。

  2. 使用 4 個雙 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 2 個工作程序。

  3. 使用 2 個四 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 4 個工作程序。

  4. 使用 1 個 8 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 8 個工作程序。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
抽象 get_worker_group(role='default')[來源]

傳回指定 角色WorkerGroup

請注意,工作程序群組是可變物件,因此在多執行緒/程序環境中,其狀態可能會改變。建議實作者(但非強制)傳回防禦性唯讀副本。

傳回類型

WorkerGroup

抽象 run(role='default')[來源]

執行代理。

支援在發生失敗時重試工作程序群組,最多 max_restarts 次。

傳回

執行的結果,包含每個工作程序的傳回值或失敗詳細資訊,由工作程序的全局排名映射。

引發

例外狀況 - 與工作程序無關的任何其他失敗

傳回類型

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source]

關於特定類型 worker 的藍圖資訊。

對於給定的角色,必須只存在單一 worker 規範。 worker 規範預計在所有節點(機器)之間都是同質的,也就是說,每個節點都為特定規範運行相同數量的 worker。

參數
  • role (str) – 使用者定義的角色,適用於具有此規範的 worker

  • local_world_size (int) – 要運行的本地 worker 數量

  • fn (Optional[Callable]) – (已棄用,請改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – worker 函數或命令

  • args (Tuple) – 要傳遞給 entrypoint 的參數

  • rdzv_handler (RendezvousHandler) – 處理這組 worker 的 rdzv

  • max_restarts (int) – worker 的最大重試次數

  • monitor_interval (float) – 每隔 n 秒監控 worker 的狀態

  • master_port (Optional[int]) – 如果未指定,則在 rank 0 上運行 c10d store 的固定埠,否則將選擇一個隨機的空閒埠

  • master_addr (Optional[str]) – 如果未指定,則在 rank 0 上運行 c10d store 的固定 master_addr,否則將選擇 agent rank 0 上的主機名稱

  • redirects – 將標準串流重定向到檔案,通過傳遞映射選擇性地重定向到特定的本地 rank

  • tee – 將指定的標準串流 tee 到控制台 + 檔案,通過傳遞映射選擇性地 tee 到特定的本地 rank,優先於 redirects 設定。

get_entrypoint_name()[source]

取得進入點名稱。

如果進入點是一個函數(例如 Callable),則返回其 __qualname__,否則如果進入點是一個二進制檔案(例如 str),則返回二進制檔案名稱。

class torch.distributed.elastic.agent.server.WorkerState(value)[source]

WorkerGroup 的狀態。

worker group 中的 worker 會作為一個單位改變狀態。如果 worker group 中的一個 worker 失敗,則整個集合都被視為失敗

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

worker group 從初始的 INIT 狀態開始,然後進入 HEALTHYUNHEALTHY 狀態,最後到達終止狀態 SUCCEEDEDFAILED

worker group 可以被 agent 中斷並暫時置於 STOPPED 狀態。 STOPPED 狀態下的 worker 將由 agent 安排在不久的將來重新啟動。 worker 被置於 STOPPED 狀態的一些例子是

  1. worker group 失敗|觀察到不健康

  2. 偵測到成員變更

當 worker group 上的操作(啟動、停止、rdzv、重試等)失敗並導致操作部分應用於 worker group 時,狀態將為 UNKNOWN。通常,這發生在 agent 上狀態變更事件期間未捕獲/未處理的例外情況。 agent 不應恢復 UNKNOWN 狀態下的 worker group,最好自行終止並允許作業管理器重試該節點。

static is_running(state)[source]

返回 Worker 的狀態。

傳回

如果 worker 狀態表示 worker 仍在運行(例如,進程存在但不一定是健康的),則為 True。

傳回類型

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]

worker 實例。

將此與表示 worker 規範的 WorkerSpec 進行對比。 Worker 是從 WorkerSpec 創建的。 Worker 之於 WorkerSpec 就如同對象之於類別。

worker 的 idElasticAgent 的具體實現來解釋。對於本地 agent,它可能是 worker 的 pid (int),對於遠端 agent,它可以編碼為 host:port (string)

參數
  • id (Any) – 唯一標識 worker(由 agent 解釋)

  • local_rank (int) – worker 的本地 rank

  • global_rank (int) – worker 的全局 rank

  • role_rank (int) – worker 在所有具有相同角色的 worker 中的 rank

  • world_size (int) – worker 數量(全局)

  • role_world_size (int) – 具有相同角色的 worker 數量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source]

一組 Worker 實例。

該類別為由 ElasticAgent 管理的給定 WorkerSpec 定義了一組 Worker 實例。 worker group 是否包含跨實例 worker 取決於 agent 的實現。

實現

以下是 torchelastic 提供的 agent 實現。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]

處理主機本地端工作者的 torchelastic.agent.server.ElasticAgent 的實作。

此代理程式會針對每個主機進行部署,並設定為產生 n 個工作者。使用 GPU 時,n 會對應到主機上可用的 GPU 數量。

本地端代理程式不會與部署在其他主機上的其他本地端代理程式通訊,即使工作者可能進行主機間通訊。工作者 ID 會被解譯為本地端程序。代理程式會將所有工作者程序作為單一單元啟動和停止。

傳遞給工作者函式的 worker 函式和參數必須與 Python 多程序相容。若要將多程序資料結構傳遞給工作者,您可以在與指定的 start_method 相同的多程序內容中建立資料結構,並將其作為函式參數傳遞。

exit_barrier_timeout 會指定等待其他代理程式完成的時間量(以秒為單位)。這可以作為安全網,處理工作者在不同時間完成的情況,防止代理程式將提早完成的工作者視為縮減事件。強烈建議使用者程式碼處理確保以同步方式終止工作者,而不是依賴 exit_barrier_timeout。

如果在 `LocalElasticAgent` 程序中定義了值為 1 的環境變數 TORCHELASTIC_ENABLE_FILE_TIMER,則可以在 `LocalElasticAgent` 中啟用基於命名管道的監看程式。或者,可以使用命名管道的唯一檔案名稱設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 會在內部建立唯一的檔案名稱並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數會傳播到工作者程序,讓它們可以連線到 `LocalElasticAgent` 使用的相同命名管道。

日誌會寫入指定的日誌目錄。根據預設,每個日誌行的前綴都會加上 [${role_name}${local_rank}]:(例如 [trainer0]: foobar)。可以透過傳遞 範本字串 作為 log_line_prefix_template 參數來自訂日誌前綴。以下巨集(識別碼)會在執行階段替換:${role_name}, ${local_rank}, ${rank}。例如,若要使用全域等級而不是本地端等級作為每個日誌行的前綴,請設定 log_line_prefix_template = "[${rank}]:

範例啟動函式

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

範例啟動二進制檔案

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

擴充代理程式

若要擴充代理程式,您可以直接實作 `ElasticAgent`,但是我們建議您改為擴充 SimpleElasticAgent,它提供了大部分的鷹架,並留給您一些特定的抽象方法來實作。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]

管理一種特定工作者角色類型的 ElasticAgent

為單一 WorkerSpec(例如一種特定工作者角色類型)管理工作者(WorkerGroup)的 ElasticAgent

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source]

為工作者程序決定適當的等級。

等級分配是根據以下演算法完成的

  1. 每個代理程式將其設定(group_rank、group_world_size、num_workers)寫入通用儲存體。

  2. 等級 0 的代理程式會從儲存體讀取所有 role_info,並決定每個代理程式的工作者等級。

  3. 決定全域等級:工作者的全域等級是透過累加其前面所有工作者的 local_world_size 來計算。基於效率原因,每個工作者都會被分配一個基本全域等級,使其工作者位於 [base_global_rank, base_global_rank + local_world_size) 的範圍內。

  4. 決定角色等級:角色等級是使用第 3 點中的演算法決定的,但等級是根據角色名稱計算的。

  5. 等級 0 的代理程式會將分配的等級寫入儲存體。

  6. 每個代理程式都會從儲存體讀取分配的等級。

時間複雜度:每個工作者 O(1),等級 0 O(n),整體 O(n)

傳回類型

List[Worker]

_exit_barrier()[source]

定義一個屏障,讓代理程式程序保持活動狀態,直到所有工作者都完成。

等待 exit_barrier_timeout 秒,讓所有代理程式完成執行其本地端工作者(無論成功與否)。這可以作為安全防護,防止使用者腳本在不同時間終止。

_initialize_workers(worker_group)[source]

為 worker_group 啟動一組新的工作者。

本質上,這是一個會合,然後是 start_workers。呼叫者應該先呼叫 _stop_workers() 來停止執行中的工作者,然後再呼叫此方法。

樂觀地將剛啟動的工作者群組的狀態設定為 HEALTHY,並將實際的狀態監控委託給 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source]

檢查 worker_group 的工作者。

此函式也會傳回工作者群組的新狀態。

傳回類型

RunResult

_rendezvous(worker_group)[source]

為工作者規格指定的工作者執行會合。

為工作者分配新的全域等級和世界大小。更新工作者群組的會合儲存體。

_restart_workers(worker_group)[source]

重新啟動(停止、會合、啟動)群組中的所有本地端工作者。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source]

清除在代理程式工作期間配置的任何資源。

參數

death_sig (Signals) – 要傳送到子程序的訊號,預設為 SIGTERM

abstract _start_workers(worker_group)[source]

啟動 worker_group.spec.local_world_size 個工作器。

這是根據工作器群組的工作器規格。傳回 local_rank 到工作器 id 的映射。

傳回類型

Dict[int, Any]

abstract _stop_workers(worker_group, is_restart=False)[source]

停止指定工作器群組中的所有工作器。

實作者必須處理 WorkerState 定義的所有狀態中的工作器。也就是說,它必須妥善處理停止不存在的工作器、不正常(卡住)的工作器等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]

傳回工作器執行的結果。

執行結果遵循「全有或全無」的原則,當且僅當此代理程式管理的所有本地工作器都成功完成時,執行才會成功。

如果結果成功(例如 is_failed() = False),則 return_values 欄位包含由其全域排名映射的此代理程式管理的工作器的輸出(傳回值)。也就是說,result.return_values[0] 是全域排名 0 的傳回值。

備註

return_values 僅在工作器進入點是函數時才有意義。指定為二進位進入點的工作器通常沒有傳回值,而 return_values 欄位沒有意義,而且可能是空的。

如果 is_failed() 傳回 True,則 failures 欄位包含失敗資訊,同樣由失敗工作器的全域排名映射。

return_valuesfailures 中的鍵是互斥的,也就是說,工作器的最終狀態只能是以下其中一種:成功、失敗。根據代理程式的重新啟動原則,由代理程式有意終止的工作器不會顯示在 return_valuesfailures 中。

代理程式中的看門狗

如果在 `LocalElasticAgent` 程序中定義了值為 1 的環境變數 TORCHELASTIC_ENABLE_FILE_TIMER,則可以在 `LocalElasticAgent` 中啟用基於命名管道的監看程式。或者,可以使用命名管道的唯一檔案名稱設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 會在內部建立唯一的檔案名稱並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數會傳播到工作者程序,讓它們可以連線到 `LocalElasticAgent` 使用的相同命名管道。

健康檢查伺服器

如果在 `LocalElasticAgent` 程序中定義了環境變數 TORCHELASTIC_HEALTH_CHECK_PORT,則可以在 `LocalElasticAgent` 中啟用健康檢查監控伺服器。新增健康檢查伺服器的介面,可以透過在指定的埠號上啟動 tcp/http 伺服器來擴展。此外,健康檢查伺服器將具有回呼以檢查看門狗是否處於活動狀態。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]

健康檢查監控伺服器的介面,可以透過在指定的埠上啟動 tcp/http 伺服器來擴展。

參數
  • alive_callback (Callable[[], int]) – Callable[[], int],回呼到代理程式的最後進度時間

  • port (int) – int,啟動 tcp/http 伺服器的埠號

  • timeout (int) – int,決定代理程式是活動/死亡的逾時秒數

start()[source]

Pytorch 不支援的功能,不會啟動任何健康檢查伺服器

stop()[source]

停止健康檢查伺服器的函數

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source]

建立健康檢查伺服器物件

傳回類型

HealthCheckServer

文件

取得 PyTorch 的完整開發人員文件

檢視文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

檢視教學課程

資源

尋找開發資源並取得問題解答

檢視資源