• 文件 >
  • 分散式 RPC 架構
捷徑

分散式 RPC 架構

分散式 RPC 架構透過一組基本元素提供多機器模型訓練的機制,以允許遠端通訊,以及一個更高級別的 API 來自動區分跨多台機器分割的模型。

警告

RPC 套件中的 API 是穩定的。目前有多個正在進行的工作項目,用於改善效能和錯誤處理,這些項目將在未來的版本中提供。

警告

CUDA 支援是在 PyTorch 1.9 中引入的,目前仍為**測試版**功能。並非所有 RPC 套件的功能都與 CUDA 支援相容,因此不建議使用這些功能。這些不支援的功能包括:RRef、JIT 相容性、分散式自動梯度和分散式優化器,以及效能分析。這些缺點將在未來的版本中得到解決。

注意

如需與分散式訓練相關的所有功能的簡要介紹,請參閱PyTorch 分散式概覽

基本知識

分散式 RPC 架構可以輕鬆地遠端執行函數,支援在不複製實際資料的情況下參考遠端物件,並提供自動梯度和優化器 API,以透明地跨 RPC 邊界執行反向傳播和更新參數。這些功能可以分為四組 API。

  1. 遠端程序呼叫 (RPC) 支援在指定的目標工作器上使用給定的參數執行函數,並取得回傳值或建立對回傳值的參考。有三種主要的 RPC API:rpc_sync()(同步)、rpc_async()(非同步)和remote()(非同步並回傳對遠端回傳值的參考)。如果使用者程式碼在沒有回傳值的情況下無法繼續執行,請使用同步 API。否則,請使用非同步 API 來取得 Future,並在呼叫端需要回傳值時等待 Future。當需要遠端建立某些內容但不需要將其提取到呼叫端時,remote() API 很有用。想像一下,驅動程序正在設定參數伺服器和訓練器。驅動程序可以在參數伺服器上建立嵌入表,然後與訓練器共用對嵌入表的參考,但它本身永遠不會在本機使用嵌入表。在這種情況下,rpc_sync()rpc_async() 不再適用,因為它們總是意味著回傳值將立即或在將來回傳給呼叫端。

  2. 遠端參考 (RRef) 作為對本地或遠端物件的分散式共用指標。它可以與其他工作器共用,並且將透明地處理參考計數。每個 RRef 只有一個所有者,並且物件只存在於該所有者上。持有 RRef 的非所有者工作器可以透過明確請求從所有者那裡取得物件的副本。當工作器需要存取某些資料物件,但它本身既不是建立者(remote() 的呼叫者),也不是物件的所有者時,這很有用。我們將在下面討論的分散式優化器就是這種用例的一個例子。

  3. 分散式自動梯度 將參與正向傳播的所有工作器上的本地自動梯度引擎拼接在一起,並在反向傳播期間自動聯繫它們以計算梯度。如果正向傳播需要跨越多台機器(例如,在進行分散式模型平行訓練、參數伺服器訓練等時),這將特別有用。使用此功能,使用者程式碼不再需要擔心如何跨 RPC 邊界傳送梯度,以及應該以何種順序啟動本地自動梯度引擎,如果正向傳播中有嵌套和相互依賴的 RPC 呼叫,這可能會變得相當複雜。

  4. 分散式最佳化器 (Distributed Optimizer) 的建構函式接受一個 Optimizer()(例如,SGD()Adagrad() 等)和一個參數 RRef 列表,並在每個不同的 RRef 所有者上建立一個 Optimizer() 實例,並在執行 step() 時相應地更新參數。當您具有分散式正向和反向傳遞時,參數和梯度將分散在多個工作節點上,因此每個參與的工作節點上都需要一個最佳化器。分散式最佳化器將所有這些本地最佳化器包裝成一個,並提供簡潔的建構函式和 step() API。

RPC

在使用 RPC 和分散式 Autograd 原語之前,必須先進行初始化。要初始化 RPC 架構,我們需要使用 init_rpc(),它將初始化 RPC 架構、RRef 架構和分散式 Autograd。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[原始碼]

初始化 RPC 原語,例如本地 RPC 代理和分散式 Autograd,這將使當前進程立即準備好發送和接收 RPC。

