分散式通訊套件 - torch.distributed¶
注意事項
如需與分散式訓練相關的所有功能的簡要介紹,請參閱 PyTorch 分散式概觀。
後端¶
torch.distributed 支援三種內建後端,每種後端具有不同的功能。下表顯示了哪些函數可用於 CPU / CUDA 張量。如果用於建置 PyTorch 的實作支援 CUDA,則 MPI 僅支援 CUDA。
| 後端 | 
 | 
 | 
 | |||
|---|---|---|---|---|---|---|
| 裝置 | CPU | GPU | CPU | GPU | CPU | GPU | 
| send | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| recv | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| broadcast | ✓ | ✓ | ✓ | ? | ✘ | ✓ | 
| all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | 
| reduce | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| all_gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| scatter | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| reduce_scatter | ✘ | ✘ | ✘ | ✘ | ✘ | ✓ | 
| all_to_all | ✘ | ✘ | ✓ | ? | ✘ | ✓ | 
| barrier | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
PyTorch 附帶的後端¶
PyTorch 分散式套件支援 Linux(穩定)、MacOS(穩定)和 Windows(原型)。根據預設,對於 Linux,Gloo 和 NCCL 後端會在 PyTorch distributed 中建置並包含在內(僅在使用 CUDA 建置時才會包含 NCCL)。MPI 是一個可選的後端,只有在從原始碼建置 PyTorch 時才能包含在內。(例如,在已安裝 MPI 的主機上建置 PyTorch。)
注意事項
從 PyTorch v1.8 開始,Windows 支援所有集體通訊後端,但 NCCL 除外,如果 init_method 引數 init_process_group() 指向一個檔案,則它必須遵循以下架構
- 本機檔案系統, - init_method="file:///d:/tmp/some_file"
- 共用檔案系統, - init_method="file://////{machine_name}/{share_folder_name}/some_file"
與 Linux 平台相同,您可以透過設定環境變數 MASTER_ADDR 和 MASTER_PORT 來啟用 TcpStore。
要使用哪個後端?¶
過去,我們經常被問到:「我應該使用哪個後端?」
- 經驗法則 - 使用 NCCL 後端進行分散式GPU 訓練 
- 使用 Gloo 後端進行分散式CPU 訓練。 
 
- 具有 InfiniBand 互連的 GPU 主機 - 使用 NCCL,因為它是目前唯一支援 InfiniBand 和 GPUDirect 的後端。 
 
- 具有乙太網路互連的 GPU 主機 - 使用 NCCL,因為它目前提供最佳的分散式 GPU 訓練效能,特別適用於多程序單節點或多節點分散式訓練。如果您在使用 NCCL 時遇到任何問題,請使用 Gloo 作為備用選項。(請注意,對於 GPU 而言,Gloo 目前的執行速度比 NCCL 慢。) 
 
- 具有 InfiniBand 互連的 CPU 主機 - 如果您的 InfiniBand 已啟用 IP over IB,請使用 Gloo,否則請改用 MPI。我們正計畫在即將發行的版本中為 Gloo 新增 InfiniBand 支援。 
 
- 具有乙太網路互連的 CPU 主機 - 請使用 Gloo,除非您有特殊原因需要使用 MPI。 
 
常見環境變數¶
選擇要使用的網路介面¶
根據預設,NCCL 和 Gloo 後端都會嘗試尋找正確的網路介面來使用。如果自動偵測到的介面不正確,您可以使用以下環境變數覆寫它(適用於各自的後端)
- NCCL_SOCKET_IFNAME,例如 - export NCCL_SOCKET_IFNAME=eth0
- GLOO_SOCKET_IFNAME,例如 - export GLOO_SOCKET_IFNAME=eth0
如果您使用的是 Gloo 後端,則可以透過以逗號分隔多個介面來指定它們,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。後端會在這些介面上以循環方式分派作業。所有程序都必須在此變數中指定相同數量的介面。
其他 NCCL 環境變數¶
除錯 - 如果 NCCL 失敗,您可以設定 NCCL_DEBUG=INFO 以列印明確的警告訊息以及基本的 NCCL 初始化資訊。
您也可以使用 NCCL_DEBUG_SUBSYS 來獲取有關 NCCL 特定方面的更多詳細信息。例如,NCCL_DEBUG_SUBSYS=COLL 會打印集合調用的日誌,這在調試掛起時可能會有幫助,尤其是在由集合類型或消息大小不匹配引起的掛起時。如果拓撲檢測失敗,設置 NCCL_DEBUG_SUBSYS=GRAPH 來檢查詳細的檢測結果並保存為參考將會很有幫助,以便在需要 NCCL 團隊進一步幫助時使用。
性能調優 - NCCL 根據其拓撲檢測執行自動調優,以節省用戶的調優工作量。在某些基於套接字的系統上,用戶仍然可以嘗試調整 NCCL_SOCKET_NTHREADS 和 NCCL_NSOCKS_PERTHREAD 來增加套接字網絡帶寬。這兩個環境變量已由 NCCL 為某些雲端供應商(例如 AWS 或 GCP)預先調整。
如需 NCCL 環境變量的完整列表,請參閱 NVIDIA NCCL 官方文檔
基礎知識¶
torch.distributed 包為在一台或多台機器上運行的多個計算節點上的多進程並行提供了 PyTorch 支持和通信原語。類別 torch.nn.parallel.DistributedDataParallel() 建立在此功能的基礎上,提供同步分佈式訓練作為任何 PyTorch 模型的包裝器。這與 多進程包 - torch.multiprocessing 和 torch.nn.DataParallel() 提供的並行類型不同,因為它支持多個網絡連接的機器,並且用戶必須為每個進程顯式啟動主訓練腳本的單獨副本。
在單機同步的情況下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel() 包裝器與其他數據並行方法相比可能仍然具有優勢,包括 torch.nn.DataParallel()
- 每個進程都維護自己的優化器,並在每次迭代中執行完整的優化步驟。雖然這看起來是多餘的,因為梯度已經被收集在一起並在進程之間進行了平均,因此每個進程的梯度都是相同的,這意味著不需要參數廣播步驟,從而減少了在節點之間傳輸張量的時間。 
- 每個進程都包含一個獨立的 Python 解釋器,消除了從單個 Python 進程驅動多個執行線程、模型副本或 GPU 所帶來的額外解釋器開銷和「GIL 爭用」。這對於大量使用 Python 運行時的模型尤其重要,包括具有循環層或許多小組件的模型。 
初始化¶
在調用任何其他方法之前,需要使用 torch.distributed.init_process_group() 或 torch.distributed.device_mesh.init_device_mesh() 函數初始化包。兩者都會阻塞,直到所有進程都已加入。
- torch.distributed.is_available()[source]¶
- 如果分佈式包可用,則返回 - True。- 否則, - torch.distributed不會公開任何其他 API。目前,- torch.distributed可在 Linux、MacOS 和 Windows 上使用。從源代碼構建 PyTorch 時,請設置- USE_DISTRIBUTED=1以啟用它。目前,默認值是 Linux 和 Windows 為- USE_DISTRIBUTED=1,MacOS 為- USE_DISTRIBUTED=0。- 返回類型
 
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]¶
- 初始化默認的分佈式進程組。 - 這也會初始化分佈式包。 - 初始化進程組有兩種主要方法
- 明確指定 - store、- rank和- world_size。
- 指定 - init_method(一個 URL 字符串),指示在哪裡/如何發現對等點。可以選擇指定- rank和- world_size,或在 URL 中編碼所有必需的參數並省略它們。
 
 - 如果兩者都沒有指定,則假設 - init_method為「env://」。- 參數
