使用 PyTorch 編寫分散式應用程式¶
建立時間:2017 年 10 月 06 日 | 最後更新時間:2025 年 02 月 20 日 | 最後驗證時間:2024 年 11 月 05 日
作者:Séb Arnold
注意
在 github 上檢視和編輯此教程。
先決條件
在這個簡短的教程中,我們將回顧 PyTorch 的分散式包。我們將瞭解如何設定分散式環境,使用不同的通訊策略,並探討包的一些內部機制。
設定¶
PyTorch 中包含的分散式包(即 torch.distributed)使研究人員和實踐者能夠輕鬆地跨程序和機器叢集並行化計算。為此,它利用訊息傳遞語義,允許每個程序與其他任何程序通訊資料。與多程序 (torch.multiprocessing) 包不同,程序可以使用不同的通訊後端,並且不侷限於在同一臺機器上執行。
為了開始使用,我們需要能夠同時執行多個程序。如果你有計算叢集的訪問許可權,你應該諮詢你的本地系統管理員或使用你喜歡的協調工具(例如,pdsh、clustershell 或 slurm)。出於本教程的目的,我們將使用單臺機器並使用以下模板生成多個程序。
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上面的指令碼會生成兩個程序,它們將各自設定分散式環境,初始化程序組 (dist.init_process_group),並最終執行給定的 run 函式。
讓我們看看 init_process 函式。它確保每個程序都能透過一個主節點協調,使用相同的 IP 地址和埠。請注意,我們使用了 gloo 後端,但還有其他後端可用。(參見 第 5.1 節)我們將在本教程的最後介紹 dist.init_process_group 中的“魔法”,但它本質上是透過共享程序位置來實現彼此通訊。
點對點通訊¶
傳送和接收 (Send and Recv)¶
資料從一個程序傳輸到另一個程序稱為點對點通訊。這可以透過 send 和 recv 函式或其立即對應的 isend 和 irecv 函式來實現。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的示例中,兩個程序都以一個零張量開始,然後程序 0 增加該張量並將其傳送給程序 1,以便它們都最終得到 1.0。請注意,程序 1 需要分配記憶體來儲存它將接收的資料。
還要注意 send/recv 是阻塞的:兩個程序都會阻塞直到通訊完成。另一方面,立即操作是非阻塞的;指令碼繼續執行,方法返回一個 Work 物件,我們可以選擇對其呼叫 wait()。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
使用立即操作時,我們必須謹慎使用傳送和接收的張量。由於我們不知道資料何時會傳輸到另一個程序,我們不應該修改傳送的張量,也不應該在 req.wait() 完成之前訪問接收的張量。換句話說,
在
dist.isend()之後寫入tensor將導致未定義的行為。在
dist.irecv()之後讀取tensor將導致未定義的行為,直到req.wait()執行完成。
然而,在 req.wait() 執行完成後,我們可以保證通訊已經發生,並且儲存在 tensor[0] 中的值為 1.0。
點對點通訊在我們需要更精細地控制程序間通訊時非常有用。它們可以用於實現複雜的演算法,例如 百度 DeepSpeech 或 Facebook 大規模實驗中使用的演算法。(參見 第 4.1 節)
集體通訊¶
分散 (Scatter)¶ |
收集 (Gather)¶ |
規約 (Reduce)¶ |
全域性規約 (All-Reduce)¶ |
廣播 (Broadcast)¶ |
全域性收集 (All-Gather)¶ |
與點對點通訊相反,集體通訊允許跨組中的所有程序進行通訊模式。一個組是我們所有程序的子集。要建立一個組,我們可以將一個等級列表傳遞給 dist.new_group(group)。預設情況下,集體通訊在所有程序上執行,這也稱為世界。例如,為了獲得所有程序上所有張量的總和,我們可以使用 dist.all_reduce(tensor, op, group) 集體操作。
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由於我們想要組中所有張量的總和,因此我們使用 dist.ReduceOp.SUM 作為規約運算元。一般來說,任何可交換的數學運算都可以用作運算元。PyTorch 開箱即用地提供了許多此類運算元,它們都在元素級別上工作:
dist.ReduceOp.SUM (求和),dist.ReduceOp.PRODUCT (乘積),dist.ReduceOp.MAX (最大值),dist.ReduceOp.MIN (最小值),dist.ReduceOp.BAND (按位與),dist.ReduceOp.BOR (按位或),dist.ReduceOp.BXOR (按位異或),dist.ReduceOp.PREMUL_SUM (前乘求和).
支援的運算元完整列表可在此處找到。
除了 dist.all_reduce(tensor, op, group) 外,PyTorch 中當前還實現了許多其他集體操作。以下是一些支援的集體操作:
dist.broadcast(tensor, src, group):將tensor從src複製到所有其他程序。dist.reduce(tensor, dst, op, group):將op應用於每個tensor並將結果儲存在等級為dst的程序中。dist.all_reduce(tensor, op, group):與 reduce 相同,但結果儲存在所有程序中。dist.scatter(tensor, scatter_list, src, group):將scatter_list[i]的第 \(i^{\text{th}}\) 個張量複製到第 \(i^{\text{th}}\) 個程序。dist.gather(tensor, gather_list, dst, group):從所有程序將tensor收集到等級為dst的程序的gather_list中。dist.all_gather(tensor_list, tensor, group):將tensor從所有程序複製到所有程序上的tensor_list中。dist.barrier(group):阻塞 group 中的所有程序,直到每個程序都進入此函式。dist.all_to_all(output_tensor_list, input_tensor_list, group):將輸入張量列表分散到組中的所有程序,並在輸出列表中返回收集到的張量列表。
透過查閱 PyTorch 分散式模組的最新文件,可以找到支援的集體操作完整列表 (連結)。
分散式訓練¶
注意:你可以在這個 GitHub 倉庫中找到本節的示例指令碼。
既然我們瞭解了分散式模組的工作原理,接下來讓我們用它來編寫一些有用的東西。我們的目標是複製 DistributedDataParallel 的功能。當然,這只是一個教學示例,在實際應用中,你應該使用上面連結的官方的、經過充分測試和最佳化的版本。
簡單來說,我們想要實現隨機梯度下降的分散式版本。我們的指令碼將讓所有程序計算各自模型在其批處理資料上的梯度,然後對這些梯度求平均。為了確保在改變程序數量時獲得相似的收斂結果,我們首先需要對資料集進行分割槽。(你也可以使用 torch.utils.data.random_split,而不是下面的程式碼片段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
有了上面的程式碼片段,我們現在可以使用以下幾行程式碼輕鬆地對任何資料集進行分割槽:
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假設我們有 2 個副本,那麼每個程序將擁有一個包含 60000 / 2 = 30000 個樣本的 train_set。我們還將批次大小除以副本數量,以保持整體批次大小為 128。
現在我們可以編寫通常的前向-反向-最佳化訓練程式碼,並新增一個函式呼叫來平均我們模型的梯度。(以下內容主要受到官方 PyTorch MNIST 示例的啟發。)
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
剩下要實現的是 average_gradients(model) 函式,它簡單地接受一個模型並對它在整個世界範圍內的梯度進行平均。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
瞧!我們成功地實現了分散式同步 SGD,並且可以在大型計算機叢集上訓練任何模型。
注意:雖然上一句話在技術上是正確的,但要實現生產級別的同步 SGD 版本,還需要更多的技巧。再次強調,請使用經過測試和最佳化的版本。
我們自己的環形 All-reduce¶
作為額外的挑戰,想象一下我們想要實現 DeepSpeech 高效的環形 All-reduce。使用點對點集體操作實現這一點相當容易。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上面的指令碼中,allreduce(send, recv) 函式的簽名與 PyTorch 中的有所不同。它接受一個 recv 張量,並將所有 send 張量的總和儲存在其中。作為一個留給讀者的練習,我們的版本與 DeepSpeech 的實現還有一個區別:他們的實現將梯度張量分成塊,以便最佳地利用通訊頻寬。(提示:torch.chunk)
高階主題¶
現在我們準備好探索 torch.distributed 的一些更高階功能。由於內容很多,本節分為兩個子節:
通訊後端:在這裡我們學習如何使用 MPI 和 Gloo 進行 GPU 到 GPU 的通訊。
初始化方法:在這裡我們瞭解如何在
dist.init_process_group()中最好地設定初始協調階段。
通訊後端¶
torch.distributed 最優雅的方面之一是它能夠抽象並在不同後端之上構建。如前所述,PyTorch 中實現了多個後端。其中一些最受歡迎的是 Gloo、NCCL 和 MPI。根據所需用例,它們各自具有不同的規範和權衡。支援函式對比表可在此處找到。
Gloo 後端
到目前為止,我們已經廣泛使用了 Gloo 後端。作為一個開發平臺,它非常方便,因為它包含在預編譯的 PyTorch 二進位制檔案中,並且在 Linux (從 0.2 版本開始) 和 macOS (從 1.3 版本開始) 上都可以工作。它支援 CPU 上的所有點對點和集體操作,以及 GPU 上的所有集體操作。Gloo 後端對 CUDA 張量的集體操作實現不如 NCCL 後端提供的最佳化得好。
正如你肯定已經注意到的,如果你將 model 放在 GPU 上,我們的分散式 SGD 示例將無法工作。為了使用多個 GPU,我們還需要進行以下修改:
使用
device = torch.device("cuda:{}".format(rank))model = Net()\(\rightarrow\)model = Net().to(device)使用
data, target = data.to(device), target.to(device)
經過以上修改,我們的模型現在正在兩個 GPU 上訓練,你可以使用 watch nvidia-smi 監控它們的利用率。
MPI 後端
訊息傳遞介面 (MPI) 是高效能計算領域的一個標準化工具。它允許進行點對點和集體通訊,並且是 torch.distributed API 的主要靈感來源。存在多種 MPI 實現(例如 Open-MPI、MVAPICH2、Intel MPI),每種都針對不同目的進行了最佳化。使用 MPI 後端的優勢在於 MPI 在大型計算機叢集上的廣泛可用性和高最佳化水平。一些 最新的 實現還能夠利用 CUDA IPC 和 GPU Direct 技術來避免透過 CPU 進行記憶體複製。
遺憾的是,PyTorch 的二進位制檔案不能包含 MPI 實現,我們需要手動重新編譯。幸運的是,這個過程相當簡單,因為在編譯時,PyTorch 會自行查詢可用的 MPI 實現。以下步驟透過從原始碼安裝 PyTorch 來安裝 MPI 後端。
建立並激活你的 Anaconda 環境,按照指南安裝所有先決條件,但暫不執行
python setup.py install。選擇並安裝你喜歡的 MPI 實現。請注意,啟用 CUDA-aware MPI 可能需要一些額外步驟。在本例中,我們將堅持使用不帶 GPU 支援的 Open-MPI:
conda install -c conda-forge openmpi現在,進入你克隆的 PyTorch 倉庫,執行
python setup.py install。
為了測試我們新安裝的後端,需要進行一些修改。
將
if __name__ == '__main__':下的內容替換為init_process(0, 0, run, backend='mpi')。執行
mpirun -n 4 python myscript.py。
這些更改的原因是 MPI 在啟動程序之前需要建立自己的環境。MPI 還會啟動自己的程序並執行 初始化方法 中描述的握手,這使得 init_process_group 函式的 rank 和 size 引數變得多餘。這實際上非常強大,因為你可以向 mpirun 傳遞額外的引數,以便為每個程序量身定製計算資源。(比如每個程序的核心數,手動將機器分配給特定的 rank,以及 更多內容)這樣做,你應該會獲得與其他通訊後端相同的熟悉輸出。
NCCL 後端
NCCL 後端 提供了針對 CUDA tensors 的集體操作的最佳化實現。如果你的集體操作只使用 CUDA tensors,那麼考慮使用此後端以獲得最佳效能。包含 CUDA 支援的預構建二進位制檔案已包含 NCCL 後端。
初始化方法¶
在本教程的最後,讓我們研究一下我們最初呼叫的函式:dist.init_process_group(backend, init_method)。具體來說,我們將討論負責每個程序之間初步協調步驟的各種初始化方法。這些方法使你能夠定義如何完成這種協調。
初始化方法的選擇取決於你的硬體設定,某些方法可能比其他方法更適合。除了以下各節之外,請參閱官方文件以獲取更多資訊。
環境變數
在本教程中,我們一直使用環境變數初始化方法。透過在所有機器上設定以下四個環境變數,所有程序都將能夠正確連線到 master,獲取關於其他程序的資訊,並最終與它們握手。
MASTER_PORT: 將託管 rank 為 0 的程序的機器上的一個空閒埠。MASTER_ADDR: 將託管 rank 為 0 的程序的機器的 IP 地址。WORLD_SIZE: 程序總數,master 知道要等待多少個 worker。RANK: 每個程序的 rank,以便它們知道自己是 master 還是 worker。
共享檔案系統
共享檔案系統要求所有程序都能訪問共享檔案系統,並將透過共享檔案進行協調。這意味著每個程序都會開啟檔案,寫入其資訊,並等待直到所有程序都完成寫入。之後,所有所需資訊即可供所有程序使用。為了避免競態條件,檔案系統必須支援透過 fcntl 進行鎖定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
透過 TCP 進行初始化可以透過提供 rank 為 0 的程序的 IP 地址和一個可到達的埠號來實現。在此,所有 worker 都能夠連線到 rank 為 0 的程序並交換如何相互通訊的資訊。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
致謝
我要感謝 PyTorch 開發人員在實現、文件和測試方面所做的出色工作。當代碼不清楚時,我總能依靠文件或測試來找到答案。特別感謝 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 對早期草稿提供了富有見地的評論並回答了問題。





