快捷方式

TensorDictReplayBuffer

class torchrl.data.TensorDictReplayBuffer(*, priority_key: str = 'td_error', **kwargs)[source]

圍繞 ReplayBuffer 類構建的 TensorDict 特定包裝器。

關鍵字引數:
  • storage (Storage, 可選) – 要使用的儲存。如果未提供,將建立一個預設的 ListStorage,其 max_size1_000

  • sampler (Sampler, 可選) – 要使用的取樣器。如果未提供,將使用預設的 RandomSampler()。

  • writer (Writer, 可選) – 要使用的寫入器。如果未提供,將使用預設的 RoundRobinWriter

  • collate_fn (callable, 可選) – 合併樣本列表以形成一個 Tensor(s)/輸出的 mini-batch。在從對映式資料集批次載入時使用。預設值將根據儲存型別確定。

  • pin_memory (bool) – 是否應對回放緩衝區樣本呼叫 pin_memory()。

  • prefetch (int, 可選) – 使用多執行緒預取的下一個批次的數量。預設為 None(不進行預取)。

  • transform (Transform, 可選) – 呼叫 sample() 時要執行的轉換。要鏈式呼叫轉換,請使用 Compose 類。轉換應與 tensordict.TensorDict 內容一起使用。如果與其他結構一起使用,則轉換應使用前導鍵 "data" 進行編碼,該鍵將用於從非 TensorDict 內容構建 TensorDict。

  • batch_size (int, 可選) –

    呼叫 sample() 時要使用的批次大小。

    注意

    批次大小可以在構造時透過 batch_size 引數指定,也可以在取樣時指定。當批次大小在整個實驗中保持一致時,應首選前者。如果批次大小可能更改,可以將其傳遞給 sample() 方法。此選項與預取不相容(因為這需要提前知道批次大小),也與具有 drop_last 引數的取樣器不相容。

  • priority_key (str, 可選) – 在新增到此 ReplayBuffer 的 TensorDict 中假定儲存優先順序的鍵。當取樣器型別為 PrioritizedSampler 時使用。預設為 "td_error"

  • dim_extend (int, 可選) –

    指示呼叫 extend() 時要考慮的擴充套件維度。預設為 storage.ndim-1。當使用 dim_extend > 0 時,如果儲存例項化中提供了 ndim 引數,我們建議使用該引數,以便讓儲存知道資料是多維的,並在取樣期間保持儲存容量和批次大小概念的一致性。

    注意

    此引數對 add() 沒有影響,因此在程式碼庫中同時使用 add()extend() 時應謹慎使用。例如

    >>> data = torch.zeros(3, 4)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(10, ndim=2),
    ...     dim_extend=1)
    >>> # these two approaches are equivalent:
    >>> for d in data.unbind(1):
    ...     rb.add(d)
    >>> rb.extend(data)
    

  • generator (torch.Generator, 可選) –

    用於取樣的生成器。為回放緩衝區使用專用的生成器可以實現對種子的精細控制,例如在分散式作業中保持全域性種子不同但回放緩衝區種子相同。預設為 None(全域性預設生成器)。

    警告

    截至目前,該生成器對 transforms 沒有影響。

  • shared (bool, 可選) – 是否使用多程序共享緩衝區。預設為 False

  • compilable (bool, 可選) – 寫入器是否可編譯。如果為 True,則寫入器不能在多個程序之間共享。預設為 False

示例