- backend (str 或 Backend, 可選) – 要使用的後端。根據構建時的配置,有效值包括 - mpi、- gloo、- nccl和- ucc。如果未提供後端,則會創建- gloo和- nccl後端,有關如何管理多個後端的說明,請參閱下面的註釋。此字段可以作為小寫字符串提供(例如,- "gloo"),也可以通過- Backend屬性訪問(例如,- Backend.GLOO)。如果在每台機器上使用多個進程和- nccl後端,則每個進程必須對其使用的每個 GPU 具有獨佔訪問權,因為在進程之間共享 GPU 可能會導致死鎖。- ucc後端是實驗性的。
- init_method (str, 可選) – 指定如何初始化進程組的 URL。如果沒有指定 - init_method或- store,則默認為「env://」。與- store互斥。
- world_size (int, 可選) – 參與作業的進程數。如果指定了 - store,則為必需項。
- rank (int, 可選) – 當前進程的等級(它應該是 0 到 - world_size-1 之間的數字)。如果指定了- store,則為必需項。
- store (Store, 可選) – 所有工作進程都可以訪問的鍵/值存儲,用於交換連接/地址信息。與 - init_method互斥。
- timeout (timedelta, 可選) – 對進程組執行的操作的超時。NCCL 的默認值為 10 分鐘,其他後端的默認值為 30 分鐘。這是異步中止集合並使進程崩潰的時間。這樣做的原因是 CUDA 執行是異步的,並且繼續執行用戶代碼不再安全,因為失敗的異步 NCCL 操作可能會導致後續 CUDA 操作在損壞的數據上運行。設置 TORCH_NCCL_BLOCKING_WAIT 時,進程將阻塞並等待此超時。 
- group_name (str, 可選, 已棄用) – 群組名稱。此參數將被忽略 
- pg_options (ProcessGroupOptions, 可選) – 進程組選項,指定在構建特定進程組期間需要傳遞哪些附加選項。截至目前,我們僅支持 - nccl後端的- ProcessGroupNCCL.Options選項,可以指定- is_high_priority_stream,以便 nccl 後端在有計算內核等待時可以選擇高優先級的 cuda 流。
- device_id (torch.device, 可選) – 一個單獨的特定設備,用於將此進程「綁定」到該設備,允許進行特定於後端的優化。目前,這只有在 NCCL 下才會產生兩種效果:立即形成通信器(立即調用 - ncclCommInit*,而不是正常的延遲調用),並且子組將盡可能使用- ncclCommSplit以避免不必要的組創建開銷。如果您想及早了解 NCCL 初始化錯誤,也可以使用此字段。
 
 - 注意事項 - 要啟用 - backend == Backend.MPI,需要在支持 MPI 的系統上從源代碼構建 PyTorch。- 注意事項 - 多後端支援尚處於實驗階段。目前,如果未指定後端,則會建立 - gloo和- nccl後端。- gloo後端將用於 CPU 張量的集合運算,而- nccl後端將用於 CUDA 張量的集合運算。 可以透過傳入格式為「<裝置類型>:<後端名稱>,<裝置類型>:<後端名稱>」的字串來指定自訂後端,例如「cpu:gloo,cuda:custom_backend」。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[原始碼]¶
- 根據 device_type、mesh_shape 和 mesh_dim_names 參數初始化 DeviceMesh。 - 這會建立一個具有 n 維陣列配置的 DeviceMesh,其中 n 是 mesh_shape 的長度。 如果有提供 mesh_dim_names,則每個維度都會標記為 mesh_dim_names[i]。 - 注意事項 - init_device_mesh 遵循 SPMD 程式設計模型,這表示相同的 PyTorch Python 程式會在叢集中的所有處理程序/排名上執行。 確保 mesh_shape(描述裝置配置的 nD 陣列的維度)在所有排名中都相同。 不一致的 mesh_shape 可能導致程式當機。 - 注意事項 - 如果找不到處理程序群組,則 init_device_mesh 會在幕後初始化分散式通訊所需的處理程序群組/群組。 - 參數
- 回傳
- 代表裝置配置的 - DeviceMesh物件。
- 返回類型
 - 範例:
- >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) 
 
- torch.distributed.is_torchelastic_launched()[原始碼]¶
- 檢查此程序是否使用 - torch.distributed.elastic(又稱 torchelastic)啟動。- 環境變數 - TORCHELASTIC_RUN_ID的存在與否用於判斷目前的程序是否使用 torchelastic 啟動。 這是一個合理的判斷依據,因為- TORCHELASTIC_RUN_ID會對應到會合 ID,該 ID 永遠是一個非 Null 值,表示用於對等點探索的作業 ID。- 返回類型
 
目前支援三種初始化方法
TCP 初始化¶
有兩種使用 TCP 進行初始化的方法,這兩種方法都需要所有處理程序都可以訪問的網路地址和所需的 world_size。 第一種方法需要指定屬於排名 0 程序的地址。 此初始化方法要求所有程序都已手動指定排名。
請注意,最新的分散式套件不再支援多播地址。 group_name 也已棄用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)
環境變數初始化¶
此方法將從環境變數讀取配置,允許使用者完全自訂如何取得資訊。 要設定的變數為
- MASTER_PORT- 必要;必須是排名 0 機器上的可用連接埠
- MASTER_ADDR- 必要(排名 0 除外);排名 0 節點的地址
- WORLD_SIZE- 必要;可以在此處或在呼叫初始化函數時設定
- RANK- 必要;可以在此處或在呼叫初始化函數時設定
排名 0 的機器將用於設定所有連線。
這是預設方法,這表示不需要指定 init_method(或者可以是 env://)。
初始化後¶
執行 torch.distributed.init_process_group() 之後,可以使用以下函數。 若要檢查處理程序群組是否已初始化,請使用 torch.distributed.is_initialized()。
- class torch.distributed.Backend(name)[原始碼]¶
- 類似列舉的後端類別。 - 可用的後端:GLOO、NCCL、UCC、MPI 和其他已註冊的後端。 - 此類別的值為小寫字串,例如 - "gloo"。 可以作為屬性存取它們,例如- Backend.NCCL。- 可以直接呼叫此類別來解析字串,例如 - Backend(backend_str)將檢查- backend_str是否有效,如果是,則回傳已解析的小寫字串。 它也接受大寫字串,例如- Backend("GLOO")會回傳- "gloo"。- 注意事項 - 項目 - Backend.UNDEFINED存在,但僅用作某些欄位的初始值。 使用者不應直接使用它,也不應假設它存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source]¶
- 使用給定的名稱和實例化函數註冊新的後端。 - 此類方法由第三方 - ProcessGroup擴展使用,用於註冊新的後端。- 參數
- name (str) – - ProcessGroup擴展的後端名稱。它應該與- init_process_group()中的名稱相符。
- func (function) – 實例化後端的函數處理程序。該函數應該在後端擴展中實現,並接受四個參數,包括 - store、- rank、- world_size和- timeout。
- extended_api (bool, 選用) – 後端是否支援擴展參數結構。默認值: - False。如果設置為- True,則後端將獲得- c10d::DistributedBackendOptions的實例,以及後端實現定義的進程組選項物件。
- device (str 或 list 的 str, 選用) – 此後端支援的設備類型,例如「cpu」、「cuda」等。如果為 None,則假設同時支援「cpu」和「cuda」。 
 
 - 注意事項 - 對第三方後端的支援是實驗性的,可能會有所變更。 
 
- torch.distributed.get_backend(group=None)[source]¶
- 返回給定進程組的後端。 - 參數
- group (ProcessGroup, 選用) – 要處理的進程組。默認值為一般的主進程組。如果指定了另一個特定的組,則調用進程必須是 - group的一部分。
- 回傳
- 給定進程組的後端,以小寫字符串形式返回。 
- 返回類型
 
