捷徑

集合

在 Torch Distributed Elastic 的上下文中,我們使用術語「集合」來指稱結合了「分散式同步」原語和「對等探索」的特定功能。

Torch Distributed Elastic 使用它來收集訓練作業的參與者(即節點),以便他們都能同意相同的參與者列表和每個人的角色,並就何時可以開始/恢復訓練做出一致的集體決定。

Torch Distributed Elastic 集合提供以下關鍵功能

障礙:

執行集合的節點都將阻塞,直到集合被視為完成 - 這發生在至少 min 個節點(針對相同的作業)已加入集合障礙時。這也意味著障礙不一定是固定大小的。

到達 min 個節點後,還有一個額外的短暫等待時間 - 這是為了確保集合不會「過快」完成(這可能會排除在大致相同的時間嘗試加入的其他節點)。

如果在障礙處收集到 max 個節點,則集合會立即完成。

還有一個總體超時,如果從未達到 min 個節點,則會導致集合失敗 - 這意味著是一個簡單的故障安全,以幫助釋放部分分配的作業資源,以防資源管理器出現問題,並且應被解釋為不可重試。

排他性:

一個簡單的分散式障礙是不夠的,因為我們還需要確保在任何給定時間(對於給定的作業)只存在一組節點。換句話說,新節點(即延遲加入)不應能夠為同一作業形成平行的獨立工作節點組。

Torch Distributed Elastic 集合確保,如果一組節點已經完成集合(因此可能已經在訓練),那麼嘗試集合的其他「延遲」節點將只會宣布自己處於等待狀態,並且必須等到(先前完成的)現有集合首先被銷毀。

一致性:

當集合完成時,其所有成員都將同意作業成員資格和每個人的角色。此角色使用一個稱為等級的整數表示,介於 0 和世界大小之間。

請注意,等級「不穩定」,因為在下一個(重新)集合中,同一個節點可能會被分配不同的等級。

容錯:

Torch Distributed Elastic 集合旨在容忍集合過程中的節點故障。如果程序在加入集合和集合完成之間崩潰(或失去網路連接等),則將使用剩餘的健康節點自動進行重新集合。

節點也可能在完成(或「被其他節點觀察到」已完成)集合「後」失敗 - 這種情況將由 Torch Distributed Elastic train_loop 處理(它也會觸發重新集合)。

共用鍵值儲存:

當集合完成時,會建立並返回一個共用鍵值儲存。此儲存實作了 torch.distributed.Store API(請參閱 分散式通訊文件)。

此儲存僅由已完成集合的成員共用。Torch Distributed Elastic 旨在使用它來交換初始化作業控制和資料平面所需的資訊。

等待工作節點和集合關閉:

Torch Distributed Elastic 集合處理常式物件提供額外功能,這些功能在技術上不屬於集合過程的一部分

  1. 查詢有多少工作節點延遲到達障礙,誰可以參與「下一次」集合。

  2. 將集合設定為「已關閉」以通知所有節點不要參與下一次集合。

DynamicRendezvousHandler:

Torch Distributed Elastic 附帶 DynamicRendezvousHandler 類別,該類別實作了上述集合機制。它是一種與後端無關的類型,預計在建構期間指定一個特定的 RendezvousBackend 執行個體。

Torch 分散式使用者可以實作自己的後端類型,也可以使用 PyTorch 附帶的以下實作之一

以下是描述會合工作原理的狀態圖。

../_images/etcd_rdzv_diagram.png

註冊表

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[原始碼]

保存用於建構 RendezvousHandler 的參數。

參數
  • backend (str) – 用於處理會合的後端名稱。

  • endpoint (str) – 會合的端點,通常採用 <主機名稱>[:<端口號>] 的形式。

  • run_id (str) – 會合的 ID。

  • min_nodes (int) – 允許加入會合的最小節點數。

  • max_nodes (int) – 允許加入會合的最大節點數。

  • local_addr (Optional[str]) – 本機節點的地址。

  • **kwargs – 指定後端的其他參數。

get(key, default=None)[原始碼]

如果 key 存在,則返回 key 的值,否則返回 default

返回類型

任何

get_as_bool(key, default=None)[原始碼]

bool 類型返回 key 的值。

返回類型

Optional[bool]

get_as_int(key, default=None)[原始碼]

int 類型返回 key 的值。

返回類型

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[原始碼]

表示 RendezvousHandler 後端的註冊表。

