• 教程 >
  • TorchRL 目標:編碼 DDPG 損失
快捷方式

TorchRL 目標:編碼 DDPG 損失

建立日期:2023 年 8 月 14 日 | 最後更新:2025 年 3 月 20 日 | 最後驗證:未驗證

作者Vincent Moens

概述

TorchRL 將強化學習 (RL) 演算法的訓練分解為多個部分,這些部分將在你的訓練指令碼中組合起來:環境、資料收集和儲存、模型以及最後的損失函式。

TorchRL 損失(或稱為“目標”)是包含可訓練引數(策略和值模型)的狀態物件。本教程將指導你從頭開始使用 TorchRL 編寫一個損失函式。

為此,我們將重點關注 DDPG,這是一個相對容易編碼的演算法。深度確定性策略梯度 (DDPG) 是一種簡單的連續控制演算法。它包括學習一個動作-觀測對的引數化值函式,然後學習一個策略,該策略針對給定的觀測輸出最大化此值函式的動作。

你將學到什麼

  • 如何編寫一個損失模組並自定義其值估計器;

  • 如何在 TorchRL 中構建環境,包括變換(例如,資料歸一化)和並行執行;

  • 如何設計策略網路和值網路;

  • 如何高效地從環境中收集資料並將其儲存在回放緩衝區中;

  • 如何在回放緩衝區中儲存軌跡(而不是轉換);

  • 如何評估你的模型。

先決條件

本教程假設你已完成 PPO 教程,該教程概述了 TorchRL 元件和依賴項,例如 tensordict.TensorDicttensordict.nn.TensorDictModules,儘管本教程應該足夠清晰,即使對這些類沒有深入理解也能看懂。

注意

我們的目標並非提供該演算法的 SOTA(State-of-the-Art,最先進)實現,而是為了高層次地闡述 TorchRL 的損失實現以及在該演算法上下文中使用的庫特性。

匯入和設定

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我們將在 CUDA 上執行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列可在訓練指令碼中使用的損失函式。其目標是提供易於重用/替換且簽名簡單的損失函式。

TorchRL 損失的主要特點是

  • 它們是狀態物件:它們包含可訓練引數的副本,因此 loss_module.parameters() 提供訓練演算法所需的一切。

  • 它們遵循 TensorDict 約定:torch.nn.Module.forward() 方法將接收一個從回放緩衝區中取樣的 TensorDict 作為輸入,其中包含返回損失值所需的所有資訊。

    >>> data = replay_buffer.sample()
    >>> loss_dict = loss_module(data)
    
  • 它們輸出一個 tensordict.TensorDict 例項,其中損失值寫在 "loss_<smth>" 下面,smth 是描述損失的字串。TensorDict 中的其他鍵可能是有用的指標,可在訓練期間記錄。

    注意

    我們返回獨立損失的原因是允許使用者對不同的引數集使用不同的最佳化器。透過以下方式可以簡單地求和損失:

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

The __init__ 方法

所有損失的父類是 LossModule。與庫中的許多其他元件一樣,其 forward() 方法期望接收一個從經驗回放緩衝區或任何類似資料結構中取樣的 tensordict.TensorDict 例項作為輸入。使用這種格式可以在不同模態下或在模型需要讀取多個條目的複雜設定中重用模組。換句話說,它允許我們編碼一個對輸入資料型別無感知的損失模組,並專注於執行損失函式的基本步驟,僅此而已。

為了使本教程儘可能具有啟發性,我們將獨立展示類的每個方法,並在稍後階段填充該類。

讓我們從 __init__() 方法開始。DDPG 旨在用一個簡單的策略解決控制任務:訓練一個策略以輸出最大化值網路預測的值的動作。因此,我們的損失模組在其建構函式中需要接收兩個網路:一個 actor 網路和一個 value 網路。我們期望它們都是 TensorDict 相容的物件,例如 tensordict.nn.TensorDictModule。我們的損失函式需要計算目標值並將值網路擬合到此目標值,並生成一個動作並擬合策略,使得其值估計被最大化。

The crucial step of the LossModule.__init__() 方法是呼叫 convert_to_functional()。此方法將從模組中提取引數並將其轉換為函式式模組。嚴格來說,這不是必需的,完全可以在沒有它的情況下編寫所有損失。但是,我們鼓勵使用它,原因如下。

