分散式 RPC 架構¶
分散式 RPC 架構透過一組基本元素提供多機器模型訓練的機制,以允許遠端通訊,以及一個更高級別的 API 來自動區分跨多台機器分割的模型。
警告
RPC 套件中的 API 是穩定的。目前有多個正在進行的工作項目,用於改善效能和錯誤處理,這些項目將在未來的版本中提供。
警告
CUDA 支援是在 PyTorch 1.9 中引入的,目前仍為**測試版**功能。並非所有 RPC 套件的功能都與 CUDA 支援相容,因此不建議使用這些功能。這些不支援的功能包括:RRef、JIT 相容性、分散式自動梯度和分散式優化器,以及效能分析。這些缺點將在未來的版本中得到解決。
注意
如需與分散式訓練相關的所有功能的簡要介紹,請參閱PyTorch 分散式概覽。
基本知識¶
分散式 RPC 架構可以輕鬆地遠端執行函數,支援在不複製實際資料的情況下參考遠端物件,並提供自動梯度和優化器 API,以透明地跨 RPC 邊界執行反向傳播和更新參數。這些功能可以分為四組 API。
- 遠端程序呼叫 (RPC) 支援在指定的目標工作器上使用給定的參數執行函數,並取得回傳值或建立對回傳值的參考。有三種主要的 RPC API: - rpc_sync()(同步)、- rpc_async()(非同步)和- remote()(非同步並回傳對遠端回傳值的參考)。如果使用者程式碼在沒有回傳值的情況下無法繼續執行,請使用同步 API。否則,請使用非同步 API 來取得 Future,並在呼叫端需要回傳值時等待 Future。當需要遠端建立某些內容但不需要將其提取到呼叫端時,- remote()API 很有用。想像一下,驅動程序正在設定參數伺服器和訓練器。驅動程序可以在參數伺服器上建立嵌入表,然後與訓練器共用對嵌入表的參考,但它本身永遠不會在本機使用嵌入表。在這種情況下,- rpc_sync()和- rpc_async()不再適用,因為它們總是意味著回傳值將立即或在將來回傳給呼叫端。
- 遠端參考 (RRef) 作為對本地或遠端物件的分散式共用指標。它可以與其他工作器共用,並且將透明地處理參考計數。每個 RRef 只有一個所有者,並且物件只存在於該所有者上。持有 RRef 的非所有者工作器可以透過明確請求從所有者那裡取得物件的副本。當工作器需要存取某些資料物件,但它本身既不是建立者( - remote()的呼叫者),也不是物件的所有者時,這很有用。我們將在下面討論的分散式優化器就是這種用例的一個例子。
- 分散式自動梯度 將參與正向傳播的所有工作器上的本地自動梯度引擎拼接在一起,並在反向傳播期間自動聯繫它們以計算梯度。如果正向傳播需要跨越多台機器(例如,在進行分散式模型平行訓練、參數伺服器訓練等時),這將特別有用。使用此功能,使用者程式碼不再需要擔心如何跨 RPC 邊界傳送梯度,以及應該以何種順序啟動本地自動梯度引擎,如果正向傳播中有嵌套和相互依賴的 RPC 呼叫,這可能會變得相當複雜。 
- 分散式最佳化器 (Distributed Optimizer)的建構函式接受一個- Optimizer()(例如,- SGD()、- Adagrad()等)和一個參數 RRef 列表,並在每個不同的 RRef 所有者上建立一個- Optimizer()實例,並在執行- step()時相應地更新參數。當您具有分散式正向和反向傳遞時,參數和梯度將分散在多個工作節點上,因此每個參與的工作節點上都需要一個最佳化器。分散式最佳化器將所有這些本地最佳化器包裝成一個,並提供簡潔的建構函式和- step()API。
RPC¶
在使用 RPC 和分散式 Autograd 原語之前,必須先進行初始化。要初始化 RPC 架構,我們需要使用 init_rpc(),它將初始化 RPC 架構、RRef 架構和分散式 Autograd。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[原始碼]¶
- 初始化 RPC 原語,例如本地 RPC 代理和分散式 Autograd,這將使當前進程立即準備好發送和接收 RPC。 - 參數
- name (str) – 此節點的全局唯一名稱。(例如, - Trainer3、- ParameterServer2、- Master、- Worker1)名稱只能包含數字、字母、底線、冒號和/或破折號,並且長度必須小於 128 個字元。
- backend (BackendType, 可選) – RPC 後端實現的類型。支援的值為 - BackendType.TENSORPIPE(預設值)。有關更多資訊,請參閱後端。
- rank (int) – 此節點的全局唯一 ID/排名。 
- world_size (int) – 群組中的工作節點數。 
- rpc_backend_options (RpcBackendOptions, 可選) – 傳遞給 RpcAgent 建構函式的選項。它必須是 - RpcBackendOptions的代理特定子類,並且包含代理特定的初始化配置。默認情況下,對於所有代理,它會將默認逾時設置為 60 秒,並使用- init_method = "env://"初始化的底層進程組執行會合,這意味著需要正確設置環境變數- MASTER_ADDR和- MASTER_PORT。有關更多資訊以及可用的選項,請參閱後端。
 
 
以下 API 允許用戶遠端執行函數,以及建立對遠端資料物件的引用(RRef)。在這些 API 中,當傳遞 Tensor 作為參數或回傳值時,目標工作節點將嘗試建立具有相同中繼資料(即形狀、步幅等)的 Tensor。我們有意禁止傳輸 CUDA 張量,因為如果來源和目標工作節點上的設備列表不匹配,則可能會崩潰。在這種情況下,應用程式始終可以在呼叫端明確地將輸入張量移至 CPU,並在需要時將其移至被呼叫端上的所需設備。
警告
RPC 中的 TorchScript 支援是一項原型功能,可能會有所變更。自 v1.5.0 起,torch.distributed.rpc 支援呼叫 TorchScript 函數作為 RPC 目標函數,這將有助於提高被呼叫端上的平行性,因為執行 TorchScript 函數不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]¶
- 對工作節點 - to進行阻塞 RPC 呼叫,以執行函數- func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。- 參數
- to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/ - WorkerInfo。
- func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 - add())和帶註釋的 TorchScript 函數。
- args (tuple) – - func呼叫的參數元組。
- kwargs (dict) – - func呼叫的關鍵字參數字典。
- timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用 - _set_rpc_timeout設置的默認值。
 
