| """ |
| Script to convert Aloha hdf5 data to the LeRobot dataset v2.0 format. |
| |
| Example usage: uv run examples/aloha_real/convert_aloha_data_to_lerobot.py --raw-dir /path/to/raw/data --repo-id <org>/<dataset-name> |
| """ |
|
|
| import dataclasses |
| from pathlib import Path |
| import shutil |
| from typing import Literal |
|
|
| import h5py |
| from lerobot.common.datasets.lerobot_dataset import LEROBOT_HOME |
| from lerobot.common.datasets.lerobot_dataset import LeRobotDataset |
| from lerobot.common.datasets.push_dataset_to_hub._download_raw import download_raw |
| import numpy as np |
| import torch |
| import tqdm |
| import tyro |
|
|
|
|
| @dataclasses.dataclass(frozen=True) |
| class DatasetConfig: |
| use_videos: bool = True |
| tolerance_s: float = 0.0001 |
| image_writer_processes: int = 10 |
| image_writer_threads: int = 5 |
| video_backend: str | None = None |
|
|
|
|
| DEFAULT_DATASET_CONFIG = DatasetConfig() |
|
|
|
|
| def create_empty_dataset( |
| repo_id: str, |
| robot_type: str, |
| mode: Literal["video", "image"] = "video", |
| *, |
| has_velocity: bool = False, |
| has_effort: bool = False, |
| dataset_config: DatasetConfig = DEFAULT_DATASET_CONFIG, |
| ) -> LeRobotDataset: |
| motors = [ |
| "right_waist", |
| "right_shoulder", |
| "right_elbow", |
| "right_forearm_roll", |
| "right_wrist_angle", |
| "right_wrist_rotate", |
| "right_gripper", |
| "left_waist", |
| "left_shoulder", |
| "left_elbow", |
| "left_forearm_roll", |
| "left_wrist_angle", |
| "left_wrist_rotate", |
| "left_gripper", |
| ] |
| cameras = [ |
| "cam_high", |
| "cam_low", |
| "cam_left_wrist", |
| "cam_right_wrist", |
| ] |
|
|
| features = { |
| "observation.state": { |
| "dtype": "float32", |
| "shape": (len(motors),), |
| "names": [ |
| motors, |
| ], |
| }, |
| "action": { |
| "dtype": "float32", |
| "shape": (len(motors),), |
| "names": [ |
| motors, |
| ], |
| }, |
| } |
|
|
| if has_velocity: |
| features["observation.velocity"] = { |
| "dtype": "float32", |
| "shape": (len(motors),), |
| "names": [ |
| motors, |
| ], |
| } |
|
|
| if has_effort: |
| features["observation.effort"] = { |
| "dtype": "float32", |
| "shape": (len(motors),), |
| "names": [ |
| motors, |
| ], |
| } |
|
|
| for cam in cameras: |
| features[f"observation.images.{cam}"] = { |
| "dtype": mode, |
| "shape": (3, 480, 640), |
| "names": [ |
| "channels", |
| "height", |
| "width", |
| ], |
| } |
|
|
| if Path(LEROBOT_HOME / repo_id).exists(): |
| shutil.rmtree(LEROBOT_HOME / repo_id) |
|
|
| return LeRobotDataset.create( |
| repo_id=repo_id, |
| fps=50, |
| robot_type=robot_type, |
| features=features, |
| use_videos=dataset_config.use_videos, |
| tolerance_s=dataset_config.tolerance_s, |
| image_writer_processes=dataset_config.image_writer_processes, |
| image_writer_threads=dataset_config.image_writer_threads, |
| video_backend=dataset_config.video_backend, |
| ) |
|
|
|
|
| def get_cameras(hdf5_files: list[Path]) -> list[str]: |
| with h5py.File(hdf5_files[0], "r") as ep: |
| |
| return [key for key in ep["/observations/images"].keys() if "depth" not in key] |
|
|
|
|
| def has_velocity(hdf5_files: list[Path]) -> bool: |
| with h5py.File(hdf5_files[0], "r") as ep: |
| return "/observations/qvel" in ep |
|
|
|
|
| def has_effort(hdf5_files: list[Path]) -> bool: |
| with h5py.File(hdf5_files[0], "r") as ep: |
| return "/observations/effort" in ep |
|
|
|
|
| def load_raw_images_per_camera(ep: h5py.File, cameras: list[str]) -> dict[str, np.ndarray]: |
| imgs_per_cam = {} |
| for camera in cameras: |
| uncompressed = ep[f"/observations/images/{camera}"].ndim == 4 |
|
|
| if uncompressed: |
| |
| imgs_array = ep[f"/observations/images/{camera}"][:] |
| else: |
| import cv2 |
|
|
| |
| imgs_array = [] |
| for data in ep[f"/observations/images/{camera}"]: |
| imgs_array.append(cv2.cvtColor(cv2.imdecode(data, 1), cv2.COLOR_BGR2RGB)) |
| imgs_array = np.array(imgs_array) |
|
|
| imgs_per_cam[camera] = imgs_array |
| return imgs_per_cam |
|
|
|
|
| def load_raw_episode_data( |
| ep_path: Path, |
| ) -> tuple[dict[str, np.ndarray], torch.Tensor, torch.Tensor, torch.Tensor | None, torch.Tensor | None]: |
| with h5py.File(ep_path, "r") as ep: |
| state = torch.from_numpy(ep["/observations/qpos"][:]) |
| action = torch.from_numpy(ep["/action"][:]) |
|
|
| velocity = None |
| if "/observations/qvel" in ep: |
| velocity = torch.from_numpy(ep["/observations/qvel"][:]) |
|
|
| effort = None |
| if "/observations/effort" in ep: |
| effort = torch.from_numpy(ep["/observations/effort"][:]) |
|
|
| imgs_per_cam = load_raw_images_per_camera( |
| ep, |
| [ |
| "cam_high", |
| "cam_low", |
| "cam_left_wrist", |
| "cam_right_wrist", |
| ], |
| ) |
|
|
| return imgs_per_cam, state, action, velocity, effort |
|
|
|
|
| def populate_dataset( |
| dataset: LeRobotDataset, |
| hdf5_files: list[Path], |
| task: str, |
| episodes: list[int] | None = None, |
| ) -> LeRobotDataset: |
| if episodes is None: |
| episodes = range(len(hdf5_files)) |
|
|
| for ep_idx in tqdm.tqdm(episodes): |
| ep_path = hdf5_files[ep_idx] |
|
|
| imgs_per_cam, state, action, velocity, effort = load_raw_episode_data(ep_path) |
| num_frames = state.shape[0] |
|
|
| for i in range(num_frames): |
| frame = { |
| "observation.state": state[i], |
| "action": action[i], |
| } |
|
|
| for camera, img_array in imgs_per_cam.items(): |
| frame[f"observation.images.{camera}"] = img_array[i] |
|
|
| if velocity is not None: |
| frame["observation.velocity"] = velocity[i] |
| if effort is not None: |
| frame["observation.effort"] = effort[i] |
|
|
| dataset.add_frame(frame) |
|
|
| dataset.save_episode(task=task) |
|
|
| return dataset |
|
|
|
|
| def port_aloha( |
| raw_dir: Path, |
| repo_id: str, |
| raw_repo_id: str | None = None, |
| task: str = "DEBUG", |
| *, |
| episodes: list[int] | None = None, |
| push_to_hub: bool = True, |
| is_mobile: bool = False, |
| mode: Literal["video", "image"] = "image", |
| dataset_config: DatasetConfig = DEFAULT_DATASET_CONFIG, |
| ): |
| if (LEROBOT_HOME / repo_id).exists(): |
| shutil.rmtree(LEROBOT_HOME / repo_id) |
|
|
| if not raw_dir.exists(): |
| if raw_repo_id is None: |
| raise ValueError("raw_repo_id must be provided if raw_dir does not exist") |
| download_raw(raw_dir, repo_id=raw_repo_id) |
|
|
| hdf5_files = sorted(raw_dir.glob("episode_*.hdf5")) |
|
|
| dataset = create_empty_dataset( |
| repo_id, |
| robot_type="mobile_aloha" if is_mobile else "aloha", |
| mode=mode, |
| has_effort=has_effort(hdf5_files), |
| has_velocity=has_velocity(hdf5_files), |
| dataset_config=dataset_config, |
| ) |
| dataset = populate_dataset( |
| dataset, |
| hdf5_files, |
| task=task, |
| episodes=episodes, |
| ) |
| dataset.consolidate() |
|
|
| if push_to_hub: |
| dataset.push_to_hub() |
|
|
|
|
| if __name__ == "__main__": |
| tyro.cli(port_aloha) |
|
|