TorchRL 這樣做的原因是 RL 演算法經常使用不同的引數集執行同一個模型,這些引數集被稱為“可訓練”引數和“目標”引數。“可訓練”引數是最佳化器需要擬合的引數。“目標”引數通常是前者的一個副本,帶有一定的時間延遲(絕對延遲或透過移動平均稀釋)。這些目標引數用於計算與下一個觀測值關聯的值。為值模型使用一組與當前配置不完全匹配的目標引數的優點之一是,它們為正在計算的值函式提供了一個悲觀的界限。請注意下面的 create_target_params 關鍵字引數:此引數告訴 convert_to_functional() 方法在損失模組中建立一組用於計算目標值的目標引數。如果將其設定為 False(例如,對於 actor 網路),target_actor_network_params 屬性仍然可以訪問,但這將僅返回 actor 引數的一個 分離的 版本。

稍後,我們將看到在 TorchRL 中如何更新目標引數。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

值估計器損失方法

在許多 RL 演算法中,值網路(或 Q 值網路)是基於經驗值估計進行訓練的。這可以是自舉的 (TD(0),低方差,高偏差),意味著目標值僅使用下一個獎勵獲得,別無其他;或者可以是 Monte-Carlo 估計 (TD(1)),在這種情況下將使用整個後續獎勵序列(高方差,低偏差)。也可以使用中間估計器 (TD(\(\lambda\))) 來折衷偏差和方差。TorchRL 透過 ValueEstimators 列舉類使得使用其中一種估計器變得容易,該類包含所有已實現的值估計器的指標。這裡我們定義預設值函式。我們將採用最簡單的版本 (TD(0)),稍後展示如何更改它。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我們還需要根據使用者的查詢,向 DDPG 提供一些關於如何構建值估計器的指令。根據提供的估計器,我們將構建用於訓練時的相應模組

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

The make_value_estimator 方法可以呼叫,但並非必須:如果未呼叫,LossModule 將使用其預設估計器查詢此方法。

Actor 損失方法

RL 演算法的核心是 actor 的訓練損失。對於 DDPG,此函式非常簡單:我們只需要計算使用策略計算出的動作所關聯的值,並最佳化 actor 權重以最大化此值。

在計算此值時,我們必須確保將值引數從圖中取出,否則 actor 和 value 的損失會混淆。為此,可以使用 hold_out_params() 函式。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

Value 損失方法

現在我們需要最佳化我們的值網路引數。為此,我們將依賴於我們類的值估計器

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

在 forward 呼叫中整合各部分

唯一缺少的部分是 forward 方法,它將把 value 損失和 actor 損失結合起來,收整合本值並將其寫入一個 TensorDict 返回給使用者。

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

現在我們有了損失函式,就可以用它來訓練一個策略來解決控制任務。

環境

在大多數演算法中,首先需要處理的是環境的構建,因為它決定了訓練指令碼的其餘部分。

對於此示例,我們將使用 "cheetah" 任務。目標是讓半獵豹儘可能快地奔跑。

在 TorchRL 中,可以透過依賴 dm_controlgym 來建立此類任務

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

預設情況下,這些環境停用渲染。從狀態進行訓練通常比從影像進行訓練更容易。為了保持簡單,我們僅專注於從狀態學習。要將畫素傳遞給 env.step() 收集的 tensordicts,只需將 from_pixels=True 引數傳遞給建構函式

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我們編寫一個 make_env() 輔助函式,它將使用上面考慮的兩個後端之一(dm-controlgym)建立環境。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

Transforms(變換)

現在我們有了一個基礎環境,可能想要修改其表示形式使其更適合策略。在 TorchRL 中,變換被新增到基礎環境中的專門 torchr.envs.TransformedEnv 類中。

  • 在 DDPG 中,通常會使用一些啟發式值來重新縮放獎勵。在此示例中,我們將獎勵乘以 5。

  • 如果我們使用 dm_control,構建模擬器(使用雙精度數字)與我們的指令碼(可能使用單精度數字)之間的介面也很重要。這種轉換是雙向的:當呼叫 env.step() 時,我們的動作需要以雙精度表示,而輸出需要轉換為單精度。DoubleToFloat 變換正是執行此操作:in_keys 列表指的是需要從雙精度轉換為單精度的鍵,而 in_keys_inv 指的是在傳遞給環境之前需要轉換為雙精度的鍵。

  • 我們使用 CatTensors 變換將狀態鍵連線在一起。

  • 最後,我們也保留了對狀態進行歸一化的可能性:我們將在稍後計算歸一化常數。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

並行執行

以下輔助函式允許我們並行執行環境。並行執行環境可以顯著加快資料收集吞吐量。使用變換環境時,我們需要選擇是為每個環境單獨執行變換,還是集中資料並批次進行變換。這兩種方法都很容易編碼

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