參數
  • name (str) – 此節點的全局唯一名稱。(例如,Trainer3ParameterServer2MasterWorker1)名稱只能包含數字、字母、底線、冒號和/或破折號,並且長度必須小於 128 個字元。

  • backend (BackendType, 可選) – RPC 後端實現的類型。支援的值為 BackendType.TENSORPIPE(預設值)。有關更多資訊,請參閱後端

  • rank (int) – 此節點的全局唯一 ID/排名。

  • world_size (int) – 群組中的工作節點數。

  • rpc_backend_options (RpcBackendOptions, 可選) – 傳遞給 RpcAgent 建構函式的選項。它必須是 RpcBackendOptions 的代理特定子類,並且包含代理特定的初始化配置。默認情況下,對於所有代理,它會將默認逾時設置為 60 秒,並使用 init_method = "env://" 初始化的底層進程組執行會合,這意味著需要正確設置環境變數 MASTER_ADDRMASTER_PORT。有關更多資訊以及可用的選項,請參閱後端

以下 API 允許用戶遠端執行函數,以及建立對遠端資料物件的引用(RRef)。在這些 API 中,當傳遞 Tensor 作為參數或回傳值時,目標工作節點將嘗試建立具有相同中繼資料(即形狀、步幅等)的 Tensor。我們有意禁止傳輸 CUDA 張量,因為如果來源和目標工作節點上的設備列表不匹配,則可能會崩潰。在這種情況下,應用程式始終可以在呼叫端明確地將輸入張量移至 CPU,並在需要時將其移至被呼叫端上的所需設備。

警告

RPC 中的 TorchScript 支援是一項原型功能,可能會有所變更。自 v1.5.0 起,torch.distributed.rpc 支援呼叫 TorchScript 函數作為 RPC 目標函數,這將有助於提高被呼叫端上的平行性,因為執行 TorchScript 函數不需要 GIL。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]

對工作節點 to 進行阻塞 RPC 呼叫,以執行函數 func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。

參數
  • to (strWorkerInfoint) – 目標工作節點的名稱/排名/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args (tuple) – func 呼叫的參數元組。

  • kwargs (dict) – func 呼叫的關鍵字參數字典。

  • timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用 _set_rpc_timeout 設置的默認值。

回傳值

回傳使用 argskwargs 執行 func 的結果。

範例:

確保在兩個工作節點上都正確設置了 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中運行以下代碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 執行 TorchScript 函數的範例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[原始碼]

對工作節點 to 進行非阻塞 RPC 呼叫,以執行函數 func。RPC 訊息的發送和接收與 Python 代碼的執行平行。此方法是線程安全的。此方法將立即返回一個可以等待的 Future

參數
  • to (strWorkerInfoint) – 目標工作節點的名稱/排名/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args (tuple) – func 呼叫的參數元組。

  • kwargs (dict) – func 呼叫的關鍵字參數字典。

  • timeout (float, 可選) – 此 RPC 使用的逾時(以秒為單位)。如果 RPC 在此時間內未完成,則會引發指示其已逾時的異常。值 0 表示無限逾時,即永遠不會引發逾時錯誤。如果未提供,則使用在初始化期間或使用 _set_rpc_timeout 設置的默認值。

回傳值

返回一個可以等待的 Future 物件。完成後,可以從 Future 物件中檢索 funcargskwargs 的回傳值。

警告

不支援使用 GPU 張量作為 func 的參數或回傳值,因為我們不支援通過網路發送 GPU 張量。在將 GPU 張量用作 func 的參數或回傳值之前,您需要明確地將其複製到 CPU。

警告

rpc_async API 在通過網路發送參數張量之前不會複製其儲存,這可以由不同的線程完成,具體取決於 RPC 後端類型。呼叫端應確保在返回的 Future 完成之前,這些張量的內容保持完整。

範例:

確保在兩個工作節點上都正確設置了 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中運行以下代碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 執行 TorchScript 函數的範例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]

發出遠端呼叫,在工作節點 to 上執行 func,並立即傳回結果值的 RRef。工作節點 to 將會是傳回的 RRef 的擁有者,而呼叫 remote 的工作節點則是使用者。擁有者會管理其 RRef 的全域參考計數,並且只有在全域沒有任何對其的有效參考時,才會銷毀擁有者的 RRef

