• 教程 >
  • 強化學習 (DQN) 教程
快捷方式

強化學習 (DQN) 教程

創建於: 2017 年 3 月 24 日 | 最後更新於: 2024 年 6 月 18 日 | 最後驗證於: 2024 年 11 月 5 日

作者: Adam Paszke

Mark Towers

本教程展示瞭如何使用 PyTorch 在 Gymnasium 中的 CartPole-v1 任務上訓練一個深度 Q 學習 (DQN) 智慧體。

你可能會發現閱讀原始的 深度 Q 學習 (DQN) 論文有所幫助

任務

智慧體必須在兩個動作之間做出決定——向左或向右移動推車——以便連線在上面的杆子保持直立。你可以在 Gymnasium 的網站上找到有關該環境以及其他更具挑戰性環境的更多資訊。

CartPole

CartPole(倒立擺)

當智慧體觀察環境的當前狀態並選擇一個動作時,環境會遷移到新的狀態,並返回一個獎勵,該獎勵表明了動作的後果。在該任務中,每增加一個時間步,獎勵為 +1;如果杆子傾倒得太遠或推車偏離中心超過 2.4 個單位,環境就會終止。這意味著表現更好的場景將執行更長時間,累積更大的回報。

CartPole 任務的設計使得智慧體的輸入是代表環境狀態(位置、速度等)的 4 個實數值。我們將這 4 個輸入不做任何縮放,直接透過一個具有 2 個輸出的小型全連線網路,每個輸出對應一個動作。網路經過訓練,可以根據輸入狀態預測每個動作的期望值。然後選擇具有最高期望值的動作。

軟體包

首先,讓我們匯入所需的軟體包。首先,我們需要用於環境的 gymnasium,使用 pip 安裝。這是原始 OpenAI Gym 專案的一個分支,自 Gym v0.19 以來由同一個團隊維護。如果你在 Google Colab 中執行此程式碼,請執行

%%bash
pip3 install gymnasium[classic_control]

我們還將使用 PyTorch 中的以下內容:

  • 神經網路 (torch.nn)

  • 最佳化 (torch.optim)

  • 自動微分 (torch.autograd)

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

回放緩衝區

我們將使用經驗回放緩衝區來訓練 DQN。它儲存智慧體觀察到的轉移(transitions),使我們以後可以重用這些資料。透過從中隨機取樣,構建批次的轉移之間會去相關。這已被證明可以極大地穩定和改進 DQN 訓練過程。

為此,我們需要兩個類:

  • Transition - 一個命名元組,表示環境中的單個轉移。它本質上將 (state, action) 對對映到它們的 (next_state, reward) 結果,其中狀態是稍後描述的螢幕差分影像。

  • ReplayMemory - 一個有限大小的迴圈緩衝區,用於儲存最近觀察到的轉移。它還實現了一個 .sample() 方法,用於隨機選擇一批轉移進行訓練。

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

現在,讓我們定義模型。但首先,快速回顧一下 DQN 是什麼。

DQN 演算法

我們的環境是確定性的,因此為了簡單起見,這裡展示的所有方程也都是確定性地構建的。在強化學習文獻中,它們通常還包含對環境中隨機轉移的期望。

我們的目標是訓練一個策略,使其嘗試最大化折扣累積獎勵 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也稱為回報 (return)。折扣因子 \(\gamma\) 應該是一個介於 \(0\)\(1\) 之間的常數,確保求和收斂。較低的 \(\gamma\) 使得不確定遙遠未來的獎勵相對於可以相當確定的近期獎勵對智慧體而言不那麼重要。它也鼓勵智慧體獲取時間上更近的獎勵,而不是時間上遙遠未來等價的獎勵。

Q 學習背後的主要思想是,如果我們有一個函式 \(Q^*: 狀態 \times 動作 \rightarrow \mathbb{R}\),它可以告訴我們在給定狀態下采取某個動作後,我們的回報將是多少,那麼我們就可以輕鬆構建一個最大化獎勵的策略:

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a) \ ]