為了利用 PyTorch 的向量化能力,我們採用第一種方法

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 將多個步驟與一個動作打包在一起 如果 > 1,則需要調整其他幀計數(例如 frames_per_batch、total_frames),以確保在實驗中收集的總幀數一致。這很重要,因為增加 frame-skip 但保持總幀數不變可能看似作弊:綜合比較,使用 frame-skip 為 2 收集的 10M 元素資料集和使用 frame-skip 為 1 收集的資料集的與環境互動的實際比例是 2:1!簡而言之,在使用幀跳過時,應謹慎處理訓練指令碼的幀計數,因為這可能導致訓練策略之間的比較產生偏差。

縮放獎勵有助於我們控制訊號幅度,從而實現更高效的學習。

reward_scaling = 5.0

我們還定義了軌跡何時被截斷。一千步(如果 frame-skip = 2 則為 500 步)對於獵豹任務來說是一個不錯的數字

max_frames_per_traj = 500

觀測值歸一化

為了計算歸一化統計資料,我們在環境中執行任意數量的隨機步驟,並計算收集到的觀測值的均值和標準差。可以使用 ObservationNorm.init_stats() 方法來達到此目的。要獲取彙總統計資料,我們建立一個虛擬環境,執行一定數量的步驟,在一定數量的步驟中收集資料並計算其彙總統計資料。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

歸一化統計資料

使用 ObservationNorm 進行統計計算使用的隨機步驟數

init_env_steps = 5000

transform_state_dict = get_env_stats()

每個資料收集器中的環境數量

env_per_collector = 4

我們將之前計算出的統計資料傳遞給環境以歸一化其輸出

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import CompositeSpec

構建模型

現在我們轉向模型的設定。正如我們所見,DDPG 需要一個值網路(訓練用於估計狀態-動作對的值)和一個引數化的 actor(學習如何選擇最大化此值的動作)。

回想一下,構建一個 TorchRL 模組需要兩個步驟

  • 編寫將用作網路的 torch.nn.Module

  • 將網路包裝在 tensordict.nn.TensorDictModule 中,其中透過指定輸入和輸出鍵來處理資料流。

在更復雜的場景中,也可以使用 tensordict.nn.TensorDictSequential

Q 值網路包裝在 ValueOperator 中,它會自動將 Q 值網路的 out_keys 設定為 "state_action_value",並將其他值網路的 out_keys 設定為 "state_value"

TorchRL 提供了原始論文中提出的 DDPG 網路的內建版本。這些可以在 DdpgMlpActorDdpgMlpQNet 下找到。