處理常式

class torch.distributed.elastic.rendezvous.RendezvousHandler[原始碼]

主要的會合介面。

注意

分散式 Torch 使用者通常不需要實現自己的 RendezvousHandler。已經提供基於 C10d 存儲的實現,建議大多數使用者使用。

abstract get_backend()[原始碼]

返回會合後端的名稱。

返回類型

str

abstract get_run_id()[原始碼]

返回會合的運行 ID。

運行 ID 是使用者定義的 ID,用於唯一標識分散式應用程式的實例。它通常映射到作業 ID,用於允許節點加入正確的分散式應用程式。

返回類型

str

abstract is_closed()[原始碼]

檢查會合是否已關閉。

關閉的會合表示在同一個作業中所有未來的重新會合嘗試都將失敗。

is_closed()set_closed() 具有最終傳播的語義,不應用於同步。其目的是,如果至少有一個節點決定作業已完成,它將關閉會合,其他節點將很快觀察到這一點並停止運行。

返回類型

bool

abstract next_rendezvous()[原始碼]

會合障礙的主要入口點。

阻塞,直到會合完成並且當前程序包含在形成的工作組中,或者發生超時,或者會合被標記為已關閉。

返回

RendezvousInfo 的實例。

引發
返回類型

RendezvousInfo

abstract num_nodes_waiting()[原始碼]

回傳在集合點障礙處延遲到達的節點數量,因此這些節點不包含在目前的工作節點群組中。

呼叫者應定期呼叫此方法,以檢查是否有新的節點正在等待加入作業,如果有,則透過呼叫 next_rendezvous()(重新集合)來允許它們加入。

返回類型

int

abstract set_closed()[來源]

將集合點標記為已關閉。

abstract shutdown()[來源]

關閉所有已開啟的集合點資源。

範例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回類型

bool

property use_agent_store: bool

表示 next_rendezvous() 回傳的儲存體參考可以與使用者應用程式共用,並且在應用程式生命週期中都可使用。

集合點處理程式實作將以 RendezvousStoreInfo 的實例共用儲存體詳細資訊。應用程式依慣例使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢儲存體。

資料類別

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[來源]

儲存集合點的相關資訊。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[來源]

可用於啟動訓練器分散式通訊的儲存體地址和埠

static build(rank, store)[來源]

工廠方法,在 rank0 主機上尋找未使用的埠,並在所有 rank 中使用地址/埠資訊。

如果已知 master_addr/master_port(在共用現有的 TCP 儲存體伺服器時很有用),請使用建構函數。

返回類型

RendezvousStoreInfo

例外狀況

class torch.distributed.elastic.rendezvous.api.RendezvousError[來源]

表示集合點錯誤的基底類型。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[來源]

在集合點關閉時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[來源]

在集合點未在時間內完成時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[來源]

在與集合點後端的連線失敗時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[來源]

在集合點的狀態損壞時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[來源]

當節點未包含在集合點中並正常退出時引發。

例外狀況是一種退出堆疊的機制,但並不表示失敗。

實作

動態集合點

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[來源]

從指定的參數建立新的 DynamicRendezvousHandler

參數
  • store (Store) – 要作為集合點的一部分回傳的 C10d 儲存體。

  • backend (RendezvousBackend) – 用於儲存集合點狀態的後端。

返回類型

DynamicRendezvousHandler

參數

說明

join_timeout

預期完成集合點的總時間(以秒為單位)。預設值為 600 秒。

last_call_timeout

在達到最小節點數量後,在完成集合點之前的額外等待時間(以秒為單位)。預設值為 30 秒。

close_timeout

在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 之後,預期關閉集合點的時間(以秒為單位)。預設值為 30 秒。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[來源]

表示在多個節點之間設定集合點的處理程式。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[來源]

建立新的 DynamicRendezvousHandler

參數
  • run_id (str) – 集合點的執行 ID。

  • store (Store) – 要作為集合點的一部分回傳的 C10d 儲存體。

  • backend (RendezvousBackend) – 用於儲存集合點狀態的後端。

  • min_nodes (int) – 允許加入會合的最小節點數。

  • max_nodes (int) – 允許加入會合的最大節點數。

  • local_addr (Optional[str]) – 本機節點地址。

  • timeout (Optional[RendezvousTimeout]) – 集合點的逾時設定。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]

表示持有會合狀態的後端。

abstract get_state()[source]