關閉¶
重要的是在退出時通過調用 destroy_process_group() 來清理資源。
最簡單的模式是在訓練腳本中不再需要通信的地方(通常在 main() 的末尾附近),通過為 group 參數使用默認值 None 來調用 destroy_process_group() 來銷毀每個進程組和後端。該調用應該在每個訓練器進程中執行一次,而不是在外部進程啟動器級別執行。
如果在超時時間內沒有在 pg 中的所有等級都調用 destroy_process_group(),尤其是在應用程序中有多個進程組時(例如,對於 N-D 並行),則可能會在退出時掛起。這是因為 ProcessGroupNCCL 的析構函數會調用 ncclCommAbort,而 ncclCommAbort 必須被集體調用,但是如果由 python 的 GC 調用,則調用 ProcessGroupNCCL 的析構函數的順序是不確定的。調用 destroy_process_group() 可以確保以一致的順序在所有等級中調用 ncclCommAbort,並避免在 ProcessGroupNCCL 的析構函數期間調用 ncclCommAbort。
重新初始化¶
destroy_process_group 也可用於銷毀單個進程組。一種用例可能是容錯訓練,其中一個進程組可能會被銷毀,然後在運行時初始化一個新的進程組。在這種情況下,至關重要的是在調用銷毀之後和隨後初始化之前,使用除 torch.distributed 原語之外的其他方法同步訓練器進程。由於實現這種同步的難度,目前不支持/未測試此行為,並且被認為是一個已知問題。如果這是一個阻礙您前進的用例,請在 GitHub 上提交問題或 RFC。
分佈式鍵值存儲¶
分佈式套件附帶一個分佈式鍵值存儲,可用於在組中的進程之間共享信息,以及在 torch.distributed.init_process_group() 中初始化分佈式套件(通過顯式創建存儲作為指定 init_method 的替代方法)。鍵值存儲有 3 種選擇:TCPStore、FileStore 和 HashStore。
- class torch.distributed.TCPStore¶
- 基於 TCP 的分佈式鍵值存儲實現。服務器存儲保存數據,而客戶端存儲可以通過 TCP 連接到服務器存儲,並執行諸如 - set()之類的操作來插入鍵值對,使用- get()檢索鍵值對等。應該始終初始化一個服務器存儲,因為客戶端存儲將等待服務器建立連接。- 參數
- host_name (str) – 服務器存儲應在其上運行的域名或 IP 地址。 
- port (int) – 服務器存儲應偵聽傳入請求的端口。 
- world_size (int, 選用) – 存儲用戶的總數(客戶端數量 + 1 個服務器)。默認值為 None(None 表示存儲用戶的數量不固定)。 
- is_master (bool, 選用) – 初始化服務器存儲時為 True,客戶端存儲時為 False。默認值為 False。 
- timeout (timedelta, 選用) – 存儲在初始化期間以及 - get()和- wait()等方法中使用的超時時間。默認值為 timedelta(seconds=300)。
- wait_for_workers (bool, 選用) – 是否等待所有工作線程與服務器存儲建立連接。這僅在 world_size 為固定值時適用。默認值為 True。 
- multi_tenant (bool, 選用) – 如果為 True,則當前進程中具有相同主機/端口的所有 - TCPStore實例將使用同一個底層- TCPServer。默認值為 False。
- master_listen_fd (int, 選用) – 如果指定,則底層 - TCPServer將偵聽此文件描述符,該描述符必須是已綁定到- port的套接字。在某些情況下,這對於避免端口分配競爭條件很有用。默認值為 None(表示服務器創建一個新套接字並嘗試將其綁定到- port)。
- use_libuv (bool, 選用) – 如果為 True,則將 libuv 用於 - TCPServer後端。默認值為 True。
 
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key") 
 
- class torch.distributed.HashStore¶
- 一種基於底層雜湊映射的執行緒安全儲存體。此儲存體可以在同一個流程中使用(例如,由其他執行緒使用),但不能跨流程使用。 - 範例:
- >>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value") 
 
- class torch.distributed.FileStore¶
- 一種儲存體實作,使用檔案來儲存底層的鍵值對。 - 範例:
- >>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key") 
 
- class torch.distributed.PrefixStore¶
- 3 個鍵值儲存體( - TCPStore、- FileStore和- HashStore)的包裝器,它會在插入儲存體的每個鍵前面加上一個前綴。- 參數
- prefix (str) – 在插入儲存體之前會預先附加到每個鍵的前綴字串。 
- store (torch.distributed.store) – 作為底層鍵值儲存體的儲存體物件。 
 
 
- torch.distributed.Store.set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None¶
- 根據提供的 - key和- value將鍵值對插入儲存體。如果儲存體中已存在- key,則會使用新提供的- value覆寫舊值。- 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key") 
 
- torch.distributed.Store.get(self: torch._C._distributed_c10d.Store, arg0: str) bytes¶
- 擷取儲存體中與指定 - key相關聯的值。如果儲存體中不存在- key,則函數會等待在初始化儲存體時定義的- timeout,然後再擲回例外狀況。- 參數
- key (str) – 函數將傳回與此鍵相關聯的值。 
- 回傳
- 如果儲存體中有 - key,則為與- key相關聯的值。
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key") 
 
- torch.distributed.Store.add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int¶
- 針對指定 - key第一次呼叫 add 會在儲存體中建立與- key相關聯的計數器,並初始化為- amount。後續使用相同- key呼叫 add 會將計數器增加指定的- amount。使用已透過- set()在儲存體中設定的鍵呼叫- add()將會導致例外狀況。- 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key") 
 
- torch.distributed.Store.compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes¶
- 根據提供的 - key將鍵值對插入儲存體,並在插入之前執行- expected_value和- desired_value之間的比較。只有當- key的- expected_value已存在於儲存體中,或- expected_value為空字串時,才會設定- desired_value。- 參數
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key") 
 
- torch.distributed.Store.wait(*args, **kwargs)¶
- 多載函數。 - wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None 
 - 等待 - keys中的每個鍵都被新增到儲存體中。如果在- timeout(在儲存體初始化期間設定)之前並未設定所有鍵,則- wait將會擲回例外狀況。- 參數
- keys (list) – 要等待其在儲存體中設定的鍵清單。 
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"]) 
 - wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None 
 - 等待 - keys中的每個鍵都被新增到儲存體中,如果在提供的- timeout之前並未設定這些鍵,則會擲回例外狀況。- 參數
- keys (list) – 要等待其在儲存體中設定的鍵清單。 
- timeout (timedelta) – 在擲回例外狀況之前等待鍵被新增的時間。 
 
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10)) 
 
- torch.distributed.Store.num_keys(self: torch._C._distributed_c10d.Store) int¶
- 傳回儲存體中設定的鍵數。請注意,此數字通常會比透過 - set()和- add()新增的鍵數多一個,因為有一個鍵是用來協調所有使用儲存體的工作器。- 警告 - 當與 - TCPStore一起使用時,- num_keys會傳回寫入基礎檔案的鍵值數量。如果儲存體被銷毀,並且使用相同的檔案建立了另一個儲存體,則原始鍵值將會被保留。- 回傳
- 儲存體中存在的鍵值數量。 
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys() 
 
- torch.distributed.Store.delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool¶
- 從儲存體中刪除與 - key相關聯的鍵值對。如果鍵值成功刪除,則傳回 True,否則傳回 False。- 參數
- key (str) – 要從儲存體中刪除的鍵值 
- 回傳
- 如果 - key已刪除,則為 True,否則為 False。
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key") 
 
- torch.distributed.Store.set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None¶
- 設定儲存體的預設逾時。此逾時在初始化期間以及在 - wait()和- get()中使用。- 參數
- timeout (timedelta) – 要在儲存體中設定的逾時。 
 - 範例:
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"]) 
 