由於我們使用 lazy 模組,在能夠將策略從一個裝置移動到另一個裝置並執行其他操作之前,需要例項化這些 lazy 模組。因此,用少量資料樣本執行模組是一個好的實踐。為此,我們從環境規範中生成假資料。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=CompositeSpec(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

探索

根據原始論文的建議,策略被傳遞到一個 OrnsteinUhlenbeckProcessModule 探索模組中。我們來定義 OU 噪聲達到其最小值之前的幀數

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

資料收集器

TorchRL 提供了專門的類,幫助你透過在環境中執行策略來收集資料。這些“資料收集器”會迭代地計算給定時間要執行的動作,然後在環境中執行一步,並在需要時重置環境。資料收集器的設計旨在幫助開發者嚴格控制每批資料的幀數、收集的同步/非同步特性以及分配給資料收集的資源(例如 GPU、工作程序數等)。

這裡我們將使用 SyncDataCollector,這是一個簡單的單程序資料收集器。TorchRL 還提供了其他收集器,例如 MultiaSyncDataCollector,它以非同步方式執行 rollout(例如,在最佳化策略的同時收集資料,從而解耦訓練和資料收集)。

需要指定的引數有

  • 一個環境工廠或一個環境,

  • 策略,

  • 收集器被視為空之前總共的幀數,

  • 每條軌跡的最大幀數(對於非終止環境很有用,例如 dm_control 環境)。

    注意

    傳遞給收集器的 max_frames_per_traj 將產生在用於推理的環境中註冊一個新的 StepCounter 變換的效果。我們可以手動達到同樣的結果,就像我們在這個指令碼中所做的一樣。

還需要傳遞

  • 收集的每個批次中的幀數,

  • 獨立於策略執行的隨機步驟數,

  • 用於策略執行的裝置

  • 用於在資料傳遞給主程序之前儲存資料的裝置。

訓練期間將使用的總幀數應約為 1M。

total_frames = 10_000  # 1_000_000

在外部迴圈的每次迭代中,收集器返回的幀數等於每個子軌跡的長度乘以每個收集器中並行執行的環境數量。

換句話說,我們期望收集器返回的批次的形狀為 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

評估器:構建你的記錄器物件

由於訓練資料是使用某種探索策略獲得的,因此我們的演算法的真實效能需要在確定性模式下評估。我們使用一個專門的類 Recorder 來完成此操作,該類以給定頻率在環境中執行策略,並返回從這些模擬中獲得的一些統計資料。

以下輔助函式構建此物件

from torchrl.trainers import Recorder


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = Recorder(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我們將每收集 10 個批次記錄一次效能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

回放緩衝區

回放緩衝區有兩種型別:優先回放(使用某種誤差訊號使某些條目比其他條目有更高的取樣可能性)和常規的迴圈經驗回放。

TorchRL 回放緩衝區是可組合的:可以選擇儲存、取樣和寫入策略。也可以使用記憶體對映陣列在物理記憶體上儲存張量。以下函式負責建立具有所需超引數的回放緩衝區

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我們將把回放緩衝區儲存在磁碟上的一個臨時目錄中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

回放緩衝區儲存和批次大小

TorchRL 重放緩衝區計算第一維度的元素數量。由於我們將向緩衝區輸入軌跡,因此需要透過將其除以資料收集器產生的子軌跡長度來調整緩衝區大小。關於批大小,我們的取樣策略是首先採樣長度為 traj_len=200 的軌跡,然後再選擇長度為 random_crop_len=25 的子軌跡來計算損失。這種策略平衡了儲存特定長度的完整軌跡的需求與為損失函式提供足夠多樣性樣本的需求。下圖展示了資料流:一個收集器在每個批次中獲取 8 幀資料,並行執行 2 個環境,將資料饋送到包含 1000 條軌跡的重放緩衝區,並從每條軌跡中取樣長度為 2 個時間步的子軌跡。

Storing trajectories in the replay buffer

我們先從緩衝區中儲存的幀數開始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

優先順序重放緩衝區預設停用

prb = False

我們還需要定義每收集一個批次資料執行多少次更新。這被稱為更新/資料比率或 UTD 比率

update_to_data = 64

我們將使用長度為 25 的軌跡來計算損失

random_crop_len = 25

在原始論文中,作者在每收集一幀資料時使用一個包含 64 個元素的批次進行一次更新。在這裡,我們複製相同的比率,但在每次批次收集時執行多次更新。我們調整了批次大小,以達到相同的每幀更新次數比率。

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

損失模組構建

我們使用剛剛建立的 actor 和 qnet 構建我們的損失模組。由於我們需要更新目標引數,我們_必須_建立一個目標網路更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

讓我們使用 TD(lambda) 估計器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda, device=device)

注意

離策略(Off-policy)通常使用 TD(0) 估計器。在這裡,我們使用了 TD(\(\lambda\)) 估計器,這會引入一些偏差,因為某個狀態之後的軌跡是使用過時的策略收集的。這個技巧,以及資料收集中使用的多步技巧,是“hack”的替代版本,儘管它們在回報估計中引入了偏差,但在實踐中我們發現它們通常效果良好。

目標網路更新器

目標網路是離策略強化學習演算法的關鍵組成部分。透過 HardUpdateSoftUpdate 類可以輕鬆更新目標網路引數。它們以損失模組作為引數構建,並在訓練迴圈的適當位置透過呼叫 updater.step() 完成更新。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

最佳化器

最後,我們將為策略網路和值網路使用 Adam 最佳化器

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

訓練策略

現在我們已經構建了所有需要的模組,訓練迴圈非常簡單直觀。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector

實驗結果

我們繪製了訓練期間的平均獎勵圖。我們可以看到我們的策略很好地學會了如何解決任務。

注意

如上所述,為了獲得更合理的效能,請使用更大的 total_frames 值,例如 1M。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()

結論

在本教程中,我們學習瞭如何以 DDPG 的具體示例為基礎,在 TorchRL 中編寫一個損失模組。

關鍵要點包括

  • 如何使用 LossModule 類編寫一個新的損失元件;

  • 如何使用(或不使用)目標網路,以及如何更新其引數;

  • 如何建立與損失模組相關的最佳化器。

後續步驟

要在此損失模組上進一步迭代,我們可以考慮

指令碼總執行時間: ( 0 分鐘 0.000 秒)

由 Sphinx-Gallery 生成的畫廊

文件

查閱 PyTorch 的完整開發者文件

檢視文件

教程

獲取針對初學者和高階開發人員的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源