彈性代理¶
伺服器¶
彈性代理是 torchelastic 的控制平面。
它是一個用於啟動和管理底層工作程序的程序。代理負責
- 使用分散式 torch:工作程序在啟動時會取得所有必要資訊,以便成功且輕鬆地呼叫 - torch.distributed.init_process_group()。
- 容錯:監控工作程序,並在偵測到工作程序失敗或不健康時,關閉所有工作程序並重新啟動所有工作程序。 
- 彈性:對成員變更做出反應,並使用新成員重新啟動工作程序。 
最簡單的代理是每個節點部署一個,並與本地程序一起運作。更進階的代理可以遠端啟動和管理工作程序。代理可以完全去中心化,根據其管理的工作程序做出決策。或者可以協調,與其他代理(管理同一個作業中的工作程序)通訊以做出集體決策。
以下是一個管理本地工作程序群組的代理示意圖。
 
概念¶
本節說明與瞭解 代理 在 torchelastic 中的角色相關的高階類別和概念。
- 類別 torch.distributed.elastic.agent.server.ElasticAgent[來源]¶
- 一個負責管理一個或多個工作程序的代理程序。 - 假設工作程序是常規的分散式 PyTorch 腳本。當代理建立工作程序時,代理會提供必要資訊,以便工作程序正確初始化 torch 程序群組。 - 確切的部署拓撲和代理與工作程序的比率取決於代理的具體實作和使用者對作業放置的偏好。例如,若要在 GPU 上執行具有 8 個訓練器(每個 GPU 一個)的分散式訓練作業,可以 - 使用 8 個單 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 1 個工作程序。 
- 使用 4 個雙 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 2 個工作程序。 
- 使用 2 個四 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 4 個工作程序。 
- 使用 1 個 8 GPU 執行個體,每個執行個體放置一個代理,每個代理管理 8 個工作程序。 
 - 用法 - group_result = agent.run() if group_result.is_failed(): # workers failed failure = group_result.failures[0] logger.exception("worker 0 failed with exit code : %s", failure.exit_code) else: return group_result.return_values[0] # return rank 0's results 
- class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source]¶
- 關於特定類型 worker 的藍圖資訊。 - 對於給定的角色,必須只存在單一 worker 規範。 worker 規範預計在所有節點(機器)之間都是同質的,也就是說,每個節點都為特定規範運行相同數量的 worker。 - 參數
- role (str) – 使用者定義的角色,適用於具有此規範的 worker 
- local_world_size (int) – 要運行的本地 worker 數量 
- args (Tuple) – 要傳遞給 - entrypoint的參數
- rdzv_handler (RendezvousHandler) – 處理這組 worker 的 rdzv 
- max_restarts (int) – worker 的最大重試次數 
- monitor_interval (float) – 每隔 - n秒監控 worker 的狀態
- master_port (Optional[int]) – 如果未指定,則在 rank 0 上運行 c10d store 的固定埠,否則將選擇一個隨機的空閒埠 
- master_addr (Optional[str]) – 如果未指定,則在 rank 0 上運行 c10d store 的固定 master_addr,否則將選擇 agent rank 0 上的主機名稱 
- redirects – 將標準串流重定向到檔案,通過傳遞映射選擇性地重定向到特定的本地 rank 
- tee – 將指定的標準串流 tee 到控制台 + 檔案,通過傳遞映射選擇性地 tee 到特定的本地 rank,優先於 - redirects設定。
 
 
- class torch.distributed.elastic.agent.server.WorkerState(value)[source]¶
- WorkerGroup的狀態。- worker group 中的 worker 會作為一個單位改變狀態。如果 worker group 中的一個 worker 失敗,則整個集合都被視為失敗 - UNKNOWN - agent lost track of worker group state, unrecoverable INIT - worker group object created not yet started HEALTHY - workers running and healthy UNHEALTHY - workers running and unhealthy STOPPED - workers stopped (interrupted) by the agent SUCCEEDED - workers finished running (exit 0) FAILED - workers failed to successfully finish (exit !0) - worker group 從初始的 - INIT狀態開始,然後進入- HEALTHY或- UNHEALTHY狀態,最後到達終止狀態- SUCCEEDED或- FAILED。- worker group 可以被 agent 中斷並暫時置於 - STOPPED狀態。- STOPPED狀態下的 worker 將由 agent 安排在不久的將來重新啟動。 worker 被置於- STOPPED狀態的一些例子是- worker group 失敗|觀察到不健康 
- 偵測到成員變更 
 - 當 worker group 上的操作(啟動、停止、rdzv、重試等)失敗並導致操作部分應用於 worker group 時,狀態將為 - UNKNOWN。通常,這發生在 agent 上狀態變更事件期間未捕獲/未處理的例外情況。 agent 不應恢復- UNKNOWN狀態下的 worker group,最好自行終止並允許作業管理器重試該節點。
