""" Offline preprocessing: download LeRobot episodes and convert to .rrd files. Downloads 3 episodes each from 6 OpenX datasets covering diverse robot types. Reads parquet + mp4 files directly (no lerobot dependency needed). """ import json import os import numpy as np from pathlib import Path import rerun as rr import pandas as pd import av from huggingface_hub import hf_hub_download DATA_DIR = Path(__file__).parent / "data" # ---- Dataset Configuration ---- # chunk_size is how many episodes per chunk directory (1000 for most v2.0 datasets) DATASETS = [ { "slug": "fractal20220817_data", "repo_id": "IPEC-COMMUNITY/fractal20220817_data_lerobot", "robot_type": "Google Robot", "episodes": [0, 1, 2], "camera_keys": ["observation.images.image"], "camera_labels": ["main"], "fps": 3, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 87212, }, { "slug": "droid", "repo_id": "IPEC-COMMUNITY/droid_lerobot", "robot_type": "Franka Panda", "episodes": [0, 1, 2], "camera_keys": [ "observation.images.exterior_image_1_left", "observation.images.exterior_image_2_left", "observation.images.wrist_image_left", ], "camera_labels": ["exterior_left", "exterior_right", "wrist"], "fps": 15, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 92233, }, { "slug": "bridge_orig", "repo_id": "IPEC-COMMUNITY/bridge_orig_lerobot", "robot_type": "WidowX", "episodes": [0, 1, 2], "camera_keys": [ "observation.images.image_0", "observation.images.image_1", "observation.images.image_2", "observation.images.image_3", ], "camera_labels": ["cam_0", "cam_1", "cam_2", "cam_3"], "fps": 5, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 53192, }, { "slug": "dobbe", "repo_id": "IPEC-COMMUNITY/dobbe_lerobot", "robot_type": "Hello Stretch", "episodes": [0, 1, 2], "camera_keys": ["observation.images.wrist_image"], "camera_labels": ["wrist"], "fps": 3.75, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 5208, }, { "slug": "jaco_play", "repo_id": "IPEC-COMMUNITY/jaco_play_lerobot", "robot_type": "Kinova Jaco", "episodes": [0, 1, 2], "camera_keys": [ "observation.images.image", "observation.images.image_wrist", ], "camera_labels": ["main", "wrist"], "fps": 10, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 976, }, { "slug": "kuka", "repo_id": "IPEC-COMMUNITY/kuka_lerobot", "robot_type": "KUKA IIWA", "episodes": [0, 1, 2], "camera_keys": ["observation.images.image"], "camera_labels": ["main"], "fps": 10, "chunk_size": 1000, "state_dim": 8, "action_dim": 7, "num_episodes_total": 209880, }, ] def estimate_intrinsics(h, w): """Estimate pinhole intrinsics from image resolution.""" fx = fy = max(h, w) * 1.2 cx, cy = w / 2, h / 2 return np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32) def download_file(repo_id, path, cache_dir=None): """Download a file from HF and return local path. Returns None if not found.""" try: return hf_hub_download(repo_id, path, repo_type="dataset", cache_dir=cache_dir) except Exception: return None def read_video_frames(video_path): """Decode all frames from an MP4 file. Returns list of (H, W, C) numpy arrays.""" container = av.open(video_path) stream = container.streams.video[0] frames = [] for frame in container.decode(stream): img = frame.to_ndarray(format="rgb24") # (H, W, 3) uint8 frames.append(img) container.close() return frames def process_episode(ds_config, ep_idx): """Download and convert a single episode to .rrd format. Returns (rrd_path, num_frames) or (None, None) on failure.""" slug = ds_config["slug"] repo_id = ds_config["repo_id"] chunk_size = ds_config["chunk_size"] camera_keys = ds_config["camera_keys"] camera_labels = ds_config["camera_labels"] fps = ds_config["fps"] out_dir = DATA_DIR / slug out_dir.mkdir(parents=True, exist_ok=True) rrd_path = out_dir / f"episode_{ep_idx:06d}.rrd" if rrd_path.exists(): # Get frame count from cached parquet chunk = ep_idx // chunk_size ep_str = f"episode_{ep_idx:06d}" pq_path = f"data/chunk-{chunk:03d}/{ep_str}.parquet" local_pq = download_file(repo_id, pq_path) num_frames = len(pd.read_parquet(local_pq)) if local_pq else "?" print(f" Episode {ep_idx}: already exists ({rrd_path.stat().st_size / 1024:.0f} KB)") return rrd_path, num_frames chunk = ep_idx // chunk_size ep_str = f"episode_{ep_idx:06d}" # Download parquet file pq_path = f"data/chunk-{chunk:03d}/{ep_str}.parquet" local_pq = download_file(repo_id, pq_path) if local_pq is None: print(f" Episode {ep_idx}: parquet not found ({pq_path})") return None, None # Read parquet data df = pd.read_parquet(local_pq) num_frames = len(df) print(f" Episode {ep_idx}: {num_frames} frames, parquet OK, downloading videos...") # Download and decode video for each camera camera_frames = {} for ck, cl in zip(camera_keys, camera_labels): video_path = f"videos/chunk-{chunk:03d}/{ck}/{ep_str}.mp4" local_video = download_file(repo_id, video_path) if local_video is None: print(f" Camera '{cl}': video not found, skipping") continue frames = read_video_frames(local_video) camera_frames[cl] = frames print(f" Camera '{cl}': {len(frames)} frames decoded") if not camera_frames: print(f" Episode {ep_idx}: no video data found, skipping") return None, None # Create .rrd recording rec_id = f"{slug}_ep{ep_idx:06d}" rr.init("openx_viewer", recording_id=rec_id) # Determine camera positions (spread around the scene) positions = [ (0.8, 0.2, 0.6), (-0.8, 0.2, 0.6), (0.2, 0.8, 0.4), (-0.2, -0.8, 0.4), ] # Get image resolution from first frame of first camera first_cam = list(camera_frames.keys())[0] first_img = camera_frames[first_cam][0] h, w = first_img.shape[:2] # Log camera setups (static) for i, (cl, frames) in enumerate(camera_frames.items()): pos = positions[i % len(positions)] cam_path = f"cameras/{cl}" rr.log(cam_path, rr.Transform3D(translation=pos), static=True) rr.log( f"{cam_path}/image", rr.Pinhole( image_from_camera=estimate_intrinsics(h, w), resolution=[w, h], camera_xyz=rr.ViewCoordinates.RDF, image_plane_distance=0.5, ), static=True, ) # Note: subsequent cameras might have different resolutions but we keep same intrinsics # as they're estimates anyway # State/action labels state_cols = [c for c in df.columns if c == "observation.state"] action_cols = [c for c in df.columns if c == "action"] state_labels = ["x", "y", "z", "rx", "ry", "rz", "rw", "gripper"] action_labels = ["ax", "ay", "az", "aroll", "apitch", "ayaw", "agripper"] # Log frames n_video_frames = min(len(frames) for frames in camera_frames.values()) for frame_i in range(n_video_frames): rr.set_time_sequence("frame", frame_i) rr.set_time_seconds("time", frame_i / fps) # Log camera images for cl, frames in camera_frames.items(): if frame_i < len(frames): rr.log(f"cameras/{cl}/image", rr.Image(frames[frame_i], color_model="RGB")) # Log state if available and within bounds if state_cols and frame_i < num_frames: state = np.asarray(df.iloc[frame_i][state_cols[0]]).flatten() for j, label in enumerate(state_labels): if j < len(state): rr.log(f"state/{label}", rr.Scalars([float(state[j])])) # Log action if available and within bounds if action_cols and frame_i < num_frames: action = np.asarray(df.iloc[frame_i][action_cols[0]]).flatten() for j, label in enumerate(action_labels): if j < len(action): rr.log(f"action/{label}", rr.Scalars([float(action[j])])) rr.save(str(rrd_path)) rr.disconnect() size_kb = rrd_path.stat().st_size / 1024 print(f" Episode {ep_idx}: saved {size_kb:.0f} KB") return rrd_path, num_frames def process_dataset(ds_config): """Process all episodes for a dataset.""" slug = ds_config["slug"] print(f"\n{'='*60}") print(f"{ds_config['robot_type']} — {ds_config['repo_id']}") print(f" Cameras: {ds_config['camera_labels']}") print(f" Episodes: {ds_config['episodes']}") manifest_entry = { "slug": slug, "repo_id": ds_config["repo_id"], "robot_type": ds_config["robot_type"], "fps": ds_config["fps"], "camera_keys": ds_config["camera_keys"], "camera_labels": ds_config["camera_labels"], "state_dim": ds_config.get("state_dim", 8), "action_dim": ds_config.get("action_dim", 7), "num_episodes_total": ds_config.get("num_episodes_total", 0), "episodes_available": [], } for ep_idx in ds_config["episodes"]: rrd_path, num_frames = process_episode(ds_config, ep_idx) if rrd_path: manifest_entry["episodes_available"].append({ "index": ep_idx, "frames": num_frames, "file": f"data/{slug}/episode_{ep_idx:06d}.rrd", }) return manifest_entry def main(): manifests = [] for ds_config in DATASETS: meta = process_dataset(ds_config) manifests.append(meta) manifest_path = DATA_DIR / "manifest.json" manifest_path.write_text(json.dumps(manifests, indent=2)) print(f"\n{'='*60}") print(f"Manifest written to {manifest_path}") print(f"Total datasets: {len(manifests)}") total_eps = sum(len(m["episodes_available"]) for m in manifests) print(f"Total episodes: {total_eps}") if __name__ == "__main__": main()