Rendezvous¶
在 Torch Distributed Elastic 的上下文中,我們使用術語 rendezvous 指代一種特定的功能,該功能結合了分散式同步原語和對等節點發現。
Torch Distributed Elastic 使用它來聚集訓練作業的參與者(即節點),使它們都能就相同的參與者列表和每個參與者的角色達成一致,並在訓練何時可以開始/恢復的問題上做出一致的集體決定。
Torch Distributed Elastic rendezvous 提供以下關鍵功能:
屏障 (Barrier):
執行 rendezvous 的節點都將阻塞,直到 rendezvous 被認為是完成的 - 這發生在至少有總數達到 min 個節點加入 rendezvous 屏障(針對同一個作業)時。這也意味著屏障的大小不一定是固定的。
達到 min 個節點後會有一個額外的短暫等待時間 - 這用於確保 rendezvous 不會“過快”完成(這可能會將同時嘗試加入的其他節點排除在外)。
如果在屏障處聚集了 max 個節點,則 rendezvous 立即完成。
還有一個總體超時,如果 min 個節點始終未達到,則會導致 rendezvous 失敗 - 這旨在作為一個簡單的故障保護,以幫助釋放部分分配的作業資源,以防資源管理器出現問題,並且應解釋為不可重試。
排他性 (Exclusivity):
一個簡單的分散式屏障是不夠的,因為我們還需要確保在任何給定時間(對於給定作業)只存在一個節點組。換句話說,新節點(即延遲加入的節點)不應該能夠為同一個作業形成一個並行獨立的 worker 組。
Torch Distributed Elastic rendezvous 確保如果一個節點組已經完成了 rendezvous(因此可能已經在訓練中),那麼嘗試進行 rendezvous 的其他“延遲”節點只會宣告自己正在等待,並且必須等到(之前完成的)現有 rendezvous 首先被銷燬。
一致性 (Consistency):
當 rendezvous 完成時,其所有成員將就作業成員資格以及每個成員在其中的角色達成一致。此角色由一個整數表示,稱為 rank,其值介於 0 到 world size 之間。
請注意,rank 是不穩定的,這意味著同一個節點在下一次(重新)rendezvous 中可能會被分配不同的 rank。
容錯性 (Fault-tolerance):
Torch Distributed Elastic rendezvous 設計用於在 rendezvous 過程中容忍節點故障。如果在加入 rendezvous 和 rendezvous 完成之間,一個程序崩潰(或丟失網路連線等),則會與剩餘的健康節點自動重新進行 rendezvous。
一個節點也可能在完成 rendezvous 之後(或被其他節點觀察到已經完成 rendezvous)失敗 - 這種情況將由 Torch Distributed Elastic 的 train_loop 處理(其中也會觸發重新 rendezvous)。
共享鍵值儲存 (Shared key-value store):
當 rendezvous 完成時,會建立一個共享的鍵值儲存並返回。該儲存實現了 torch.distributed.Store API(請參閱分散式通訊文件)。
此儲存僅由已完成 rendezvous 的成員共享。它旨在由 Torch Distributed Elastic 用於交換初始化作業控制和資料平面所需的資訊。
等待中的 worker 和 rendezvous 關閉:
Torch Distributed Elastic rendezvous handler 物件提供了附加功能,這些功能技術上不是 rendezvous 過程的一部分:
查詢有多少 worker 延遲到達了屏障,他們可以參與下一次 rendezvous。
將 rendezvous 設定為closed,以向所有節點發出訊號,表明不參與下一次 rendezvous。
DynamicRendezvousHandler:
Torch Distributed Elastic 提供 DynamicRendezvousHandler 類,該類實現了上述 rendezvous 機制。它是一個與後端無關的型別,期望在構建時指定一個特定的 RendezvousBackend 例項。
Torch 分散式使用者可以實現自己的後端型別,也可以使用 PyTorch 提供的以下實現之一:
C10dRendezvousBackend: 使用 C10d store(預設為TCPStore)作為 rendezvous 後端。使用 C10d store 的主要優勢是它無需第三方依賴(如 etcd)即可建立 rendezvous。EtcdRendezvousBackend: 取代了舊的EtcdRendezvousHandler類。將EtcdRendezvousBackend例項傳遞給DynamicRendezvousHandler功能上等同於例項化一個EtcdRendezvousHandler。store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
下面是描述 rendezvous 工作原理的狀態圖。
登錄檔 (Registry)¶
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[原始碼][原始碼]¶
儲存構建
RendezvousHandler的引數。- 引數
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[原始碼][原始碼]¶
表示
RendezvousHandler後端的登錄檔。
處理程式 (Handler)¶
- class torch.distributed.elastic.rendezvous.RendezvousHandler[原始碼][原始碼]¶
主要 rendezvous 介面。
注意
分散式 Torch 使用者通常不需要實現自己的
RendezvousHandler。基於 C10d Store 的實現已經提供,建議大多數使用者使用。- abstract get_run_id()[原始碼][原始碼]¶
返回 rendezvous 的執行 ID。
執行 ID 是一個使用者定義的 ID,用於唯一標識分散式應用程式的例項。它通常對映到作業 ID,並用於允許節點加入正確的分散式應用程式。
- 返回型別
- abstract is_closed()[原始碼][原始碼]¶
檢查 rendezvous 是否已關閉。
關閉的 rendezvous 意味著所有將來在同一作業中重新進行 rendezvous 的嘗試都將失敗。
is_closed()和set_closed()具有最終傳播的語義,不應用於同步。其意圖是,如果至少有一個節點決定作業完成,它將關閉 rendezvous,其他節點很快就會觀察到這一點並停止執行。- 返回型別
- abstract next_rendezvous()[原始碼][原始碼]¶
rendezvous 屏障的主要入口點。
阻塞直到 rendezvous 完成並且當前程序包含在形成的 worker 組中,或者發生超時,或者 rendezvous 被標記為關閉。
- 返回
RendezvousInfo的例項。- 引發
RendezvousClosedError – Rendezvous 已關閉。
RendezvousConnectionError – 連線到 rendezvous 後端失敗。
RendezvousStateError – Rendezvous 狀態損壞。
RendezvousTimeoutError – Rendezvous 未按時完成。
- 返回型別
- abstract num_nodes_waiting()[原始碼][原始碼]¶
返回延遲到達 rendezvous 屏障因而未包含在當前 worker 組中的節點數量。
呼叫者應定期呼叫此方法檢查是否有新節點正在等待加入作業,如果是,則透過呼叫
next_rendezvous()(重新 rendezvous) 允許它們加入。- 返回型別
- abstract shutdown()[原始碼][原始碼]¶
關閉為 rendezvous 開啟的所有資源。
示例
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 返回型別
- property use_agent_store: bool¶
指示
next_rendezvous()返回的 store 引用可以與使用者應用程式共享,並在應用程式生命週期中可用。Rendezvous handler 實現將以
RendezvousStoreInfo例項的形式共享 store 詳細資訊。應用程式習慣上使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢 store。
資料類 (Dataclasses)¶
- class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[原始碼][原始碼]¶
儲存 rendezvous 相關資訊。
異常 (Exceptions)¶
- class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[原始碼][原始碼]¶
在 rendezvous 已關閉時引發。
- class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source][source]¶
當 rendezvous 未按時完成時引發此錯誤。
- class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source][source]¶
當連線到 rendezvous 後端失敗時引發此錯誤。
實現¶
動態 Rendezvous¶
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source][source]¶
根據指定引數建立一個新的
DynamicRendezvousHandler例項。- 引數
store (Store) – 作為 rendezvous 的一部分返回的 C10d 儲存例項。
backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端例項。
- 返回型別
引數
描述
join_timeout
rendezvous 預期完成的總時間(以秒為單位)。預設為 600 秒。
last_call_timeout
達到最小節點數後,在完成 rendezvous 之前額外等待的時間(以秒為單位)。預設為 30 秒。
close_timeout
在呼叫
RendezvousHandler.set_closed()或RendezvousHandler.shutdown()後,rendezvous 預期在其中關閉的時間(以秒為單位)。預設為 30 秒。heartbeat
預期完成保活心跳的時間(以秒為單位)
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source][source]¶
表示一個負責在節點集合之間建立 rendezvous 的處理程式。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source][source]¶
建立一個新的
DynamicRendezvousHandler例項。- 引數
run_id (str) – rendezvous 的執行 ID。
store (Store) – 作為 rendezvous 的一部分返回的 C10d 儲存例項。
backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端例項。
min_nodes (int) – 允許加入 rendezvous 的最小節點數。
max_nodes (int) – 允許加入 rendezvous 的最大節點數。
timeout (Optional[RendezvousTimeout]) – rendezvous 的超時配置。
keep_alive_interval (int) – 節點在傳送心跳以在 rendezvous 中保持活動狀態之前等待的時間量。
keep_alive_max_attempt (int) – 節點在經歷最大失敗心跳嘗試次數後被視為死亡。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source][source]¶
表示一個儲存 rendezvous 狀態的後端例項。
- abstract get_state()[source][source]¶
獲取 rendezvous 狀態。
- 返回
一個元組,包含編碼的 rendezvous 狀態及其 fence token;如果後端中未找到狀態,則為
None。- 引發
RendezvousConnectionError – 連線到後端失敗。
RendezvousStateError – Rendezvous 狀態損壞。
- 返回型別
- abstract set_state(state, token=None)[source][source]¶
設定 rendezvous 狀態。
新 rendezvous 狀態是條件性設定的
如果指定的
token與後端儲存的 fence token 匹配,則狀態將被更新。新狀態將連同其 fence token 一起返回給呼叫者。如果指定的
token與後端儲存的 fence token 不匹配,則狀態將不會更新;取而代之的是,現有狀態將連同其 fence token 一起返回給呼叫者。如果指定的
token是None,則僅在後端沒有現有狀態的情況下才會設定新狀態。新狀態或現有狀態(連同其 fence token)將被返回給呼叫者。
- 引數
state (bytes) – 編碼後的 rendezvous 狀態。
token (Optional[Any]) – 透過之前呼叫
get_state()或set_state()獲取的可選 fence token。
- 返回
一個元組,包含序列化的 rendezvous 狀態、其 fence token 以及一個布林值,指示我們的設定嘗試是否成功。
- 引發
RendezvousConnectionError – 連線到後端失敗。
RendezvousStateError – Rendezvous 狀態損壞。
- 返回型別
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source][source]¶
儲存 rendezvous 的超時配置。
C10d 後端¶
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source][source]¶
根據指定引數建立一個新的
C10dRendezvousBackend例項。引數
描述
store_type
C10d 儲存的型別。目前支援的型別有 “tcp” 和 “file”,分別對應於
torch.distributed.TCPStore和torch.distributed.FileStore。預設為 “tcp”。read_timeout
儲存操作的讀取超時時間(以秒為單位)。預設為 60 秒。
注意:這僅適用於
torch.distributed.TCPStore。對於不接受 timeout 作為引數的torch.distributed.FileStore,它不相關。is_host
一個布林值,指示此後端例項是否將託管 C10d 儲存。如果未指定,將透過將本機的 hostname 或 IP 地址與指定的 rendezvous endpoint 進行匹配來啟發式推斷。預設為
None。請注意,此配置選項僅適用於
torch.distributed.TCPStore。在正常情況下可以安全地跳過它;僅當其值無法正確確定時才需要它(例如,rendezvous endpoint 的 hostname 是 CNAME 或與機器的 FQDN 不匹配)。
Etcd 後端¶
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source][source]¶
根據指定引數建立一個新的
EtcdRendezvousBackend例項。引數
描述
read_timeout
etcd 操作的讀取超時時間(以秒為單位)。預設為 60 秒。
protocol
用於與 etcd 通訊的協議。有效值為 “http” 和 “https”。預設為 “http”。
ssl_cert
與 HTTPS 一起使用的 SSL 客戶端證書路徑。預設為
None。ssl_cert_key
與 HTTPS 一起使用的 SSL 客戶端證書私鑰路徑。預設為
None。ca_cert
根 SSL 頒發機構證書路徑。預設為
None。
Etcd 會合(舊版)¶
警告
DynamicRendezvousHandler 類已取代 EtcdRendezvousHandler 類,推薦大多數使用者使用前者。EtcdRendezvousHandler 已處於維護模式,將來會被棄用。
- class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source][source]¶
實現了由
torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous支援的torch.distributed.elastic.rendezvous.RendezvousHandler介面。EtcdRendezvousHandler使用 URL 來配置要使用的會合型別,並將特定於實現的配置傳遞給會合模組。基本的 etcd 會合配置 URL 如下所示:etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://:2379/1234?min_workers=1&max_workers=3
上述 URL 的解釋如下:
使用註冊了
etcd方案的會合處理器要使用的
etcd端點是localhost:2379job_id == 1234在 etcd 中用作字首(這允許多個作業共享一個共同的 etcd 伺服器,只要job_ids保證唯一)。請注意,作業 ID 可以是任何字串(例如,不一定是數字),只要它是唯一的。min_workers=1和max_workers=3指定了成員規模的範圍 - Torch Distributed Elastic 只要叢集規模大於或等於min_workers就會開始執行作業,並允許最多max_workers加入叢集。
下面是可傳遞給 etcd 會合的完整引數列表:
引數
描述
min_workers
會合有效的最小工作程序數
max_workers
允許加入的最大工作程序數
timeout
next_rendezvous 預計成功的總超時時間(預設為 600 秒)
last_call_timeout
達到最小工作程序數後的額外等待時間(“最後召集”)(預設為 30 秒)
etcd_prefix
路徑字首(從 etcd 根目錄開始),所有 etcd 節點都將在此路徑下建立(預設為
/torchelastic/p2p)
Etcd 儲存¶
EtcdStore 是當 etcd 用作會合後端時,由 next_rendezvous() 返回的 C10d Store 例項型別。
- class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source][source]¶
透過借用會合 etcd 例項來實現 c10 Store 介面。
這是由
EtcdRendezvous返回的儲存物件。
Etcd 伺服器¶
EtcdServer 是一個便捷類,使您能夠輕鬆地在子程序中啟動和停止 etcd 伺服器。這對於測試或單節點(多工作程序)部署非常有用,在這種情況下,手動設定 etcd 伺服器會很麻煩。
警告
對於生產環境和多節點部署,請考慮正確部署高可用的 etcd 伺服器,因為這是您的分散式作業的單點故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source][source]¶
注意
在 etcd server v3.4.3 上測試透過。
在隨機可用埠上啟動和停止本地獨立 etcd 伺服器。對於單節點、多工作程序啟動或測試很有用,在這種情況下,sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。
此類註冊了一個終止處理程式,用於在退出時關閉 etcd 子程序。此終止處理程式不能替代呼叫
stop()方法。使用以下回退機制來查詢 etcd 二進位制檔案:
使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH
如果存在,使用
<此檔案根目錄>/bin/etcd使用
PATH中的etcd
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 引數
etcd_binary_path – etcd 伺服器二進位制檔案路徑(有關回退路徑,請參見上文)