取得會合狀態。

返回

編碼後的會合狀態及其隔離記號的元組,如果在後端找不到狀態,則為 None

引發
返回類型

Optional[Tuple[bytes, Any]]

abstract property name: str

取得後端的名稱。

abstract set_state(state, token=None)[source]

設定會合狀態。

新的會合狀態會根據條件設定

  • 如果指定的 token 與儲存在後端的隔離記號相符,則會更新狀態。新的狀態將與其隔離記號一起返回給呼叫者。

  • 如果指定的 token 與儲存在後端的隔離記號不符,則不會更新狀態;相反的,現有的狀態將與其隔離記號一起返回給呼叫者。

  • 如果指定的 tokenNone,則只有在後端中沒有現有狀態時才會設定新的狀態。新的狀態或現有的狀態將與其隔離記號一起返回給呼叫者。

參數
  • state (bytes) – 編碼後的會合狀態。

  • token (Optional[Any]) – 由先前呼叫 get_state()set_state() 擷取的選用隔離記號。

返回

序列化後的會合狀態、其隔離記號和一個布林值,表示我們的設定嘗試是否成功的元組。

引發
返回類型

Optional[Tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]

持有會合的逾時設定。

參數
  • join (Optional[timedelta]) – 預期會合完成的時間。

  • last_call (Optional[timedelta]) – 一旦會合具有最少數量的必要參與者後,在完成會合之前的額外等待時間。

  • close (Optional[timedelta]) – 在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 之後,預期會合關閉的時間。

  • keep_alive – 預期完成保持活動訊號的時間。

property close: timedelta

取得關閉逾時。

property heartbeat: timedelta

取得保持活動訊號逾時。

property join: timedelta

取得加入逾時。

property last_call: timedelta

取得最後呼叫逾時。

C10d 後端

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]

從指定的參數建立新的 C10dRendezvousBackend

參數

說明

store_type

C10d 儲存庫的類型。目前支援的類型為「tcp」和「file」,分別對應於 torch.distributed.TCPStoretorch.distributed.FileStore。預設值為「tcp」。

read_timeout

儲存庫操作的讀取逾時(以秒為單位)。預設值為 60 秒。

請注意,這僅適用於 torch.distributed.TCPStore。這與 torch.distributed.FileStore 無關,因為它不將逾時作為參數。

is_host

一個布林值,表示此後端執行個體是否將託管 C10d 儲存庫。如果未指定,它將透過將此機器的網域名稱或 IP 位址與指定的會合端點進行比對來啟發式地推斷。預設值為 None

請注意,此設定選項僅適用於 torch.distributed.TCPStore。在正常情況下,您可以安全地跳過它;只有當無法正確確定其值時才需要它(例如,會合端點的網域名稱為 CNAME 或與機器的 FQDN 不符)。

返回類型

