• 教程 >
  • 使用非同步執行實現批次 RPC 處理
快捷方式

使用非同步執行實現批次 RPC 處理

建立時間:2020 年 7 月 28 日 | 最後更新:2024 年 11 月 13 日 | 最後驗證:未驗證

作者Shen Li

注意

編輯github 中檢視和編輯本教程。

前提條件

本教程演示瞭如何使用 @rpc.functions.async_execution 裝飾器構建批次處理 RPC 應用程式,這有助於透過減少阻塞的 RPC 執行緒數量和整合 callee 上的 CUDA 操作來加速訓練。這與 使用 TorchServe 進行批次推理 有著相同的思路。

注意

本教程需要 PyTorch v1.6.0 或更高版本。

基礎知識

先前的教程展示瞭如何使用 torch.distributed.rpc 構建分散式訓練應用程式,但沒有詳細說明在處理 RPC 請求時 callee 端會發生什麼。在 PyTorch v1.5 中,每個 RPC 請求都會阻塞 callee 上的一個執行緒來執行該請求中的函式,直到該函式返回。這適用於許多用例,但有一個注意事項。如果使用者函式因 IO(例如,巢狀 RPC 呼叫)或訊號(例如,等待另一個 RPC 請求解除阻塞)而阻塞,callee 上的 RPC 執行緒將不得不空閒等待直到 IO 完成或訊號事件發生。因此,RPC callee 可能會使用比實際需要更多的執行緒。這個問題的原因是 RPC 將使用者函式視為黑盒,對函式內部發生的事情知之甚少。為了讓使用者函式能夠讓出並釋放 RPC 執行緒,需要向 RPC 系統提供更多提示。

從 v1.6.0 開始,PyTorch 透過引入兩個新概念解決了這個問題

  • 一種封裝非同步執行的 torch.futures.Future 型別,它還支援安裝回調函式。

  • 一個 @rpc.functions.async_execution 裝飾器,它允許應用程式告訴 callee 目標函式將返回一個 future,並且可以在執行期間多次暫停和讓出。

藉助這兩個工具,應用程式程式碼可以將使用者函式分解為多個更小的函式,將它們作為回撥鏈式地連線到 Future 物件上,並返回包含最終結果的 Future。在 callee 端,當獲取到 Future 物件時,它也會將後續的 RPC 響應準備和通訊作為回撥安裝上去,這些回撥將在最終結果準備好時觸發。透過這種方式,callee 不再需要阻塞一個執行緒並等待最終返回值就緒。請參閱 @rpc.functions.async_execution 的 API 文件獲取簡單示例。

除了減少 callee 上的空閒執行緒數量之外,這些工具還有助於使批次 RPC 處理更簡單、更快。本教程的以下兩部分演示瞭如何使用 @rpc.functions.async_execution 裝飾器構建分散式批次更新引數伺服器和批次處理強化學習應用程式。

批次更新引數伺服器

考慮一個同步引數伺服器訓練應用程式,它有一個引數伺服器 (PS) 和多個 trainer。在此應用程式中,PS 持有引數並等待所有 trainer 報告梯度。在每次迭代中,它都會等待接收到所有 trainer 的梯度,然後一次性更新所有引數。下面的程式碼展示了 PS 類的實現。update_and_fetch_model 方法使用 @rpc.functions.async_execution 進行裝飾,並將由 trainer 呼叫。每次呼叫都返回一個 Future 物件,該物件將用更新後的模型填充。大多數 trainer 發起的呼叫只是將梯度累積到 .grad 欄位,立即返回,並讓出 PS 上的 RPC 執行緒。最後到達的 trainer 將觸發最佳化器步驟並消耗所有先前報告的梯度。然後它用更新後的模型設定 future_model,這反過來又透過 Future 物件通知來自其他 trainer 的所有先前請求,並將更新後的模型傳送給所有 trainer。

import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim

num_classes, batch_update_size = 30, 5

class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g

            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

對於 trainer,它們都使用 PS 中的同一組引數進行初始化。在每次迭代中,每個 trainer 首先執行前向和後向傳播以在本地生成梯度。然後,每個 trainer 使用 RPC 將其梯度報告給 PS,並透過同一 RPC 請求的返回值取回更新後的引數。在 trainer 的實現中,目標函式是否標記了 @rpc.functions.async_execution 沒有區別。trainer 只需使用 rpc_sync 呼叫 update_and_fetch_model,這將在 trainer 上阻塞,直到返回更新後的模型。

batch_size, image_w, image_h  = 20, 64, 64

class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

本教程跳過了啟動多個程序的程式碼,請參考 examples 倉庫獲取完整實現。注意,不使用 @rpc.functions.async_execution 裝飾器也可以實現批次處理。然而,這將需要在 PS 上阻塞更多 RPC 執行緒,或者使用另一輪 RPC 來獲取更新後的模型,後一種方法會增加程式碼複雜性和通訊開銷。