- 回傳值
- 回傳使用 - args和- kwargs執行- func的結果。
 - 範例:
- 確保在兩個工作節點上都正確設置了 - MASTER_ADDR和- MASTER_PORT。有關更多詳細資訊,請參閱- init_process_group()API。例如,- export MASTER_ADDR=localhost export MASTER_PORT=5678 - 然後在兩個不同的進程中運行以下代碼 - >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown() - >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() - 以下是使用 RPC 執行 TorchScript 函數的範例。 - >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) - >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown() - >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() 
 
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]¶
- 對工作節點 - to進行非阻塞 RPC 呼叫,以執行函數- func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。此方法將立即返回一個可以等待的- Future。- 參數
- to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/ - WorkerInfo。
- func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 - add())和帶註釋的 TorchScript 函數。
- args (tuple) – - func呼叫的參數元組。
- kwargs (dict) – - func呼叫的關鍵字參數字典。
- timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用 - _set_rpc_timeout設置的默認值。
 
- 回傳值
- 返回一個可以等待的 - Future物件。完成後,可以從- Future物件中檢索- func對- args和- kwargs的回傳值。
 - 警告 - 不支援使用 GPU 張量作為 - func的參數或回傳值,因為我們不支援通過網路發送 GPU 張量。在將 GPU 張量用作- func的參數或回傳值之前,您需要明確地將其複製到 CPU。- 警告 - rpc_asyncAPI 在通過網路發送參數張量之前不會複製其儲存,這可以由不同的線程完成,具體取決於 RPC 後端類型。呼叫端應確保在返回的- Future完成之前,這些張量的內容保持完整。- 範例:
- 確保在兩個工作節點上都正確設置了 - MASTER_ADDR和- MASTER_PORT。有關更多詳細資訊,請參閱- init_process_group()API。例如,- export MASTER_ADDR=localhost export MASTER_PORT=5678 - 然後在兩個不同的進程中運行以下代碼 - >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown() - >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() - 以下是使用 RPC 執行 TorchScript 函數的範例。 - >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) - >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown() - >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() 
 
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
- 發出遠端呼叫,在工作節點 - to上執行- func,並立即傳回結果值的- RRef。工作節點- to將會是傳回的- RRef的擁有者,而呼叫- remote的工作節點則是使用者。擁有者會管理其- RRef的全域參考計數,並且只有在全域沒有任何對其的有效參考時,才會銷毀擁有者的- RRef。- 參數
- to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/ - WorkerInfo。
- func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 - add())和帶註釋的 TorchScript 函數。
- args (tuple) – - func呼叫的參數元組。
- kwargs (dict) – - func呼叫的關鍵字參數字典。
- timeout (float, 選用) – 此遠端呼叫的逾時時間(秒)。如果在這個逾時時間內,沒有成功在工作節點 - to上處理完這個- RRef的建立,那麼下次嘗試使用 RRef 時(例如- to_here()),就會引發一個逾時錯誤,表示建立失敗。值 0 表示無限逾時,也就是說,永遠不會引發逾時錯誤。如果沒有提供,則使用初始化期間或使用- _set_rpc_timeout設定的預設值。
 
- 回傳值
- 結果值的使用者 - RRef實例。使用阻塞 API- torch.distributed.rpc.RRef.to_here()在本地擷取結果值。
 - 警告 - remoteAPI 不會在傳送參數張量的儲存體之前複製它們,這可能會由不同的執行緒完成,具體取決於 RPC 後端的類型。呼叫者應確保這些張量的內容保持完整,直到擁有者確認傳回的 RRef 為止,這可以使用- torch.distributed.rpc.RRef.confirmed_by_owner()API 進行檢查。- 警告 - 會盡力處理 - remoteAPI 的錯誤,例如逾時。這表示當- remote發起的遠端呼叫失敗時,例如出現逾時錯誤,我們會盡力處理錯誤。這表示錯誤會以非同步方式在產生的 RRef 上進行處理和設定。如果在處理之前應用程式尚未使用 RRef(例如- to_here或 fork 呼叫),則未來使用- RRef時將會適當地引發錯誤。但是,使用者應用程式有可能在錯誤被處理之前就使用了- RRef。在這種情況下,錯誤可能不會被引發,因為它們尚未被處理。- 範例 - Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]¶
- 取得給定工作節點名稱的 - WorkerInfo。使用這個- WorkerInfo可以避免在每次呼叫時都傳遞一個昂貴的字串。- 參數
- worker_name (str) – 工作節點的字串名稱。如果是 - None,則傳回目前工作節點的 ID。(預設為- None)
- 回傳值
- 給定 - worker_name的- WorkerInfo實例,如果- worker_name為- None,則為目前工作節點的- WorkerInfo。
 
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]¶
- 關閉 RPC 代理程式,然後銷毀它。這會停止本地代理程式接受未完成的請求,並透過終止所有 RPC 執行緒來關閉 RPC 架構。如果 - graceful=True,這個方法會阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。否則,如果- graceful=False,則這是一個本地關閉,它不會等待其他 RPC 程序到達這個方法。- 警告 - 對於由 - rpc_async()傳回的- Future物件,不應在- shutdown()之後呼叫- future.wait()。- 參數
- graceful (bool) – 是否要進行正常關閉。如果是 True,則會 1) 等待 - UserRRefs沒有待處理的系統訊息,並將其刪除;2) 阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。
 - 範例:
- 確保在兩個工作節點上都正確設置了 - MASTER_ADDR和- MASTER_PORT。有關更多詳細資訊,請參閱- init_process_group()API。例如,- export MASTER_ADDR=localhost export MASTER_PORT=5678 - 然後在兩個不同的進程中運行以下代碼 - >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown() - >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown() 
 
- class torch.distributed.rpc.WorkerInfo¶
- 一種封裝系統中工作節點資訊的結構。包含工作節點的名稱和 ID。這個類別不打算直接建構,而是可以透過 - get_worker_info()擷取實例,並將結果傳遞給- rpc_sync()、- rpc_async()、- remote()等函式,以避免在每次呼叫時都複製一個字串。- property id¶
- 用於識別工作節點的全域唯一 ID。 
 - property name¶
- 工作節點的名稱。 
 