參數
  • to (strWorkerInfoint) – 目標工作節點的名稱/排名/WorkerInfo

  • func (Callable) – 可呼叫的函數,例如 Python 可呼叫物件、內建運算符(例如 add())和帶註釋的 TorchScript 函數。

  • args (tuple) – func 呼叫的參數元組。

  • kwargs (dict) – func 呼叫的關鍵字參數字典。

  • timeout (float, 選用) – 此遠端呼叫的逾時時間(秒)。如果在這個逾時時間內,沒有成功在工作節點 to 上處理完這個 RRef 的建立,那麼下次嘗試使用 RRef 時(例如 to_here()),就會引發一個逾時錯誤,表示建立失敗。值 0 表示無限逾時,也就是說,永遠不會引發逾時錯誤。如果沒有提供,則使用初始化期間或使用 _set_rpc_timeout 設定的預設值。

回傳值

結果值的使用者 RRef 實例。使用阻塞 API torch.distributed.rpc.RRef.to_here() 在本地擷取結果值。

警告

remote API 不會在傳送參數張量的儲存體之前複製它們,這可能會由不同的執行緒完成,具體取決於 RPC 後端的類型。呼叫者應確保這些張量的內容保持完整,直到擁有者確認傳回的 RRef 為止,這可以使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 進行檢查。

警告

會盡力處理 remote API 的錯誤,例如逾時。這表示當 remote 發起的遠端呼叫失敗時,例如出現逾時錯誤,我們會盡力處理錯誤。這表示錯誤會以非同步方式在產生的 RRef 上進行處理和設定。如果在處理之前應用程式尚未使用 RRef(例如 to_here 或 fork 呼叫),則未來使用 RRef 時將會適當地引發錯誤。但是,使用者應用程式有可能在錯誤被處理之前就使用了 RRef。在這種情況下,錯誤可能不會被引發,因為它們尚未被處理。

範例

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[source]

取得給定工作節點名稱的 WorkerInfo。使用這個 WorkerInfo 可以避免在每次呼叫時都傳遞一個昂貴的字串。

參數

worker_name (str) – 工作節點的字串名稱。如果是 None,則傳回目前工作節點的 ID。(預設為 None

回傳值

給定 worker_nameWorkerInfo 實例,如果 worker_nameNone,則為目前工作節點的 WorkerInfo

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]

關閉 RPC 代理程式,然後銷毀它。這會停止本地代理程式接受未完成的請求,並透過終止所有 RPC 執行緒來關閉 RPC 架構。如果 graceful=True,這個方法會阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。否則,如果 graceful=False,則這是一個本地關閉,它不會等待其他 RPC 程序到達這個方法。

警告

對於由 rpc_async() 傳回的 Future 物件,不應在 shutdown() 之後呼叫 future.wait()

參數

graceful (bool) – 是否要進行正常關閉。如果是 True,則會 1) 等待 UserRRefs 沒有待處理的系統訊息,並將其刪除;2) 阻塞,直到所有本地和遠端 RPC 程序都到達這個方法,並等待所有未完成的工作完成。

範例:

確保在兩個工作節點上都正確設置了 MASTER_ADDRMASTER_PORT。有關更多詳細資訊,請參閱 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然後在兩個不同的進程中運行以下代碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo

一種封裝系統中工作節點資訊的結構。包含工作節點的名稱和 ID。這個類別不打算直接建構,而是可以透過 get_worker_info() 擷取實例,並將結果傳遞給 rpc_sync()rpc_async()remote() 等函式,以避免在每次呼叫時都複製一個字串。

property id

用於識別工作節點的全域唯一 ID。

property name

工作節點的名稱。

RPC 套件還提供了裝飾器,允許應用程式指定如何在被呼叫方處理給定的函式。

torch.distributed.rpc.functions.async_execution(fn)[source]

一個用於函式的裝飾器,表示該函式的傳回值保證是一個 Future 物件,並且該函式可以在 RPC 被呼叫方非同步執行。更具體地說,被呼叫方會提取由包裝函式傳回的 Future,並將後續處理步驟安裝為該 Future 的回調。安裝的回調將在完成時從 Future 中讀取值,並將該值作為 RPC 回應傳回。這也意味著傳回的 Future 只存在於被呼叫方,並且永遠不會透過 RPC 傳送。當包裝函式 (fn) 的執行需要暫停和恢復時,例如,包含 rpc_async() 或等待其他信號時,這個裝飾器非常有用。