本節使用一個簡單的引數伺服器訓練示例來展示如何使用 @rpc.functions.async_execution 裝飾器實現批次 RPC 應用程式。在下一節中,我們將使用批次處理重新實現先前 分散式 RPC 框架入門 教程中的強化學習示例,並演示其對訓練速度的影響。

批次處理 CartPole Solver

本節使用 OpenAI Gym 中的 CartPole-v1 作為示例,展示批次處理 RPC 對效能的影響。請注意,由於目標是演示 @rpc.functions.async_execution 的用法,而不是構建最好的 CartPole solver 或解決大多數不同的 RL 問題,我們使用非常簡單的策略和獎勵計算方法,重點關注多 observer 單 agent 的批次 RPC 實現。我們使用與先前教程類似的 Policy 模型,如下所示。與之前的教程相比,區別在於其建構函式接受一個額外的 batch 引數,該引數控制 F.softmaxdim 引數,因為批次處理時,forward 函式中的 x 引數包含來自多個 observer 的狀態,因此維度需要相應改變。其餘部分保持不變。

import argparse
import torch.nn as nn
import torch.nn.functional as F

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

Observer 的建構函式也做了相應的調整。它還接受一個 batch 引數,該引數控制它使用哪個 Agent 函式來選擇動作。在批次模式下,它會呼叫 Agent 上的 select_action_batch 函式(稍後會介紹),該函式將使用 @rpc.functions.async_execution 進行裝飾。

import gym
import torch.distributed.rpc as rpc

class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

與之前的教程 分散式 RPC 框架入門 相比,observer 的行為略有不同。它不是在環境停止時退出,而是在每個 episode 中始終執行 n_steps 次迭代。當環境返回時,observer 只需重置環境並重新開始。透過這種設計,agent 將從每個 observer 接收固定數量的狀態,因此可以將它們打包到一個固定大小的張量中。在每一步中,Observer 使用 RPC 將其狀態傳送給 Agent,並透過返回值獲取動作。在每個 episode 結束時,它將所有步驟的獎勵返回給 Agent。請注意,這個 run_episode 函式將由 Agent 使用 RPC 呼叫。因此,此函式中的 rpc_sync 呼叫將是一個巢狀的 RPC 呼叫。我們也可以將此函式標記為 @rpc.functions.async_execution,以避免阻塞 Observer 上的一個執行緒。然而,由於瓶頸在 Agent 而不是 Observer,阻塞 Observer 程序上的一個執行緒應該是可以接受的。

import torch

class Observer:
    ...

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

Agent 的建構函式也接受一個 batch 引數,該引數控制動作機率如何進行批次處理。在批次模式下,saved_log_probs 包含一個張量列表,其中每個張量包含所有 observer 在一步中的動作機率。在非批次處理模式下,saved_log_probs 是一個字典,其中鍵是 observer ID,值是該 observer 的動作機率列表。

import threading
from torch.distributed.rpc import RRef

class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

非批次處理模式下的 select_acion 簡單地透過策略執行狀態,儲存動作機率,然後立即將動作返回給 observer。

from torch.distributions import Categorical

class Agent:
    ...

    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

啟用批次處理後,狀態儲存在一個 2D 張量 self.states 中,使用 observer ID 作為行 ID。然後,它透過在批次生成的 self.future_actions Future 物件上安裝回調函式來鏈式生成一個 Future,該 Future 將填充該 observer ID 索引到的特定行。最後到達的 observer 一次性將所有批次狀態透過策略執行,並相應地設定 self.future_actions。當這種情況發生時,安裝在 self.future_actions 上的所有回撥函式將被觸發,它們的返回值將用於填充鏈式生成的 Future 物件,這反過來又通知 Agent 為來自其他 observer 的所有先前 RPC 請求準備和通訊響應。

class Agent:
    ...

    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

現在我們來定義不同的 RPC 函式是如何組合在一起的。Agent 控制每個 episode 的執行。它首先使用 rpc_async 在所有 observer 上啟動 episode,並阻塞在返回的 future 上,這些 future 將填充 observer 獎勵。請注意,下面的程式碼使用 RRef 助手 ob_rref.rpc_async()ob_rref RRef 的所有者上使用提供的引數啟動 run_episode 函式。然後它將儲存的動作機率和返回的 observer 獎勵轉換為期望的資料格式,並啟動訓練步驟。最後,它重置所有狀態並返回當前 episode 的獎勵。此函式是執行一個 episode 的入口點。

class Agent:
    ...

    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

其餘程式碼是正常的程序啟動和日誌記錄,類似於其他 RPC 教程。在本教程中,所有 observer 都被動地等待來自 agent 的命令。請參考 examples 倉庫獲取完整實現。

def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

批次 RPC 有助於將動作推理整合到更少的 CUDA 操作中,從而降低了分攤開銷。上面的 main 函式使用不同數量的 observer(從 1 到 10),在批次和非批次模式下執行相同的程式碼。下圖繪製了使用預設引數值在不同 world size 下的執行時間。結果證實了我們的預期,即批次處理有助於加速訓練。

文件

查閱 PyTorch 的完整開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲取問題解答

檢視資源