RPC 套件還提供了裝飾器,允許應用程式指定如何在被呼叫方處理給定的函式。
- torch.distributed.rpc.functions.async_execution(fn)[source]¶
- 一個用於函式的裝飾器,表示該函式的傳回值保證是一個 - Future物件,並且該函式可以在 RPC 被呼叫方非同步執行。更具體地說,被呼叫方會提取由包裝函式傳回的- Future,並將後續處理步驟安裝為該- Future的回調。安裝的回調將在完成時從- Future中讀取值,並將該值作為 RPC 回應傳回。這也意味著傳回的- Future只存在於被呼叫方,並且永遠不會透過 RPC 傳送。當包裝函式 (- fn) 的執行需要暫停和恢復時,例如,包含- rpc_async()或等待其他信號時,這個裝飾器非常有用。- 注意 - 為了啟用非同步執行,應用程式必須將此裝飾器返回的函數物件傳遞給 RPC API。如果 RPC 偵測到此裝飾器安裝的屬性,它就知道此函數會返回一個 - Future物件,並會相應地處理它。然而,這並不意味著這個裝飾器在定義函數時必須是最外層的。例如,當與- @staticmethod或- @classmethod結合使用時,- @rpc.functions.async_execution必須是內部裝飾器,才能讓目標函數被識別為靜態或類別函數。此目標函數仍然可以非同步執行,因為在存取時,靜態或類別方法會保留由- @rpc.functions.async_execution安裝的屬性。- 範例:
- 返回的 - Future物件可以來自- rpc_async()、- then()或- Future建構函數。以下範例顯示直接使用- then()返回的- Future。- >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.]) - 當與 TorchScript 裝飾器結合使用時,此裝飾器必須是最外層的。 - >>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.]) - 當與靜態或類別方法結合使用時,此裝飾器必須是內部的。 - >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) - 此裝飾器也適用於 RRef 輔助函數,即 . - torch.distributed.rpc.RRef.rpc_sync()、- torch.distributed.rpc.RRef.rpc_async()和- torch.distributed.rpc.RRef.remote()。- >>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.]) 
 
後端¶
RPC 模組可以利用不同的後端在節點之間進行通訊。要使用的後端可以在 init_rpc() 函數中指定,方法是傳遞 BackendType 列舉的特定值。無論使用哪種後端,其餘的 RPC API 都不會改變。每個後端也定義了自己的 RpcBackendOptions 類別子類別,其實例也可以傳遞給 init_rpc() 以設定後端的行為。
- class torch.distributed.rpc.BackendType(value)¶
- 可用後端的列舉類別。 - PyTorch 搭載了一個內建的 - BackendType.TENSORPIPE後端。可以使用- register_backend()函數註冊其他後端。
- class torch.distributed.rpc.RpcBackendOptions¶
- 一個抽象結構,封裝傳遞給 RPC 後端的選項。這個類別的實例可以傳遞給 - init_rpc(),以便使用特定設定初始化 RPC,例如要使用的 RPC 超時和- init_method。- property init_method¶
- 指定如何初始化處理程序群組的 URL。預設值為 - env://
 - property rpc_timeout¶
- 一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。 
 