群組¶
根據預設,集體運算會在預設群組(也稱為全域)上運作,並且需要所有處理程序都進入分散式函數呼叫。然而,某些工作負載可以從更細粒度的溝通中受益。這就是分散式群組發揮作用的地方。new_group() 函數可以用於建立新的群組,其中包含所有處理程序的任意子集。它會傳回一個不透明的群組控制代碼,可以作為 group 參數提供給所有集體運算(集體運算是以某些眾所周知的程式設計模式交換資訊的分散式函數)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None)[source]¶
- 建立新的分散式群組。 - 此函數要求主群組中的所有處理程序(即屬於分散式作業的所有處理程序)都進入此函數,即使它們不會成為群組的成員。此外,應在所有處理程序中以相同的順序建立群組。 - 警告 - 同時使用多個處理程序群組與 - NCCL後端並不安全,使用者應在其應用程式中執行明確的同步,以確保一次只使用一個處理程序群組。這表示來自一個處理程序群組的集體運算應該在裝置上完成執行(不僅僅是排入佇列,因為 CUDA 執行是非同步的),然後才能將來自另一個處理程序群組的集體運算排入佇列。如需詳細資訊,請參閱 同時使用多個 NCCL 通訊器。- 參數
- timeout (timedelta, 選用) – 如需詳細資訊和預設值,請參閱 init_process_group。 
- backend (str 或 Backend, 選用) – 要使用的後端。根據建置時組態,有效值為 - gloo和- nccl。根據預設,使用與全域群組相同的後端。此欄位應以小寫字串形式提供(例如,- "gloo"),也可以透過- Backend屬性(例如,- Backend.GLOO)存取。如果傳入- None,則將使用對應於預設處理程序群組的後端。預設為- None。
- pg_options (ProcessGroupOptions, 選用) – 處理程序群組選項,指定在建構特定處理程序群組期間需要傳入哪些其他選項。也就是說,對於 - nccl後端,可以指定- is_high_priority_stream,以便處理程序群組可以挑選高優先順序的 cuda 串流。
- use_local_synchronization (bool, 選用) – 在處理程序群組建立結束時執行群組本機屏障。這一點的不同之處在於,非成員等級不需要呼叫 API,也不需要加入屏障。 
- group_desc (str, 選用) – 描述處理程序群組的字串。 
 
- 回傳
- 可以提供給集體呼叫的分散式群組的控制代碼,如果等級不是 - ranks的一部分,則為 GroupMember.NON_GROUP_MEMBER。
 - 備註:use_local_synchronization 不適用於 MPI。 - 備註:雖然 use_local_synchronization=True 在較大的叢集和較小的處理程序群組中可以顯著提高速度,但必須小心使用,因為它會改變叢集行為,因為非成員等級不會加入群組 barrier()。 - 備註:當每個等級建立多個重疊的處理程序群組時,use_local_synchronization=True 可能會導致死結。為避免這種情況,請確保所有等級都遵循相同的全域建立順序。 
- torch.distributed.get_group_rank(group, global_rank)[source]¶
- 將全域等級轉換為群組等級。 - global_rank必須是- group的一部分,否則會引發 RuntimeError。- 參數
- group (ProcessGroup) – 要查詢相對等級的 ProcessGroup。 
- global_rank (int) – 要查詢的全域等級。 
 
- 回傳
- global_rank相對於- group的群組等級
- 返回類型
 - 備註:在預設處理程序群組上呼叫此函數會傳回身分 
DeviceMesh¶
DeviceMesh 是一種較高層級的抽象概念,用於管理處理程序群組(或 NCCL 通訊器)。它允許使用者輕鬆建立節點間和節點內的處理程序群組,而不必擔心如何為不同的子處理程序群組正確設定等級,並且它有助於輕鬆管理這些分散式處理程序群組。 init_device_mesh() 函數可用於建立新的 DeviceMesh,其中網格形狀描述了設備拓撲。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[原始碼]¶
- DeviceMesh 表示一個設備網格,其中設備的佈局可以表示為一個 n 維陣列,而 n 維陣列的每個值都是預設處理程序群組等級的全域 ID。 - DeviceMesh 可用於描述叢集中設備的佈局,並作為叢集中設備清單之間通訊的代理。 - DeviceMesh 可以用作上下文管理器。 - 注意事項 - DeviceMesh 遵循 SPMD 編程模型,這表示相同的 PyTorch Python 程式會在叢集中的所有處理程序/等級上執行。因此,使用者需要確保 mesh 陣列(描述設備佈局)在所有等級上都相同。不一致的 mesh 將導致無訊息的掛起。 - 參數
- device_type (str) – 網格的設備類型。目前支援:「cpu」、「cuda/類 cuda」。 
- mesh (ndarray) – 一個多維陣列或整數張量,描述設備的佈局,其中 ID 是預設處理程序群組的全域 ID。 
 
- 回傳
- 代表裝置配置的 - DeviceMesh物件。
- 返回類型
 - 以下程式以 SPMD 方式在每個處理程序/等級上執行。在此範例中,我們有 2 個主機,每個主機有 4 個 GPU。在網格的第一個維度上進行縮減將跨列(0、4)... 和(3、7)進行縮減,在網格的第二個維度上進行縮減將跨行(0、1、2、3)和(4、5、6、7)進行縮減。 - 範例:
- >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) 
 
點對點通訊¶
isend() 和 irecv() 在使用時會傳回分散式請求物件。通常,此物件的類型未指定,因為它們不應手動建立,但保證它們支援兩種方法
- is_completed()- 如果作業已完成,則傳回 True
- wait()- 將封鎖處理程序,直到作業完成。保證在傳回後- is_completed()會傳回 True。
- torch.distributed.isend(tensor, dst, group=None, tag=0)[原始碼]¶
- 非同步發送張量。 - 警告 - 在請求完成之前修改 - tensor會導致未定義的行為。- 警告 - NCCL 後端不支援 - tag。
- torch.distributed.send_object_list(object_list, dst, group=None, device=None)[原始碼]¶
- 同步發送 - object_list中可醃漬的物件。- 與 - send()類似,但可以傳入 Python 物件。請注意,- object_list中的所有物件都必須是可醃漬的才能發送。- 參數
- object_list (List[Any]) – 要發送的輸入物件清單。每個物件都必須是可醃漬的。接收者必須提供大小相等的清單。 
- dst (int) – 要將 - object_list發送到的目的地等級。目的地等級是根據全域處理程序群組(與- group參數無關)
- group – (ProcessGroup, 選用): 要處理的處理程序群組。如果為 None,則將使用預設的處理程序群組。預設值為 - None。
- device ( - torch.device, 選用) – 如果不是 None,則在發送之前,物件會被序列化並轉換為張量,然後移至- device。預設值為- None。
 
- 回傳
- None.
 - 注意事項 - 對於基於 NCCL 的處理程序群組,在進行通訊之前,必須將物件的內部張量表示形式移至 GPU 設備。在這種情況下,使用的設備由 - torch.cuda.current_device()給出,並且使用者有責任確保透過- torch.cuda.set_device()設定此項,以便每個等級都有一個單獨的 GPU。- 警告 - send_object_list()會隱式地使用- pickle模組,而該模組已知不安全。可以構造惡意 pickle 數據,在反序列化過程中執行任意代碼。請僅使用您信任的數據調用此函數。- 警告 - 使用 GPU 張量調用 - send_object_list()的支援度不高,而且效率低下,因為張量會被序列化,導致 GPU -> CPU 傳輸。請考慮改用- send()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] 
 
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None)[source]¶
- 同步接收 - object_list中可序列化物件。- 與 - recv()類似,但可以接收 Python 物件。- 參數
- object_list (List[Any]) – 要接收物件的清單。必須提供大小與發送清單大小相等的清單。 
- src (int, 可選) – 要從中接收 - object_list的來源排名。來源排名基於全局進程組(與- group參數無關)。如果設置為 None,則將從任何排名接收。默認為- None。
- group – (ProcessGroup, 選用): 要處理的處理程序群組。如果為 None,則將使用預設的處理程序群組。預設值為 - None。
- device ( - torch.device,可選) – 如果不是 None,則在此設備上接收。默認為- None。
 
- 回傳
- 發送者排名。如果排名不是群組的一部分,則為 -1。如果排名是群組的一部分,則 - object_list將包含從- src排名發送的物件。
 - 注意事項 - 對於基於 NCCL 的處理程序群組,在進行通訊之前,必須將物件的內部張量表示形式移至 GPU 設備。在這種情況下,使用的設備由 - torch.cuda.current_device()給出,並且使用者有責任確保透過- torch.cuda.set_device()設定此項,以便每個等級都有一個單獨的 GPU。- 警告 - recv_object_list()會隱式地使用- pickle模組,而該模組已知不安全。可以構造惡意 pickle 數據,在反序列化過程中執行任意代碼。請僅使用您信任的數據調用此函數。- 警告 - 使用 GPU 張量調用 - recv_object_list()的支援度不高,而且效率低下,因為張量會被序列化,導致 GPU -> CPU 傳輸。請考慮改用- recv()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] 
 
