快捷方式

Rendezvous

在 Torch Distributed Elastic 的上下文中,我們使用術語 rendezvous 指代一種特定的功能,該功能結合了分散式同步原語和對等節點發現

Torch Distributed Elastic 使用它來聚集訓練作業的參與者(即節點),使它們都能就相同的參與者列表和每個參與者的角色達成一致,並在訓練何時可以開始/恢復的問題上做出一致的集體決定。

Torch Distributed Elastic rendezvous 提供以下關鍵功能:

屏障 (Barrier):

執行 rendezvous 的節點都將阻塞,直到 rendezvous 被認為是完成的 - 這發生在至少有總數達到 min 個節點加入 rendezvous 屏障(針對同一個作業)時。這也意味著屏障的大小不一定是固定的。

達到 min 個節點後會有一個額外的短暫等待時間 - 這用於確保 rendezvous 不會“過快”完成(這可能會將同時嘗試加入的其他節點排除在外)。

如果在屏障處聚集了 max 個節點,則 rendezvous 立即完成。

還有一個總體超時,如果 min 個節點始終未達到,則會導致 rendezvous 失敗 - 這旨在作為一個簡單的故障保護,以幫助釋放部分分配的作業資源,以防資源管理器出現問題,並且應解釋為不可重試。

排他性 (Exclusivity):

一個簡單的分散式屏障是不夠的,因為我們還需要確保在任何給定時間(對於給定作業)只存在一個節點組。換句話說,新節點(即延遲加入的節點)不應該能夠為同一個作業形成一個並行獨立的 worker 組。

Torch Distributed Elastic rendezvous 確保如果一個節點組已經完成了 rendezvous(因此可能已經在訓練中),那麼嘗試進行 rendezvous 的其他“延遲”節點只會宣告自己正在等待,並且必須等到(之前完成的)現有 rendezvous 首先被銷燬。

一致性 (Consistency):

當 rendezvous 完成時,其所有成員將就作業成員資格以及每個成員在其中的角色達成一致。此角色由一個整數表示,稱為 rank,其值介於 0 到 world size 之間。

請注意,rank 是不穩定的,這意味著同一個節點在下一次(重新)rendezvous 中可能會被分配不同的 rank。

容錯性 (Fault-tolerance):

Torch Distributed Elastic rendezvous 設計用於在 rendezvous 過程中容忍節點故障。如果在加入 rendezvous 和 rendezvous 完成之間,一個程序崩潰(或丟失網路連線等),則會與剩餘的健康節點自動重新進行 rendezvous。

一個節點也可能在完成 rendezvous 之後(或其他節點觀察到已經完成 rendezvous)失敗 - 這種情況將由 Torch Distributed Elastic 的 train_loop 處理(其中也會觸發重新 rendezvous)。

共享鍵值儲存 (Shared key-value store):

當 rendezvous 完成時,會建立一個共享的鍵值儲存並返回。該儲存實現了 torch.distributed.Store API(請參閱分散式通訊文件)。

此儲存僅由已完成 rendezvous 的成員共享。它旨在由 Torch Distributed Elastic 用於交換初始化作業控制和資料平面所需的資訊。

等待中的 worker 和 rendezvous 關閉:

Torch Distributed Elastic rendezvous handler 物件提供了附加功能,這些功能技術上不是 rendezvous 過程的一部分:

  1. 查詢有多少 worker 延遲到達了屏障,他們可以參與下一次 rendezvous。

  2. 將 rendezvous 設定為closed,以向所有節點發出訊號,表明不參與下一次 rendezvous。

DynamicRendezvousHandler:

Torch Distributed Elastic 提供 DynamicRendezvousHandler 類,該類實現了上述 rendezvous 機制。它是一個與後端無關的型別,期望在構建時指定一個特定的 RendezvousBackend 例項。

Torch 分散式使用者可以實現自己的後端型別,也可以使用 PyTorch 提供的以下實現之一:

下面是描述 rendezvous 工作原理的狀態圖。

../_images/etcd_rdzv_diagram.png

登錄檔 (Registry)

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[原始碼][原始碼]

儲存構建 RendezvousHandler 的引數。

引數
  • backend (str) – 用於處理 rendezvous 的後端名稱。

  • endpoint (str) – rendezvous 的端點,通常格式為 <主機名>[:<埠>]。

  • run_id (str) – rendezvous 的 id。

  • min_nodes (int) – 允許加入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許加入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本地節點的地址。

  • **kwargs – 指定後端的附加引數。

get(key, default=None)[原始碼][原始碼]

如果 key 存在,則返回 key 的值;否則返回 default

返回型別

Any

get_as_bool(key, default=None)[原始碼][原始碼]

key 的值作為 bool 返回。

返回型別

Optional[bool]

