注意
轉到末尾 下載完整示例程式碼。
從流構建 tensordicts¶
在許多實際應用中,資料是持續生成的,並且頻率各不相同。
例如,來自物聯網裝置的感測器讀數、金融交易或社交媒體更新都可以產生需要即時處理和分析的資料流。
處理此類資料流時,通常需要將輸入資料“分桶”成離散塊,以便進行高效處理和分析。然而,處理具有不同頻率或格式的資料流時,這可能會帶來挑戰。
在本教程中,我們將探討如何使用 TensorDict 構建和操作資料流。我們將學習如何建立張量的惰性堆疊、處理非同步資料流,以及緻密化我們的資料以實現高效儲存和處理。
在本教程中,您將學習: - 如何讀取資料流並以固定間隔將其寫入 tensordict 中; - 如何構建將具有異構形狀的內容堆疊在一起的 TensorDict; - 如果需要,如何使用 nested_tensor 將這些張量緻密化到單個儲存中。
堆疊異構 tensordicts¶
在許多實際場景中,資料以具有不同預設頻率的流形式出現。
本教程的目標是將傳入的資料“分桶”,以便以給定的較低頻率讀取和處理。在這種場景下的挑戰是資料可能無法以規則的“矩形”格式表示(即張量的每個維度都明確定義),但可能出現一個數據桶的元素比另一個多的情況,此時我們不能簡單地將它們堆疊在一起。通常,考慮第一和第二個資料桶如下的情況
import torch
from tensordict import TensorDict
bucket0 = TensorDict(stream0=torch.randn(5), stream1=torch.randn(4))
bucket1 = TensorDict(stream0=torch.randn(4), stream1=torch.randn(5))
原則上,由於兩個流的形狀不同,我們無法在記憶體中連續堆疊這兩個 tensordict。幸運的是,TensorDict 提供了一個工具來將具有異構張量形狀的例項組合在一起:LazyStackedTensorDict。要建立惰性堆疊,只需呼叫 lazy_stack()。
data = TensorDict.lazy_stack([bucket0, bucket1], dim=0)
print(data)
LazyStackedTensorDict(
fields={
stream0: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False),
stream1: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False)},
exclusive_fields={
},
batch_size=torch.Size([2]),
device=None,
is_shared=False,
stack_dim=0)
結果資料只是這兩個 tensordict 沿維度 0 堆疊在一起的表示。 LazyStackedTensorDict 支援 TensorDictBase 類的大多數常用操作,以下是一些示例
data_select = data.select("stream0")
data_plus_1 = data + 1
data_apply = data.apply(lambda x: x + 1)
此外,對其進行索引將返回我們用於建立堆疊的原始資料
assert data[0] is bucket0
然而,在某些情況下,人們可能希望獲得底層資料的連續表示。為此,TensorDictBase 提供了一個 densify() 方法,該方法將堆疊可以堆疊的張量,並嘗試將其餘部分表示為 nested_tensor 例項。
data_cont = data.densify()
非同步資料流¶
現在讓我們轉向一個更具體的例子,我們將建立一個函式,該函式以給定頻率流式傳輸資料(在本例中,只是每次迭代增加 1 的整數)。
為了跨執行緒傳遞資料,該函式將使用作為輸入接收到的佇列
import asyncio
from typing import List
async def generate_numbers(frequency: float, queue: asyncio.Queue) -> None:
i = 0
while True:
await asyncio.sleep(1 / frequency)
await queue.put(i)
i += 1
collect_data 函式從佇列中讀取資料,持續給定時間。一旦 timeout 已過,函式將返回。
async def collect_data(queue: asyncio.Queue, timeout: float) -> List[int]:
values = []
# We create a nested `collect` async function in order to be able to stop it as
# soon as timeout is passed (see wait_for below).
async def collect():
nonlocal values
while True:
value = await queue.get()
values.append(value)
task = asyncio.create_task(collect())
try:
await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
task.cancel()
return values
wait7hz 函式從佇列中讀取資料,持續給定時間。
async def wait7hz() -> None:
queue = asyncio.Queue()
generate_task = asyncio.create_task(generate_numbers(7, queue))
collect_data_task = asyncio.create_task(collect_data(queue, timeout=1))
values = await collect_data_task
# The ``generate_task`` has not been terminated
generate_task.cancel()
print(values)
asyncio.run(wait7hz())
from typing import Callable, Dict
[0, 1, 2, 3, 4, 5]
現在我們可以設計一個類,該類繼承自 LazyStackedTensorDict,並讀取來自不同流的資料並將其註冊到單獨的 tensordict 中。 LazyStackedTensorDict 的一個不錯特性是它也可以增量構建,因此我們可以透過擴充套件惰性堆疊來簡單地註冊新傳入的資料,直到收集到足夠的資料。這是 StreamedTensorDict 類的一個實現
from tensordict import LazyStackedTensorDict, NestedKey, TensorDictBase
class StreamedTensorDict(LazyStackedTensorDict):
"""A lazy stack class that can be built from a dictionary of streams."""
@classmethod
async def from_streams(
cls,
streams: Dict[NestedKey, Callable],
timeout: float,
batch_size: int,
densify: bool = True,
) -> TensorDictBase:
td = cls(stack_dim=0)
# We construct a queue for each stream
queues = [asyncio.Queue() for _ in range(len(streams))]
tasks = []
for stream, queue in zip(streams.values(), queues):
task = asyncio.create_task(stream(queue))
tasks.append(task)
for _ in range(batch_size):
values_tasks = []
for queue in queues:
values_task = asyncio.create_task(collect_data(queue, timeout))
values_tasks.append(values_task)
values = await asyncio.gather(*values_tasks)
td.append(TensorDict(dict(zip(streams.keys(), values))))
# Cancel the generator tasks
for task in tasks:
task.cancel()
if densify:
return td.densify(layout=torch.strided)
return td
最後,main 函式將組合流函式 stream0 和 stream1,並將它們傳遞給 StreamedTensorDict.from_streams 方法,該方法將為每個 timeout=1 秒收集 batch_size 批次的資料。
async def main() -> TensorDictBase:
def stream0(queue):
return generate_numbers(frequency=7, queue=queue)
def stream1(queue):
return generate_numbers(frequency=3, queue=queue)
# Running this should take about 10 seconds
return await StreamedTensorDict.from_streams(
{"bucket0": stream0, "bucket1": stream1}, timeout=1, batch_size=10
)
td = asyncio.run(main())
print("TensorDict from stream", td)
TensorDict from stream TensorDict(
fields={
bucket0: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False),
bucket1: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
- 讓我們表示來自兩個流的資料 - 對於 batch_size * timeout * Hz,它應該等於 torch.arange()
<=> 1 * 10 秒 * 3 或 7
print("bucket0 (7Hz, around 70 values)", td["bucket0"].values())
print("bucket1 (3Hz, around 30 values)", td["bucket1"].values())
print("shapes of bucket0 (7Hz, around 70 values)", td["bucket0"]._nested_tensor_size())
bucket0 (7Hz, around 70 values) tensor([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68])
bucket1 (3Hz, around 30 values) tensor([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28])
shapes of bucket0 (7Hz, around 70 values) tensor([[6],
[7],
[7],
[7],
[7],
[7],
[7],
[7],
[7],
[7]])
結論¶
在本教程中,我們探討了使用 TensorDict 和非同步資料流的基礎知識。我們學習瞭如何建立張量的惰性堆疊、使用 asyncio 處理非同步資料流,以及緻密化我們的資料以實現高效儲存和處理。
我們還看到了如何使用 TensorDict 和 LazyStackedTensorDict 簡化複雜的資料處理任務,例如對具有不同頻率的資料流進行分桶。透過利用 TensorDict 和 asyncio 的強大功能,您可以構建可擴充套件且高效的資料處理流水線,以處理最苛刻的實際應用。
感謝您跟隨本教程學習!希望您覺得它有所幫助並具有資訊量。
指令碼總執行時間: (0 分 11.023 秒)