- torch.distributed.batch_isend_irecv(p2p_op_list)[source]¶
- 異步發送或接收一批張量,並返回請求清單。 - 處理 - p2p_op_list中的每個操作,並返回相應的請求。目前支援 NCCL、Gloo 和 UCC 後端。- 參數
- p2p_op_list – 點對點操作清單(每個操作元的類型為 - torch.distributed.P2POp)。清單中 isend/irecv 的順序很重要,它需要與遠端相應的 isend/irecv 匹配。
- 回傳
- 通過調用 op_list 中相應的操作返回的分佈式請求物件清單。 
 - 範例 - >>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1 - 注意事項 - 請注意,當此 API 與 NCCL PG 後端一起使用時,用戶必須使用 torch.cuda.set_device 設置當前的 GPU 設備,否則將導致意外的掛起問題。 - 此外,如果此 API 是傳遞給 - dist.P2POp的- group中的第一個集體調用,則該- group的所有排名都必須參與此 API 調用;否則,行為未定義。如果此 API 調用不是該- group中的第一個集體調用,則允許僅涉及該- group的排名子集的批次 P2P 操作。
同步和異步集體操作¶
根據傳遞給集體的 async_op 標誌的設置,每個集體操作函數都支援以下兩種操作:
**同步操作** - 默認模式,當 async_op 設置為 False 時。當函數返回時,保證集體操作已執行。在 CUDA 操作的情況下,不能保證 CUDA 操作已完成,因為 CUDA 操作是異步的。對於 CPU 集體操作,任何利用集體調用輸出的進一步函數調用都將按預期行為。對於 CUDA 集體操作,在同一 CUDA 流上利用輸出的函數調用將按預期行為。用戶必須注意在不同流下運行的情況下的同步。有關 CUDA 語義(如流同步)的詳細信息,請參閱 CUDA 語義。請參閱以下腳本,以查看 CPU 和 CUDA 操作在這些語義上的差異示例。
**異步操作** - 當 async_op 設置為 True 時。集體操作函數返回一個分佈式請求物件。一般來說,您不需要手動創建它,並且保證它支援兩種方法:
- is_completed()- 在 CPU 集體操作的情況下,如果已完成,則返回- True。在 CUDA 操作的情況下,如果操作已成功排入 CUDA 流並且可以在默認流上使用輸出而無需進一步同步,則返回- True。
- wait()- 在 CPU 集體操作的情況下,將阻塞進程,直到操作完成。在 CUDA 集體操作的情況下,將阻塞,直到操作已成功排入 CUDA 流並且可以在默認流上使用輸出而無需進一步同步。
- get_future()- 返回- torch._C.Future物件。支援 NCCL,也支援 GLOO 和 MPI 上的大多數操作,但點對點操作除外。注意:隨著我們繼續採用 Futures 並合併 API,- get_future()調用可能會變得冗餘。
範例
以下代碼可用於參考在使用分佈式集體操作時 CUDA 操作的語義。它顯示了在不同 CUDA 流上使用集體輸出時需要顯式同步。
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)
集體函數¶
- torch.distributed.broadcast(tensor, src, group=None, async_op=False)[source]¶
- 將張量廣播到整個群組。 - 參與集體操作的所有進程中的 - tensor必須具有相同數量的元素。
- torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[source]¶
- 將 - object_list中可醃製的物件廣播到整個群組。- 類似於 - broadcast(),但可以傳入 Python 物件。請注意,- object_list中的所有物件都必須是可醃製的,才能進行廣播。- 參數
- object_list (List[Any]) – 要廣播的輸入物件清單。每個物件都必須是可醃製的。只有 - src等級上的物件會被廣播,但每個等級都必須提供大小相等的清單。
- src (int) – 要廣播 - object_list的來源等級。來源等級是根據全局進程組(不論- group參數為何)。
- group – (ProcessGroup, 選用): 要處理的處理程序群組。如果為 None,則將使用預設的處理程序群組。預設值為 - None。
- device ( - torch.device, optional) – 如果不是 None,則會將物件序列化並轉換為張量,並在廣播之前移至- device。預設值為- None。
 
- 回傳
- None。如果等級是群組的一部分,則- object_list將包含從- src等級廣播的物件。
 - 注意事項 - 對於基於 NCCL 的處理程序群組,在進行通訊之前,必須將物件的內部張量表示形式移至 GPU 設備。在這種情況下,使用的設備由 - torch.cuda.current_device()給出,並且使用者有責任確保透過- torch.cuda.set_device()設定此項,以便每個等級都有一個單獨的 GPU。- 注意事項 - 請注意,此 API 與 - broadcast()集體通訊略有不同,因為它不提供- async_op控制代碼,因此將會是阻塞呼叫。- 警告 - broadcast_object_list()隱式使用- pickle模組,而該模組已知不安全。可以建構惡意的 pickle 數據,這些數據將在取消醃製過程中執行任意代碼。僅使用您信任的數據呼叫此函數。- 警告 - 使用 GPU 張量呼叫 - broadcast_object_list()的支援度不高且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被醃製。請考慮改用- broadcast()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] 
 
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
- 以所有機器都能獲得最終結果的方式,跨所有機器減少張量數據。 - 呼叫後, - tensor在所有進程中都將按位元相同。- 支援複數張量。 - 參數
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 - 範例 - >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 - >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1 
- torch.distributed.reduce(tensor, dst, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
- 跨所有機器減少張量數據。 - 只有等級為 - dst的進程才會收到最終結果。- 參數
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]¶
- 將整個群組的張量收集到一個清單中。 - 支援複數張量。 - 參數
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 - 範例 - >>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1 - >>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1 
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]¶
- 從所有等級收集張量,並將它們放入單個輸出張量中。 - 參數
- output_tensor (Tensor) – 用於容納來自所有等級的張量元素的輸出張量。它的大小必須正確,才能具有以下形式之一:(i) 沿主維度串聯所有輸入張量;有關“串聯”的定義,請參閱 - torch.cat();(ii) 沿主維度堆疊所有輸入張量;有關“堆疊”的定義,請參閱- torch.stack()。以下範例可以更好地說明支援的輸出形式。
- input_tensor (Tensor) – 要從當前等級收集的張量。與 - all_gatherAPI 不同,此 API 中的輸入張量在所有等級上的大小必須相同。
- group (ProcessGroup, 選用) – 要處理的進程組。如果為 None,則使用默認進程組。 
- async_op (bool, optional) – 此操作是否應為異步操作 
 
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 - 範例 - >>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 - 警告 - Gloo 後端不支援此 API。 
- torch.distributed.all_gather_object(object_list, obj, group=None)[source]¶
- 將整個群組的可醃製物件收集到一個清單中。 - 類似於 - all_gather(),但可以傳入 Python 物件。請注意,物件必須是可醃製的,才能進行收集。- 參數
- object_list (list[Any]) – 輸出清單。它的大小應正確,作為此集體通訊的群組大小,並且將包含輸出。 
- obj (Any) – 要從當前進程廣播的可醃製 Python 物件。 
- group (ProcessGroup, optional) – 要處理的進程組。如果為 None,則將使用預設進程組。預設值為 - None。
 
- 回傳
- 無。如果呼叫等級是此群組的一部分,則集體通訊的輸出將填入輸入 - object_list中。如果呼叫等級不是群組的一部分,則傳入的- object_list將不會被修改。
 - 注意事項 - 請注意,此 API 與 - all_gather()集體通訊略有不同,因為它不提供- async_op控制代碼,因此將會是阻塞呼叫。- 注意事項 - 對於基於 NCCL 的處理群組,在進行通訊之前,物件的內部張量表示必須移至 GPU 裝置。在這種情況下,使用的裝置由 - torch.cuda.current_device()給出,並且使用者有責任透過- torch.cuda.set_device()確保設定此裝置,以便每個秩都有一個獨立的 GPU。- 警告 - all_gather_object()隱含使用- pickle模組,而眾所周知該模組並不安全。構造惡意 pickle 數據是可能的,這些數據會在 unpickle 過程中執行任意程式碼。僅使用您信任的數據呼叫此函數。- 警告 - 使用 GPU 張量呼叫 - all_gather_object()並未得到很好的支援,並且效率低下,因為它會導致 GPU -> CPU 傳輸,因為張量會被序列化。請考慮改用- all_gather()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}] 
 
