torch.futures¶
此套件提供了一個 Future 類型,用於封裝非同步執行和一組工具函數,以簡化對 Future 物件的操作。目前,Future 類型主要由 分散式 RPC 框架 使用。
- class torch.futures.Future(*, devices=None)¶
- 包裝在 - torch._C.Future周圍,用於封裝可呼叫物件的非同步執行,例如- rpc_async()。它還公開了一組 API,用於新增回呼函數和設定結果。- 警告 - GPU 支援是一項測試功能,可能會有所變更。 - add_done_callback(callback)[原始碼]¶
- 將指定的回呼函數附加到此 - Future,該函數將在此- Future完成時執行。可以將多個回呼函數新增到同一個- Future,但無法保證它們的執行順序。回呼函數必須採用一個參數,即對此- Future的參照。回呼函數可以使用- value()方法來取得值。請注意,如果此- Future已完成,則指定的回呼函數將會內嵌執行。- 建議您使用 - then()方法,因為它提供了一種在回呼函數完成後進行同步的方法。- add_done_callback如果您的回呼函數沒有傳回任何值,則可能會更有效率。但是,- then()和- add_done_callback都在幕後使用相同的回呼函數註冊 API。- 對於 GPU 張量,此方法的行為方式與 - then()相同。- 參數
- callback ( - Future) – 一個- Callable,它採用一個參數,即對此- Future的參照。
 - 備註 - 請注意,如果回呼函數擲出錯誤,無論是透過原始 future 以異常完成並呼叫 - fut.wait(),還是透過回呼函數中的其他程式碼,都必須仔細處理錯誤。例如,如果此回呼函數稍後完成了其他 future,則這些 future 不會被標記為以錯誤完成,並且使用者有責任獨立處理這些 future 的完成/等待。- 範例:
- >>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5 
 
 - done()[原始碼]¶
- 如果此 - Future已完成,則傳回- True。如果- Future有結果或異常,則表示已完成。- 如果值包含駐留在 GPU 上的張量,即使填充這些張量的異步內核尚未在裝置上完成執行, - Future.done()也會傳回- True,因為在這個階段結果已經可以使用,前提是執行了適當的同步(請參閱- wait())。- 回傳型別
 
 - set_exception(result)[原始碼]¶
- 為此 - Future設定例外狀況,這會將此- Future標記為因錯誤而完成,並觸發所有附加的回調。請注意,在此- Future上呼叫 wait()/value() 時,將會在行內引發此處設定的例外狀況。- 參數
- result (BaseException) – 此 - Future的例外狀況。
 - 範例:
- >>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo 
 
 - set_result(result)[原始碼]¶
- 設定此 - Future的結果,這會將此- Future標記為已完成,並觸發所有附加的回調。請注意,- Future不能標記為完成兩次。- 如果結果包含駐留在 GPU 上的張量,則即使填充這些張量的異步內核尚未在裝置上完成執行,也可以呼叫此方法,前提是在呼叫此方法時,將這些內核排入佇列的串流設定為目前的串流。簡而言之,只要不在中間切換串流,就可以在啟動這些內核後立即安全地呼叫此方法,而無需任何額外的同步。此方法將記錄所有相關當前串流上的事件,並使用它們來確保此 - Future的所有消費者都能正確排程。- 參數
- result (object) – 此 - Future的結果物件。
 - 範例:
- >>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join() 
 
 - then(callback)[原始碼]¶
- 將給定的回調函數附加到此 - Future,它將在- Future完成時執行。可以將多個回調添加到同一個- Future,但不能保證它們執行的順序(若要強制執行特定順序,請考慮鏈接:- fut.then(cb1).then(cb2))。回調必須採用一個參數,即對此- Future的引用。回調函數可以使用- value()方法來獲取值。請注意,如果此- Future已經完成,則給定的回調將立即在行內執行。- 如果 - Future的值包含駐留在 GPU 上的張量,則可能會在填充這些張量的異步內核尚未完成在裝置上執行時呼叫回調。但是,將使用一些專用串流(從全局池中獲取)作為當前串流來呼叫回調,這些串流將與這些內核同步。因此,回調對這些張量執行的任何操作都將在內核完成後排程在裝置上。換句話說,只要回調不切換串流,它就可以安全地操作結果,而無需任何額外的同步。這類似於- wait()的非阻塞行為。- 同樣,如果回調傳回的值包含駐留在 GPU 上的張量,則即使產生這些張量的內核仍在裝置上執行,它也可以這樣做,只要回調在其執行過程中沒有更改串流即可。如果要更改串流,則必須小心地將它們與原始串流重新同步,也就是在呼叫回調時當前的那些串流。 - 參數
- callback ( - Callable) – 一個將此- Future作為唯一參數的- Callable。
- 回傳
- 一個新的 - Future物件,其中包含- callback的傳回值,並將在給定的- callback完成時標記為已完成。
- 回傳型別
- Future[S] 
 - 備註 - 請注意,如果回調函數拋出異常,無論是通過原始 future 因異常而完成並呼叫 - fut.wait(),還是通過回調中的其他代碼,- then傳回的 future 都將使用遇到的錯誤進行適當標記。但是,如果此回調稍後完成了其他 future,則這些 future 不會被標記為因錯誤而完成,並且用戶有責任獨立處理這些 future 的完成/等待。- 範例:
- >>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None 
 
 
- torch.futures.collect_all(futures)[原始碼]¶
- 將提供的 - Future物件收集到一個單一的組合- Future中,當所有子 future 都完成時,該 future 即完成。- 範例:
- >>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1