注意

為了啟用非同步執行,應用程式必須將此裝飾器返回的函數物件傳遞給 RPC API。如果 RPC 偵測到此裝飾器安裝的屬性,它就知道此函數會返回一個 Future 物件,並會相應地處理它。然而,這並不意味著這個裝飾器在定義函數時必須是最外層的。例如,當與 @staticmethod@classmethod 結合使用時,@rpc.functions.async_execution 必須是內部裝飾器,才能讓目標函數被識別為靜態或類別函數。此目標函數仍然可以非同步執行,因為在存取時,靜態或類別方法會保留由 @rpc.functions.async_execution 安裝的屬性。

範例:

返回的 Future 物件可以來自 rpc_async()then()Future 建構函數。以下範例顯示直接使用 then() 返回的 Future

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

當與 TorchScript 裝飾器結合使用時,此裝飾器必須是最外層的。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

當與靜態或類別方法結合使用時,此裝飾器必須是內部的。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

此裝飾器也適用於 RRef 輔助函數,即 . torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

後端

RPC 模組可以利用不同的後端在節點之間進行通訊。要使用的後端可以在 init_rpc() 函數中指定,方法是傳遞 BackendType 列舉的特定值。無論使用哪種後端,其餘的 RPC API 都不會改變。每個後端也定義了自己的 RpcBackendOptions 類別子類別,其實例也可以傳遞給 init_rpc() 以設定後端的行為。

class torch.distributed.rpc.BackendType(value)

可用後端的列舉類別。

PyTorch 搭載了一個內建的 BackendType.TENSORPIPE 後端。可以使用 register_backend() 函數註冊其他後端。

class torch.distributed.rpc.RpcBackendOptions

一個抽象結構,封裝傳遞給 RPC 後端的選項。這個類別的實例可以傳遞給 init_rpc(),以便使用特定設定初始化 RPC,例如要使用的 RPC 超時和 init_method

property init_method

指定如何初始化處理程序群組的 URL。預設值為 env://

property rpc_timeout

一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。

TensorPipe 後端

TensorPipe 代理程式(預設)利用 TensorPipe 函式庫,該函式庫提供專為機器學習設計的原生點對點通訊原語,從根本上解決了 Gloo 的一些限制。與 Gloo 相比,它的優點是非同步,這允許多個傳輸同時進行,每個傳輸都有自己的速度,而不會互相阻塞。它只會在需要時才在節點對之間開啟管道,並且當一個節點發生故障時,只有與其相關的管道會被關閉,而所有其他管道將繼續正常工作。此外,它能夠支援多種不同的傳輸方式(當然包括 TCP,但也包括共享記憶體、NVLink、InfiniBand……),並且可以自動偵測其可用性,並協商每個管道要使用的最佳傳輸方式。

TensorPipe 後端已在 PyTorch v1.6 中引入,並且正在積極開發中。目前,它只支援 CPU 張量,GPU 支援即將推出。它附帶了一個基於 TCP 的傳輸方式,就像 Gloo 一樣。它還可以自動將大型張量分塊並多工處理到多個通訊端和執行緒上,以實現非常高的頻寬。代理程式將能夠自行選擇最佳傳輸方式,無需任何干預。

範例

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]

TensorPipeAgent 的後端選項,衍生自 RpcBackendOptions

