到期計時器¶
到期計時器與代理程式在同一個程序上設定,並可在你的指令碼中使用,以處理停滯的 worker。當你進入一個可能停滯的程式碼塊時,可以獲取一個到期計時器,它會指示計時器伺服器在 worker 未能在自行設定的到期截止日期前釋放計時器時終止該程序。
用法
import torchelastic.timer as timer
import torchelastic.agent.server as agent
def main():
start_method = "spawn"
message_queue = mp.get_context(start_method).Queue()
server = timer.LocalTimerServer(message, max_interval=0.01)
server.start() # non-blocking
spec = WorkerSpec(
fn=trainer_func,
args=(message_queue,),
...<OTHER_PARAMS...>)
agent = agent.LocalElasticAgent(spec, start_method)
agent.run()
def trainer_func(message_queue):
timer.configure(timer.LocalTimerClient(message_queue))
with timer.expires(after=60): # 60 second expiry
# do some work
在上面的示例中,如果 trainer_func 執行時間超過 60 秒,則 worker 程序將被終止,並且代理程式會重試 worker 組。
客戶端方法¶
- torch.distributed.elastic.timer.configure(timer_client)[source][source]¶
配置計時器客戶端。必須在使用
expires之前呼叫。
- torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source][source]¶
獲取一個倒計時計時器,它將在距現在
after秒後到期,除非其所包裹的程式碼塊在該時間範圍內完成。當計時器到期時,此 worker 有資格被“收割”(reaped)。“收割”的確切含義取決於客戶端實現。在大多數情況下,“收割”意味著終止 worker 程序。請注意,worker **不**保證在time.now() + after時刻精確地被收割,而是 worker “有資格”被收割,並且客戶端與之通訊的TimerServer最終將決定何時以及如何收割具有到期計時器的 worker。用法
torch.distributed.elastic.timer.configure(LocalTimerClient()) with expires(after=10): torch.distributed.all_reduce(...)
伺服器/客戶端實現¶
以下是 torchelastic 提供的計時器伺服器和客戶端對。
注意
計時器伺服器和客戶端必須始終成對實現和使用,因為伺服器和客戶端之間存在訊息協議。
以下是基於 multiprocess.Queue 實現的一對計時器伺服器和客戶端。
- class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source][source]¶
與
LocalTimerClient配合使用的伺服器。客戶端預計是執行此伺服器的父程序的子程序。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地 worker 的計時器(在同一主機上的程序中執行)。
- class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source][source]¶
`LocalTimerServer` 的客戶端。此客戶端旨在用於執行
LocalTimerServer的同一主機上,並使用 pid 唯一標識 worker。這在具有多個 GPU 裝置的主機上為每個 GPU 啟動一個子程序(訓練器)的情況下特別有用。
以下是基於命名管道實現的另一對計時器伺服器和客戶端。
- class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source][source]¶
與
FileTimerClient配合使用的伺服器。客戶端預計與執行此伺服器的程序在同一主機上執行。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地 worker 的計時器(在同一主機上的程序中執行)。
- class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]¶
`FileTimerServer` 的客戶端。此客戶端旨在用於執行
FileTimerServer的同一主機上,並使用 pid 唯一標識 worker。此客戶端使用命名管道向FileTimerServer傳送計時器請求。此客戶端是生產者,而FileTimerServer是消費者。多個客戶端可以與同一個FileTimerServer配合使用。- 引數
file_path (str) – str,FIFO 特殊檔案的路徑。
FileTimerServer必須透過呼叫 os.mkfifo() 建立它。signal – signal,用於終止程序的訊號。使用負數或零訊號不會終止程序。
編寫自定義計時器伺服器/客戶端¶
要編寫自己的計時器伺服器和客戶端,請分別擴充套件伺服器的 torch.distributed.elastic.timer.TimerServer 和客戶端的 torch.distributed.elastic.timer.TimerClient。TimerRequest 物件用於在伺服器和客戶端之間傳遞訊息。
- class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]¶
表示倒計時計時器獲取和釋放的資料物件,用於
TimerClient和TimerServer之間。負數的expiration_time應解釋為“釋放”請求。注意
worker_id的型別是實現特定的。它是TimerServer和TimerClient實現用於唯一標識 worker 的任何內容。
- class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]¶
監視活動計時器並及時使其到期的實體。此伺服器負責收割(reaping)具有到期計時器的 worker。