分散式優化器¶
警告
當使用 CUDA 張量時,目前不支援分散式優化器
torch.distributed.optim 公開了 DistributedOptimizer,它採用遠端參數清單 (RRef) 並在參數所在的 worker 上本機執行優化器。分散式優化器可以使用任何本機優化器 基底類別 來應用每個 worker 上的梯度。
- class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)[來源]¶
DistributedOptimizer 採用分散在各個 worker 的參數的遠端參考,並針對每個參數在本機應用指定的優化器。
此類別使用
get_gradients()來擷取特定參數的梯度。對
step()的並行呼叫(無論來自相同或不同的用戶端)將在每個 worker 上序列化,因為每個 worker 的優化器一次只能處理一組梯度。但是,不能保證完整的正向傳播-反向傳播-優化器序列會一次為一個用戶端執行。這表示正在應用的梯度可能與在特定 worker 上執行的最新正向傳播不符。此外,worker 之間沒有保證的排序。DistributedOptimizer 預設會建立啟用 TorchScript 的本機優化器,以便在多執行緒訓練(例如分散式模型平行)的情況下,優化器更新不會被 Python 全域直譯器鎖定 (GIL) 阻塞。目前大多數優化器都啟用了此功能。您也可以按照 PyTorch 教學課程中的 程式碼範例 為您自己的自訂優化器啟用 TorchScript 支援。
- 參數
optimizer_class (optim.Optimizer) – 要在每個 worker 上實例化的優化器類別。
params_rref (list[RRef]) – 要最佳化的本機或遠端參數的 RRef 清單。
args – 要傳遞至每個 worker 上的優化器建構函式的引數。
kwargs – 要傳遞至每個 worker 上的優化器建構函式的引數。
- 範例:
>>> import torch.distributed.autograd as dist_autograd >>> import torch.distributed.rpc as rpc >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> >>> with dist_autograd.context() as context_id: >>> # Forward pass. >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> loss = rref1.to_here() + rref2.to_here() >>> >>> # Backward pass. >>> dist_autograd.backward(context_id, [loss.sum()]) >>> >>> # Optimizer. >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> [rref1, rref2], >>> lr=0.05, >>> ) >>> dist_optim.step(context_id)
- step(context_id)[來源]¶
執行單一優化步驟。
這將在每個包含要最佳化參數的 worker 上呼叫
torch.optim.Optimizer.step(),並會阻塞直到所有 worker 都返回。提供的context_id將用於擷取包含應應用於參數的梯度的對應context。- 參數
context_id – 我們應該為其執行優化器步驟的 autograd context id。
- class torch.distributed.optim.PostLocalSGDOptimizer(optim, averager)[來源]¶
包裝任意
torch.optim.Optimizer並執行 後置本機 SGD,此優化器在每個步驟都會執行本機優化器。在預熱階段之後,它會在本機優化器應用後定期平均參數。- 參數
optim (Optimizer) – 本機優化器。
averager (ModelAverager) – 用於執行後置本機 SGD 演算法的模型平均器實例。
範例
>>> import torch >>> import torch.distributed as dist >>> import torch.distributed.algorithms.model_averaging.averagers as averagers >>> import torch.nn as nn >>> from torch.distributed.optim import PostLocalSGDOptimizer >>> from torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hook import ( >>> PostLocalSGDState, >>> post_localSGD_hook, >>> ) >>> >>> model = nn.parallel.DistributedDataParallel( >>> module, device_ids=[rank], output_device=rank >>> ) >>> >>> # Register a post-localSGD communication hook. >>> state = PostLocalSGDState(process_group=None, subgroup=None, start_localSGD_iter=100) >>> model.register_comm_hook(state, post_localSGD_hook) >>> >>> # Create a post-localSGD optimizer that wraps a local optimizer. >>> # Note that ``warmup_steps`` used in ``PostLocalSGDOptimizer`` must be the same as >>> # ``start_localSGD_iter`` used in ``PostLocalSGDState``. >>> local_optim = torch.optim.SGD(params=model.parameters(), lr=0.01) >>> opt = PostLocalSGDOptimizer( >>> optim=local_optim, >>> averager=averagers.PeriodicModelAverager(period=4, warmup_steps=100) >>> ) >>> >>> # In the first 100 steps, DDP runs global gradient averaging at every step. >>> # After 100 steps, DDP runs gradient averaging within each subgroup (intra-node by default), >>> # and post-localSGD optimizer runs global model averaging every 4 steps after applying the local optimizer. >>> for step in range(0, 200): >>> opt.zero_grad() >>> loss = loss_fn(output, labels) >>> loss.backward() >>> opt.step()
- load_state_dict(state_dict)[來源]¶
這與
torch.optim.Optimizerload_state_dict()相同,但也會將模型平均器的步驟值還原為儲存在提供的state_dict中的值。如果
state_dict中沒有"step"項目,它會發出警告並將模型平均器的步驟初始化為 0。
- state_dict()[source]¶
這與
torch.optim.Optimizerstate_dict()相同,但會在檢查點中添加一個額外的項目來記錄模型平均器的步驟,以確保重新載入不會導致不必要的重新預熱。
- class torch.distributed.optim.ZeroRedundancyOptimizer(params, optimizer_class, process_group=None, parameters_as_bucket_view=False, overlap_with_ddp=False, **defaults)[source]¶
包裝任意
optim.Optimizer並將其狀態分片到群組中的各個rank。共享方式如 ZeRO 所述。
每個rank中的本地優化器實例僅負責更新大約
1 / world_size個參數,因此只需要保留1 / world_size個優化器狀態。在本地更新參數後,每個rank會將其參數廣播到所有其他peer,以保持所有模型副本處於相同狀態。ZeroRedundancyOptimizer可以與torch.nn.parallel.DistributedDataParallel結合使用,以減少每個rank的峰值記憶體消耗。ZeroRedundancyOptimizer使用排序貪婪演算法在每個rank上打包多個參數。每個參數屬於單個rank,並且不會在rank之間分割。分割是任意的,可能與參數註冊或使用順序不符。- 參數
params (
Iterable) – 一個Iterable的torch.Tensor或dict,提供所有將在rank之間分片的參數。- 關鍵字參數
optimizer_class (
torch.nn.Optimizer) – 本地優化器的類別。process_group (
ProcessGroup, optional) –torch.distributedProcessGroup(預設值:由torch.distributed.init_process_group()初始化的dist.group.WORLD)。parameters_as_bucket_view (bool, optional) – 如果為
True,則將參數打包到桶中以加快通訊速度,並且param.data欄位指向不同偏移量的桶視圖;如果為False,則單獨通訊每個單獨的參數,並且每個params.data保持不變(預設值:False)。overlap_with_ddp (bool, optional) – 如果為
True,則step()與DistributedDataParallel的梯度同步重疊;這需要 (1)optimizer_class參數的函數式優化器或具有函數式等效項的優化器,以及 (2) 註冊從ddp_zero_hook.py中的函數之一建構的 DDP 通訊鉤子;參數被打包到與DistributedDataParallel中的桶相匹配的桶中,這意味著parameters_as_bucket_view參數被忽略。如果為False,則step()在反向傳遞後單獨運行(正常情況下)。(預設值:False)**defaults – 任何尾隨參數,將被轉發到本地優化器。
範例
>>> import torch.nn as nn >>> from torch.distributed.optim import ZeroRedundancyOptimizer >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> model = nn.Sequential(*[nn.Linear(2000, 2000).to(rank) for _ in range(20)]) >>> ddp = DDP(model, device_ids=[rank]) >>> opt = ZeroRedundancyOptimizer( >>> ddp.parameters(), >>> optimizer_class=torch.optim.Adam, >>> lr=0.01 >>> ) >>> ddp(inputs).sum().backward() >>> opt.step()
警告
目前,
ZeroRedundancyOptimizer要求所有傳入的參數都是相同的密集類型。警告
如果您傳遞
overlap_with_ddp=True,請注意以下事項:根據當前實作將DistributedDataParallel與ZeroRedundancyOptimizer重疊的方式,前兩到三次訓練迭代不會在優化器步驟中執行參數更新,具體取決於static_graph=False還是static_graph=True。這是因為它需要有關DistributedDataParallel使用的梯度分桶策略的信息,如果static_graph=False,則在第二次正向傳遞之前不會最終確定,如果static_graph=True,則在第三次正向傳遞之前不會最終確定。為了調整這個問題,一種選擇是預先添加虛擬輸入。警告
ZeroRedundancyOptimizer 仍處於實驗階段,可能會有所變更。
- add_param_group(param_group)[source]¶
將參數群組添加到
Optimizer的param_groups中。這在微調預先訓練的網路時很有用,因為凍結的層可以變成可訓練的,並在訓練過程中添加到
Optimizer中。- 參數
param_group (
dict) – 指定要優化的參數和群組特定的優化選項。
警告
此方法處理更新所有分區上的分片,但需要在所有rank上調用。在rank的子集上調用此方法將導致訓練掛起,因為通訊基元是根據託管參數調用的,並且期望所有rank都參與同一組參數。
- consolidate_state_dict(to=0)[source]¶
在目標rank上合併
state_dict列表(每個rank一個)。- 參數
to (int) – 接收優化器狀態的rank(預設值:0)。
- 引發
RuntimeError – 如果
overlap_with_ddp=True並且在此ZeroRedundancyOptimizer實例完全初始化之前調用此方法,則會發生此錯誤,一旦DistributedDataParallel梯度桶被重建,就會發生此錯誤。
警告
這需要在所有rank上調用。
- join_hook(**kwargs)[原始碼]¶
返回 ZeRO 加入鉤子。
它透過在優化器步驟中遮蔽集體通訊,來啟用對不均勻輸入的訓練。
在呼叫此鉤子之前,必須正確設置梯度。
此鉤子不支援任何關鍵字參數;也就是說,
kwargs未被使用。
- load_state_dict(state_dict)[原始碼]¶
從輸入的
state_dict中載入與給定排名相關的狀態,並根據需要更新本地優化器。- 參數
state_dict (dict) – 優化器狀態;應該是呼叫
state_dict()所返回的物件。- 引發
RuntimeError – 如果
overlap_with_ddp=True並且在此ZeroRedundancyOptimizer實例完全初始化之前調用此方法,則會發生此錯誤,一旦DistributedDataParallel梯度桶被重建,就會發生此錯誤。
- state_dict()[原始碼]¶
返回此排名已知的最後一個全局優化器狀態。
- 引發
RuntimeError – 如果
overlap_with_ddp=True且在此ZeroRedundancyOptimizer實例完全初始化之前呼叫此方法,則會發生此錯誤,一旦DistributedDataParallel梯度桶被重建;或者如果在沒有事先呼叫consolidate_state_dict()的情況下呼叫此方法,則會發生此錯誤。- 返回類型