參數
  • num_worker_threads (int, optional) – TensorPipeAgent 用於執行請求的執行緒池中的執行緒數量(預設值:16)。

  • rpc_timeout (float, optional) – RPC 請求的預設超時時間(以秒為單位)(預設值:60 秒)。如果 RPC 未在此時間範圍內完成,則會引發指示已超時的例外狀況。如果需要,呼叫者可以在 rpc_sync()rpc_async() 中覆蓋個別 RPC 的超時時間。

  • init_method (str, optional) – 用於初始化用於會合的分散式存放區的 URL。它接受 init_process_group() 的相同參數的任何值(預設值:env://)。

  • device_maps (Dict[str, Dict], optional) – 從此工作節點到被呼叫節點的裝置放置映射。鍵是被呼叫工作節點名稱,值是字典(Dict of intstrtorch.device),將此工作節點的裝置映射到被呼叫工作節點的裝置。(預設值:None

  • devices (List[int, str, or torch.device], optional) – RPC 代理程式使用的所有本地 CUDA 裝置。預設情況下,它將從其自身的 device_maps 和來自其對等節點的 device_maps 的相應裝置初始化為所有本地裝置。在處理 CUDA RPC 請求時,代理程式將正確同步此 List 中所有裝置的 CUDA 流。

property device_maps

裝置映射位置。

property devices

本地代理程式使用的所有裝置。

property init_method

指定如何初始化處理程序群組的 URL。預設值為 env://

property num_worker_threads

TensorPipeAgent 用於執行請求的執行緒池中的執行緒數量。

property rpc_timeout

一個浮點數,表示所有 RPC 要使用的超時時間。如果 RPC 未在此時間範圍內完成,它將以指示已超時的例外狀況完成。

set_device_map(to, device_map)[原始碼]

設定每個 RPC 呼叫方和被呼叫方配對之間的裝置映射。這個函數可以被呼叫多次,以遞增的方式添加裝置放置配置。

參數
  • to (str) – 被呼叫方名稱。

  • device_map (Dict of int, str, or torch.device) – 從這個工作節點到被呼叫方的裝置放置映射。這個映射必須是可逆的。

範例

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices(devices)[原始碼]

設定 TensorPipe RPC 代理使用的本地裝置。當處理 CUDA RPC 請求時,TensorPipe RPC 代理將會正確地同步這個 List 中所有裝置的 CUDA 串流。

參數

devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地裝置。

注意

RPC 框架不會自動重試任何 rpc_sync()rpc_async()remote() 呼叫。原因是 RPC 框架無法判斷一個操作是否是等冪的,以及是否可以安全地重試。因此,處理失敗和在必要時進行重試是應用程式的責任。RPC 通訊基於 TCP,因此可能會因為網路故障或間歇性網路連線問題而發生故障。在這種情況下,應用程式需要以合理的退避時間適當地進行重試,以確保網路不會因為過於積極的重試而過載。

RRef

警告

當使用 CUDA 張量時,目前不支援 RRef

「RRef」(遠端參考)是指向遠端工作節點上某個類型為 T(例如 Tensor)的值的參考。這個控制代碼會讓被參考的遠端值在擁有者上保持活動狀態,但並不意味著該值將來會被傳送到本地工作節點。RRef 可以用於多機訓練,方法是在其他工作節點上保存對 nn.Modules 的參考,並在訓練期間呼叫適當的函數來檢索或修改它們的參數。有關更多詳細資訊,請參閱 遠端參考協定

class torch.distributed.rpc.PyRRef(RRef)

一個封裝了指向遠端工作節點上某個類型值的參考的類別。這個控制代碼會讓被參考的遠端值在工作節點上保持活動狀態。當 1) 應用程式程式碼和本地 RRef 上下文中都沒有對它的參考,或 2) 應用程式已呼叫正常關機時,UserRRef 將會被刪除。在已刪除的 RRef 上呼叫方法會導致未定義的行為。RRef 實作只提供盡力而為的錯誤偵測,應用程式不應該在 rpc.shutdown() 之後使用 UserRRefs

警告

RRef 只能由 RPC 模組進行序列化和反序列化。在沒有 RPC 的情況下對 RRef 進行序列化和反序列化(例如,Python pickle、torch save() / load()、JIT save() / load() 等)將會導致錯誤。

參數
  • value (object) – 要由這個 RRef 包裝的值。

  • type_hint (Type, optional) – 應該傳遞給 TorchScript 編譯器的 Python 類型,作為 value 的類型提示。

範例:

為了簡單起見,以下範例省略了 RPC 初始化和關機程式碼。有關這些細節,請參閱 RPC 文件。

  1. 使用 rpc.remote 建立 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 從本地物件建立 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 與其他工作節點共用 RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None

使用 RRef 作為反向傳遞的根節點來執行反向傳遞。如果提供了 dist_autograd_ctx_id,我們將使用從 RRef 的擁有者開始提供的 ctx_id 執行分佈式反向傳遞。在這種情況下,應該使用 get_gradients() 來檢索梯度。如果 dist_autograd_ctx_idNone,則假設這是一個本地自動梯度圖,我們只會執行本地反向傳遞。在本地情況下,呼叫此 API 的節點必須是 RRef 的擁有者。RRef 的值預計是一個純量張量。