- torch.distributed.gather(tensor, gather_list=None, dst=0, group=None, async_op=False)[原始碼]¶
- 在單一進程中收集張量列表。 
- torch.distributed.gather_object(obj, object_gather_list=None, dst=0, group=None)[原始碼]¶
- 在單一進程中從整個群組收集可序列化物件。 - 與 - gather()類似,但可以傳入 Python 物件。請注意,物件必須是可序列化的才能被收集。- 參數
- 回傳
- 無。在 - dst秩上,- object_gather_list將包含集體的輸出。
 - 注意事項 - 請注意,此 API 與 gather 集體略有不同,因為它不提供 async_op 句柄,因此將是一個阻塞呼叫。 - 注意事項 - 對於基於 NCCL 的處理群組,在進行通訊之前,物件的內部張量表示必須移至 GPU 裝置。在這種情況下,使用的裝置由 - torch.cuda.current_device()給出,並且使用者有責任透過- torch.cuda.set_device()確保設定此裝置,以便每個秩都有一個獨立的 GPU。- 警告 - gather_object()隱含使用- pickle模組,而眾所周知該模組並不安全。構造惡意 pickle 數據是可能的,這些數據會在 unpickle 過程中執行任意程式碼。僅使用您信任的數據呼叫此函數。- 警告 - 使用 GPU 張量呼叫 - gather_object()並未得到很好的支援,並且效率低下,因為它會導致 GPU -> CPU 傳輸,因為張量會被序列化。請考慮改用- gather()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}] 
 
- torch.distributed.scatter(tensor, scatter_list=None, src=0, group=None, async_op=False)[原始碼]¶
- 將張量列表分散到群組中的所有進程。 - 每個進程將只接收一個張量,並將其數據存儲在 - tensor參數中。- 支援複數張量。 - 參數
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 - 注意事項 - 請注意,scatter_list 中的所有張量必須具有相同的大小。 - 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> t_ones = torch.ones(tensor_size) >>> t_fives = torch.ones(tensor_size) * 5 >>> output_tensor = torch.zeros(tensor_size) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. For example, on rank 1: >>> output_tensor tensor([5., 5.]) 
 
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[原始碼]¶
- 將 - scatter_object_input_list中的可序列化物件分散到整個群組。- 與 - scatter()類似,但可以傳入 Python 物件。在每個秩上,分散的物件將作為- scatter_object_output_list的第一個元素存儲。請注意,- scatter_object_input_list中的所有物件必須是可序列化的才能被分散。- 參數
- scatter_object_output_list (List[任何]) – 非空列表,其第一個元素將存儲分散到此秩的物件。 
- scatter_object_input_list (List[任何]) – 要分散的輸入物件列表。每個物件都必須是可序列化的。只有 - src秩上的物件才會被分散,並且對於非 src 秩,該參數可以是- None。
- src (整數) – 要從中分散 - scatter_object_input_list的來源秩。來源秩基於全局進程群組(與- group參數無關)。
- group – (ProcessGroup, 選用): 要處理的處理程序群組。如果為 None,則將使用預設的處理程序群組。預設值為 - None。
 
- 回傳
- 無。如果秩是群組的一部分,則- scatter_object_output_list的第一個元素將設置為此秩的分散物件。
 - 注意事項 - 請注意,此 API 與 scatter 集體略有不同,因為它不提供 - async_op句柄,因此將是一個阻塞呼叫。- 警告 - scatter_object_list()隱含使用- pickle模組,而眾所周知該模組並不安全。構造惡意 pickle 數據是可能的,這些數據會在 unpickle 過程中執行任意程式碼。僅使用您信任的數據呼叫此函數。- 警告 - 使用 GPU 張量呼叫 - scatter_object_list()並未得到很好的支援,並且效率低下,因為它會導致 GPU -> CPU 傳輸,因為張量會被序列化。請考慮改用- scatter()。- 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}] 
 
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[原始碼]¶
- 減少,然後將張量列表分散到群組中的所有進程。 
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
- 對張量進行縮減,然後將其分散到群組中的所有秩。 - 參數
- 回傳
- 異步工作處理常式,如果 async_op 設定為 True。如果非異步操作或不屬於群組的一部分,則為 None。 
 - 範例 - >>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 - 警告 - Gloo 後端不支援此 API。 
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]¶
- 分割輸入張量,然後將分割列表分散到群組中的所有處理程序。 - 之後,從群組中的所有處理程序串聯接收到的張量,並作為單個輸出張量返回。 - 支援複數張量。 - 參數
- output (張量) – 收集的串聯輸出張量。 
- input (張量) – 要分散的輸入張量。 
- output_split_sizes – (list[Int], 選擇性): 如果指定為 None 或為空,則為 dim 0 的輸出分割大小, - output張量的 dim 0 必須可以被- world_size整除。
- input_split_sizes – (list[Int], 選擇性): 如果指定為 None 或為空,則為 dim 0 的輸入分割大小, - input張量的 dim 0 必須可以被- world_size整除。
- group (ProcessGroup, 選用) – 要處理的進程組。如果為 None,則使用默認進程組。 
- async_op (布林值, 可選) – 此操作是否應該是異步操作。 
 
- 回傳
- 異步工作處理常式,如果 async_op 設定為 True。如果非異步操作或不屬於群組的一部分,則為 None。 
 - 警告 - all_to_all_single 仍處於實驗階段,可能會有所變更。 - 範例 - >>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3 - >>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) - >>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3 - >>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3 
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]¶
- 將輸入張量列表分散到群組中的所有處理程序,並在輸出列表中返回收集的張量列表。 - 支援複數張量。 - 參數
- 回傳
- 異步工作處理常式,如果 async_op 設定為 True。如果非異步操作或不屬於群組的一部分,則為 None。 
 - 警告 - all_to_all 仍處於實驗階段,可能會有所變更。 - 範例 - >>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3 - >>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i) - >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3 - >>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3 
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]¶
- 同步所有處理程序。 - 如果 async_op 為 False,或者如果在 wait() 上呼叫了異步工作處理常式,則此全域操作會阻塞處理程序,直到整個群組都進入此函數為止。 - 參數
- 回傳
- 如果 async_op 設定為 True,則為異步工作控制代碼。如果不是 async_op 或不是群組的一部分,則為 None 
 - 注意事項 - ProcessGroupNCCL 現在依賴流同步而不是裝置同步來阻塞 CPU。因此,請勿假設 barrier() 會執行裝置同步。 
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]¶
- 同步處理程序,類似於 - torch.distributed.barrier,但會考慮可設定的逾時。- 它能夠報告在提供的逾時內未通過此屏障的秩。具體來說,對於非零秩,將會阻塞,直到從秩 0 處理發送/接收為止。秩 0 將會阻塞,直到來自其他秩的所有發送/接收都已處理完畢,並將報告在逾時內未能回應的秩的失敗。請注意,如果有一個秩未到達 monitored_barrier(例如,由於掛起),則所有其他秩在 monitored_barrier 中都會失敗。 - 此全域操作會阻塞群組中的所有處理程序/秩,直到整個群組都成功退出該函數為止,這使其對於偵錯和同步非常有用。但是,它可能會影響效能,因此應僅用於偵錯或需要在主機端進行完整同步點的情況。出於偵錯目的,可以在應用程序的全域呼叫之前插入此屏障,以檢查是否有任何秩不同步。 - 注意事項 - 請注意,此全域操作僅適用於 GLOO 後端。 - 參數
- group (ProcessGroup, 選擇性) – 要處理的處理程序群組。如果為 - None,則將使用預設處理程序群組。
- timeout (datetime.timedelta, 選擇性) – monitored_barrier 的逾時。如果為 - None,則將使用預設處理程序群組逾時。
- wait_all_ranks (bool, 選擇性) – 是否收集所有失敗的秩。預設情況下,此值為 - False,並且秩 0 上的- monitored_barrier將會在其遇到的第一個失敗秩上引發錯誤,以便快速失敗。通過設定- wait_all_ranks=True,- monitored_barrier將會收集所有失敗的秩,並引發一個包含所有失敗秩信息的錯誤。
 
