torch.futures¶
此包提供了一個 Future 型別,用於封裝非同步執行以及一組用於簡化 Future 物件操作的工具函式。目前,Future 型別主要由分散式 RPC 框架使用。
- class torch.futures.Future(*, devices=None)¶
它是
torch._C.Future的封裝,用於封裝可呼叫物件(例如rpc_async())的非同步執行。它還提供了一組 API 來添加回調函式和設定結果。警告
GPU 支援是測試版功能,可能會有所更改。
- add_done_callback(callback)[source][source]¶
將給定的回撥函式附加到此
Future,該函式將在Future完成時執行。可以將多個回撥新增到同一個Future,但無法保證它們的執行順序。回撥必須接受一個引數,即此Future的引用。回撥函式可以使用value()方法獲取值。請注意,如果此Future已經完成,則給定回撥將立即內聯執行。我們建議您使用
then()方法,因為它提供了一種在回撥完成後同步的方式。如果您的回撥不返回任何內容,add_done_callback可能成本更低。但這兩種方法 (then()和add_done_callback) 底層都使用相同的回撥註冊 API。對於 GPU 張量,此方法的行為方式與
then()相同。- 引數
callback (
Future) – 一個Callable,接受一個引數,該引數是此Future的引用。
注意
請注意,如果回撥函式丟擲異常,無論是由於原始 Future 以異常完成並呼叫
fut.wait(),還是由於回撥中的其他程式碼,都必須仔細處理錯誤。例如,如果此回撥稍後完成額外的 futures,這些 futures 不會被標記為以錯誤完成,使用者需要獨立負責處理這些 futures 的完成/等待。- 示例:
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[source][source]¶
如果此
Future已完成,則返回True。一個Future完成表示它已有結果或異常。如果該值包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,
Future.done()也會返回True,因為在此階段結果已經可用,前提是執行了適當的同步(參見wait())。- 返回型別
- set_exception(result)[source][source]¶
為此
Future設定一個異常,這將把此Future標記為因錯誤完成,並觸發所有附加的回撥。請注意,在此Future上呼叫wait()/value()時,此處設定的異常將內聯丟擲。- 引數
result (BaseException) – 此
Future的異常。
- 示例:
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source][source]¶
為此
Future設定結果,這將把此Future標記為已完成,並觸發所有附加的回撥。請注意,一個Future不能被標記完成兩次。如果結果包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,只要呼叫此方法時,enqueue 這些核心的 stream 被設定為當前 stream,也可以呼叫此方法。簡單來說,只要中間不改變 stream,就可以在啟動這些核心後立即安全地呼叫此方法,無需額外同步。此方法將在所有相關的當前 stream 上記錄事件,並使用它們確保此
Future的所有 consumer 都能正確排程。- 引數
result (object) – 此
Future的結果物件。
- 示例:
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source][source]¶
將給定的回撥函式附加到此
Future,該函式將在Future完成時執行。可以將多個回撥新增到同一個Future,但無法保證它們的執行順序(若要強制特定順序,請考慮鏈式呼叫:fut.then(cb1).then(cb2))。回撥必須接受一個引數,即此Future的引用。回撥函式可以使用value()方法獲取值。請注意,如果此Future已經完成,則給定回撥將立即內聯執行。如果
Future的值包含駐留在 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,也可能會呼叫回撥。但是,呼叫回撥時會設定一些專用的 stream 作為當前 stream(從全域性池中獲取),這些 stream 將與這些核心同步。因此,回撥在此張量上執行的任何操作都將在核心完成後在裝置上排程。換句話說,只要回撥不切換 stream,就可以安全地操作結果而無需額外的同步。這類似於wait()的非阻塞行為。同樣,如果回撥返回的值包含駐留在 GPU 上的張量,即使產生這些張量的核心仍在裝置上執行,只要回撥在執行期間未更改 stream,也可以這樣做。如果需要更改 stream,則必須注意將其與原始 stream(即呼叫回撥時當前的那些 stream)重新同步。
- 引數
callback (
Callable) – 一個Callable,接受此Future作為唯一引數。- 返回
一個新的
Future物件,它持有回撥的返回值,並在給定回撥完成時被標記為已完成。- 返回型別
Future[S]
注意
請注意,如果回撥函式丟擲異常,無論是由於原始 Future 以異常完成並呼叫
fut.wait(),還是由於回撥中的其他程式碼,由then返回的 Future 都將 appropriately 標記遇到的錯誤。但是,如果此回撥稍後完成額外的 futures,這些 futures 不會被標記為以錯誤完成,使用者需要獨立負責處理這些 futures 的完成/等待。- 示例:
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- value()[source][source]¶
獲取已完成 Future 的值。
此方法只能在呼叫
wait()完成後或在傳遞給then()的回撥函式內部呼叫。在其他情況下,此Future可能尚未持有值,呼叫value()可能會失敗。如果該值包含駐留在 GPU 上的張量,則此方法不會執行任何額外同步。這應事先透過呼叫
wait()單獨完成(回撥內部除外,then()已處理)。- 返回
此
Future持有的值。如果建立值(回撥或 RPC)的函式丟擲錯誤,此value()方法也會丟擲錯誤。- 返回型別
T
- wait()[source][source]¶
阻塞直到此
Future的值準備就緒。如果該值包含駐留在 GPU 上的張量,則會與可能非同步填充這些張量的核心(在裝置上執行)執行額外同步。這種同步是非阻塞的,這意味著
wait()將在當前 stream 中插入必要的指令,以確保 enqueue 到這些 stream 的後續操作將在非同步核心之後正確排程,但一旦完成,wait()將返回,即使這些核心仍在執行。只要不改變 stream,訪問和使用這些值時就不需要進一步同步。- 返回
此
Future持有的值。如果建立值(回撥或 RPC)的函式丟擲錯誤,此wait方法也會丟擲錯誤。- 返回型別
T
- torch.futures.collect_all(futures)[source][source]¶
將提供的
Future物件收集到一個組合的Future中,當所有子 Future 完成時,該組合 Future 完成。- 引數
- 返回
返回一個指向傳入的 Futures 列表的
Future物件。- 返回型別
- 示例:
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1