參數
  • dist_autograd_ctx_id (int, optional) – 我們應該為其檢索梯度的分佈式自動梯度上下文 ID(預設值:-1)。

  • retain_graph (bool, optional) – 如果為 False,則用於計算梯度的圖將會被釋放。請注意,幾乎在所有情況下,都不需要將此選項設定為 True,而且通常可以用更有效的方式解決。通常,您需要將其設定為 True 才能多次執行反向傳遞(預設值:False)。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool

傳回此 RRef 是否已由擁有者確認。 OwnerRRef 一律傳回 true,而 UserRRef 僅在擁有者知道此 UserRRef 時才傳回 true。

is_owner(self: torch._C._distributed_rpc.PyRRef) bool

傳回目前節點是否是此 RRef 的擁有者。

local_value(self: torch._C._distributed_rpc.PyRRef) object

如果當前節點是所有者,則返回對本地值的參考。否則,拋出異常。

owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo

返回擁有此 RRef 的節點的工作者資訊。

owner_name(self: torch._C._distributed_rpc.PyRRef) str

返回擁有此 RRef 的節點的工作者名稱。

remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 remote。更具體地說,rref.remote().func_name(*args, **kwargs) 等同於以下內容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, 選用) – rref.remote() 的逾時。如果此 RRef 的建立未在逾時時間內成功完成,則下次嘗試使用 RRef 時(例如 to_here),將會引發逾時。如果未提供,則將使用預設的 RPC 逾時。有關 RRef 的特定逾時語義,請參閱 rpc.remote()

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 rpc_async。更具體地說,rref.rpc_async().func_name(*args, **kwargs) 等同於以下內容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, 選用) – rref.rpc_async() 的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

建立一個輔助代理,以輕鬆地使用 RRef 的所有者作為目標,在這個 RRef 引用的物件上執行函數,來啟動 rpc_sync。更具體地說,rref.rpc_sync().func_name(*args, **kwargs) 等同於以下內容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
參數

timeout (float, 選用) – rref.rpc_sync() 的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時。

範例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

封鎖呼叫,將 RRef 的值從所有者複製到本地節點並返回它。如果當前節點是所有者,則返回對本地值的參考。

參數

timeout (float, 選用) – to_here 的逾時。如果呼叫未在此時間範圍內完成,則會引發指示逾時的異常。如果未提供此參數,則將使用預設的 RPC 逾時(60 秒)。

RemoteModule

警告

使用 CUDA 張量時,目前不支援 RemoteModule

RemoteModule 是一種在不同行程上遠程建立 nn.Module 的簡便方法。實際的模組位於遠程主機上,但本地主機具有此模組的控制代碼,並且可以像常規 nn.Module 一樣呼叫此模組。但是,呼叫會產生到遠程端的 RPC 呼叫,並且如果需要,可以透過 RemoteModule 支援的其他 API 非同步執行。

class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]

只有在 RPC 初始化後才能建立 RemoteModule 实例。

它在指定的遠程節點上建立用戶端指定的模組。它的行為類似於常規的 nn.Module,除了 forward 方法是在遠程節點上執行的。它會處理自動梯度記錄,以確保反向傳播將梯度傳播回相應的遠程模組。

它根據 module_clsforward 方法的簽章生成兩個方法 forward_asyncforwardforward_async 非同步運行並返回一個 Future。 forward_asyncforward 的參數與 module_cls 返回的模組的 forward 方法相同。

例如,如果 module_cls 返回一個 nn.Linear 的实例,其 forward 方法簽章為: def forward(input: Tensor) -> Tensor:,則生成的 RemoteModule 將具有以下簽章的 2 個方法

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
參數
  • remote_device (str) – 目標工作節點上我們想要放置此模組的裝置。格式應為「<工作節點名稱>/<裝置>」,其中裝置欄位可以解析為 torch.device 類型。例如:「trainer0/cpu」、「trainer0」、「ps0/cuda:0」。此外,裝置欄位可以是選用的,預設值為「cpu」。

  • module_cls (nn.Module) –

    要在遠端建立的模組類別。例如:

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • args (Sequence, 選用) – 要傳遞給 module_cls 的 args。

  • kwargs (Dict, 選用) – 要傳遞給 module_cls 的 kwargs。

回傳值