- class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]¶
- worker 實例。 - 將此與表示 worker 規範的 - WorkerSpec進行對比。- Worker是從- WorkerSpec創建的。- Worker之於- WorkerSpec就如同對象之於類別。- worker 的 - id由- ElasticAgent的具體實現來解釋。對於本地 agent,它可能是 worker 的- pid (int),對於遠端 agent,它可以編碼為- host:port (string)。
實現¶
以下是 torchelastic 提供的 agent 實現。
- class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]¶
- 處理主機本地端工作者的 - torchelastic.agent.server.ElasticAgent的實作。- 此代理程式會針對每個主機進行部署,並設定為產生 - n個工作者。使用 GPU 時,- n會對應到主機上可用的 GPU 數量。- 本地端代理程式不會與部署在其他主機上的其他本地端代理程式通訊,即使工作者可能進行主機間通訊。工作者 ID 會被解譯為本地端程序。代理程式會將所有工作者程序作為單一單元啟動和停止。 - 傳遞給工作者函式的 worker 函式和參數必須與 Python 多程序相容。若要將多程序資料結構傳遞給工作者,您可以在與指定的 - start_method相同的多程序內容中建立資料結構,並將其作為函式參數傳遞。- exit_barrier_timeout會指定等待其他代理程式完成的時間量(以秒為單位)。這可以作為安全網,處理工作者在不同時間完成的情況,防止代理程式將提早完成的工作者視為縮減事件。強烈建議使用者程式碼處理確保以同步方式終止工作者,而不是依賴 exit_barrier_timeout。- 如果在 - `LocalElasticAgent`程序中定義了值為 1 的環境變數- TORCHELASTIC_ENABLE_FILE_TIMER,則可以在- `LocalElasticAgent`中啟用基於命名管道的監看程式。或者,可以使用命名管道的唯一檔案名稱設定另一個環境變數- `TORCHELASTIC_TIMER_FILE`。如果未設定環境變數- `TORCHELASTIC_TIMER_FILE`,- `LocalElasticAgent`會在內部建立唯一的檔案名稱並將其設定為環境變數- `TORCHELASTIC_TIMER_FILE`,並且此環境變數會傳播到工作者程序,讓它們可以連線到- `LocalElasticAgent`使用的相同命名管道。- 日誌會寫入指定的日誌目錄。根據預設,每個日誌行的前綴都會加上 - [${role_name}${local_rank}]:(例如- [trainer0]: foobar)。可以透過傳遞 範本字串 作為- log_line_prefix_template參數來自訂日誌前綴。以下巨集(識別碼)會在執行階段替換:- ${role_name}, ${local_rank}, ${rank}。例如,若要使用全域等級而不是本地端等級作為每個日誌行的前綴,請設定- log_line_prefix_template = "[${rank}]:。- 範例啟動函式 - def trainer(args) -> str: return "do train" def main(): start_method="spawn" shared_queue= multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("trainer failed") else: print(f"rank 0 return value: {results.return_values[0]}") # prints -> rank 0 return value: do train - 範例啟動二進制檔案 - def main(): spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer", args=("--trainer-args", "foobar"), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = agent.run() if not results.is_failed(): print("binary launches do not have return values") 
擴充代理程式¶
若要擴充代理程式,您可以直接實作 `ElasticAgent`,但是我們建議您改為擴充 SimpleElasticAgent,它提供了大部分的鷹架,並留給您一些特定的抽象方法來實作。
- class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]¶
- 管理一種特定工作者角色類型的 - ElasticAgent。- 為單一 - WorkerSpec(例如一種特定工作者角色類型)管理工作者(- WorkerGroup)的- ElasticAgent。- _assign_worker_ranks(store, group_rank, group_world_size, spec)[source]¶
- 為工作者程序決定適當的等級。 - 等級分配是根據以下演算法完成的 - 每個代理程式將其設定(group_rank、group_world_size、num_workers)寫入通用儲存體。 
- 等級 0 的代理程式會從儲存體讀取所有 role_info,並決定每個代理程式的工作者等級。 
- 決定全域等級:工作者的全域等級是透過累加其前面所有工作者的 local_world_size 來計算。基於效率原因,每個工作者都會被分配一個基本全域等級,使其工作者位於 [base_global_rank, base_global_rank + local_world_size) 的範圍內。 
- 決定角色等級:角色等級是使用第 3 點中的演算法決定的,但等級是根據角色名稱計算的。 
- 等級 0 的代理程式會將分配的等級寫入儲存體。 
- 每個代理程式都會從儲存體讀取分配的等級。 
 - 時間複雜度:每個工作者 O(1),等級 0 O(n),整體 O(n) 
 - _exit_barrier()[source]¶
