快捷方式

到期計時器

到期計時器與代理程式在同一個程序上設定,並可在你的指令碼中使用,以處理停滯的 worker。當你進入一個可能停滯的程式碼塊時,可以獲取一個到期計時器,它會指示計時器伺服器在 worker 未能在自行設定的到期截止日期前釋放計時器時終止該程序。

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

在上面的示例中,如果 trainer_func 執行時間超過 60 秒,則 worker 程序將被終止,並且代理程式會重試 worker 組。

客戶端方法

torch.distributed.elastic.timer.configure(timer_client)[source][source]

配置計時器客戶端。必須在使用 expires 之前呼叫。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source][source]

獲取一個倒計時計時器,它將在距現在 after 秒後到期,除非其所包裹的程式碼塊在該時間範圍內完成。當計時器到期時,此 worker 有資格被“收割”(reaped)。“收割”的確切含義取決於客戶端實現。在大多數情況下,“收割”意味著終止 worker 程序。請注意,worker **不**保證在 time.now() + after 時刻精確地被收割,而是 worker “有資格”被收割,並且客戶端與之通訊的 TimerServer 最終將決定何時以及如何收割具有到期計時器的 worker。

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

伺服器/客戶端實現

以下是 torchelastic 提供的計時器伺服器和客戶端對。

注意

計時器伺服器和客戶端必須始終成對實現和使用,因為伺服器和客戶端之間存在訊息協議。

以下是基於 multiprocess.Queue 實現的一對計時器伺服器和客戶端。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source][source]

LocalTimerClient 配合使用的伺服器。客戶端預計是執行此伺服器的父程序的子程序。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地 worker 的計時器(在同一主機上的程序中執行)。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source][source]

`LocalTimerServer` 的客戶端。此客戶端旨在用於執行 LocalTimerServer 的同一主機上,並使用 pid 唯一標識 worker。這在具有多個 GPU 裝置的主機上為每個 GPU 啟動一個子程序(訓練器)的情況下特別有用。

以下是基於命名管道實現的另一對計時器伺服器和客戶端。

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source][source]

FileTimerClient 配合使用的伺服器。客戶端預計與執行此伺服器的程序在同一主機上執行。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地 worker 的計時器(在同一主機上的程序中執行)。

引數
  • file_path (str) – str,將要建立的 FIFO 特殊檔案的路徑。

  • max_interval (float) – float,每個看門狗迴圈的最大間隔時間(秒)。

  • daemon (bool) – bool,是否以守護模式執行看門狗執行緒。守護執行緒不會阻塞程序停止。

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],用於記錄 JSON 格式事件的可選回撥函式。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]

`FileTimerServer` 的客戶端。此客戶端旨在用於執行 FileTimerServer 的同一主機上,並使用 pid 唯一標識 worker。此客戶端使用命名管道向 FileTimerServer 傳送計時器請求。此客戶端是生產者,而 FileTimerServer 是消費者。多個客戶端可以與同一個 FileTimerServer 配合使用。

引數
  • file_path (str) – str,FIFO 特殊檔案的路徑。FileTimerServer 必須透過呼叫 os.mkfifo() 建立它。

  • signal – signal,用於終止程序的訊號。使用負數或零訊號不會終止程序。

編寫自定義計時器伺服器/客戶端

要編寫自己的計時器伺服器和客戶端,請分別擴充套件伺服器的 torch.distributed.elastic.timer.TimerServer 和客戶端的 torch.distributed.elastic.timer.TimerClientTimerRequest 物件用於在伺服器和客戶端之間傳遞訊息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]

表示倒計時計時器獲取和釋放的資料物件,用於 TimerClientTimerServer 之間。負數的 expiration_time 應解釋為“釋放”請求。

注意

worker_id 的型別是實現特定的。它是 TimerServerTimerClient 實現用於唯一標識 worker 的任何內容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]

監視活動計時器並及時使其到期的實體。此伺服器負責收割(reaping)具有到期計時器的 worker。

abstract clear_timers(worker_ids)[source][source]

清除給定 worker_ids 的所有計時器。

abstract get_expired_timers(deadline)[source][source]

返回每個 worker_id 的所有到期計時器。到期計時器是指 expiration_time 小於或等於提供的截止日期的計時器。

返回型別

dict[str, list[torch.distributed.elastic.timer.api.TimerRequest]]

abstract register_timers(timer_requests)[source][source]

處理傳入的計時器請求並將其註冊到伺服器。計時器請求可以是獲取計時器請求,也可以是釋放計時器請求。expiration_time 為負數的計時器請求應解釋為釋放計時器請求。

class torch.distributed.elastic.timer.TimerClient[source][source]

透過與 TimerServer 通訊來獲取和釋放倒計時計時器的客戶端庫。

abstract acquire(scope_id, expiration_time)[source][source]

為持有此客戶端物件的 worker 獲取一個計時器,給定 scope_idexpiration_time。通常將計時器註冊到 TimerServer

abstract release(scope_id)[source][source]

釋放此客戶端代表的 worker 上 scope_id 的計時器。呼叫此方法後,該作用域上的倒計時計時器不再生效。

除錯資訊日誌記錄

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[source][source]

文件

查閱 PyTorch 全面開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得解答

檢視資源