一個遠端模組實例,它包裝了由使用者提供的 module_cls 建立的 Module,它有一個阻塞的 forward 方法和一個異步的 forward_async 方法,該方法會傳回遠端使用者提供的模組上 forward 呼叫的 future。

範例:

在兩個不同的程序中執行以下程式碼

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,您可以在此 教學 中找到與 DistributedDataParallel (DDP) 結合使用的更實際範例。

get_module_rref()

傳回指向遠端模組的 RRef (RRef[nn.Module])。

傳回類型

RRef[Module]

remote_parameters(recurse=True)

傳回指向遠端模組參數的 RRef 清單。

這通常可以與 DistributedOptimizer 結合使用。

參數

recurse (bool) – 如果為 True,則傳回遠端模組和遠端模組所有子模組的參數。否則,只傳回遠端模組的直接成員參數。

回傳值

RRef (List[RRef[nn.Parameter]]) 到遠端模組參數的清單。

傳回類型

List[RRef[Parameter]]

分散式 Autograd 架構

警告

使用 CUDA 張量時,目前不支援分散式 autograd

此模組提供基於 RPC 的分散式 autograd 架構,可用於模型平行訓練等應用程式。簡而言之,應用程式可能會透過 RPC 傳送和接收梯度記錄張量。在正向傳遞中,我們會記錄何時透過 RPC 傳送梯度記錄張量,並在反向傳遞期間使用此資訊透過 RPC 執行分散式反向傳遞。如需更多詳細資訊,請參閱 分散式 Autograd 設計

torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None

使用提供的根啟動分散式反向傳遞。這目前實作了 FAST 模式演算法,該演算法假設在反向傳遞期間,在工作節點之間於相同的分散式 autograd 上下文中傳送的所有 RPC 訊息都將成為 autograd 圖的一部分。

我們使用提供的根來探索 autograd 圖並計算適當的依賴關係。此方法會阻塞,直到完成整個 autograd 計算。

我們在每個節點上的適當 torch.distributed.autograd.context 中累積梯度。在呼叫 torch.distributed.autograd.backward() 時,會根據傳入的 context_id 查詢要使用的 autograd 上下文。如果沒有與給定 ID 相符的有效 autograd 上下文,我們會擲回錯誤。您可以使用 get_gradients() API 擷取累積的梯度。

參數
  • context_id (int) – 我們應該為其擷取梯度的 autograd 上下文 ID。

  • roots (list) – 代表 autograd 計算根的張量。所有張量都應該是純量。

  • retain_graph (bool, 選用) – 如果為 False,則會釋放用於計算梯度的圖。請注意,幾乎在所有情況下,都不需要將此選項設定為 True,而且通常可以用更有效率的方式解決。通常,您需要將其設定為 True 才能多次執行反向傳遞。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
class torch.distributed.autograd.context[原始碼]

上下文物件,用於在使用分散式 autograd 時包裝正向和反向傳遞。在 with 陳述式中產生的 context_id 需要用於在所有工作節點上唯一識別分散式反向傳遞。每個工作節點都會儲存與此 context_id 相關聯的元數據,這是正確執行分散式 autograd 傳遞所必需的。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]

擷取從張量到該張量的適當梯度的映射,該梯度是在作為分散式 autograd 反向傳遞的一部分提供的上下文中累積的,與給定的 context_id 相對應。

參數

context_id (int) – 我們應該為其擷取梯度的 autograd 上下文 ID。

回傳值

一個映射,其中鍵是張量,值是與該張量相關聯的梯度。

範例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分散式優化器

請參閱 torch.distributed.optim 頁面,以取得有關分散式優化器的文件。

設計注意事項

分散式 autograd 設計注意事項涵蓋了基於 RPC 的分散式 autograd 架構的設計,該架構對於模型平行訓練等應用程式非常有用。

RRef 設計注意事項涵蓋了由架構用於參考遠端工作節點上的值的 RRef(遠端參考)協定的設計。

教學

RPC 教學向使用者介紹 RPC 架構,提供幾個使用 torch.distributed.rpc API 的範例應用程式,並示範如何使用 分析器 來分析基於 RPC 的工作負載。

文件

取得 PyTorch 的完整開發人員文件

查看文件

教學課程

取得適用於初學者和進階開發人員的深入教學課程

查看教學課程

資源

尋找開發資源並取得問題解答

查看資源