TensorPipe 後端¶
TensorPipe 代理程式(預設)利用 TensorPipe 函式庫,該函式庫提供專為機器學習設計的原生點對點通訊原語,從根本上解決了 Gloo 的一些限制。與 Gloo 相比,它的優點是非同步,這允許多個傳輸同時進行,每個傳輸都有自己的速度,而不會互相阻塞。它只會在需要時才在節點對之間開啟管道,並且當一個節點發生故障時,只有與其相關的管道會被關閉,而所有其他管道將繼續正常工作。此外,它能夠支援多種不同的傳輸方式(當然包括 TCP,但也包括共享記憶體、NVLink、InfiniBand……),並且可以自動偵測其可用性,並協商每個管道要使用的最佳傳輸方式。
TensorPipe 後端已在 PyTorch v1.6 中引入,並且正在積極開發中。目前,它只支援 CPU 張量,GPU 支援即將推出。它附帶了一個基於 TCP 的傳輸方式,就像 Gloo 一樣。它還可以自動將大型張量分塊並多工處理到多個通訊端和執行緒上,以實現非常高的頻寬。代理程式將能夠自行選擇最佳傳輸方式,無需任何干預。
範例
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]¶
- TensorPipeAgent的後端選項,衍生自- RpcBackendOptions。- 參數
- num_worker_threads (int, optional) – - TensorPipeAgent用於執行請求的執行緒池中的執行緒數量(預設值:16)。
- rpc_timeout (float, optional) – RPC 請求的預設超時時間(以秒為單位)(預設值:60 秒)。如果 RPC 未在此時間範圍內完成,則會引發指示已超時的例外狀況。如果需要,呼叫者可以在 - rpc_sync()和- rpc_async()中覆蓋個別 RPC 的超時時間。
- init_method (str, optional) – 用於初始化用於會合的分散式存放區的 URL。它接受 - init_process_group()的相同參數的任何值(預設值:- env://)。
- device_maps (Dict[str, Dict], optional) – 從此工作節點到被呼叫節點的裝置放置映射。鍵是被呼叫工作節點名稱,值是字典( - Dictof- int、- str或- torch.device),將此工作節點的裝置映射到被呼叫工作節點的裝置。(預設值:- None)
- devices (List[int, str, or - torch.device], optional) – RPC 代理程式使用的所有本地 CUDA 裝置。預設情況下,它將從其自身的- device_maps和來自其對等節點的- device_maps的相應裝置初始化為所有本地裝置。在處理 CUDA RPC 請求時,代理程式將正確同步此- List中所有裝置的 CUDA 流。
 
 - property device_maps¶
- 裝置映射位置。 
 - property devices¶
- 本地代理程式使用的所有裝置。 
 - property init_method¶
- 指定如何初始化處理程序群組的 URL。預設值為 - env://
 - property num_worker_threads¶
- TensorPipeAgent用於執行請求的執行緒池中的執行緒數量。
 - property rpc_timeout¶
- 一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。 
 - set_device_map(to, device_map)[原始碼]¶
- 設定每個 RPC 呼叫方和被呼叫方配對之間的裝置映射。這個函數可以被呼叫多次,以遞增的方式添加裝置放置配置。 - 參數
- to (str) – 被呼叫方名稱。 
- device_map (Dict of int, str, or torch.device) – 從這個工作節點到被呼叫方的裝置放置映射。這個映射必須是可逆的。 
 
 - 範例 - >>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1') 
 - set_devices(devices)[原始碼]¶
- 設定 TensorPipe RPC 代理使用的本地裝置。當處理 CUDA RPC 請求時,TensorPipe RPC 代理將會正確地同步這個 - List中所有裝置的 CUDA 串流。- 參數
- devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地裝置。 
 
 
注意
RPC 框架不會自動重試任何 rpc_sync()、rpc_async() 和 remote() 呼叫。原因是 RPC 框架無法判斷一個操作是否是等冪的,以及是否可以安全地重試。因此,處理失敗和在必要時進行重試是應用程式的責任。RPC 通訊基於 TCP,因此可能會因為網路故障或間歇性網路連線問題而發生故障。在這種情況下,應用程式需要以合理的退避時間適當地進行重試,以確保網路不會因為過於積極的重試而過載。
RRef¶
警告
當使用 CUDA 張量時,目前不支援 RRef
「RRef」(遠端參考)是指向遠端工作節點上某個類型為 T(例如 Tensor)的值的參考。這個控制代碼會讓被參考的遠端值在擁有者上保持活動狀態,但並不意味著該值將來會被傳送到本地工作節點。RRef 可以用於多機訓練,方法是在其他工作節點上保存對 nn.Modules 的參考,並在訓練期間呼叫適當的函數來檢索或修改它們的參數。有關更多詳細資訊,請參閱 遠端參考協定。
- class torch.distributed.rpc.PyRRef(RRef)¶
- 一個封裝了指向遠端工作節點上某個類型值的參考的類別。這個控制代碼會讓被參考的遠端值在工作節點上保持活動狀態。當 1) 應用程式程式碼和本地 RRef 上下文中都沒有對它的參考,或 2) 應用程式已呼叫正常關機時, - UserRRef將會被刪除。在已刪除的 RRef 上呼叫方法會導致未定義的行為。RRef 實作只提供盡力而為的錯誤偵測,應用程式不應該在- rpc.shutdown()之後使用- UserRRefs。- 警告 - RRef 只能由 RPC 模組進行序列化和反序列化。在沒有 RPC 的情況下對 RRef 進行序列化和反序列化(例如,Python pickle、torch - save()/- load()、JIT- save()/- load()等)將會導致錯誤。- 參數
- value (object) – 要由這個 RRef 包裝的值。 
- type_hint (Type, optional) – 應該傳遞給 - TorchScript編譯器的 Python 類型,作為- value的類型提示。
 
 - 範例:
- 為了簡單起見,以下範例省略了 RPC 初始化和關機程式碼。有關這些細節,請參閱 RPC 文件。 - 使用 rpc.remote 建立 RRef 
 - >>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here() - 從本地物件建立 RRef 
 - >>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x) - 與其他工作節點共用 RRef 
 - >>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1 - >>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,)) 
 - backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None¶
- 使用 RRef 作為反向傳遞的根節點來執行反向傳遞。如果提供了 - dist_autograd_ctx_id,我們將使用從 RRef 的擁有者開始提供的 ctx_id 執行分佈式反向傳遞。在這種情況下,應該使用- get_gradients()來檢索梯度。如果- dist_autograd_ctx_id為- None,則假設這是一個本地自動梯度圖,我們只會執行本地反向傳遞。在本地情況下,呼叫此 API 的節點必須是 RRef 的擁有者。RRef 的值預計是一個純量張量。- 參數
 - 範例:
- >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id) 
 
 - confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
- 傳回此 - RRef是否已由擁有者確認。- OwnerRRef一律傳回 true,而- UserRRef僅在擁有者知道此- UserRRef時才傳回 true。
 - is_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
- 傳回目前節點是否是此 - RRef的擁有者。
 - local_value(self: torch._C._distributed_rpc.PyRRef) object¶
- 如果當前節點是所有者,則返回對本地值的參考。否則,拋出異常。 
 - owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo¶
- 返回擁有此 - RRef的節點的工作者資訊。
 - owner_name(self: torch._C._distributed_rpc.PyRRef) str¶
- 返回擁有此 - RRef的節點的工作者名稱。
 - remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
- 建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 - remote。更具體地說,- rref.remote().func_name(*args, **kwargs)等同於以下內容- >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs)) - 參數
- timeout (float, 選用) – - rref.remote()的逾時。如果此- RRef的建立未在逾時時間內成功完成,則下次嘗試使用 RRef 時(例如- to_here),將會引發逾時。如果未提供,則將使用預設的 RPC 逾時。有關- RRef的特定逾時語義,請參閱- rpc.remote()。
 - 範例:
- >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]]) 
 
 - rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
- 建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 - rpc_async。更具體地說,- rref.rpc_async().func_name(*args, **kwargs)等同於以下內容- >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs)) - 參數
- timeout (float, 選用) – - rref.rpc_async()的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。
 - 範例:
- >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]]) 
 
 - rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
- 建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 - rpc_sync。更具體地說,- rref.rpc_sync().func_name(*args, **kwargs)等同於以下內容- >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs)) - 參數
- timeout (float, 選用) – - rref.rpc_sync()的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。
 - 範例:
- >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]]) 
 
 - to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
- 封鎖呼叫,將 RRef 的值從所有者複製到本地節點並返回它。如果當前節點是所有者,則返回對本地值的參考。 - 參數
- timeout (float, 選用) – - to_here的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時(60 秒)。
 
 
RemoteModule¶
警告
使用 CUDA 張量時,目前不支援 RemoteModule
RemoteModule 是一種在不同行程上遠程建立 nn.Module 的簡便方法。實際的模組位於遠程主機上,但本地主機具有此模組的控制代碼,並且可以像常規 nn.Module 一樣呼叫此模組。但是,呼叫會產生到遠程端的 RPC 呼叫,並且如果需要,可以透過 RemoteModule 支援的其他 API 非同步執行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]¶
- 只有在 RPC 初始化後才能建立 RemoteModule 实例。 - 它在指定的遠程節點上建立用戶端指定的模組。它的行為類似於常規的 - nn.Module,除了- forward方法是在遠程節點上執行的。它會處理自動梯度記錄,以確保反向傳播將梯度傳播回相應的遠程模組。- 它根據 - module_cls的- forward方法的簽章生成兩個方法- forward_async和- forward。- forward_async非同步運行並返回一個 Future。- forward_async和- forward的參數與- module_cls返回的模組的- forward方法相同。- 例如,如果 - module_cls返回一個- nn.Linear的实例,其- forward方法簽章為:- def forward(input: Tensor) -> Tensor:,則生成的- RemoteModule將具有以下簽章的 2 個方法- def forward(input: Tensor) -> Tensor:- def forward_async(input: Tensor) -> Future[Tensor]:- 參數
- remote_device (str) – 目標工作節點上我們想要放置此模組的裝置。格式應為「<工作節點名稱>/<裝置>」,其中裝置欄位可以解析為 torch.device 類型。例如:「trainer0/cpu」、「trainer0」、「ps0/cuda:0」。此外,裝置欄位可以是選用的,預設值為「cpu」。 
- module_cls (nn.Module) – - 要在遠端建立的模組類別。例如: - >>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule 
- args (Sequence, 選用) – 要傳遞給 - module_cls的 args。
- kwargs (Dict, 選用) – 要傳遞給 - module_cls的 kwargs。
 