get_as_int(key, default=None)[原始碼][原始碼]

key 的值作為 int 返回。

返回型別

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[原始碼][原始碼]

表示 RendezvousHandler 後端的登錄檔。

處理程式 (Handler)

class torch.distributed.elastic.rendezvous.RendezvousHandler[原始碼][原始碼]

主要 rendezvous 介面。

注意

分散式 Torch 使用者通常不需要實現自己的 RendezvousHandler。基於 C10d Store 的實現已經提供,建議大多數使用者使用。

abstract get_backend()[原始碼][原始碼]

返回 rendezvous 後端的名稱。

返回型別

str

abstract get_run_id()[原始碼][原始碼]

返回 rendezvous 的執行 ID。

執行 ID 是一個使用者定義的 ID,用於唯一標識分散式應用程式的例項。它通常對映到作業 ID,並用於允許節點加入正確的分散式應用程式。

返回型別

str

abstract is_closed()[原始碼][原始碼]

檢查 rendezvous 是否已關閉。

關閉的 rendezvous 意味著所有將來在同一作業中重新進行 rendezvous 的嘗試都將失敗。

is_closed()set_closed() 具有最終傳播的語義,不應用於同步。其意圖是,如果至少有一個節點決定作業完成,它將關閉 rendezvous,其他節點很快就會觀察到這一點並停止執行。

返回型別

bool

abstract next_rendezvous()[原始碼][原始碼]

rendezvous 屏障的主要入口點。

阻塞直到 rendezvous 完成並且當前程序包含在形成的 worker 組中,或者發生超時,或者 rendezvous 被標記為關閉。

返回

RendezvousInfo 的例項。

引發
返回型別

RendezvousInfo

abstract num_nodes_waiting()[原始碼][原始碼]

返回延遲到達 rendezvous 屏障因而未包含在當前 worker 組中的節點數量。

呼叫者應定期呼叫此方法檢查是否有新節點正在等待加入作業,如果是,則透過呼叫 next_rendezvous() (重新 rendezvous) 允許它們加入。

返回型別

int

abstract set_closed()[原始碼][原始碼]

將 rendezvous 標記為已關閉。

abstract shutdown()[原始碼][原始碼]

關閉為 rendezvous 開啟的所有資源。

示例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回型別

bool

property use_agent_store: bool

指示 next_rendezvous() 返回的 store 引用可以與使用者應用程式共享,並在應用程式生命週期中可用。

Rendezvous handler 實現將以 RendezvousStoreInfo 例項的形式共享 store 詳細資訊。應用程式習慣上使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢 store。

資料類 (Dataclasses)

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[原始碼][原始碼]

儲存 rendezvous 相關資訊。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[原始碼][原始碼]

儲存可用於引導訓練器分散式通訊的地址和埠。

static build(rank, store)[原始碼][原始碼]

工廠方法,在 rank0 主機上查詢未使用的空閒新埠,並將地址/埠資訊與所有 rank 共享。

如果 master_addr/master_port 已知(在共享現有 tcp store 伺服器時很有用),請使用建構函式。

引數
  • rank (int) – 當前節點的 rank

  • store (Store) – 用於 rendezvous 的 store

  • local_addr (Optional[str]) – 當前節點的地址,如果未提供,將從主機名解析

  • server_port (Optional[int]) – 當 TCPStore 被共享時,TCPStore 伺服器的埠。

返回型別

RendezvousStoreInfo

異常 (Exceptions)

class torch.distributed.elastic.rendezvous.api.RendezvousError[原始碼][原始碼]

表示 rendezvous 錯誤的基類。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[原始碼][原始碼]

在 rendezvous 已關閉時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source][source]

當 rendezvous 未按時完成時引發此錯誤。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source][source]

當連線到 rendezvous 後端失敗時引發此錯誤。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[source][source]

當 rendezvous 的狀態損壞時引發此錯誤。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[source][source]

當節點未包含在 rendezvous 中並優雅退出時引發此錯誤。

異常是一種退出棧的機制,但並不意味著失敗。

實現

動態 Rendezvous

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source][source]

根據指定引數建立一個新的 DynamicRendezvousHandler 例項。

引數
  • store (Store) – 作為 rendezvous 的一部分返回的 C10d 儲存例項。

  • backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端例項。

返回型別

DynamicRendezvousHandler

引數

描述

join_timeout

rendezvous 預期完成的總時間(以秒為單位)。預設為 600 秒。

last_call_timeout

達到最小節點數後,在完成 rendezvous 之前額外等待的時間(以秒為單位)。預設為 30 秒。

close_timeout

在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預期在其中關閉的時間(以秒為單位)。預設為 30 秒。

heartbeat

預期完成保活心跳的時間(以秒為單位)

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source][source]

表示一個負責在節點集合之間建立 rendezvous 的處理程式。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source][source]