- 回傳
- None.
 - 範例:
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier. 
 
- class torch.distributed.Work¶
- Work 物件表示 PyTorch 分佈式套件中待處理的異步操作的控制代碼。它由非阻塞的全域操作返回,例如 dist.all_reduce(tensor, async_op=True)。 
- class torch.distributed.ReduceOp¶
- 一個類似於列舉的類別,用於可用的縮減操作: - SUM、- PRODUCT、- MIN、- MAX、- BAND、- BOR、- BXOR和- PREMUL_SUM。- 使用 - NCCL後端時,- BAND、- BOR和- BXOR縮減操作不可用。- AVG會在跨秩求和之前將值除以世界大小。- AVG僅適用於- NCCL後端,並且僅適用於 2.10 或更高版本的 NCCL。- PREMUL_SUM會在縮減之前在本機將輸入乘以給定的純量。- PREMUL_SUM僅適用於- NCCL後端,並且僅適用於 2.11 或更高版本的 NCCL。用戶應該使用- torch.distributed._make_nccl_premul_sum。- 此外,複數張量不支持 - MAX、- MIN和- PRODUCT。- 可以將此類別的值作為屬性訪問,例如 - ReduceOp.SUM。它們用於指定縮減全域操作的策略,例如- reduce()。- 這個類別不支援 - __members__屬性。
效能分析集體通訊¶
請注意,您可以使用 torch.profiler(建議使用,僅在 1.8.1 版後可用)或 torch.autograd.profiler 來分析集體通訊和此處提到的點對點通訊 API 的效能。所有開箱即用的後端(gloo、nccl、mpi)皆受支援,且集體通訊的使用將會如預期般呈現在效能分析輸出/追蹤中。分析程式碼的方式與任何一般的 Torch 運算子相同
import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)
如需效能分析器功能的完整概觀,請參閱 效能分析器文件。
多 GPU 集體函數¶
警告
多 GPU 函數(代表每個 CPU 執行緒有多個 GPU)已遭棄用。截至目前為止,PyTorch Distributed 的首選程式設計模型是每個執行緒一個裝置,如本文檔中的 API 所示範。如果您是後端開發人員,並且想要支援每個執行緒多個裝置,請聯繫 PyTorch Distributed 的維護人員。
第三方後端¶
除了內建的 GLOO/MPI/NCCL 後端之外,PyTorch Distributed 還透過執行階段註冊機制支援第三方後端。如需透過 C++ 擴充功能開發第三方後端的參考,請參閱 教學課程 - 自訂 C++ 和 CUDA 擴充功能 和 test/cpp_extensions/cpp_c10d_extension.cpp。第三方後端的功能由其實作決定。
新的後端衍生自 c10d::ProcessGroup,並在匯入時透過 torch.distributed.Backend.register_backend() 註冊後端名稱和實例化介面。
當手動匯入此後端並使用對應的後端名稱叫用 torch.distributed.init_process_group() 時,torch.distributed 套件會在新後端上執行。
警告
第三方後端的支援為實驗性功能,可能會有所變動。
啟動公用程式¶
torch.distributed 套件還在 torch.distributed.launch 中提供啟動公用程式。此輔助公用程式可用於為分散式訓練啟動每個節點的多個行程。
模組 torch.distributed.launch。
torch.distributed.launch 是一個模組,可在每個訓練節點上產生多個分散式訓練行程。
警告
此模組將被棄用,改用 torchrun。
該公用程式可用於單節點分散式訓練,其中每個節點將產生一個或多個行程。該公用程式可用於 CPU 訓練或 GPU 訓練。如果該公用程式用於 GPU 訓練,則每個分散式行程將在單個 GPU 上運行。這可以顯著提高單節點訓練效能。它也可以用於多節點分散式訓練,方法是在每個節點上產生多個行程,以顯著提高多節點分散式訓練效能。這對於具有多個 Infiniband 介面的系統(這些介面具有直接 GPU 支援)尤其有利,因為它們都可以用於聚合通訊頻寬。
在單節點分散式訓練或多節點分散式訓練的情況下,此公用程式都將啟動每個節點上給定數量的行程(--nproc-per-node)。如果用於 GPU 訓練,則此數字必須小於或等於當前系統上的 GPU 數量(nproc_per_node),並且每個行程將在 *從 GPU 0 到 GPU(nproc_per_node - 1)* 的單個 GPU 上運行。
如何使用此模組
- 單節點多行程分散式訓練 
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
- 多節點多行程分散式訓練:(例如兩個節點) 
節點 1:*(IP:192.168.1.1,並且有一個空閒埠:1234)*
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
節點 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
- 若要查詢此模組提供的選用參數 
python -m torch.distributed.launch --help
重要注意事項
1. 此公用程式和多行程分散式(單節點或多節點)GPU 訓練目前僅使用 NCCL 分散式後端才能獲得最佳效能。因此,NCCL 後端是 GPU 訓練的推薦後端。
2. 在您的訓練程式中,您必須解析命令列參數:--local-rank=LOCAL_PROCESS_RANK,該參數將由本模組提供。如果您的訓練程式使用 GPU,則應確保您的程式碼僅在 LOCAL_PROCESS_RANK 的 GPU 裝置上運行。這可以透過以下方式完成
解析 local_rank 參數
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
使用以下任一方法將您的裝置設定為本地排名
>>> torch.cuda.set_device(args.local_rank)  # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...
在 2.0.0 版中已變更: 啟動器會將 --local-rank=<rank> 參數傳遞給您的腳本。從 PyTorch 2.0.0 開始,建議使用帶破折號的 --local-rank,而不是以前使用的帶底線的 --local_rank。
為了向後相容,使用者可能需要在其參數解析程式碼中處理這兩種情況。這表示在參數解析器中同時包含 "--local-rank" 和 "--local_rank"。如果僅提供 "--local_rank",啟動器將會觸發錯誤:「錯誤:無法辨識的參數:–local-rank=<rank>」。對於僅支援 PyTorch 2.0.0+ 的訓練程式碼,包含 "--local-rank" 應該就足夠了。
3. 在您的訓練程式中,您應該在開始時呼叫以下函數來啟動分散式後端。強烈建議使用 init_method=env://。其他 init 方法(例如 tcp://)可能可以使用,但 env:// 是此模組官方支援的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')
4. 在您的訓練程式中,您可以使用常規的分散式函數,也可以使用 torch.nn.parallel.DistributedDataParallel() 模組。如果您的訓練程式使用 GPU 進行訓練,並且您想使用 torch.nn.parallel.DistributedDataParallel() 模組,以下是配置方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)
請確保將 device_ids 參數設定為您的程式碼將在其上運行的唯一 GPU 裝置 ID。這通常是行程的本地排名。換句話說,device_ids 必須是 [args.local_rank],並且 output_device 必須是 args.local_rank 才能使用此公用程式
5. 另一種將 local_rank 傳遞給子行程的方法是透過環境變數 LOCAL_RANK。當您使用 --use-env=True 啟動腳本時,就會啟用此行為。您必須調整上面的子行程範例,將 args.local_rank 替換為 os.environ['LOCAL_RANK'];當您指定此標記時,啟動器將不會傳遞 --local-rank。
警告
local_rank 不是全域唯一的:它僅在機器上的每個行程中是唯一的。因此,不要使用它來決定是否應該(例如)寫入網路檔案系統。如需錯誤執行此操作可能導致問題的範例,請參閱 https://github.com/pytorch/pytorch/issues/12042。
產生公用程式¶
多行程套件 - torch.multiprocessing 套件還在 torch.multiprocessing.spawn() 中提供 spawn 函數。此輔助函數可用於產生多個行程。它的工作原理是傳入您要執行的函數,並產生 N 個行程來執行它。這也可以用於多行程分散式訓練。
如需如何使用它的參考,請參閱 PyTorch 範例 - ImageNet 實作
請注意,此函數需要 Python 3.4 或更高版本。
偵錯 torch.distributed 應用程式¶
偵錯分散式應用程式可能具有挑戰性,因為難以理解的當機、崩潰或各排名之間的行為不一致。torch.distributed 提供了一套工具,可幫助以自助服務方式偵錯訓練應用程式
Python 中斷點¶
在分散式環境中使用 Python 的偵錯器非常方便,但由於它無法立即使用,因此許多人根本不使用它。PyTorch 提供了一個圍繞 pdb 的自訂包裝器,可簡化流程。
torch.distributed.breakpoint 使此過程變得容易。在內部,它以兩種方式自訂了 pdb 的中斷點行為,但在其他方面則與普通的 pdb 相同。1. 僅在一個排名(由使用者指定)上附加偵錯器。2. 透過使用 torch.distributed.barrier() 來確保所有其他排名都停止,該屏障將在偵錯的排名發出 continue 後釋放 3. 從子行程重新路由 stdin,使其連接到您的終端機。
若要使用它,只需在所有排名上發出 torch.distributed.breakpoint(rank),在每種情況下對 rank 使用相同的值。
監控屏障¶
從 v1.10 版開始,torch.distributed.monitored_barrier() 作為 torch.distributed.barrier() 的替代方案,可在崩潰時提供有關哪個進程可能發生錯誤的訊息,例如,並非所有進程都在提供的逾時時間內呼叫 torch.distributed.monitored_barrier()。 torch.distributed.monitored_barrier() 使用 send/recv 通訊原語,透過類似於確認的過程實作主機端屏障,允許進程 0 回報哪些進程未能在逾時時間內確認屏障。例如,請參考以下函數,其中進程 1 未能呼叫 torch.distributed.monitored_barrier()(在實務上,這可能是由於應用程式錯誤或先前集體作業中的掛起)
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())
進程 0 上會產生以下錯誤訊息,允許使用者判斷哪些進程可能發生錯誤並進一步調查
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG¶
使用 TORCH_CPP_LOG_LEVEL=INFO 時,環境變數 TORCH_DISTRIBUTED_DEBUG 可用於觸發其他有用的記錄和集體同步檢查,以確保所有進程都能適當同步。 TORCH_DISTRIBUTED_DEBUG 可以設定為 OFF(預設)、INFO 或 DETAIL,具體取決於所需的除錯級別。請注意,最詳細的選項 DETAIL可能會影響應用程式的效能,因此僅應在除錯問題時使用。
設定 TORCH_DISTRIBUTED_DEBUG=INFO 會在使用 torch.nn.parallel.DistributedDataParallel() 訓練的模型初始化時產生額外的除錯記錄,而 TORCH_DISTRIBUTED_DEBUG=DETAIL 還會記錄選定迭代次數的執行階段效能統計數據。這些執行階段統計數據包括正向時間、反向時間、梯度通訊時間等數據。例如,假設以下應用程式
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)
    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
    inp = torch.randn(10, 10).cuda()
    print("train")
    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())
