快捷方式

torch.futures

此包提供了一個 Future 型別,用於封裝非同步執行以及一組用於簡化 Future 物件操作的工具函式。目前,Future 型別主要由分散式 RPC 框架使用。

class torch.futures.Future(*, devices=None)

它是 torch._C.Future 的封裝,用於封裝可呼叫物件(例如 rpc_async())的非同步執行。它還提供了一組 API 來添加回調函式和設定結果。

警告

GPU 支援是測試版功能,可能會有所更改。

add_done_callback(callback)[source][source]

將給定的回撥函式附加到此 Future,該函式將在 Future 完成時執行。可以將多個回撥新增到同一個 Future,但無法保證它們的執行順序。回撥必須接受一個引數,即此 Future 的引用。回撥函式可以使用 value() 方法獲取值。請注意,如果此 Future 已經完成,則給定回撥將立即內聯執行。

我們建議您使用 then() 方法,因為它提供了一種在回撥完成後同步的方式。如果您的回撥不返回任何內容,add_done_callback 可能成本更低。但這兩種方法 (then()add_done_callback) 底層都使用相同的回撥註冊 API。

對於 GPU 張量,此方法的行為方式與 then() 相同。

引數

callback (Future) – 一個 Callable,接受一個引數,該引數是此 Future 的引用。

注意

請注意,如果回撥函式丟擲異常,無論是由於原始 Future 以異常完成並呼叫 fut.wait(),還是由於回撥中的其他程式碼,都必須仔細處理錯誤。例如,如果此回撥稍後完成額外的 futures,這些 futures 不會被標記為以錯誤完成,使用者需要獨立負責處理這些 futures 的完成/等待。

示例:
>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[source][source]

如果此 Future 已完成,則返回 True。一個 Future 完成表示它已有結果或異常。

如果該值包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,Future.done() 也會返回 True,因為在此階段結果已經可用,前提是執行了適當的同步(參見 wait())。

返回型別

bool

set_exception(result)[source][source]

為此 Future 設定一個異常,這將把此 Future 標記為因錯誤完成,並觸發所有附加的回撥。請注意,在此 Future 上呼叫 wait()/ value() 時,此處設定的異常將內聯丟擲。

引數

result (BaseException) – 此 Future 的異常。

示例:
>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[source][source]

為此 Future 設定結果,這將把此 Future 標記為已完成,並觸發所有附加的回撥。請注意,一個 Future 不能被標記完成兩次。

如果結果包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,只要呼叫此方法時,enqueue 這些核心的 stream 被設定為當前 stream,也可以呼叫此方法。簡單來說,只要中間不改變 stream,就可以在啟動這些核心後立即安全地呼叫此方法,無需額外同步。此方法將在所有相關的當前 stream 上記錄事件,並使用它們確保此 Future 的所有 consumer 都能正確排程。

引數

result (object) – 此 Future 的結果物件。

示例:
>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[source][source]

將給定的回撥函式附加到此 Future,該函式將在 Future 完成時執行。可以將多個回撥新增到同一個 Future,但無法保證它們的執行順序(若要強制特定順序,請考慮鏈式呼叫:fut.then(cb1).then(cb2))。回撥必須接受一個引數,即此 Future 的引用。回撥函式可以使用 value() 方法獲取值。請注意,如果此 Future 已經完成,則給定回撥將立即內聯執行。

如果 Future 的值包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,也可能會呼叫回撥。但是,呼叫回撥時會設定一些專用的 stream 作為當前 stream(從全域性池中獲取),這些 stream 將與這些核心同步。因此,回撥在此張量上執行的任何操作都將在核心完成後在裝置上排程。換句話說,只要回撥不切換 stream,就可以安全地操作結果而無需額外的同步。這類似於 wait() 的非阻塞行為。

同樣,如果回撥返回的值包含駐留在 GPU 上的張量,即使產生這些張量的核心仍在裝置上執行,只要回撥在執行期間未更改 stream,也可以這樣做。如果需要更改 stream,則必須注意將其與原始 stream(即呼叫回撥時當前的那些 stream)重新同步。

引數

callback (Callable) – 一個 Callable,接受此 Future 作為唯一引數。

返回

一個新的 Future 物件,它持有回撥的返回值,並在給定回撥完成時被標記為已完成。

返回型別

Future[S]

注意

請注意,如果回撥函式丟擲異常,無論是由於原始 Future 以異常完成並呼叫 fut.wait(),還是由於回撥中的其他程式碼,由 then 返回的 Future 都將 appropriately 標記遇到的錯誤。但是,如果此回撥稍後完成額外的 futures,這些 futures 不會被標記為以錯誤完成,使用者需要獨立負責處理這些 futures 的完成/等待。

示例:
>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[source][source]

獲取已完成 Future 的值。

此方法只能在呼叫 wait() 完成後或在傳遞給 then() 的回撥函式內部呼叫。在其他情況下,此 Future 可能尚未持有值,呼叫 value() 可能會失敗。

如果該值包含駐留在 GPU 上的張量,則此方法不會執行任何額外同步。這應事先透過呼叫 wait() 單獨完成(回撥內部除外,then() 已處理)。

返回

Future 持有的值。如果建立值(回撥或 RPC)的函式丟擲錯誤,此 value() 方法也會丟擲錯誤。

返回型別

T

wait()[source][source]

阻塞直到此 Future 的值準備就緒。

如果該值包含駐留在 GPU 上的張量,則會與可能非同步填充這些張量的核心(在裝置上執行)執行額外同步。這種同步是非阻塞的,這意味著 wait() 將在當前 stream 中插入必要的指令,以確保 enqueue 到這些 stream 的後續操作將在非同步核心之後正確排程,但一旦完成,wait() 將返回,即使這些核心仍在執行。只要不改變 stream,訪問和使用這些值時就不需要進一步同步。

返回

Future 持有的值。如果建立值(回撥或 RPC)的函式丟擲錯誤,此 wait 方法也會丟擲錯誤。

返回型別

T

torch.futures.collect_all(futures)[source][source]

將提供的 Future 物件收集到一個組合的 Future 中,當所有子 Future 完成時,該組合 Future 完成。

引數

futures (list) – Future 物件的列表。

返回

返回一個指向傳入的 Futures 列表的 Future 物件。

返回型別

Future[list[torch.jit.Future]]

示例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[source][source]

等待所有提供的 Future 完成,並返回已完成值的列表。如果任何 Future 遇到錯誤,該方法將提前退出並報告錯誤,而不等待其他 Future 完成。

引數

futures (list) – Future 物件的列表。

返回

已完成 Future 結果的列表。如果任何 Future 上的 wait 丟擲異常,此方法也會丟擲異常。

返回型別

list

文件

查閱 PyTorch 的完整開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源