快捷方式

遠端引用協議

本文介紹了遠端引用協議(Remote Reference protocol)的設計細節,並詳細講解了不同場景下的訊息流。在繼續之前,請確保您熟悉分散式 RPC 框架

背景

RRef 是 Remote REFerence 的縮寫。它是位於本地或遠端工作器上的物件的引用,並在底層透明地處理引用計數。概念上,它可以被視為一個分散式共享指標。應用程式可以透過呼叫 remote() 建立 RRef。每個 RRef 都由 remote() 呼叫的被呼叫方工作器(即所有者,owner)擁有,並可供多個使用者使用。所有者儲存實際資料並跟蹤全域性引用計數。每個 RRef 都可以透過一個全域性唯一的 RRefId 來唯一標識,該 ID 在 remote() 呼叫的呼叫方建立時分配。

在所有者工作器上,只有一個 OwnerRRef 例項,其中包含實際資料,而在使用者工作器上,可以根據需要有任意數量的 UserRRef 例項,且 UserRRef 不持有資料。所有在所有者上的使用都將透過全域性唯一的 RRefId 檢索唯一的 OwnerRRef 例項。當 UserRRefrpc_sync()rpc_async()remote() 呼叫中用作引數或返回值時,就會建立一個 UserRRef,並且所有者會收到通知以更新引用計數。當全域性沒有 UserRRef 例項且所有者上也沒有對 OwnerRRef 的引用時,OwnerRRef 及其資料將被刪除。

假設

RRef 協議基於以下假設設計。

  • 瞬態網路故障:RRef 設計透過重試訊息來處理瞬態網路故障。它無法處理節點崩潰或永久性網路分割槽。當這些事件發生時,應用程式應關閉所有工作器,回滾到先前的檢查點,然後恢復訓練。

  • 非冪等使用者自定義函式 (UDF):我們假設提供給 rpc_sync()rpc_async()remote() 的使用者自定義函式 (UDF) 是非冪等的,因此不能重試。然而,內部 RRef 控制訊息是冪等的,並在訊息失敗時重試。

  • 訊息亂序交付:我們不假定任意一對節點之間的訊息交付順序,因為傳送方和接收方都使用多執行緒。無法保證哪條訊息將首先被處理。

RRef 生命週期

該協議的目標是在適當的時間刪除 OwnerRRef。刪除 OwnerRRef 的正確時機是當沒有存活的 UserRRef 例項,並且使用者程式碼也沒有持有對 OwnerRRef 的引用時。棘手之處在於確定是否存在任何存活的 UserRRef 例項。

設計原理

使用者可以透過三種情況獲得 UserRRef

  1. 從所有者那裡接收 UserRRef

  2. 從另一個使用者那裡接收 UserRRef

  3. 建立一個由另一個工作器擁有的新的 UserRRef

情況 1 最簡單,所有者將其 RRef 傳遞給使用者,所有者呼叫 rpc_sync()rpc_async()remote() 並將其 RRef 用作引數。在這種情況下,將在使用者工作器上建立一個新的 UserRRef。由於所有者是呼叫方,它可以輕鬆更新其在 OwnerRRef 上的本地引用計數。

唯一的要求是任何 UserRRef 在銷燬時必須通知所有者。因此,我們需要第一個保證

G1. 當任何 UserRRef 被刪除時,所有者將收到通知。

由於訊息可能延遲或亂序到達,我們需要另一個保證來確保刪除訊息不會處理得太早。如果 A 傳送一條涉及 RRef 的訊息給 B,我們將 A 上的 RRef 稱為父 RRef,將 B 上的 RRef 稱為子 RRef。

G2. 父 RRef 在子 RRef 被所有者確認之前不會被刪除。

在情況 2 和 3 中,所有者可能對 RRef 的分叉圖(fork graph)僅有部分了解或完全不瞭解。例如,可以在使用者工作器上構建一個 RRef,並且在所有者收到任何 RPC 呼叫之前,建立該 RRef 的使用者可能已經將 RRef 分享給其他使用者,而這些使用者可能進一步分享 RRef。一個不變的屬性是,任何 RRef 的分叉圖始終是一棵樹,因為分叉一個 RRef 總是會在被呼叫方上建立一個新的 UserRRef 例項(除非被呼叫方是所有者),因此每個 RRef 只有一個父節點。

所有者對樹中任何 UserRRef 的檢視有三個階段

1) unknown -> 2) known -> 3) deleted.

所有者對整個樹的檢視不斷變化。當所有者認為沒有存活的 UserRRef 例項時,它會刪除其 OwnerRRef 例項,即當 OwnerRRef 被刪除時,所有 UserRRef 例項要麼確實已被刪除,要麼是未知的。危險的情況是當某些分叉是未知而其他分叉已被刪除時。

G2 簡單保證,在所有者知道其所有子 UserRRef 例項之前,任何父 UserRRef 都不會被刪除。然而,子 UserRRef 可能在所有者知道其父 UserRRef 之前被刪除。

