多程序包 - torch.multiprocessing¶
torch.multiprocessing 是原生 multiprocessing 模組的包裝器。
它註冊了自定義的 reducer,使用共享記憶體來提供不同程序中相同資料的共享檢視。一旦張量/儲存(storage)被移動到共享記憶體(參見 share_memory_()),就可以在不進行任何複製的情況下將其傳送到其他程序。
該 API 與原始模組 100% 相容 - 只需將 import multiprocessing 改為 import torch.multiprocessing,就可以使透過佇列傳送或透過其他機制共享的所有張量移動到共享記憶體。
由於 API 的相似性,我們不對該包的大部分內容進行文件記錄,建議參考原始模組的優秀文件。
警告
如果主程序突然退出(例如由於接收到訊號),Python 的 multiprocessing 有時會無法清理其子程序。這是一個已知的問題,因此如果在中斷直譯器後看到任何資源洩漏,很可能就是這種情況發生了。
策略管理¶
- torch.multiprocessing.set_sharing_strategy(new_strategy)[source][source]¶
設定共享 CPU 張量的策略。
- 引數
new_strategy (str) – 所選策略的名稱。應為
get_all_sharing_strategies()返回的值之一。
共享 CUDA 張量¶
程序間共享 CUDA 張量僅在 Python 3 中受支援,使用 spawn 或 forkserver 啟動方法。
與 CPU 張量不同,傳送程序需要保留原始張量,只要接收程序保留張量的副本。引用計數是在內部實現的,但要求使用者遵循以下最佳實踐。
警告
如果消費者程序因致命訊號異常終止,只要傳送程序仍在執行,共享張量可能會永久保留在記憶體中。
消費者應儘快釋放記憶體。
## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)
2. 使生產者程序執行,直到所有消費者退出。這將防止生產者程序釋放仍在被消費者使用的記憶體的情況。
## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
不要傳遞接收到的張量。
# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()
共享策略¶
本節簡要概述了不同共享策略的工作原理。請注意,它僅適用於 CPU 張量 - CUDA 張量將始終使用 CUDA API 進行共享,因為這是它們可以共享的唯一方式。
檔案描述符 - file_descriptor¶
注意
這是預設策略(macOS 和 OS X 上不支援此策略)。
此策略將使用檔案描述符作為共享記憶體控制代碼。每當儲存(storage)被移動到共享記憶體時,從 shm_open 獲取的檔案描述符會與物件一起快取,當將其傳送到其他程序時,檔案描述符將(例如透過 UNIX 套接字)傳輸給它。接收者也會快取檔案描述符並對其進行 mmap,以獲取儲存資料的共享檢視。
請注意,如果共享大量張量,此策略將大部分時間保持大量檔案描述符處於開啟狀態。如果您的系統對開啟的檔案描述符數量有限制較低且無法提高,應使用 file_system 策略。
檔案系統 - file_system¶
此策略將使用提供給 shm_open 的檔名來標識共享記憶體區域。這樣做的好處是不需要實現快取從 shm_open 獲取的檔案描述符,但同時容易出現共享記憶體洩漏。檔案建立後不能立即刪除,因為其他程序需要訪問它來開啟其檢視。如果程序發生致命崩潰或被殺死,並且沒有呼叫儲存(storage)的解構函式,檔案將保留在系統中。這是非常嚴重的問題,因為它們會持續佔用記憶體,直到系統重啟或手動釋放它們。
為了解決共享記憶體檔案洩漏的問題,torch.multiprocessing 將啟動一個名為 torch_shm_manager 的守護程序,該程序將與當前程序組隔離,並跟蹤所有共享記憶體分配。一旦所有連線到它的程序退出,它將等待片刻以確保沒有新的連線,並遍歷該組分配的所有共享記憶體檔案。如果發現其中任何檔案仍然存在,它們將被解除分配。我們已經測試了這種方法,並證明它能夠應對各種故障。儘管如此,如果您的系統具有足夠高的限制,並且 file_descriptor 是一種受支援的策略,我們不建議切換到此策略。
生成子程序¶
注意
適用於 Python >= 3.4。
這依賴於 Python 的 multiprocessing 包中的 spawn 啟動方法。
透過建立 Process 例項並呼叫 join 等待它們完成,可以生成多個子程序來執行某個函式。這種方法在處理單個子程序時效果很好,但在處理多個程序時可能會出現潛在問題。
具體來說,按順序連線(joining)程序意味著它們將按順序終止。如果不是這樣,並且第一個程序沒有終止,則程序終止將不會被注意到。此外,沒有原生機制用於錯誤傳播。
下面的 spawn 函式解決了這些問題,並負責錯誤傳播、亂序終止,並在檢測到其中一個程序發生錯誤時主動終止其他程序。
- torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[source][source]¶
生成
nprocs個程序,這些程序使用args執行fn。如果其中一個程序以非零退出狀態退出,其餘程序將被殺死,並丟擲異常,指示終止原因。如果在子程序中捕獲到異常,該異常將被轉發,其回溯資訊將包含在父程序中丟擲的異常中。
- 引數
fn (函式) –
此函式作為生成的程序的入口點被呼叫。此函式必須在模組的頂層定義,以便可以被序列化(pickled)和生成(spawned)。這是 multiprocessing 強制的要求。
該函式以
fn(i, *args)的形式呼叫,其中i是程序索引,args是透傳的引數元組。args (元組) – 傳遞給
fn的引數。nprocs (int) – 要生成的程序數。
join (bool) – 對所有程序執行阻塞式連線(join)。
daemon (bool) – 生成程序的守護程序(daemon)標誌。如果設定為 True,將建立守護程序。
start_method (str) – (已棄用) 此方法將始終使用
spawn作為啟動方法。要使用不同的啟動方法,請使用start_processes()。
- 返回
如果
join為True則返回 None,如果join為False則返回ProcessContext
- class torch.multiprocessing.SpawnContext[source][source]¶
當呼叫
spawn()時且join=False時返回。