建立一個新的 DynamicRendezvousHandler 例項。

引數
  • run_id (str) – rendezvous 的執行 ID。

  • store (Store) – 作為 rendezvous 的一部分返回的 C10d 儲存例項。

  • backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端例項。

  • min_nodes (int) – 允許加入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許加入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本地節點地址。

  • timeout (Optional[RendezvousTimeout]) – rendezvous 的超時配置。

  • keep_alive_interval (int) – 節點在傳送心跳以在 rendezvous 中保持活動狀態之前等待的時間量。

  • keep_alive_max_attempt (int) – 節點在經歷最大失敗心跳嘗試次數後被視為死亡。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source][source]

表示一個儲存 rendezvous 狀態的後端例項。

abstract get_state()[source][source]

獲取 rendezvous 狀態。

返回

一個元組,包含編碼的 rendezvous 狀態及其 fence token;如果後端中未找到狀態,則為 None

引發
返回型別

Optional[tuple[bytes, Any]]

abstract property name: str

獲取後端的名稱。

abstract set_state(state, token=None)[source][source]

設定 rendezvous 狀態。

新 rendezvous 狀態是條件性設定的

  • 如果指定的 token 與後端儲存的 fence token 匹配,則狀態將被更新。新狀態將連同其 fence token 一起返回給呼叫者。

  • 如果指定的 token 與後端儲存的 fence token 不匹配,則狀態將不會更新;取而代之的是,現有狀態將連同其 fence token 一起返回給呼叫者。

  • 如果指定的 tokenNone,則僅在後端沒有現有狀態的情況下才會設定新狀態。新狀態或現有狀態(連同其 fence token)將被返回給呼叫者。

引數
  • state (bytes) – 編碼後的 rendezvous 狀態。

  • token (Optional[Any]) – 透過之前呼叫 get_state()set_state() 獲取的可選 fence token。

返回

一個元組,包含序列化的 rendezvous 狀態、其 fence token 以及一個布林值,指示我們的設定嘗試是否成功。

引發
返回型別

Optional[tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source][source]

儲存 rendezvous 的超時配置。

引數
  • join (Optional[timedelta]) – rendezvous 預期在其中完成的時間。

  • last_call (Optional[timedelta]) – 在 rendezvous 達到所需最小參與者數量後,完成 rendezvous 之前的額外等待時間。

  • close (Optional[timedelta]) – 在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預期在其中關閉的時間。

  • heartbeat (Optional[timedelta]) – 預期完成保活心跳的時間。

property close: timedelta

獲取關閉超時時間。

property heartbeat: timedelta

獲取保活心跳超時時間。

property join: timedelta

獲取加入超時時間。

property last_call: timedelta

獲取 last call 超時時間。

C10d 後端

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source][source]

根據指定引數建立一個新的 C10dRendezvousBackend 例項。

引數

描述

store_type

C10d 儲存的型別。目前支援的型別有 “tcp” 和 “file”,分別對應於 torch.distributed.TCPStoretorch.distributed.FileStore。預設為 “tcp”。

read_timeout

儲存操作的讀取超時時間(以秒為單位)。預設為 60 秒。

注意:這僅適用於 torch.distributed.TCPStore。對於不接受 timeout 作為引數的 torch.distributed.FileStore,它不相關。

is_host

一個布林值,指示此後端例項是否將託管 C10d 儲存。如果未指定,將透過將本機的 hostname 或 IP 地址與指定的 rendezvous endpoint 進行匹配來啟發式推斷。預設為 None

請注意,此配置選項僅適用於 torch.distributed.TCPStore。在正常情況下可以安全地跳過它;僅當其值無法正確確定時才需要它(例如,rendezvous endpoint 的 hostname 是 CNAME 或與機器的 FQDN 不匹配)。

返回型別

tuple[torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source][source]

表示一個由 C10d 支援的 rendezvous 後端。

引數
get_state()[source][source]

參見基類。

返回型別

Optional[tuple[bytes, Any]]

property name: str

參見基類。

set_state(state, token=None)[source][source]

參見基類。

返回型別

Optional[tuple[bytes, Any, bool]]

Etcd 後端

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source][source]

根據指定引數建立一個新的 EtcdRendezvousBackend 例項。

引數

描述

read_timeout

etcd 操作的讀取超時時間(以秒為單位)。預設為 60 秒。

protocol

用於與 etcd 通訊的協議。有效值為 “http” 和 “https”。預設為 “http”。

ssl_cert

與 HTTPS 一起使用的 SSL 客戶端證書路徑。預設為 None

ssl_cert_key

與 HTTPS 一起使用的 SSL 客戶端證書私鑰路徑。預設為 None

ca_cert

根 SSL 頒發機構證書路徑。預設為 None