考慮以下示例,其中 OwnerRRef 分叉到 A,然後 A 分叉到 Y,Y 分叉到 Z

OwnerRRef -> A -> Y -> Z

如果 Z 的所有訊息(包括刪除訊息)在所有者處理 Y 的訊息之前被所有者處理。所有者會在知道 Y 存在之前得知 Z 已被刪除。然而,這並不會導致任何問題。因為 Y 的至少一個祖先(A)將會存活,並且它會阻止所有者刪除 OwnerRRef。更具體地說,如果所有者不知道 Y,A 由於 **G2** 不會被刪除,而且所有者知道 A,因為 A 是它的父節點。

如果 RRef 是在使用者工作器上建立的,情況會稍微複雜一些

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 對 UserRRef 呼叫 to_here(),那麼在 Z 被刪除時,所有者至少知道 A 的存在,因為否則 to_here() 將不會完成。如果 Z 沒有呼叫 to_here(),則所有者可能在收到 A 和 Y 的任何訊息之前收到 Z 的所有訊息。在這種情況下,由於 OwnerRRef 的實際資料尚未建立,因此也沒有需要刪除的東西。這與 Z 完全不存在的情況相同。因此,仍然可以接受。

實現

G1 透過在 UserRRef 解構函式中傳送刪除訊息來實現。為了提供 **G2**,父 UserRRef 在每次分叉時都被放入一個上下文中,並由新的 ForkId 索引。父 UserRRef 僅在收到來自子節點的確認訊息 (ACK) 時才會從上下文中移除,而子節點僅在獲得所有者確認後才會傳送 ACK。

協議場景

現在我們討論上述設計在四種場景下如何轉化為協議。

使用者將 RRef 作為返回值分享給所有者

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在此場景下,UserRRef 在使用者工作器 A 上建立,然後連同遠端訊息一起傳遞給所有者工作器 B,之後 B 建立 OwnerRRef。方法 remote() 會立即返回,這意味著 UserRRef 可以在所有者知曉之前被分叉/使用。

在所有者工作器上,收到 remote() 呼叫後,會建立 OwnerRRef,並返回一個 ACK 來確認 {100, 1}RRefId, ForkId)。只有收到此 ACK 後,A 才能刪除其 UserRRef。這同時涉及 **G1** 和 **G2**。**G1** 顯而易見。對於 **G2**,OwnerRRefUserRRef 的子節點,而 UserRRef 在收到來自所有者的 ACK 之前不會被刪除。

user_to_owner_ret.png

上圖顯示了訊息流,其中實線箭頭包含使用者函式,虛線箭頭是內建訊息。請注意,從 A 到 B 的前兩條訊息(remote()to_here())可能會以任意順序到達 B,但最終的刪除訊息僅在以下條件滿足時才會傳送:

  • B 確認 UserRRef {100, 1} (G2),並且

  • Python GC (垃圾回收器) 同意刪除本地 UserRRef 例項。這發生在 RRef 不再處於作用域內並符合垃圾回收條件時。

使用者將 RRef 作為引數分享給所有者

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在此場景下,在 A 上建立 UserRRef 後,A 在後續對 B 的 RPC 呼叫中將其用作引數。A 會保持 UserRRef {100, 1} 存活,直到收到 B 的確認 (**G2**,而非 RPC 呼叫的返回值)。這是必要的,因為 A 不應在收到所有先前的訊息之前傳送刪除訊息,否則,由於我們不保證訊息交付順序,OwnerRRef 可能會在使用前被刪除。這是透過建立 RRef 的子 ForkId,並將它們儲存在一個對映中直到收到所有者確認該子 ForkId 來實現的。下圖展示了訊息流。

user_to_owner_arg.png

請注意,UserRRef 可能在 func 函式完成或甚至開始之前在 B 上被刪除。但這沒問題,因為在 B 為子 ForkId 傳送 ACK 時,它已經獲取了 OwnerRRef 例項,這將防止其被過早刪除。

所有者將 RRef 分享給使用者

所有者到使用者是最簡單的情況,所有者可以在本地更新引用計數,並且不需要額外的控制訊息來通知其他方。關於 **G2**,這等同於父節點立即收到了所有者的 ACK,因為父節點就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上圖顯示了訊息流。請注意,當 OwnerRRef 在 rpc_async 呼叫後退出作用域時,它不會被刪除,因為內部有一個對映會在存在任何已知分叉時保持其存活,在這種情況下就是 UserRRef {100, 1}。(**G2**)

使用者將 RRef 分享給使用者

這是最複雜的情況,需要呼叫方使用者(父 UserRRef)、被呼叫方使用者(子 UserRRef)以及所有者都參與進來。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

當 C 從 A 收到子 UserRRef 時,它會向所有者 B 傳送一個分叉請求(fork request)。稍後,當 B 確認 C 上的 UserRRef 後,C 將並行執行兩個操作:1) 向 A 傳送子 ACK,以及 2) 執行使用者提供的函式。在此期間,父節點(A)將保持其 UserRRef {100, 1} 存活以實現 **G2**。

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得解答

檢視資源