分散式 RPC 架構¶
分散式 RPC 架構透過一組基本元素提供多機器模型訓練的機制,以允許遠端通訊,以及一個更高級別的 API 來自動區分跨多台機器分割的模型。
警告
RPC 套件中的 API 是穩定的。目前有多個正在進行的工作項目,用於改善效能和錯誤處理,這些項目將在未來的版本中提供。
警告
CUDA 支援是在 PyTorch 1.9 中引入的,目前仍為**測試版**功能。並非所有 RPC 套件的功能都與 CUDA 支援相容,因此不建議使用這些功能。這些不支援的功能包括:RRef、JIT 相容性、分散式自動梯度和分散式優化器,以及效能分析。這些缺點將在未來的版本中得到解決。
注意
如需與分散式訓練相關的所有功能的簡要介紹,請參閱PyTorch 分散式概覽。
基本知識¶
分散式 RPC 架構可以輕鬆地遠端執行函數,支援在不複製實際資料的情況下參考遠端物件,並提供自動梯度和優化器 API,以透明地跨 RPC 邊界執行反向傳播和更新參數。這些功能可以分為四組 API。
遠端程序呼叫 (RPC) 支援在指定的目標工作器上使用給定的參數執行函數,並取得回傳值或建立對回傳值的參考。有三種主要的 RPC API:
rpc_sync()(同步)、rpc_async()(非同步)和remote()(非同步並回傳對遠端回傳值的參考)。如果使用者程式碼在沒有回傳值的情況下無法繼續執行,請使用同步 API。否則,請使用非同步 API 來取得 Future,並在呼叫端需要回傳值時等待 Future。當需要遠端建立某些內容但不需要將其提取到呼叫端時,remote()API 很有用。想像一下,驅動程序正在設定參數伺服器和訓練器。驅動程序可以在參數伺服器上建立嵌入表,然後與訓練器共用對嵌入表的參考,但它本身永遠不會在本機使用嵌入表。在這種情況下,rpc_sync()和rpc_async()不再適用,因為它們總是意味著回傳值將立即或在將來回傳給呼叫端。遠端參考 (RRef) 作為對本地或遠端物件的分散式共用指標。它可以與其他工作器共用,並且將透明地處理參考計數。每個 RRef 只有一個所有者,並且物件只存在於該所有者上。持有 RRef 的非所有者工作器可以透過明確請求從所有者那裡取得物件的副本。當工作器需要存取某些資料物件,但它本身既不是建立者(
remote()的呼叫者),也不是物件的所有者時,這很有用。我們將在下面討論的分散式優化器就是這種用例的一個例子。分散式自動梯度 將參與正向傳播的所有工作器上的本地自動梯度引擎拼接在一起,並在反向傳播期間自動聯繫它們以計算梯度。如果正向傳播需要跨越多台機器(例如,在進行分散式模型平行訓練、參數伺服器訓練等時),這將特別有用。使用此功能,使用者程式碼不再需要擔心如何跨 RPC 邊界傳送梯度,以及應該以何種順序啟動本地自動梯度引擎,如果正向傳播中有嵌套和相互依賴的 RPC 呼叫,這可能會變得相當複雜。
分散式最佳化器 (Distributed Optimizer)的建構函式接受一個Optimizer()(例如,SGD()、Adagrad()等)和一個參數 RRef 列表,並在每個不同的 RRef 所有者上建立一個Optimizer()實例,並在執行step()時相應地更新參數。當您具有分散式正向和反向傳遞時,參數和梯度將分散在多個工作節點上,因此每個參與的工作節點上都需要一個最佳化器。分散式最佳化器將所有這些本地最佳化器包裝成一個,並提供簡潔的建構函式和step()API。
RPC¶
在使用 RPC 和分散式 Autograd 原語之前,必須先進行初始化。要初始化 RPC 架構,我們需要使用 init_rpc(),它將初始化 RPC 架構、RRef 架構和分散式 Autograd。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[原始碼]¶
初始化 RPC 原語,例如本地 RPC 代理和分散式 Autograd,這將使當前進程立即準備好發送和接收 RPC。
- 參數
name (str) – 此節點的全局唯一名稱。(例如,
Trainer3、ParameterServer2、Master、Worker1)名稱只能包含數字、字母、底線、冒號和/或破折號,並且長度必須小於 128 個字元。backend (BackendType, 可選) – RPC 後端實現的類型。支援的值為
BackendType.TENSORPIPE(預設值)。有關更多資訊,請參閱後端。rank (int) – 此節點的全局唯一 ID/排名。
world_size (int) – 群組中的工作節點數。
rpc_backend_options (RpcBackendOptions, 可選) – 傳遞給 RpcAgent 建構函式的選項。它必須是
RpcBackendOptions的代理特定子類,並且包含代理特定的初始化配置。默認情況下,對於所有代理,它會將默認逾時設置為 60 秒,並使用init_method = "env://"初始化的底層進程組執行會合,這意味著需要正確設置環境變數MASTER_ADDR和MASTER_PORT。有關更多資訊以及可用的選項,請參閱後端。
以下 API 允許用戶遠端執行函數,以及建立對遠端資料物件的引用(RRef)。在這些 API 中,當傳遞 Tensor 作為參數或回傳值時,目標工作節點將嘗試建立具有相同中繼資料(即形狀、步幅等)的 Tensor。我們有意禁止傳輸 CUDA 張量,因為如果來源和目標工作節點上的設備列表不匹配,則可能會崩潰。在這種情況下,應用程式始終可以在呼叫端明確地將輸入張量移至 CPU,並在需要時將其移至被呼叫端上的所需設備。
警告
RPC 中的 TorchScript 支援是一項原型功能,可能會有所變更。自 v1.5.0 起,torch.distributed.rpc 支援呼叫 TorchScript 函數作為 RPC 目標函數,這將有助於提高被呼叫端上的平行性,因為執行 TorchScript 函數不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]¶
對工作節點
to進行阻塞 RPC 呼叫,以執行函數func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。- 參數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/
WorkerInfo。func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如
add())和帶註釋的 TorchScript 函數。args (tuple) –
func呼叫的參數元組。kwargs (dict) –
func呼叫的關鍵字參數字典。timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用
_set_rpc_timeout設置的默認值。
- 回傳值
回傳使用
args和kwargs執行func的結果。
- 範例:
確保在兩個工作節點上都正確設置了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後在兩個不同的進程中運行以下代碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 執行 TorchScript 函數的範例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]¶
對工作節點
to進行非阻塞 RPC 呼叫,以執行函數func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。此方法將立即返回一個可以等待的Future。- 參數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/
WorkerInfo。func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如
add())和帶註釋的 TorchScript 函數。args (tuple) –
func呼叫的參數元組。kwargs (dict) –
func呼叫的關鍵字參數字典。timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用
_set_rpc_timeout設置的默認值。
- 回傳值
返回一個可以等待的
Future物件。完成後,可以從Future物件中檢索func對args和kwargs的回傳值。
警告
不支援使用 GPU 張量作為
func的參數或回傳值,因為我們不支援通過網路發送 GPU 張量。在將 GPU 張量用作func的參數或回傳值之前,您需要明確地將其複製到 CPU。警告
rpc_asyncAPI 在通過網路發送參數張量之前不會複製其儲存,這可以由不同的線程完成,具體取決於 RPC 後端類型。呼叫端應確保在返回的Future完成之前,這些張量的內容保持完整。- 範例:
確保在兩個工作節點上都正確設置了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後在兩個不同的進程中運行以下代碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 執行 TorchScript 函數的範例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
發出遠端呼叫,在工作節點
to上執行func,並立即傳回結果值的RRef。工作節點to將會是傳回的RRef的擁有者,而呼叫remote的工作節點則是使用者。擁有者會管理其RRef的全域參考計數,並且只有在全域沒有任何對其的有效參考時,才會銷毀擁有者的RRef。- 參數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/排名/
WorkerInfo。func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如
add())和帶註釋的 TorchScript 函數。args (tuple) –
func呼叫的參數元組。kwargs (dict) –
func呼叫的關鍵字參數字典。timeout (float, 選用) – 此遠端呼叫的逾時時間(秒)。如果在這個逾時時間內,沒有成功在工作節點
to上處理完這個RRef的建立,那麼下次嘗試使用 RRef 時(例如to_here()),就會引發一個逾時錯誤,表示建立失敗。值 0 表示無限逾時,也就是說,永遠不會引發逾時錯誤。如果沒有提供,則使用初始化期間或使用_set_rpc_timeout設定的預設值。
- 回傳值
結果值的使用者
RRef實例。使用阻塞 APItorch.distributed.rpc.RRef.to_here()在本地擷取結果值。
警告
remoteAPI 不會在傳送參數張量的儲存體之前複製它們,這可能會由不同的執行緒完成,具體取決於 RPC 後端的類型。呼叫者應確保這些張量的內容保持完整,直到擁有者確認傳回的 RRef 為止,這可以使用torch.distributed.rpc.RRef.confirmed_by_owner()API 進行檢查。警告
會盡力處理
remoteAPI 的錯誤,例如逾時。這表示當remote發起的遠端呼叫失敗時,例如出現逾時錯誤,我們會盡力處理錯誤。這表示錯誤會以非同步方式在產生的 RRef 上進行處理和設定。如果在處理之前應用程式尚未使用 RRef(例如to_here或 fork 呼叫),則未來使用RRef時將會適當地引發錯誤。但是,使用者應用程式有可能在錯誤被處理之前就使用了RRef。在這種情況下,錯誤可能不會被引發,因為它們尚未被處理。範例
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]¶
取得給定工作節點名稱的
WorkerInfo。使用這個WorkerInfo可以避免在每次呼叫時都傳遞一個昂貴的字串。- 參數
worker_name (str) – 工作節點的字串名稱。如果是
None,則傳回目前工作節點的 ID。(預設為None)- 回傳值
給定
worker_name的WorkerInfo實例,如果worker_name為None,則為目前工作節點的WorkerInfo。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]¶
關閉 RPC 代理程式,然後銷毀它。這會停止本地代理程式接受未完成的請求,並透過終止所有 RPC 執行緒來關閉 RPC 架構。如果
graceful=True,這個方法會阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。否則,如果graceful=False,則這是一個本地關閉,它不會等待其他 RPC 程序到達這個方法。警告
對於由
rpc_async()傳回的Future物件,不應在shutdown()之後呼叫future.wait()。- 參數
graceful (bool) – 是否要進行正常關閉。如果是 True,則會 1) 等待
UserRRefs沒有待處理的系統訊息,並將其刪除;2) 阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。
- 範例:
確保在兩個工作節點上都正確設置了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後在兩個不同的進程中運行以下代碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo¶
一種封裝系統中工作節點資訊的結構。包含工作節點的名稱和 ID。這個類別不打算直接建構,而是可以透過
get_worker_info()擷取實例,並將結果傳遞給rpc_sync()、rpc_async()、remote()等函式,以避免在每次呼叫時都複製一個字串。- property id¶
用於識別工作節點的全域唯一 ID。
- property name¶
工作節點的名稱。
RPC 套件還提供了裝飾器,允許應用程式指定如何在被呼叫方處理給定的函式。
- torch.distributed.rpc.functions.async_execution(fn)[source]¶
一個用於函式的裝飾器,表示該函式的傳回值保證是一個
Future物件,並且該函式可以在 RPC 被呼叫方非同步執行。更具體地說,被呼叫方會提取由包裝函式傳回的Future,並將後續處理步驟安裝為該Future的回調。安裝的回調將在完成時從Future中讀取值,並將該值作為 RPC 回應傳回。這也意味著傳回的Future只存在於被呼叫方,並且永遠不會透過 RPC 傳送。當包裝函式 (fn) 的執行需要暫停和恢復時,例如,包含rpc_async()或等待其他信號時,這個裝飾器非常有用。注意
為了啟用非同步執行,應用程式必須將此裝飾器返回的函數物件傳遞給 RPC API。如果 RPC 偵測到此裝飾器安裝的屬性,它就知道此函數會返回一個
Future物件,並會相應地處理它。然而,這並不意味著這個裝飾器在定義函數時必須是最外層的。例如,當與@staticmethod或@classmethod結合使用時,@rpc.functions.async_execution必須是內部裝飾器,才能讓目標函數被識別為靜態或類別函數。此目標函數仍然可以非同步執行,因為在存取時,靜態或類別方法會保留由@rpc.functions.async_execution安裝的屬性。- 範例:
返回的
Future物件可以來自rpc_async()、then()或Future建構函數。以下範例顯示直接使用then()返回的Future。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
當與 TorchScript 裝飾器結合使用時,此裝飾器必須是最外層的。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
當與靜態或類別方法結合使用時,此裝飾器必須是內部的。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此裝飾器也適用於 RRef 輔助函數,即 .
torch.distributed.rpc.RRef.rpc_sync()、torch.distributed.rpc.RRef.rpc_async()和torch.distributed.rpc.RRef.remote()。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
後端¶
RPC 模組可以利用不同的後端在節點之間進行通訊。要使用的後端可以在 init_rpc() 函數中指定,方法是傳遞 BackendType 列舉的特定值。無論使用哪種後端,其餘的 RPC API 都不會改變。每個後端也定義了自己的 RpcBackendOptions 類別子類別,其實例也可以傳遞給 init_rpc() 以設定後端的行為。
- class torch.distributed.rpc.BackendType(value)¶
可用後端的列舉類別。
PyTorch 搭載了一個內建的
BackendType.TENSORPIPE後端。可以使用register_backend()函數註冊其他後端。
- class torch.distributed.rpc.RpcBackendOptions¶
一個抽象結構,封裝傳遞給 RPC 後端的選項。這個類別的實例可以傳遞給
init_rpc(),以便使用特定設定初始化 RPC,例如要使用的 RPC 超時和init_method。- property init_method¶
指定如何初始化處理程序群組的 URL。預設值為
env://
- property rpc_timeout¶
一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。
TensorPipe 後端¶
TensorPipe 代理程式(預設)利用 TensorPipe 函式庫,該函式庫提供專為機器學習設計的原生點對點通訊原語,從根本上解決了 Gloo 的一些限制。與 Gloo 相比,它的優點是非同步,這允許多個傳輸同時進行,每個傳輸都有自己的速度,而不會互相阻塞。它只會在需要時才在節點對之間開啟管道,並且當一個節點發生故障時,只有與其相關的管道會被關閉,而所有其他管道將繼續正常工作。此外,它能夠支援多種不同的傳輸方式(當然包括 TCP,但也包括共享記憶體、NVLink、InfiniBand……),並且可以自動偵測其可用性,並協商每個管道要使用的最佳傳輸方式。
TensorPipe 後端已在 PyTorch v1.6 中引入,並且正在積極開發中。目前,它只支援 CPU 張量,GPU 支援即將推出。它附帶了一個基於 TCP 的傳輸方式,就像 Gloo 一樣。它還可以自動將大型張量分塊並多工處理到多個通訊端和執行緒上,以實現非常高的頻寬。代理程式將能夠自行選擇最佳傳輸方式,無需任何干預。
範例
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]¶
TensorPipeAgent的後端選項,衍生自RpcBackendOptions。- 參數
num_worker_threads (int, optional) –
TensorPipeAgent用於執行請求的執行緒池中的執行緒數量(預設值:16)。rpc_timeout (float, optional) – RPC 請求的預設超時時間(以秒為單位)(預設值:60 秒)。如果 RPC 未在此時間範圍內完成,則會引發指示已超時的例外狀況。如果需要,呼叫者可以在
rpc_sync()和rpc_async()中覆蓋個別 RPC 的超時時間。init_method (str, optional) – 用於初始化用於會合的分散式存放區的 URL。它接受
init_process_group()的相同參數的任何值(預設值:env://)。device_maps (Dict[str, Dict], optional) – 從此工作節點到被呼叫節點的裝置放置映射。鍵是被呼叫工作節點名稱,值是字典(
Dictofint、str或torch.device),將此工作節點的裝置映射到被呼叫工作節點的裝置。(預設值:None)devices (List[int, str, or
torch.device], optional) – RPC 代理程式使用的所有本地 CUDA 裝置。預設情況下,它將從其自身的device_maps和來自其對等節點的device_maps的相應裝置初始化為所有本地裝置。在處理 CUDA RPC 請求時,代理程式將正確同步此List中所有裝置的 CUDA 流。
- property device_maps¶
裝置映射位置。
- property devices¶
本地代理程式使用的所有裝置。
- property init_method¶
指定如何初始化處理程序群組的 URL。預設值為
env://
- property num_worker_threads¶
TensorPipeAgent用於執行請求的執行緒池中的執行緒數量。
- property rpc_timeout¶
一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。
- set_device_map(to, device_map)[原始碼]¶
設定每個 RPC 呼叫方和被呼叫方配對之間的裝置映射。這個函數可以被呼叫多次,以遞增的方式添加裝置放置配置。
- 參數
to (str) – 被呼叫方名稱。
device_map (Dict of int, str, or torch.device) – 從這個工作節點到被呼叫方的裝置放置映射。這個映射必須是可逆的。
範例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
- set_devices(devices)[原始碼]¶
設定 TensorPipe RPC 代理使用的本地裝置。當處理 CUDA RPC 請求時,TensorPipe RPC 代理將會正確地同步這個
List中所有裝置的 CUDA 串流。- 參數
devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地裝置。
注意
RPC 框架不會自動重試任何 rpc_sync()、rpc_async() 和 remote() 呼叫。原因是 RPC 框架無法判斷一個操作是否是等冪的,以及是否可以安全地重試。因此,處理失敗和在必要時進行重試是應用程式的責任。RPC 通訊基於 TCP,因此可能會因為網路故障或間歇性網路連線問題而發生故障。在這種情況下,應用程式需要以合理的退避時間適當地進行重試,以確保網路不會因為過於積極的重試而過載。
RRef¶
警告
當使用 CUDA 張量時,目前不支援 RRef
「RRef」(遠端參考)是指向遠端工作節點上某個類型為 T(例如 Tensor)的值的參考。這個控制代碼會讓被參考的遠端值在擁有者上保持活動狀態,但並不意味著該值將來會被傳送到本地工作節點。RRef 可以用於多機訓練,方法是在其他工作節點上保存對 nn.Modules 的參考,並在訓練期間呼叫適當的函數來檢索或修改它們的參數。有關更多詳細資訊,請參閱 遠端參考協定。
- class torch.distributed.rpc.PyRRef(RRef)¶
一個封裝了指向遠端工作節點上某個類型值的參考的類別。這個控制代碼會讓被參考的遠端值在工作節點上保持活動狀態。當 1) 應用程式程式碼和本地 RRef 上下文中都沒有對它的參考,或 2) 應用程式已呼叫正常關機時,
UserRRef將會被刪除。在已刪除的 RRef 上呼叫方法會導致未定義的行為。RRef 實作只提供盡力而為的錯誤偵測,應用程式不應該在rpc.shutdown()之後使用UserRRefs。警告
RRef 只能由 RPC 模組進行序列化和反序列化。在沒有 RPC 的情況下對 RRef 進行序列化和反序列化(例如,Python pickle、torch
save()/load()、JITsave()/load()等)將會導致錯誤。- 參數
value (object) – 要由這個 RRef 包裝的值。
type_hint (Type, optional) – 應該傳遞給
TorchScript編譯器的 Python 類型,作為value的類型提示。
- 範例:
為了簡單起見,以下範例省略了 RPC 初始化和關機程式碼。有關這些細節,請參閱 RPC 文件。
使用 rpc.remote 建立 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
從本地物件建立 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
與其他工作節點共用 RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None¶
使用 RRef 作為反向傳遞的根節點來執行反向傳遞。如果提供了
dist_autograd_ctx_id,我們將使用從 RRef 的擁有者開始提供的 ctx_id 執行分佈式反向傳遞。在這種情況下,應該使用get_gradients()來檢索梯度。如果dist_autograd_ctx_id為None,則假設這是一個本地自動梯度圖,我們只會執行本地反向傳遞。在本地情況下,呼叫此 API 的節點必須是 RRef 的擁有者。RRef 的值預計是一個純量張量。- 參數
- 範例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
傳回此
RRef是否已由擁有者確認。OwnerRRef一律傳回 true,而UserRRef僅在擁有者知道此UserRRef時才傳回 true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
傳回目前節點是否是此
RRef的擁有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object¶
如果當前節點是所有者,則返回對本地值的參考。否則,拋出異常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo¶
返回擁有此
RRef的節點的工作者資訊。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str¶
返回擁有此
RRef的節點的工作者名稱。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動
remote。更具體地說,rref.remote().func_name(*args, **kwargs)等同於以下內容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 參數
timeout (float, 選用) –
rref.remote()的逾時。如果此RRef的建立未在逾時時間內成功完成,則下次嘗試使用 RRef 時(例如to_here),將會引發逾時。如果未提供,則將使用預設的 RPC 逾時。有關RRef的特定逾時語義,請參閱rpc.remote()。
- 範例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動
rpc_async。更具體地說,rref.rpc_async().func_name(*args, **kwargs)等同於以下內容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 參數
timeout (float, 選用) –
rref.rpc_async()的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。
- 範例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動
rpc_sync。更具體地說,rref.rpc_sync().func_name(*args, **kwargs)等同於以下內容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 參數
timeout (float, 選用) –
rref.rpc_sync()的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。
- 範例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
封鎖呼叫,將 RRef 的值從所有者複製到本地節點並返回它。如果當前節點是所有者,則返回對本地值的參考。
- 參數
timeout (float, 選用) –
to_here的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時(60 秒)。
RemoteModule¶
警告
使用 CUDA 張量時,目前不支援 RemoteModule
RemoteModule 是一種在不同行程上遠程建立 nn.Module 的簡便方法。實際的模組位於遠程主機上,但本地主機具有此模組的控制代碼,並且可以像常規 nn.Module 一樣呼叫此模組。但是,呼叫會產生到遠程端的 RPC 呼叫,並且如果需要,可以透過 RemoteModule 支援的其他 API 非同步執行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]¶
只有在 RPC 初始化後才能建立 RemoteModule 实例。
它在指定的遠程節點上建立用戶端指定的模組。它的行為類似於常規的
nn.Module,除了forward方法是在遠程節點上執行的。它會處理自動梯度記錄,以確保反向傳播將梯度傳播回相應的遠程模組。它根據
module_cls的forward方法的簽章生成兩個方法forward_async和forward。forward_async非同步運行並返回一個 Future。forward_async和forward的參數與module_cls返回的模組的forward方法相同。例如,如果
module_cls返回一個nn.Linear的实例,其forward方法簽章為:def forward(input: Tensor) -> Tensor:,則生成的RemoteModule將具有以下簽章的 2 個方法def forward(input: Tensor) -> Tensor:def forward_async(input: Tensor) -> Future[Tensor]:- 參數
remote_device (str) – 目標工作節點上我們想要放置此模組的裝置。格式應為「<工作節點名稱>/<裝置>」,其中裝置欄位可以解析為 torch.device 類型。例如:「trainer0/cpu」、「trainer0」、「ps0/cuda:0」。此外,裝置欄位可以是選用的,預設值為「cpu」。
module_cls (nn.Module) –
要在遠端建立的模組類別。例如:
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, 選用) – 要傳遞給
module_cls的 args。kwargs (Dict, 選用) – 要傳遞給
module_cls的 kwargs。
- 回傳值
一個遠端模組實例,它包裝了由使用者提供的
module_cls建立的Module,它有一個阻塞的forward方法和一個異步的forward_async方法,該方法會傳回遠端使用者提供的模組上forward呼叫的 future。
- 範例:
在兩個不同的程序中執行以下程式碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,您可以在此 教學 中找到與 DistributedDataParallel (DDP) 結合使用的更實際範例。
- remote_parameters(recurse=True)¶
傳回指向遠端模組參數的
RRef清單。這通常可以與
DistributedOptimizer結合使用。
分散式 Autograd 架構¶
警告
使用 CUDA 張量時,目前不支援分散式 autograd
此模組提供基於 RPC 的分散式 autograd 架構,可用於模型平行訓練等應用程式。簡而言之,應用程式可能會透過 RPC 傳送和接收梯度記錄張量。在正向傳遞中,我們會記錄何時透過 RPC 傳送梯度記錄張量,並在反向傳遞期間使用此資訊透過 RPC 執行分散式反向傳遞。如需更多詳細資訊,請參閱 分散式 Autograd 設計。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None¶
使用提供的根啟動分散式反向傳遞。這目前實作了 FAST 模式演算法,該演算法假設在反向傳遞期間,在工作節點之間於相同的分散式 autograd 上下文中傳送的所有 RPC 訊息都將成為 autograd 圖的一部分。
我們使用提供的根來探索 autograd 圖並計算適當的依賴關係。此方法會阻塞,直到完成整個 autograd 計算。
我們在每個節點上的適當
torch.distributed.autograd.context中累積梯度。在呼叫torch.distributed.autograd.backward()時,會根據傳入的context_id查詢要使用的 autograd 上下文。如果沒有與給定 ID 相符的有效 autograd 上下文,我們會擲回錯誤。您可以使用get_gradients()API 擷取累積的梯度。- 參數
- 範例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[原始碼]¶
上下文物件,用於在使用分散式 autograd 時包裝正向和反向傳遞。在
with陳述式中產生的context_id需要用於在所有工作節點上唯一識別分散式反向傳遞。每個工作節點都會儲存與此context_id相關聯的元數據,這是正確執行分散式 autograd 傳遞所必需的。- 範例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]¶
擷取從張量到該張量的適當梯度的映射,該梯度是在作為分散式 autograd 反向傳遞的一部分提供的上下文中累積的,與給定的
context_id相對應。- 參數
context_id (int) – 我們應該為其擷取梯度的 autograd 上下文 ID。
- 回傳值
一個映射,其中鍵是張量,值是與該張量相關聯的梯度。
- 範例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
關於 RPC Autograd 的更多資訊
分散式優化器¶
請參閱 torch.distributed.optim 頁面,以取得有關分散式優化器的文件。
設計注意事項¶
分散式 autograd 設計注意事項涵蓋了基於 RPC 的分散式 autograd 架構的設計,該架構對於模型平行訓練等應用程式非常有用。
RRef 設計注意事項涵蓋了由架構用於參考遠端工作節點上的值的 RRef(遠端參考)協定的設計。
教學¶
RPC 教學向使用者介紹 RPC 架構,提供幾個使用 torch.distributed.rpc API 的範例應用程式,並示範如何使用 分析器 來分析基於 RPC 的工作負載。
結合分散式資料平行與分散式 RPC 架構(也涵蓋 RemoteModule)