返回型別

tuple[torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source][source]

表示一個基於 etcd 的 rendezvous 後端。

引數
  • client (Client) – 用於與 etcd 通訊的 etcd.Client 例項。

  • run_id (str) – rendezvous 的執行 ID。

  • key_prefix (Optional[str]) – 在 etcd 中儲存 rendezvous 狀態的路徑字首。

  • ttl (Optional[int]) – rendezvous 狀態的 TTL(生存時間)。如果未指定,預設為兩小時。

get_state()[source][source]

參見基類。

返回型別

Optional[tuple[bytes, Any]]

property name: str

參見基類。

set_state(state, token=None)[source][source]

參見基類。

返回型別

Optional[tuple[bytes, Any, bool]]

Etcd 會合(舊版)

警告

DynamicRendezvousHandler 類已取代 EtcdRendezvousHandler 類,推薦大多數使用者使用前者。EtcdRendezvousHandler 已處於維護模式,將來會被棄用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source][source]

實現了由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支援的 torch.distributed.elastic.rendezvous.RendezvousHandler 介面。EtcdRendezvousHandler 使用 URL 來配置要使用的會合型別,並將特定於實現的配置傳遞給會合模組。基本的 etcd 會合配置 URL 如下所示:

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://:2379/1234?min_workers=1&max_workers=3

上述 URL 的解釋如下:

  1. 使用註冊了 etcd 方案的會合處理器

  2. 要使用的 etcd 端點是 localhost:2379

  3. job_id == 1234 在 etcd 中用作字首(這允許多個作業共享一個共同的 etcd 伺服器,只要 job_ids 保證唯一)。請注意,作業 ID 可以是任何字串(例如,不一定是數字),只要它是唯一的。

  4. min_workers=1max_workers=3 指定了成員規模的範圍 - Torch Distributed Elastic 只要叢集規模大於或等於 min_workers 就會開始執行作業,並允許最多 max_workers 加入叢集。

下面是可傳遞給 etcd 會合的完整引數列表:

引數

描述

min_workers

會合有效的最小工作程序數

max_workers

允許加入的最大工作程序數

timeout

next_rendezvous 預計成功的總超時時間(預設為 600 秒)

last_call_timeout

達到最小工作程序數後的額外等待時間(“最後召集”)(預設為 30 秒)

etcd_prefix

路徑字首(從 etcd 根目錄開始),所有 etcd 節點都將在此路徑下建立(預設為 /torchelastic/p2p

Etcd 儲存

EtcdStore 是當 etcd 用作會合後端時,由 next_rendezvous() 返回的 C10d Store 例項型別。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source][source]

透過借用會合 etcd 例項來實現 c10 Store 介面。

這是由 EtcdRendezvous 返回的儲存物件。

add(key, num)[source][source]

以原子方式將值按整數量遞增。

整數以十進位制字串表示。如果鍵不存在,則假定預設值為 0

返回

新的(遞增後的)值

返回型別

int

check(keys)[source][source]

檢查所有鍵是否立即存在(無需等待)。

返回型別

bool

get(key)[source][source]

按鍵獲取值,可能進行阻塞等待。

如果鍵不立即存在,將進行最長 timeout 持續時間的阻塞等待,直到鍵被髮布。

返回

(bytes)

引發

LookupError - 如果超時後鍵仍未釋出

返回型別

位元組

set(key, value)[source][source]

將鍵值對寫入 EtcdStore

鍵和值都可以是 Python strbytes

wait(keys, override_timeout=None)[source][source]

等待直到所有鍵都被髮布,或直到超時。

引發

LookupError - 如果發生超時

Etcd 伺服器

EtcdServer 是一個便捷類,使您能夠輕鬆地在子程序中啟動和停止 etcd 伺服器。這對於測試或單節點(多工作程序)部署非常有用,在這種情況下,手動設定 etcd 伺服器會很麻煩。

警告

對於生產環境和多節點部署,請考慮正確部署高可用的 etcd 伺服器,因為這是您的分散式作業的單點故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source][source]

注意

在 etcd server v3.4.3 上測試透過。

在隨機可用埠上啟動和停止本地獨立 etcd 伺服器。對於單節點、多工作程序啟動或測試很有用,在這種情況下,sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。

此類註冊了一個終止處理程式,用於在退出時關閉 etcd 子程序。此終止處理程式不能替代呼叫 stop() 方法。

使用以下回退機制來查詢 etcd 二進位制檔案:

  1. 使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH

  2. 如果存在,使用 <此檔案根目錄>/bin/etcd

  3. 使用 PATH 中的 etcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
引數

etcd_binary_path – etcd 伺服器二進位制檔案路徑(有關回退路徑,請參見上文)

文件

訪問全面的開發者文件

檢視文件

教程

獲取適合初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源