然而,我們並不瞭解世界的一切,因此無法直接獲取 \(Q^*\)。但是,由於神經網路是萬能函式逼近器,我們可以簡單地建立一個神經網路並訓練它來模擬 \(Q^*\)

對於我們的訓練更新規則,我們將利用這樣一個事實:任何策略的 \(Q\) 函式都遵循 Bellman 方程:

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s')) \ ]

等式兩邊的差值稱為時序差分誤差 \(\delta\)

\[\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a)) \ ]

為了最小化這個誤差,我們將使用 Huber 損失。Huber 損失在誤差較小時表現得像均方誤差,在誤差較大時則像平均絕對誤差——這使得它在 \(Q\) 的估計值非常嘈雜時對異常值更具魯棒性。我們在從回放緩衝區中取樣的一批轉移 \(B\) 上計算這個損失:

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]
\[\text{where} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{for } |\delta| \le 1, \\ |\delta| - \frac{1}{2} & \text{otherwise.} \end{cases}\]

Q 網路

我們的模型將是一個前饋神經網路,它接收當前螢幕補丁與先前螢幕補丁之間的差異作為輸入。它有兩個輸出,分別代表 \(Q(s, \mathrm{left})\)\(Q(s, \mathrm{right})\)(其中 \(s\) 是網路的輸入)。實際上,網路試圖預測在當前輸入下采取每個動作的期望回報

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

訓練

超引數和輔助函式

此單元例項化了我們的模型及其最佳化器,並定義了一些輔助函式:

  • select_action - 將根據 ε-greedy 策略選擇一個動作。簡單來說,我們有時會使用模型來選擇動作,有時會均勻取樣一個動作。選擇隨機動作的機率將從 EPS_START 開始,並指數衰減到 EPS_ENDEPS_DECAY 控制衰減速率。

  • plot_durations - 一個輔助函式,用於繪製每回合的持續時間,以及過去 100 回合的平均值(這是官方評估中使用的衡量標準)。圖表將位於包含主訓練迴圈的單元格下方,並在每回合後更新。

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

訓練迴圈

最後是訓練模型的程式碼。

在這裡,你可以找到執行單步最佳化的 optimize_model 函式。它首先採樣一批資料,將所有張量連線成一個,計算 \(Q(s_t, a_t)\)\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),並將它們組合成我們的損失。根據定義,如果 \(s\) 是終止狀態,我們將 \(V(s)\) 設定為 0。我們還使用一個目標網路來計算 \(V(s_{t+1})\) 以增加穩定性。目標網路在每一步都會透過超引數 TAU 控制的軟更新進行更新,該超引數之前已定義。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

下面是主訓練迴圈。開始時,我們重置環境並獲取初始 state 張量。然後,我們取樣一個動作,執行它,觀察下一個狀態和獎勵(總是 1),並最佳化模型一次。當回合結束(模型失敗)時,我們重新開始迴圈。

在下面,如果 GPU 可用,num_episodes 設定為 600,否則安排 50 回合,以便訓練不會花費太長時間。然而,50 回合不足以觀察到 CartPole 上的良好效能。你應該會看到模型在 600 個訓練回合內持續達到 500 步。訓練強化學習智慧體可能是一個嘈雜的過程,因此如果未觀察到收斂,重新開始訓練可能會產生更好的結果。

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()
Result
/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:

`np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)

Complete

這裡是說明整體資料流的圖示。

../_images/reinforcement_learning_diagram.jpg

動作隨機選擇或基於策略選擇,從 gym 環境獲取下一步樣本。我們將結果記錄在回放緩衝區中,並在每次迭代時執行最佳化步驟。最佳化從回放緩衝區中隨機選取一批資料來訓練新策略。“較舊的” target_net 也用於最佳化中計算期望的 Q 值。每一步都會對其權重進行軟更新。

指令碼總執行時間: ( 2 分 56.082 秒)

Gallery 由 Sphinx-Gallery 生成

文件

訪問 PyTorch 的全面開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源