VLAwithVariousSpeed / src /openpi /training /data_loader_test.py
Alan0928's picture
Upload folder using huggingface_hub
08ff31f verified
Raw
History Blame Contribute Delete
5.47 kB
import dataclasses
import jax
import numpy as np
from openpi.models import pi0_config
from openpi.training import config as _config
from openpi.training import data_loader as _data_loader
class _RawColumns:
def __init__(self, columns):
self._columns = columns
self.column_names = list(columns)
def with_format(self, _format):
return self
def __getitem__(self, key):
return self._columns[key]
class _OnlineSourceDataset:
def __init__(self, length):
actions = np.zeros((length, 7), dtype=np.float32)
actions[:, 0] = 1.0
states = np.stack(
[np.full(8, i, dtype=np.float32) for i in range(length)],
axis=0,
)
columns = {
"actions": [row for row in actions],
"state": [row for row in states],
"episode_index": list(np.zeros(length, dtype=np.int64)),
"frame_index": list(np.arange(length, dtype=np.int64)),
"task_index": list(np.zeros(length, dtype=np.int64)),
}
self.hf_dataset = _RawColumns(columns)
self._items = [
{
"actions": actions[i],
"state": states[i],
"episode_index": np.asarray(0, dtype=np.int64),
"frame_index": np.asarray(i, dtype=np.int64),
"task_index": np.asarray(0, dtype=np.int64),
}
for i in range(length)
]
def __getitem__(self, index):
return self._items[index.__index__()]
def __len__(self):
return len(self._items)
def test_online_sliding_chunk_dataset_enumerates_episode_speed_pairs():
"""One sample per (episode, speed). Phase + chunk row are random per access."""
source = _OnlineSourceDataset(12)
dataset = _data_loader.OnlineSlidingChunkDataset(
source, [0.75, 1.0, 1.25], action_horizon=4
)
# 1 episode x 3 speeds
assert len(dataset) == 3
def test_online_sliding_chunk_dataset_returns_aligned_chunk_start():
"""Every access returns mask=1 and the chosen state/action_horizon shape."""
source = _OnlineSourceDataset(12)
dataset = _data_loader.OnlineSlidingChunkDataset(source, [1.25], action_horizon=4)
np.random.seed(0)
item = dataset[0]
assert item["actions"].shape == (4, 7)
assert float(item["speed"][0]) == 1.25
assert item["speed_label"] == "1p25x"
assert int(item["observation_mask"]) == 1
# State must come from a source frame (constant per-frame in the fake
# source: state[i] = i * np.ones(8)).
assert item["state"].shape == (8,)
def test_online_sliding_chunk_dataset_speed_one_fast_path():
"""speed=1.0 bypasses transform_episode and reads source verbatim."""
source = _OnlineSourceDataset(12)
dataset = _data_loader.OnlineSlidingChunkDataset(source, [1.0], action_horizon=4)
np.random.seed(0)
item = dataset[0]
assert float(item["speed"][0]) == 1.0
assert int(item["observation_mask"]) == 1
assert item["actions"].shape == (4, 7)
# All actions in the synthetic episode have action[:, 0] == 1.0
assert np.all(item["actions"][:, 0] == 1.0)
def test_torch_data_loader():
config = pi0_config.Pi0Config(action_dim=24, action_horizon=50, max_token_len=48)
dataset = _data_loader.FakeDataset(config, 16)
loader = _data_loader.TorchDataLoader(
dataset,
local_batch_size=4,
num_batches=2,
)
batches = list(loader)
assert len(batches) == 2
for batch in batches:
assert all(x.shape[0] == 4 for x in jax.tree.leaves(batch))
def test_torch_data_loader_infinite():
config = pi0_config.Pi0Config(action_dim=24, action_horizon=50, max_token_len=48)
dataset = _data_loader.FakeDataset(config, 4)
loader = _data_loader.TorchDataLoader(dataset, local_batch_size=4)
data_iter = iter(loader)
for _ in range(10):
_ = next(data_iter)
def test_torch_data_loader_parallel():
config = pi0_config.Pi0Config(action_dim=24, action_horizon=50, max_token_len=48)
dataset = _data_loader.FakeDataset(config, 10)
loader = _data_loader.TorchDataLoader(dataset, local_batch_size=4, num_batches=2, num_workers=2)
batches = list(loader)
assert len(batches) == 2
for batch in batches:
assert all(x.shape[0] == 4 for x in jax.tree.leaves(batch))
def test_with_fake_dataset():
config = _config.get_config("debug")
loader = _data_loader.create_data_loader(config, skip_norm_stats=True, num_batches=2)
batches = list(loader)
assert len(batches) == 2
for batch in batches:
assert all(x.shape[0] == config.batch_size for x in jax.tree.leaves(batch))
for _, actions in batches:
assert actions.shape == (config.batch_size, config.model.action_horizon, config.model.action_dim)
def test_with_real_dataset():
config = _config.get_config("pi0_aloha_sim")
config = dataclasses.replace(config, batch_size=4)
loader = _data_loader.create_data_loader(
config,
# Skip since we may not have the data available.
skip_norm_stats=True,
num_batches=2,
shuffle=True,
)
# Make sure that we can get the data config.
assert loader.data_config().repo_id == config.data.repo_id
batches = list(loader)
assert len(batches) == 2
for _, actions in batches:
assert actions.shape == (config.batch_size, config.model.action_horizon, config.model.action_dim)