Tuple[C10dRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[來源]

表示以 C10d 為基礎的會合後端。

參數
get_state()[來源]

請參閱基底類別。

返回類型

Optional[Tuple[bytes, Any]]

property name: str

請參閱基底類別。

set_state(state, token=None)[來源]

請參閱基底類別。

返回類型

Optional[Tuple[bytes, Any, bool]]

Etcd 後端

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[來源]

從指定的參數建立新的 EtcdRendezvousBackend

參數

說明

read_timeout

etcd 操作的讀取逾時(以秒為單位)。預設值為 60 秒。

協定

用於與 etcd 通訊的協定。有效值為「http」和「https」。預設值為「http」。

ssl_cert

要與 HTTPS 搭配使用的 SSL 用戶端憑證的路徑。預設值為 None

ssl_cert_key

要與 HTTPS 搭配使用的 SSL 用戶端憑證私密金鑰的路徑。預設值為 None

ca_cert

根 SSL 機構憑證的路徑。預設值為 None

返回類型

Tuple[EtcdRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[來源]

表示以 etcd 為基礎的會合後端。

參數
  • client (Client) – 用於與 etcd 通訊的 etcd.Client 執行個體。

  • run_id (str) – 集合點的執行 ID。

  • key_prefix (Optional[str]) – 要在 etcd 中儲存會合狀態的路徑。

  • ttl (Optional[int]) – 會合狀態的 TTL。如果未指定,則預設為兩小時。

get_state()[來源]

請參閱基底類別。

返回類型

Optional[Tuple[bytes, Any]]

property name: str

請參閱基底類別。

set_state(state, token=None)[來源]

請參閱基底類別。

返回類型

Optional[Tuple[bytes, Any, bool]]

Etcd 會合(舊版)

警告

DynamicRendezvousHandler 類別取代了 EtcdRendezvousHandler 類別,建議大多數使用者使用。 EtcdRendezvousHandler 處於維護模式,將來會被取代。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl)[來源]

實作由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支援的 torch.distributed.elastic.rendezvous.RendezvousHandler 介面。 EtcdRendezvousHandler 使用 URL 來設定要使用的會合類型,並將實作特定的組態傳遞給會合模組。基本的 etcd 會合組態 URL 如下所示

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://:2379/1234?min_workers=1&max_workers=3

以上 URL 的解讀如下

  1. 使用向 etcd 配置註冊的會合處理常式

  2. 要使用的 etcd 端點為 localhost:2379

  3. job_id == 1234 在 etcd 中用作前置字元(這允許在多個作業共用一個 etcd 伺服器,只要 job_ids 保證是唯一的)。請注意,作業 ID 可以是任何字串(例如,不需要是數字),只要它是唯一的即可。

  4. min_workers=1max_workers=3 指定成員大小的範圍 - 只要叢集大小大於或等於 min_workers,Torch Distributed Elastic 就會開始執行作業,並允許最多 max_workers 個成員加入叢集。

以下是可傳遞給 etcd 會合的完整參數清單

參數

說明

min_workers

會合有效的最小工作者數量

max_workers

要允許的最大工作者數量

timeout

預期 next_rendezvous 在其中成功的總逾時時間(預設為 600 秒)

last_call_timeout

達到最小工作者數量後的額外等待時間(「最後呼叫」)(預設為 30 秒)

etcd_prefix

路徑前置字元(從 etcd 根目錄開始),所有 etcd 節點都將在其內建立(預設為 /torchelastic/p2p

Etcd 儲存區

EtcdStore 是在使用 etcd 作為會合後端時,由 next_rendezvous() 傳回的 C10d Store 執行個體類型。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[來源]

藉由搭載會合 etcd 執行個體來實作 c10 Store 介面。

這是 EtcdRendezvous 傳回的儲存區物件。

add(key, num)[來源]

以原子方式將值遞增一個整數量。

整數使用以 10 為底的字串表示。如果金鑰不存在,則會假設預設值為 0

返回

新的(遞增後的)值

返回類型

int

check(keys)[來源]

檢查是否所有金鑰都立即存在(不等待)。

返回類型

bool

get(key)[原始碼]

透過金鑰取得值,可能會進行阻斷等待。

如果金鑰沒有立即出現,將會進行阻斷等待,最多等待 timeout 時間,或直到金鑰被發布。

返回

(bytes)

引發

LookupError - 如果金鑰在逾時後仍未發布

返回類型

位元組

set(key, value)[原始碼]

將一個鍵值對寫入 EtcdStore

金鑰和值都可以是 Python 的 strbytes

wait(keys, override_timeout=None)[原始碼]

等待所有金鑰都被發布,或直到逾時。

引發

LookupError - 如果發生逾時

Etcd 伺服器

EtcdServer 是一個方便的類別,可以讓您輕鬆地在子進程上啟動和停止 etcd 伺服器。這對於測試或單一節點(多個工作進程)的部署非常有用,因為在這些情況下手動設置一個 etcd 伺服器會很麻煩。

警告

對於生產環境和多節點部署,請考慮正確部署一個高可用的 etcd 伺服器,因為這是分散式作業的單點故障點。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[原始碼]

注意

在 etcd 伺服器 v3.4.3 上測試過。

在隨機的可用端口上啟動和停止一個本地的獨立 etcd 伺服器。這對於單一節點、多個工作進程的啟動或測試非常有用,因為在這些情況下,使用一個附屬的 etcd 伺服器比單獨設置一個 etcd 伺服器更方便。

這個類別會註冊一個終止處理程序,以便在退出時關閉 etcd 子進程。這個終止處理程序不能取代呼叫 stop() 方法。

使用以下後備機制來尋找 etcd 二進制文件

  1. 使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH

  2. 如果存在,則使用 <此 檔案 根目錄>/bin/etcd

  3. PATH 中使用 etcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
參數

etcd_binary_path – etcd 伺服器二進制文件的路徑(請參閱上面的後備路徑)

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並取得問題解答

查看資源