注意
前往末尾下載完整的示例程式碼。
TorchRL 訓練器:DQN 示例¶
TorchRL 提供了一個通用的 Trainer 類來處理你的訓練迴圈。訓練器執行一個巢狀迴圈,其中外層迴圈是資料收集,內層迴圈消耗這些資料或從回放緩衝區中檢索到的資料來訓練模型。在此訓練迴圈的各個點,可以附加鉤子(hook)並在給定的間隔執行。
在本教程中,我們將使用訓練器類從頭開始訓練一個 DQN 演算法來解決 CartPole(倒立擺)任務。
主要收穫
構建訓練器及其基本元件:資料收集器、損失模組、回放緩衝區和最佳化器。
向訓練器新增鉤子(hook),例如日誌記錄器、目標網路更新器等。
訓練器是完全可定製的,並提供大量功能。本教程圍繞其構建展開。我們將首先詳細介紹如何構建庫的每個元件,然後使用 Trainer 類將這些部分組合起來。
在此過程中,我們還將關注庫的其他一些方面
如何在 TorchRL 中構建環境,包括轉換(例如資料歸一化、幀拼接、調整大小和轉換為灰度圖)和並行執行。與我們在 DDPG 教程中所做的不同,我們將對畫素而不是狀態向量進行歸一化。
如何設計
QValueActor物件,即一個估算動作值並選取估算回報最高動作的 actor;如何高效地從環境中收集資料並將其儲存在回放緩衝區中;
如何使用多步(multi-step),這是一種用於離策略演算法的簡單預處理步驟;
最後是如何評估你的模型。
先決條件:我們建議你先透過 PPO 教程熟悉 torchrl。
DQN¶
DQN (深度 Q-學習) 是深度強化學習的開創性工作。
從高層次上看,該演算法相當簡單:Q-學習包括學習一個狀態-動作值表,這樣當遇到任何特定狀態時,我們只需查詢值最高的動作就知道應該選擇哪個動作。這種簡單的設定要求動作和狀態是離散的,否則無法構建查詢表。
DQN 使用一個神經網路,它將狀態-動作空間對映到一個值(標量)空間,這攤銷了儲存和探索所有可能狀態-動作組合的成本:如果過去沒有見過某個狀態,我們仍然可以透過我們的神經網路將它與各種可用動作一起傳入,並獲得每個可用動作的插值值。
我們將解決經典的控制問題——倒立擺(CartPole)。根據檢索該環境的 Gymnasium 文件所述
我們的目標不是提供該演算法的 SOTA(最先進)實現,而是為了在該演算法的背景下提供 TorchRL 功能的高階說明。
import os
import uuid
import torch
from torch import nn
from torchrl.collectors import MultiaSyncDataCollector, SyncDataCollector
from torchrl.data import LazyMemmapStorage, MultiStep, TensorDictReplayBuffer
from torchrl.envs import (
EnvCreator,
ExplorationType,
ParallelEnv,
RewardScaling,
StepCounter,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
CatFrames,
Compose,
GrayScale,
ObservationNorm,
Resize,
ToTensorImage,
TransformedEnv,
)
from torchrl.modules import DuelingCnnDQNet, EGreedyModule, QValueActor
from torchrl.objectives import DQNLoss, SoftUpdate
from torchrl.record.loggers.csv import CSVLogger
from torchrl.trainers import (
LogScalar,
LogValidationReward,
ReplayBufferTrainer,
Trainer,
UpdateWeights,
)
def is_notebook() -> bool:
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
elif shell == "TerminalInteractiveShell":
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
讓我們開始瞭解我們的演算法所需的各種部分
環境;
策略(以及我們歸類到“模型”下的相關模組);
資料收集器,它使策略在環境中進行互動並提供訓練資料;
用於儲存訓練資料的回放緩衝區;
損失模組,它計算目標函式以訓練我們的策略最大化回報;
最佳化器,它根據我們的損失執行引數更新。
附加模組包括日誌記錄器、記錄器(在“eval”模式下執行策略)和目標網路更新器。有了所有這些元件,很容易看出在訓練指令碼中可能會放錯或誤用某個元件。訓練器就在那裡為你協調一切!
構建環境¶
首先,讓我們編寫一個輔助函式來輸出一個環境。像往常一樣,“原始”環境在實踐中可能過於簡單,我們需要一些資料轉換來將其輸出暴露給策略。
我們將使用七種轉換
StepCounter用於計算每個軌跡中的步數;ToTensorImage會將[W, H, C]uint8 張量轉換為形狀為[C, W, H]且值在[0, 1]範圍內的浮點張量;RewardScaling以減小回報的尺度;GrayScale會將我們的影像轉換為灰度圖;Resize會將影像大小調整為 64x64 格式;CatFrames會將任意數量的連續幀(N=4)沿通道維度拼接成單個張量。這很有用,因為單張影像不包含關於倒立擺運動的資訊。需要透過迴圈神經網路或使用幀堆疊來記錄過去的觀察和動作。ObservationNorm會根據一些自定義的彙總統計資訊對我們的觀察進行歸一化。
實際上,我們的環境構建器有兩個引數
parallel:決定是否必須並行執行多個環境。我們在ParallelEnv之後堆疊轉換,以利用裝置上操作的向量化,儘管這在技術上也可以用於每個環境連線到其自己的轉換集的情況。obs_norm_sd將包含用於ObservationNorm轉換的歸一化常數。
def make_env(
parallel=False,
obs_norm_sd=None,
num_workers=1,
):
if obs_norm_sd is None:
obs_norm_sd = {"standard_normal": True}
if parallel:
def maker():
return GymEnv(
"CartPole-v1",
from_pixels=True,
pixels_only=True,
device=device,
)
base_env = ParallelEnv(
num_workers,
EnvCreator(maker),
# Don't create a sub-process if we have only one worker
serial_for_single=True,
mp_start_method=mp_context,
)
else:
base_env = GymEnv(
"CartPole-v1",
from_pixels=True,
pixels_only=True,
device=device,
)
env = TransformedEnv(
base_env,
Compose(
StepCounter(), # to count the steps of each trajectory
ToTensorImage(),
RewardScaling(loc=0.0, scale=0.1),
GrayScale(),
Resize(64, 64),
CatFrames(4, in_keys=["pixels"], dim=-3),
ObservationNorm(in_keys=["pixels"], **obs_norm_sd),
),
)
return env
計算歸一化常數¶
為了歸一化影像,我們不希望使用完整的 [C, W, H] 歸一化掩碼獨立歸一化每個畫素,而是使用形狀更簡單的 [C, 1, 1] 歸一化常數集(位置和尺度引數)。我們將使用 init_stats() 的 reduce_dim 引數來指示哪些維度必須被約簡,並使用 keep_dims 引數來確保不是所有維度都在過程中消失。
def get_norm_stats():
test_env = make_env()
test_env.transform[-1].init_stats(
num_iter=1000, cat_dim=0, reduce_dim=[-1, -2, -4], keep_dims=(-1, -2)
)
obs_norm_sd = test_env.transform[-1].state_dict()
# let's check that normalizing constants have a size of ``[C, 1, 1]`` where
# ``C=4`` (because of :class:`~torchrl.envs.CatFrames`).
print("state dict of the observation norm:", obs_norm_sd)
test_env.close()
del test_env
return obs_norm_sd
構建模型(深度 Q-網路)¶
以下函式構建一個 DuelingCnnDQNet 物件,它是一個簡單的 CNN,後接一個兩層 MLP。這裡使用的唯一技巧是動作值(即向左和向右動作值)是使用以下公式計算的
其中 \(\mathbb{v}\) 是我們的動作值向量,\(b\) 是一個 \(\mathbb{R}^n \rightarrow 1\) 函式,\(v\) 是一個 \(\mathbb{R}^n \rightarrow \mathbb{R}^m\) 函式,其中 \(n = \# obs\)(觀測數)且 \(m = \# actions\)(動作數)。
我們的網路被包裝在 QValueActor 中,它將讀取狀態-動作值,選取具有最大值的那個,並將所有這些結果寫入輸入的 tensordict.TensorDict 中。
def make_model(dummy_env):
cnn_kwargs = {
"num_cells": [32, 64, 64],
"kernel_sizes": [6, 4, 3],
"strides": [2, 2, 1],
"activation_class": nn.ELU,
# This can be used to reduce the size of the last layer of the CNN
# "squeeze_output": True,
# "aggregator_class": nn.AdaptiveAvgPool2d,
# "aggregator_kwargs": {"output_size": (1, 1)},
}
mlp_kwargs = {
"depth": 2,
"num_cells": [
64,
64,
],
"activation_class": nn.ELU,
}
net = DuelingCnnDQNet(
dummy_env.action_spec.shape[-1], 1, cnn_kwargs, mlp_kwargs
).to(device)
net.value[-1].bias.data.fill_(init_bias)
actor = QValueActor(net, in_keys=["pixels"], spec=dummy_env.action_spec).to(device)
# init actor: because the model is composed of lazy conv/linear layers,
# we must pass a fake batch of data through it to instantiate them.
tensordict = dummy_env.fake_tensordict()
actor(tensordict)
# we join our actor with an EGreedyModule for data collection
exploration_module = EGreedyModule(
spec=dummy_env.action_spec,
annealing_num_steps=total_frames,
eps_init=eps_greedy_val,
eps_end=eps_greedy_val_env,
)
actor_explore = TensorDictSequential(actor, exploration_module)
return actor, actor_explore
資料收集與儲存¶
回放緩衝區¶
回放緩衝區在離策略強化學習演算法(如 DQN)中扮演著核心角色。它們構成了我們在訓練期間進行取樣的資料集。
在這裡,我們將使用常規取樣策略,儘管優先順序回放緩衝區 (RB) 可以顯著提高效能。
我們使用 LazyMemmapStorage 類將儲存放在磁碟上。這種儲存以惰性方式建立:只有當第一個資料批次傳遞給它時,它才會被例項化。
這種儲存的唯一要求是,寫入時傳遞給它的資料必須始終具有相同的形狀。
buffer_scratch_dir = tempfile.TemporaryDirectory().name
def get_replay_buffer(buffer_size, n_optim, batch_size, device):
replay_buffer = TensorDictReplayBuffer(
batch_size=batch_size,
storage=LazyMemmapStorage(buffer_size, scratch_dir=buffer_scratch_dir),
prefetch=n_optim,
transform=lambda td: td.to(device),
)
return replay_buffer
資料收集器¶
與 PPO 和 DDPG 中一樣,我們將使用資料收集器作為外層迴圈中的資料載入器。
我們選擇以下配置:我們將在一系列不同的收集器中並行同步執行多個並行環境,這些收集器本身並行但非同步執行。
注意
此功能僅在使用 Python 多程序庫的“spawn”啟動方法執行程式碼時可用。如果本教程直接作為指令碼執行(因此使用“fork”方法),我們將使用常規的 SyncDataCollector。
這種配置的優勢在於,我們可以平衡批次執行的計算量與我們希望非同步執行的計算量。我們鼓勵讀者透過修改收集器的數量(即傳遞給收集器的環境構造器的數量)以及每個收集器中並行執行的環境數量(由 num_workers 超引數控制)來試驗收集速度受到的影響。
收集器的裝置可以透過 device(通用)、policy_device、env_device 和 storing_device 引數完全引數化。 storing_device 引數將修改收集資料的儲存位置:如果我們收集的批次資料量相當大,我們可能希望將它們儲存在與計算發生裝置不同的位置。對於像我們這樣的非同步資料收集器,不同的儲存裝置意味著我們收集的資料每次不會駐留在同一裝置上,這是我們的訓練迴圈必須考慮的問題。為簡單起見,我們將所有子收集器的裝置設定為相同的值。
def get_collector(
stats,
num_collectors,
actor_explore,
frames_per_batch,
total_frames,
device,
):
# We can't use nested child processes with mp_start_method="fork"
if is_fork:
cls = SyncDataCollector
env_arg = make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
else:
cls = MultiaSyncDataCollector
env_arg = [
make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
] * num_collectors
data_collector = cls(
env_arg,
policy=actor_explore,
frames_per_batch=frames_per_batch,
total_frames=total_frames,
# this is the default behavior: the collector runs in ``"random"`` (or explorative) mode
exploration_type=ExplorationType.RANDOM,
# We set the all the devices to be identical. Below is an example of
# heterogeneous devices
device=device,
storing_device=device,
split_trajs=False,
postproc=MultiStep(gamma=gamma, n_steps=5),
)
return data_collector
損失函式¶
構建我們的損失函式很簡單:我們只需要向 DQNLoss 類提供模型和一些超引數即可。
目標引數¶
許多離策略強化學習演算法在估計下一狀態或狀態-動作對的值時都使用了“目標引數”的概念。目標引數是模型引數的延遲副本。由於它們的預測與當前模型配置的預測不匹配,它們透過對估計值設定悲觀界限來幫助學習。這是一種強大的技巧(稱為“雙 Q-學習”),在類似演算法中無處不在。
def get_loss_module(actor, gamma):
loss_module = DQNLoss(actor, delay_value=True)
loss_module.make_value_estimator(gamma=gamma)
target_updater = SoftUpdate(loss_module, eps=0.995)
return loss_module, target_updater
超引數¶
讓我們從超引數開始。以下設定在實踐中應該能很好地工作,並且演算法的效能希望不會對這些引數的微小變化過於敏感。
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
最佳化器¶
# the learning rate of the optimizer
lr = 2e-3
# weight decay
wd = 1e-5
# the beta parameters of Adam
betas = (0.9, 0.999)
# Optimization steps per batch collected (aka UPD or updates per data)
n_optim = 8
DQN 引數¶
gamma 衰減因子
gamma = 0.99
平滑目標網路更新衰減引數。這大致對應於硬目標網路更新的 1/tau 間隔
tau = 0.02
資料收集和回放緩衝區¶
注意
已註釋掉用於適當訓練的值。
在環境中收集的總幀數。在其他實現中,使用者定義最大episode數。對於我們的資料收集器來說,這更難做到,因為它們返回 N 幀收集資料的批次,其中 N 是常數。然而,當收集到一定數量的 episode 時,可以透過中斷訓練迴圈來輕鬆實現相同的 episode 數限制。
total_frames = 5_000 # 500000
用於初始化回放緩衝區的隨機幀數。
init_random_frames = 100 # 1000
每個收集批次中的幀數。
frames_per_batch = 32 # 128
每個最佳化步驟從回放緩衝區取樣的幀數
batch_size = 32 # 256
回放緩衝區的幀數大小
buffer_size = min(total_frames, 100000)
每個資料收集器中並行執行的環境數量
num_workers = 2 # 8
num_collectors = 2 # 4
環境和探索¶
我們設定了 Epsilon-greedy 探索中 epsilon 因子的初始值和最終值。由於我們的策略是確定性的,探索至關重要:沒有它,唯一的隨機性來源將是環境重置。
eps_greedy_val = 0.1
eps_greedy_val_env = 0.005
為了加快學習速度,我們將價值網路的最後一層偏差設定為預定義的值(這不是強制的)
init_bias = 2.0
注意
為了加快教程的渲染速度,total_frames 超引數被設定為一個非常小的值。要獲得合理的效能,請使用更大的值,例如 500000
構建訓練器¶
TorchRL 的 Trainer 類建構函式接受以下僅關鍵字引數
collectorloss_moduleoptimizerlogger:日誌記錄器可以是total_frames:此引數定義了訓練器的生命週期。frame_skip:使用 frame-skip 時,必須告知收集器,以便準確計算收集的幀數等。告知訓練器此引數並非強制,但有助於在總幀數(預算)固定但 frame-skip 可變的情況下進行更公平的比較。
stats = get_norm_stats()
test_env = make_env(parallel=False, obs_norm_sd=stats)
# Get model
actor, actor_explore = make_model(test_env)
loss_module, target_net_updater = get_loss_module(actor, gamma)
collector = get_collector(
stats=stats,
num_collectors=num_collectors,
actor_explore=actor_explore,
frames_per_batch=frames_per_batch,
total_frames=total_frames,
device=device,
)
optimizer = torch.optim.Adam(
loss_module.parameters(), lr=lr, weight_decay=wd, betas=betas
)
exp_name = f"dqn_exp_{uuid.uuid1()}"
tmpdir = tempfile.TemporaryDirectory()
logger = CSVLogger(exp_name=exp_name, log_dir=tmpdir.name)
warnings.warn(f"log dir: {logger.experiment.log_dir}")
我們可以控制標量應該多久記錄一次。在這裡,我們將其設定為一個低值,因為我們的訓練迴圈很短
log_interval = 500
trainer = Trainer(
collector=collector,
total_frames=total_frames,
frame_skip=1,
loss_module=loss_module,
optimizer=optimizer,
logger=logger,
optim_steps_per_batch=n_optim,
log_interval=log_interval,
)
註冊鉤子¶
註冊鉤子可以透過兩種不同的方式實現
如果鉤子有,
register()方法是首選。只需將訓練器作為輸入提供,鉤子將以預設名稱註冊到預設位置。對於某些鉤子,註冊可能相當複雜:ReplayBufferTrainer需要 3 個鉤子(extend、sample和update_priority),實現起來可能很麻煩。
buffer_hook = ReplayBufferTrainer(
get_replay_buffer(buffer_size, n_optim, batch_size=batch_size, device=device),
flatten_tensordicts=True,
)
buffer_hook.register(trainer)
weight_updater = UpdateWeights(collector, update_weights_interval=1)
weight_updater.register(trainer)
recorder = LogValidationReward(
record_interval=100, # log every 100 optimization steps
record_frames=1000, # maximum number of frames in the record
frame_skip=1,
policy_exploration=actor_explore,
environment=test_env,
exploration_type=ExplorationType.DETERMINISTIC,
log_keys=[("next", "reward")],
out_keys={("next", "reward"): "rewards"},
log_pbar=True,
)
recorder.register(trainer)
探索模組的 epsilon 因子也會進行退火
trainer.register_op("post_steps", actor_explore[1].step, frames=frames_per_batch)
任何可呼叫物件(包括
TrainerHookBase的子類)都可以使用register_op()進行註冊。在這種情況下,必須顯式傳遞一個位置 ()。此方法提供了對鉤子位置的更多控制,但也需要對訓練器機制有更深入的瞭解。請查閱訓練器文件以獲取訓練器鉤子的詳細描述。
trainer.register_op("post_optim", target_net_updater.step)
我們也可以記錄訓練獎勵。請注意,對於 CartPole 來說,這隻有有限的意義,因為獎勵總是 1。折扣獎勵總和的最大化不是透過獲得更高的獎勵,而是透過讓倒立擺存活更長時間。這將在進度條中顯示的 total_rewards 值中反映出來。
log_reward = LogScalar(log_pbar=True)
log_reward.register(trainer)
注意
如果需要,可以將多個最佳化器連結到訓練器。在這種情況下,每個最佳化器將與損失字典中的一個欄位繫結。請查閱 OptimizerHook 以瞭解更多資訊。
現在我們準備好訓練我們的演算法了!只需簡單呼叫 trainer.train(),我們將獲得日誌記錄的結果。
trainer.train()
現在我們可以快速檢查包含結果的 CSV 檔案。
def print_csv_files_in_folder(folder_path):
"""
Find all CSV files in a folder and prints the first 10 lines of each file.
Args:
folder_path (str): The relative path to the folder.
"""
csv_files = []
output_str = ""
for dirpath, _, filenames in os.walk(folder_path):
for file in filenames:
if file.endswith(".csv"):
csv_files.append(os.path.join(dirpath, file))
for csv_file in csv_files:
output_str += f"File: {csv_file}\n"
with open(csv_file, "r") as f:
for i, line in enumerate(f):
if i == 10:
break
output_str += line.strip() + "\n"
output_str += "\n"
print(output_str)
print_csv_files_in_folder(logger.experiment.log_dir)
trainer.shutdown()
del trainer
結論與潛在改進¶
在本教程中,我們學習了
如何編寫訓練器,包括構建其元件並在訓練器中註冊它們;
如何編寫 DQN 演算法,包括如何使用
QValueNetwork建立一個選取價值最高動作的策略;如何構建多程序資料收集器;
本教程可能的改進包括
還可以使用優先順序回放緩衝區。這將為價值準確性最差的樣本賦予更高的優先順序。在文件的回放緩衝區部分瞭解更多資訊。
分散式損失(更多資訊請參閱
DistributionalDQNLoss)。更高階的探索技術,例如
NoisyLinear層等。