初始化時會產生以下記錄
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
執行階段期間會產生以下記錄(設定 TORCH_DISTRIBUTED_DEBUG=DETAIL 時)
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO 可增強 torch.nn.parallel.DistributedDataParallel() 中因模型中未使用參數而導致的崩潰記錄。目前,如果在正向傳遞中可能未使用某些參數,則必須將 find_unused_parameters=True 傳遞至 torch.nn.parallel.DistributedDataParallel() 初始化中,並且從 v1.10 版開始,所有模型輸出都必須用於損失計算中,因為 torch.nn.parallel.DistributedDataParallel() 不支援在反向傳遞中使用未使用的參數。這些限制對於較大的模型來說尤其具有挑戰性,因此在因錯誤而崩潰時,torch.nn.parallel.DistributedDataParallel() 會記錄所有未使用參數的完整名稱。例如,在上述應用程式中,如果我們將 loss 修改為改為計算為 loss = output[1],則 TwoLinLayerNet.a 在反向傳遞中不會收到梯度,因此導致 DDP 失敗。崩潰時,系統會將有關未使用參數的資訊傳遞給使用者,而對於大型模型來說,這些參數可能難以手動找到
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
設定 TORCH_DISTRIBUTED_DEBUG=DETAIL 會在使用者直接或間接發出的每個集體呼叫上觸發額外的連貫性和同步檢查(例如 DDP allreduce)。這是透過建立一個包裝器進程群組來完成的,該群組包裝由 torch.distributed.init_process_group() 和 torch.distributed.new_group() API 傳回的所有進程群組。因此,這些 API 將傳回一個包裝器進程群組,該群組的使用方式與一般進程群組完全相同,但在將集體呼叫分派到基礎進程群組之前會執行連貫性檢查。目前,這些檢查包括 torch.distributed.monitored_barrier(),它確保所有進程都完成其未完成的集體呼叫,並報告卡住的進程。接下來,透過確保所有集體函數都匹配並使用一致的張量形狀來檢查集體呼叫本身的連貫性。如果不是這樣,則在應用程式崩潰時會包含詳細的錯誤報告,而不是掛起或提供沒有資訊的錯誤訊息。例如,請參考以下函數,該函數在 torch.distributed.all_reduce() 中具有不匹配的輸入形狀
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())
使用 NCCL 後端時,這樣的應用程式可能會導致掛起,而在非平凡的情況下,可能難以找出根本原因。如果使用者啟用 TORCH_DISTRIBUTED_DEBUG=DETAIL 並重新執行應用程式,則以下錯誤訊息會顯示根本原因
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]
注意事項
如需在執行階段期間精細控制除錯級別,也可以使用函數 torch.distributed.set_debug_level()、torch.distributed.set_debug_level_from_env() 和 torch.distributed.get_debug_level()。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以與 TORCH_SHOW_CPP_STACKTRACES=1 結合使用,以便在偵測到集體去同步時記錄整個呼叫堆疊。這些集體去同步檢查適用於使用由 torch.distributed.init_process_group() 和 torch.distributed.new_group() API 建立的進程群組支援的 c10d 集體呼叫的所有應用程式。
記錄¶
除了透過 torch.distributed.monitored_barrier() 和 TORCH_DISTRIBUTED_DEBUG 進行顯式除錯支援之外,torch.distributed 的基礎 C++ 程式庫還會輸出各種級別的記錄訊息。這些訊息有助於瞭解分散式訓練作業的執行狀態,並排除網路連線失敗等問題。下表顯示如何透過組合 TORCH_CPP_LOG_LEVEL 和 TORCH_DISTRIBUTED_DEBUG 環境變數來調整記錄級別。
| 
 | 
 | 實際記錄級別 | 
|---|---|---|
| 
 | 已忽略 | 錯誤 | 
| 
 | 已忽略 | 警告 | 
| 
 | 已忽略 | 資訊 | 
| 
 | 
 | 除錯 | 
| 
 | 
 | 追蹤(又稱全部) | 
分散式元件會引發從 RuntimeError 衍生的自訂例外類型
- torch.distributed.DistError:這是所有分散式例外類型的基底類型。 
- torch.distributed.DistBackendError:當發生後端特定錯誤時,就會引發此例外。例如,如果使用 NCCL 後端,而使用者嘗試使用 NCCL 程式庫無法使用的 GPU。 
- torch.distributed.DistNetworkError:當網路程式庫遇到錯誤時,就會引發此例外(例如:對等節點重設連線) 
- torch.distributed.DistStoreError:當存放區遇到錯誤時,就會引發此例外(例如:TCPStore 逾時) 
- class torch.distributed.DistError¶
- 在分散式程式庫中發生錯誤時引發的例外 
- class torch.distributed.DistBackendError¶
- 在分散式中發生後端錯誤時引發的例外 
- class torch.distributed.DistNetworkError¶
- 在分散式中發生網路錯誤時引發的例外 
- class torch.distributed.DistStoreError¶
- 在分散式存放區中發生錯誤時引發的例外 
如果您正在執行單節點訓練,則以互動方式中斷腳本可能會很方便。我們提供了一種方便地在單一進程中設定中斷點的方法