- 回傳值
- 一個遠端模組實例,它包裝了由使用者提供的 - module_cls建立的- Module,它有一個阻塞的- forward方法和一個異步的- forward_async方法,該方法會傳回遠端使用者提供的模組上- forward呼叫的 future。
 - 範例:
- 在兩個不同的程序中執行以下程式碼 - >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown() - >>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() - 此外,您可以在此 教學 中找到與 DistributedDataParallel (DDP) 結合使用的更實際範例。 
 - remote_parameters(recurse=True)¶
- 傳回指向遠端模組參數的 - RRef清單。- 這通常可以與 - DistributedOptimizer結合使用。
 
分散式 Autograd 架構¶
警告
使用 CUDA 張量時,目前不支援分散式 autograd
此模組提供基於 RPC 的分散式 autograd 架構,可用於模型平行訓練等應用程式。簡而言之,應用程式可能會透過 RPC 傳送和接收梯度記錄張量。在正向傳遞中,我們會記錄何時透過 RPC 傳送梯度記錄張量,並在反向傳遞期間使用此資訊透過 RPC 執行分散式反向傳遞。如需更多詳細資訊,請參閱 分散式 Autograd 設計。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None¶
- 使用提供的根啟動分散式反向傳遞。這目前實作了 FAST 模式演算法,該演算法假設在反向傳遞期間,在工作節點之間於相同的分散式 autograd 上下文中傳送的所有 RPC 訊息都將成為 autograd 圖的一部分。 - 我們使用提供的根來探索 autograd 圖並計算適當的依賴關係。此方法會阻塞,直到完成整個 autograd 計算。 - 我們在每個節點上的適當 - torch.distributed.autograd.context中累積梯度。在呼叫- torch.distributed.autograd.backward()時,會根據傳入的- context_id查詢要使用的 autograd 上下文。如果沒有與給定 ID 相符的有效 autograd 上下文,我們會擲回錯誤。您可以使用- get_gradients()API 擷取累積的梯度。- 參數
 - 範例:
- >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss) 
 
- class torch.distributed.autograd.context[原始碼]¶
- 上下文物件,用於在使用分散式 autograd 時包裝正向和反向傳遞。在 - with陳述式中產生的- context_id需要用於在所有工作節點上唯一識別分散式反向傳遞。每個工作節點都會儲存與此- context_id相關聯的元數據,這是正確執行分散式 autograd 傳遞所必需的。- 範例:
- >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss]) 
 
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]¶
- 擷取從張量到該張量的適當梯度的映射,該梯度是在作為分散式 autograd 反向傳遞的一部分提供的上下文中累積的,與給定的 - context_id相對應。- 參數
- context_id (int) – 我們應該為其擷取梯度的 autograd 上下文 ID。 
- 回傳值
- 一個映射,其中鍵是張量,值是與該張量相關聯的梯度。 
 - 範例:
- >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2]) 
 
關於 RPC Autograd 的更多資訊
分散式優化器¶
請參閱 torch.distributed.optim 頁面,以取得有關分散式優化器的文件。
設計注意事項¶
分散式 autograd 設計注意事項涵蓋了基於 RPC 的分散式 autograd 架構的設計,該架構對於模型平行訓練等應用程式非常有用。
RRef 設計注意事項涵蓋了由架構用於參考遠端工作節點上的值的 RRef(遠端參考)協定的設計。
教學¶
RPC 教學向使用者介紹 RPC 架構,提供幾個使用 torch.distributed.rpc API 的範例應用程式,並示範如何使用 分析器 來分析基於 RPC 的工作負載。
- 結合分散式資料平行與分散式 RPC 架構(也涵蓋 RemoteModule)