- 定義一個屏障,讓代理程式程序保持活動狀態,直到所有工作者都完成。 - 等待 - exit_barrier_timeout秒,讓所有代理程式完成執行其本地端工作者(無論成功與否)。這可以作為安全防護,防止使用者腳本在不同時間終止。
 - _initialize_workers(worker_group)[source]¶
- 為 worker_group 啟動一組新的工作者。 - 本質上,這是一個會合,然後是 - start_workers。呼叫者應該先呼叫- _stop_workers()來停止執行中的工作者,然後再呼叫此方法。- 樂觀地將剛啟動的工作者群組的狀態設定為 - HEALTHY,並將實際的狀態監控委託給- _monitor_workers()方法
 - abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source]¶
- 清除在代理程式工作期間配置的任何資源。 - 參數
- death_sig (Signals) – 要傳送到子程序的訊號,預設為 SIGTERM 
 
 
- class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]¶
- 傳回工作器執行的結果。 - 執行結果遵循「全有或全無」的原則,當且僅當此代理程式管理的所有本地工作器都成功完成時,執行才會成功。 - 如果結果成功(例如 - is_failed() = False),則- return_values欄位包含由其全域排名映射的此代理程式管理的工作器的輸出(傳回值)。也就是說,- result.return_values[0]是全域排名 0 的傳回值。- 備註 - return_values僅在工作器進入點是函數時才有意義。指定為二進位進入點的工作器通常沒有傳回值,而- return_values欄位沒有意義,而且可能是空的。- 如果 - is_failed()傳回- True,則- failures欄位包含失敗資訊,同樣由失敗工作器的全域排名映射。- return_values和- failures中的鍵是互斥的,也就是說,工作器的最終狀態只能是以下其中一種:成功、失敗。根據代理程式的重新啟動原則,由代理程式有意終止的工作器不會顯示在- return_values或- failures中。
代理程式中的看門狗¶
如果在 `LocalElasticAgent` 程序中定義了值為 1 的環境變數 TORCHELASTIC_ENABLE_FILE_TIMER,則可以在 `LocalElasticAgent` 中啟用基於命名管道的監看程式。或者,可以使用命名管道的唯一檔案名稱設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE`,`LocalElasticAgent` 會在內部建立唯一的檔案名稱並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數會傳播到工作者程序,讓它們可以連線到 `LocalElasticAgent` 使用的相同命名管道。
健康檢查伺服器¶
如果在 `LocalElasticAgent` 程序中定義了環境變數 TORCHELASTIC_HEALTH_CHECK_PORT,則可以在 `LocalElasticAgent` 中啟用健康檢查監控伺服器。新增健康檢查伺服器的介面,可以透過在指定的埠號上啟動 tcp/http 伺服器來擴展。此外,健康檢查伺服器將具有回呼以檢查看門狗是否處於活動狀態。
- class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]¶
- 健康檢查監控伺服器的介面,可以透過在指定的埠上啟動 tcp/http 伺服器來擴展。 - 參數