>>> import torch
>>>
>>> from torchrl.data import LazyTensorStorage, TensorDictReplayBuffer
>>> from tensordict import TensorDict
>>>
>>> torch.manual_seed(0)
>>>
>>> rb = TensorDictReplayBuffer(storage=LazyTensorStorage(10), batch_size=5)
>>> data = TensorDict({"a": torch.ones(10, 3), ("b", "c"): torch.zeros(10, 1, 1)}, [10])
>>> rb.extend(data)
>>> sample = rb.sample(3)
>>> # samples keep track of the index
>>> print(sample)
TensorDict(
    fields={
        a: Tensor(shape=torch.Size([3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([3, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([3]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.int32, is_shared=False)},
    batch_size=torch.Size([3]),
    device=cpu,
    is_shared=False)
>>> # we can iterate over the buffer
>>> for i, data in enumerate(rb):
...     print(i, data)
...     if i == 2:
...         break
0 TensorDict(
    fields={
        a: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.int32, is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
1 TensorDict(
    fields={
        a: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.int32, is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
add(data: TensorDictBase) int[source]

向回放緩衝區新增單個元素。

引數:

data (Any) – 要新增到回放緩衝區的資料

返回:

資料在回放緩衝區中的索引。

append_transform(transform: Transform, *, invert: bool = False) ReplayBuffer

在末尾新增轉換。

呼叫 sample 時按順序應用轉換。

引數:

transform (Transform) – 要新增的轉換

關鍵字引數:

invert (bool, 可選) – 如果為 True,則轉換將被反轉(寫入時呼叫正向,讀取時呼叫逆向)。預設為 False

示例

>>> rb = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=4)
>>> data = TensorDict({"a": torch.zeros(10)}, [10])
>>> def t(data):
...     data += 1
...     return data
>>> rb.append_transform(t, invert=True)
>>> rb.extend(data)
>>> assert (data == 1).all()
dump(*args, **kwargs)

dumps() 的別名。

dumps(path)

將回放緩衝區儲存到指定路徑的磁碟上。

引數:

path (Pathstr) – 儲存回放緩衝區的路徑。

示例

>>> import tempfile
>>> import tqdm
>>> from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer
>>> from torchrl.data.replay_buffers.samplers import PrioritizedSampler, RandomSampler
>>> import torch
>>> from tensordict import TensorDict
>>> # Build and populate the replay buffer
>>> S = 1_000_000
>>> sampler = PrioritizedSampler(S, 1.1, 1.0)
>>> # sampler = RandomSampler()
>>> storage = LazyMemmapStorage(S)
>>> rb = TensorDictReplayBuffer(storage=storage, sampler=sampler)
>>>
>>> for _ in tqdm.tqdm(range(100)):
...     td = TensorDict({"obs": torch.randn(100, 3, 4), "next": {"obs": torch.randn(100, 3, 4)}, "td_error": torch.rand(100)}, [100])
...     rb.extend(td)
...     sample = rb.sample(32)
...     rb.update_tensordict_priority(sample)
>>> # save and load the buffer
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     rb.dumps(tmpdir)
...
...     sampler = PrioritizedSampler(S, 1.1, 1.0)
...     # sampler = RandomSampler()
...     storage = LazyMemmapStorage(S)
...     rb_load = TensorDictReplayBuffer(storage=storage, sampler=sampler)
...     rb_load.loads(tmpdir)
...     assert len(rb) == len(rb_load)
empty()

清空回放緩衝區並將遊標重置為 0。

extend(tensordicts: TensorDictBase) Tensor[source]

使用可迭代物件中包含的一個或多個元素擴充套件回放緩衝區。

如果存在,將呼叫逆向轉換。

引數:

data (iterable) – 要新增到回放緩衝區的資料集合。

返回:

新增到回放緩衝區的資料的索引。

警告

extend() 在處理值列表時可能具有模糊的簽名,這些值列表應被解釋為 PyTree(在這種情況下,列表中的所有元素將作為儲存的 PyTree 中的一個切片放置到儲存中)或要逐個新增的值列表。為了解決這個問題,TorchRL 在列表和元組之間做出了明確區分:元組將被視為 PyTree,而(頂層)列表將被解釋為要逐個新增到緩衝區的一系列值。對於 ListStorage 例項,只能提供未繫結的元素(不能是 PyTree)。

insert_transform(index: int, transform: Transform, *, invert: bool = False) ReplayBuffer

插入轉換。

呼叫 sample 時按順序執行轉換。

引數:
  • index (int) – 插入轉換的位置。

  • transform (Transform) – 要新增的轉換

關鍵字引數:

invert (bool, 可選) – 如果為 True,則轉換將被反轉(寫入時呼叫正向,讀取時呼叫逆向)。預設為 False

load(*args, **kwargs)

loads() 的別名。

loads(path)

從給定路徑載入回放緩衝區狀態。

緩衝區應具有匹配的元件,並且是使用 dumps() 儲存的。

引數:

path (Pathstr) – 回放緩衝區儲存的路徑。

更多資訊請參見 dumps()

register_load_hook(hook: Callable[[Any], Any])

為儲存註冊一個載入鉤子。

注意

當前,儲存回放緩衝區時鉤子不會被序列化:每次建立緩衝區時必須手動重新初始化它們。

register_save_hook(hook: Callable[[Any], Any])

為儲存註冊一個儲存鉤子。

注意

當前,儲存回放緩衝區時鉤子不會被序列化:每次建立緩衝區時必須手動重新初始化它們。

sample(batch_size: int | None = None, return_info: bool = False, include_info: bool = None) TensorDictBase[source]

從回放緩衝區取樣一批資料。

使用 Sampler 取樣索引,並從 Storage 中檢索它們。

引數:
  • batch_size (int, 可選) – 要收集的資料大小。如果未提供,此方法將按照取樣器指示的批次大小進行取樣。

  • return_info (bool) – 是否返回資訊。如果為 True,結果是一個元組 (data, info)。如果為 False,結果是資料。

返回:

一個 TensorDict,包含從回放緩衝區中選定的批次資料。如果設定了 return_info 標誌,則返回包含該 TensorDict 和 info 的元組。

property sampler

回放緩衝區的取樣器。

取樣器必須是 Sampler 的例項。

save(*args, **kwargs)

dumps() 的別名。

set_sampler(sampler: Sampler)

在回放緩衝區中設定一個新的取樣器並返回先前的取樣器。

set_storage(storage: Storage, collate_fn: Callable | None = None)

在回放緩衝區中設定一個新的儲存並返回先前的儲存。

引數:
  • storage (Storage) – 緩衝區的新儲存。

  • collate_fn (callable, 可選) – 如果提供,collate_fn 將設定為此值。否則,它將重置為預設值。

set_writer(writer: Writer)

在回放緩衝區中設定一個新的寫入器並返回先前的寫入器。

property storage

回放緩衝區的儲存。

儲存必須是 Storage 的例項。

property write_count

透過 add 和 extend 方法到目前為止寫入緩衝區的總條目數。

property writer

回放緩衝區的寫入器。

寫入器必須是 Writer 的例項。

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

獲取面向初學者和高階開發者的深度教程

檢視教程

資源

查詢開發資源並獲得問題解答

檢視資源