儲存 TensorDict 和 tensorclass 物件¶
雖然我們可以直接使用 save() 儲存 tensordict,但這會建立一個包含整個資料結構內容的單一檔案。很容易想到在某些情況下這樣做不是最優的!
TensorDict 序列化 API 主要依賴於 MemoryMappedTensor,它用於將張量獨立寫入磁碟,其資料結構模仿了 TensorDict 的結構。
與依賴 pickle 的 PyTorch save() 方法相比,TensorDict 的序列化速度可以快一個數量級。本文件解釋瞭如何使用 TensorDict 建立和互動儲存在磁碟上的資料。
儲存記憶體對映的 TensorDict¶
當 tensordict 被轉儲為 mmap 資料結構時,每個條目對應一個單獨的 *.memmap 檔案,目錄結構由鍵結構決定:通常,巢狀的鍵對應於子目錄。
將資料結構儲存為結構化的記憶體對映張量集具有以下優點
可以部分載入儲存的資料。如果一個大型模型儲存在磁碟上,但只需將部分權重載入到在單獨指令碼中建立的模組上,則只有這些權重會被載入到記憶體中。
儲存資料是安全的:使用 pickle 庫序列化大型資料結構可能不安全,因為反序列化(unpickling)可以執行任意程式碼。TensorDict 的載入 API 只從儲存的 json 檔案和磁碟上儲存的記憶體緩衝區讀取預選欄位。
儲存速度快:由於資料被寫入多個獨立檔案,我們可以透過啟動多個併發執行緒來分攤 IO 開銷,每個執行緒都獨立訪問一個專用檔案。
儲存的資料結構很清晰:目錄樹能反映資料內容。
然而,這種方法也有一些缺點
並非所有資料型別都可以儲存。
tensorclass允許儲存任何非張量資料:如果這些資料可以用 json 檔案表示,則會使用 json 格式。否則,非張量資料將作為備用方案獨立使用save()儲存。NonTensorData類可用於在常規TensorDict例項中表示非張量資料。
tensordict 的記憶體對映 API 依賴於四個核心方法:memmap_()、memmap()、memmap_like() 和 load_memmap()。
memmap_() 和 memmap() 方法會將資料寫入磁碟,可以選擇是否修改包含資料的 tensordict 例項。這些方法可用於將模型序列化到磁碟(我們使用多個執行緒來加快序列化速度)。
>>> model = nn.Transformer()
>>> weights = TensorDict.from_module(model)
>>> weights_disk = weights.memmap("/path/to/saved/dir", num_threads=32)
>>> new_weights = TensorDict.load_memmap("/path/to/saved/dir")
>>> assert (weights_disk == new_weights).all()
當資料集需要在磁碟上預分配時,應使用 memmap_like(),典型用法如下:
>>> def make_datum(): # used for illustration purposes
... return TensorDict({"image": torch.randint(255, (3, 64, 64)), "label": 0}, batch_size=[])
>>> dataset_size = 1_000_000
>>> datum = make_datum() # creates a single instance of a TensorDict datapoint
>>> data = datum.expand(dataset_size) # does NOT require more memory usage than datum, since it's only a view on datum!
>>> data_disk = data.memmap_like("/path/to/data") # creates the two memory-mapped tensors on disk
>>> del data # data is not needed anymore
如上所示,在將 TensorDict` 中的條目轉換為 MemoryMappedTensor 時,可以控制記憶體對映儲存到磁碟的位置,以便它們持久存在並在以後載入。另一方面,也可以使用檔案系統。為此,只需在上述三個序列化方法中忽略 prefix 引數即可。
當指定 prefix 時,資料結構遵循 TensorDict 的結構
>>> import torch
>>> from tensordict import TensorDict
>>> td = TensorDict({"a": torch.rand(10), "b": {"c": torch.rand(10)}}, [10])
>>> td.memmap_(prefix="tensordict")
產生以下目錄結構
tensordict
├── a.memmap
├── b
│ ├── c.memmap
│ └── meta.json
└── meta.json
meta.json 檔案包含重建 tensordict 的所有相關資訊,例如裝置、批次大小以及 tensordict 的子型別。這意味著 load_memmap() 將能夠重建複雜的巢狀結構,其中子 tensordict 可以具有與父 tensordict 不同的型別。
>>> from tensordict import TensorDict, tensorclass, TensorDictBase
>>> from tensordict.utils import print_directory_tree
>>> import torch
>>> import tempfile
>>> td_list = [TensorDict({"item": i}, batch_size=[]) for i in range(4)]
>>> @tensorclass
... class MyClass:
... data: torch.Tensor
... metadata: str
>>> tc = MyClass(torch.randn(3), metadata="some text", batch_size=[])
>>> data = TensorDict({"td_list": torch.stack(td_list), "tensorclass": tc}, [])
>>> with tempfile.TemporaryDirectory() as tempdir:
... data.memmap_(tempdir)
...
... loaded_data = TensorDictBase.load_memmap(tempdir)
... assert (loaded_data == data).all()
... print_directory_tree(tempdir)
tmpzy1jcaoq/
tensorclass/
_tensordict/
data.memmap
meta.json
meta.json
td_list/
0/
item.memmap
meta.json
1/
item.memmap
meta.json
3/
item.memmap
meta.json
2/
item.memmap
meta.json
meta.json
meta.json
處理現有的 MemoryMappedTensor¶
如果 TensorDict` 已包含 MemoryMappedTensor 條目,則可能有幾種行為。
如果未指定
prefix且memmap()被呼叫兩次,則生成的 TensorDict 將包含與原始 TensorDict 相同的資料。>>> td = TensorDict({"a": 1}, []) >>> td0 = td.memmap() >>> td1 = td0.memmap() >>> td0["a"] is td1["a"] True
如果指定了
prefix且與現有MemoryMappedTensor例項的 prefix 不同,則會引發異常,除非傳入 copy_existing=True。>>> with tempfile.TemporaryDirectory() as tmpdir_0: ... td0 = td.memmap(tmpdir_0) ... td0 = td.memmap(tmpdir_0) # works, results are just overwritten ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td1 = td0.memmap(tmpdir_1) ... td_load = TensorDict.load_memmap(tmpdir_1) # works! ... assert (td_load == td).all() ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td_load = TensorDict.load_memmap(tmpdir_1) # breaks!
此功能是為了防止使用者無意中將記憶體對映張量從一個位置複製到另一個位置而實現的。
TorchSnapshot 相容性¶
警告
由於 torchsnapshot 的維護正在停止。因此,我們將不再為 tensordict 與此庫的相容性實現新功能。
TensorDict 與 PyTorch 檢查點庫 torchsnapshot 相容。TorchSnapshot 將獨立儲存您的每個張量,其資料結構模仿您的 tensordict 或 tensorclass 的結構。此外,TensorDict 自然內建了在不將完整張量載入到記憶體的情況下儲存和載入磁碟上大型資料集所需的工具:換句話說,tensordict + torchsnapshot 的組合使得可以將大小達數百 Gb 的張量載入到預分配的 MemmapTensor 上,而無需一次性將其全部載入 RAM。
主要有兩種用例:儲存和載入適合記憶體的 tensordict,以及使用 MemmapTensor 儲存和載入儲存在磁碟上的 tensordict。
一般用例:記憶體載入¶
如果您的目標 tensordict 未預分配,則此方法適用。這提供了靈活性(您可以將任何 tensordict 載入到您的 tensordict 上,您無需事先知道其內容),並且此方法比其他方法更容易編碼。但是,如果您的張量非常大且不適合記憶體,此方法可能會失效。此外,它不允許您直接載入到您選擇的裝置上。
儲存操作需要記住的兩個主要命令是
>>> state = {"state": tensordict_source}
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path="/path/to/my/snapshot")
要載入到目標 tensordict 上,您只需載入快照並更新 tensordict。在此方法的底層,它將呼叫 tensordict_target.load_state_dict(state_dict),這意味著 state_dict 將首先完全載入到記憶體中,然後載入到目標 tensordict 上。
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> state_target = {"state": tensordict_target}
>>> snapshot.restore(app_state=state_target)
以下是一個完整的示例
>>> import uuid
>>> import torchsnapshot
>>> from tensordict import TensorDict
>>> import torch
>>>
>>> tensordict_source = TensorDict({"a": torch.randn(3), "b": {"c": torch.randn(3)}}, [])
>>> state = {"state": tensordict}
>>> path = f"/tmp/{uuid.uuid4()}"
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path=path)
>>> # later
>>> snapshot = torchsnapshot.Snapshot(path=path)
>>> tensordict2 = TensorDict()
>>> target_state = {
>>> "state": tensordict2
>>> }
>>> snapshot.restore(app_state=target_state)
>>> assert (tensordict == tensordict2).all()
儲存和載入大型資料集¶
如果資料集太大無法完全載入記憶體,上述方法很容易失效。我們利用 torchsnapshot 的功能,將張量分小塊載入到其預分配的目標位置。這要求您瞭解目標資料將具有的形狀、裝置等資訊,但這對於能夠對模型或資料載入進行檢查點操作來說是一個很小的代價!
與之前的示例不同,我們將不再使用 TensorDict 的 load_state_dict() 方法,而是使用從目標物件獲取的 state_dict,然後用儲存的資料重新填充它。
同樣,兩行程式碼足以儲存資料
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_source.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path="/path/to/my/snapshot")
我們一直在使用 torchsnapshot.StateDict,並且明確呼叫了 my_tensordict_source.state_dict(keep_vars=True),這與之前的示例不同。現在,要將其載入到目標 tensordict 上
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_target.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
在此示例中,載入完全由 torchsnapshot 處理,即沒有呼叫 TensorDict.load_state_dict()。
注意
這有兩個重要影響
由於
LazyStackedTensorDict.state_dict()(以及其他惰性 tensordict 類)在執行某些操作後返回資料的副本,因此載入到 state-dict 不會更新原始類。但是,由於 state_dict() 操作受到支援,這不會引發錯誤。類似地,由於 state-dict 是原地更新的,而 tensordict 沒有使用
TensorDict.update()或TensorDict.set()進行更新,目標 tensordict 中缺失的鍵將不會被注意到。
以下是一個完整的示例
>>> td = TensorDict({"a": torch.randn(3), "b": TensorDict({"c": torch.randn(3, 1)}, [3, 1])}, [3])
>>> td.memmap_()
>>> assert isinstance(td["b", "c"], MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>>
>>> td_dest = TensorDict({"a": torch.zeros(3), "b": TensorDict({"c": torch.zeros(3, 1)}, [3, 1])}, [3])
>>> td_dest.memmap_()
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>> # sanity check
>>> assert (td_dest == td).all()
>>> assert (td_dest["b"].batch_size == td["b"].batch_size)
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
最後,tensorclass 也支援此功能。程式碼與上面相當相似
>>> from __future__ import annotations
>>> import uuid
>>> from typing import Union, Optional
>>>
>>> import torchsnapshot
>>> from tensordict import TensorDict, MemmapTensor
>>> import torch
>>> from tensordict.prototype import tensorclass
>>>
>>> @tensorclass
>>> class MyClass:
... x: torch.Tensor
... y: Optional[MyClass]=None
...
>>> tc = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc.memmap_()
>>> assert isinstance(tc.y.x, MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>> tc_dest = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc_dest.memmap_()
>>> assert isinstance(tc_dest.y.x, MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>>
>>> assert (tc_dest == tc).all()
>>> assert (tc_dest.y.batch_size == tc.y.batch_size)
>>> assert isinstance(tc_dest.y.x, MemmapTensor)