VLAarchtests2 / VLAarchtests /tests /test_rlbench_dataset_task_metadata.py
lsnu's picture
Add files using upload-large-folder tool
9c74dfe verified
from __future__ import annotations
import pickle
from pathlib import Path
from types import SimpleNamespace
import numpy as np
from PIL import Image
import torch
from torch.utils.data import DataLoader
from sim_rlbench.dataset import RLBenchOfflineChunkDataset, sample_weight_from_action_norms
def _arm(pose_xyz: tuple[float, float, float], gripper_open: float) -> SimpleNamespace:
return SimpleNamespace(
gripper_pose=np.asarray([*pose_xyz, 0.0, 0.0, 0.0, 1.0], dtype=np.float32),
joint_positions=np.linspace(0.0, 0.6, 7, dtype=np.float32),
gripper_open=float(gripper_open),
)
def _obs(step: int) -> SimpleNamespace:
offset = 0.01 * float(step)
return SimpleNamespace(
right=_arm((0.10 + offset, 0.20, 0.30), 1.0),
left=_arm((-0.10 - offset, 0.15, 0.25), 0.0),
)
def _write_rgb_frame(directory: Path, step: int, value: int) -> None:
directory.mkdir(parents=True, exist_ok=True)
image = np.full((8, 8, 3), fill_value=value, dtype=np.uint8)
Image.fromarray(image).save(directory / f"rgb_{step:04d}.png")
def test_rlbench_dataset_emits_task_metadata(tmp_path: Path) -> None:
task_name = "bimanual_push_box"
episode_dir = tmp_path / task_name / "all_variations" / "episodes" / "episode0"
episode_dir.mkdir(parents=True, exist_ok=True)
with (episode_dir / "variation_descriptions.pkl").open("wb") as handle:
pickle.dump(["push the box together"], handle)
with (episode_dir / "low_dim_obs.pkl").open("wb") as handle:
pickle.dump([_obs(0), _obs(1)], handle)
for camera_name, pixel_value in (("front", 32), ("wrist_left", 96), ("wrist_right", 160)):
camera_dir = episode_dir / f"{camera_name}_rgb"
_write_rgb_frame(camera_dir, 0, pixel_value)
_write_rgb_frame(camera_dir, 1, pixel_value + 1)
dataset = RLBenchOfflineChunkDataset(
dataset_root=tmp_path,
tasks=[task_name],
episode_indices=[0],
resolution=8,
chunk_size=2,
history_steps=1,
)
item = dataset[0]
assert item["task"] == task_name
assert item["task_name"] == task_name
assert int(item["task_id"]) == -1
assert item["texts"] == "push the box together"
batch = next(iter(DataLoader(dataset, batch_size=1, shuffle=False)))
assert batch["task_name"] == [task_name]
assert batch["task_id"].tolist() == [-1]
def test_sample_weight_from_action_norms_biases_large_actions() -> None:
weights = sample_weight_from_action_norms(
torch.tensor([0.01, 0.05, 0.20], dtype=torch.float32),
min_norm=0.05,
power=2.0,
)
assert np.allclose(weights.numpy(), np.asarray([1.0, 1.0, 16.0], dtype=np.float32))