Spaces:
Sleeping
Sleeping
| """ | |
| Offline preprocessing: download LeRobot episodes and convert to .rrd files. | |
| Downloads 3 episodes each from 6 OpenX datasets covering diverse robot types. | |
| Reads parquet + mp4 files directly (no lerobot dependency needed). | |
| """ | |
| import json | |
| import os | |
| import numpy as np | |
| from pathlib import Path | |
| import rerun as rr | |
| import pandas as pd | |
| import av | |
| from huggingface_hub import hf_hub_download | |
| DATA_DIR = Path(__file__).parent / "data" | |
| # ---- Dataset Configuration ---- | |
| # chunk_size is how many episodes per chunk directory (1000 for most v2.0 datasets) | |
| DATASETS = [ | |
| { | |
| "slug": "fractal20220817_data", | |
| "repo_id": "IPEC-COMMUNITY/fractal20220817_data_lerobot", | |
| "robot_type": "Google Robot", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": ["observation.images.image"], | |
| "camera_labels": ["main"], | |
| "fps": 3, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 87212, | |
| }, | |
| { | |
| "slug": "droid", | |
| "repo_id": "IPEC-COMMUNITY/droid_lerobot", | |
| "robot_type": "Franka Panda", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": [ | |
| "observation.images.exterior_image_1_left", | |
| "observation.images.exterior_image_2_left", | |
| "observation.images.wrist_image_left", | |
| ], | |
| "camera_labels": ["exterior_left", "exterior_right", "wrist"], | |
| "fps": 15, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 92233, | |
| }, | |
| { | |
| "slug": "bridge_orig", | |
| "repo_id": "IPEC-COMMUNITY/bridge_orig_lerobot", | |
| "robot_type": "WidowX", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": [ | |
| "observation.images.image_0", | |
| "observation.images.image_1", | |
| "observation.images.image_2", | |
| "observation.images.image_3", | |
| ], | |
| "camera_labels": ["cam_0", "cam_1", "cam_2", "cam_3"], | |
| "fps": 5, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 53192, | |
| }, | |
| { | |
| "slug": "dobbe", | |
| "repo_id": "IPEC-COMMUNITY/dobbe_lerobot", | |
| "robot_type": "Hello Stretch", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": ["observation.images.wrist_image"], | |
| "camera_labels": ["wrist"], | |
| "fps": 3.75, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 5208, | |
| }, | |
| { | |
| "slug": "jaco_play", | |
| "repo_id": "IPEC-COMMUNITY/jaco_play_lerobot", | |
| "robot_type": "Kinova Jaco", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": [ | |
| "observation.images.image", | |
| "observation.images.image_wrist", | |
| ], | |
| "camera_labels": ["main", "wrist"], | |
| "fps": 10, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 976, | |
| }, | |
| { | |
| "slug": "kuka", | |
| "repo_id": "IPEC-COMMUNITY/kuka_lerobot", | |
| "robot_type": "KUKA IIWA", | |
| "episodes": [0, 1, 2], | |
| "camera_keys": ["observation.images.image"], | |
| "camera_labels": ["main"], | |
| "fps": 10, | |
| "chunk_size": 1000, | |
| "state_dim": 8, | |
| "action_dim": 7, | |
| "num_episodes_total": 209880, | |
| }, | |
| ] | |
| def estimate_intrinsics(h, w): | |
| """Estimate pinhole intrinsics from image resolution.""" | |
| fx = fy = max(h, w) * 1.2 | |
| cx, cy = w / 2, h / 2 | |
| return np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32) | |
| def download_file(repo_id, path, cache_dir=None): | |
| """Download a file from HF and return local path. Returns None if not found.""" | |
| try: | |
| return hf_hub_download(repo_id, path, repo_type="dataset", cache_dir=cache_dir) | |
| except Exception: | |
| return None | |
| def read_video_frames(video_path): | |
| """Decode all frames from an MP4 file. Returns list of (H, W, C) numpy arrays.""" | |
| container = av.open(video_path) | |
| stream = container.streams.video[0] | |
| frames = [] | |
| for frame in container.decode(stream): | |
| img = frame.to_ndarray(format="rgb24") # (H, W, 3) uint8 | |
| frames.append(img) | |
| container.close() | |
| return frames | |
| def process_episode(ds_config, ep_idx): | |
| """Download and convert a single episode to .rrd format. | |
| Returns (rrd_path, num_frames) or (None, None) on failure.""" | |
| slug = ds_config["slug"] | |
| repo_id = ds_config["repo_id"] | |
| chunk_size = ds_config["chunk_size"] | |
| camera_keys = ds_config["camera_keys"] | |
| camera_labels = ds_config["camera_labels"] | |
| fps = ds_config["fps"] | |
| out_dir = DATA_DIR / slug | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| rrd_path = out_dir / f"episode_{ep_idx:06d}.rrd" | |
| if rrd_path.exists(): | |
| # Get frame count from cached parquet | |
| chunk = ep_idx // chunk_size | |
| ep_str = f"episode_{ep_idx:06d}" | |
| pq_path = f"data/chunk-{chunk:03d}/{ep_str}.parquet" | |
| local_pq = download_file(repo_id, pq_path) | |
| num_frames = len(pd.read_parquet(local_pq)) if local_pq else "?" | |
| print(f" Episode {ep_idx}: already exists ({rrd_path.stat().st_size / 1024:.0f} KB)") | |
| return rrd_path, num_frames | |
| chunk = ep_idx // chunk_size | |
| ep_str = f"episode_{ep_idx:06d}" | |
| # Download parquet file | |
| pq_path = f"data/chunk-{chunk:03d}/{ep_str}.parquet" | |
| local_pq = download_file(repo_id, pq_path) | |
| if local_pq is None: | |
| print(f" Episode {ep_idx}: parquet not found ({pq_path})") | |
| return None, None | |
| # Read parquet data | |
| df = pd.read_parquet(local_pq) | |
| num_frames = len(df) | |
| print(f" Episode {ep_idx}: {num_frames} frames, parquet OK, downloading videos...") | |
| # Download and decode video for each camera | |
| camera_frames = {} | |
| for ck, cl in zip(camera_keys, camera_labels): | |
| video_path = f"videos/chunk-{chunk:03d}/{ck}/{ep_str}.mp4" | |
| local_video = download_file(repo_id, video_path) | |
| if local_video is None: | |
| print(f" Camera '{cl}': video not found, skipping") | |
| continue | |
| frames = read_video_frames(local_video) | |
| camera_frames[cl] = frames | |
| print(f" Camera '{cl}': {len(frames)} frames decoded") | |
| if not camera_frames: | |
| print(f" Episode {ep_idx}: no video data found, skipping") | |
| return None, None | |
| # Create .rrd recording | |
| rec_id = f"{slug}_ep{ep_idx:06d}" | |
| rr.init("openx_viewer", recording_id=rec_id) | |
| # Determine camera positions (spread around the scene) | |
| positions = [ | |
| (0.8, 0.2, 0.6), | |
| (-0.8, 0.2, 0.6), | |
| (0.2, 0.8, 0.4), | |
| (-0.2, -0.8, 0.4), | |
| ] | |
| # Get image resolution from first frame of first camera | |
| first_cam = list(camera_frames.keys())[0] | |
| first_img = camera_frames[first_cam][0] | |
| h, w = first_img.shape[:2] | |
| # Log camera setups (static) | |
| for i, (cl, frames) in enumerate(camera_frames.items()): | |
| pos = positions[i % len(positions)] | |
| cam_path = f"cameras/{cl}" | |
| rr.log(cam_path, rr.Transform3D(translation=pos), static=True) | |
| rr.log( | |
| f"{cam_path}/image", | |
| rr.Pinhole( | |
| image_from_camera=estimate_intrinsics(h, w), | |
| resolution=[w, h], | |
| camera_xyz=rr.ViewCoordinates.RDF, | |
| image_plane_distance=0.5, | |
| ), | |
| static=True, | |
| ) | |
| # Note: subsequent cameras might have different resolutions but we keep same intrinsics | |
| # as they're estimates anyway | |
| # State/action labels | |
| state_cols = [c for c in df.columns if c == "observation.state"] | |
| action_cols = [c for c in df.columns if c == "action"] | |
| state_labels = ["x", "y", "z", "rx", "ry", "rz", "rw", "gripper"] | |
| action_labels = ["ax", "ay", "az", "aroll", "apitch", "ayaw", "agripper"] | |
| # Log frames | |
| n_video_frames = min(len(frames) for frames in camera_frames.values()) | |
| for frame_i in range(n_video_frames): | |
| rr.set_time_sequence("frame", frame_i) | |
| rr.set_time_seconds("time", frame_i / fps) | |
| # Log camera images | |
| for cl, frames in camera_frames.items(): | |
| if frame_i < len(frames): | |
| rr.log(f"cameras/{cl}/image", rr.Image(frames[frame_i], color_model="RGB")) | |
| # Log state if available and within bounds | |
| if state_cols and frame_i < num_frames: | |
| state = np.asarray(df.iloc[frame_i][state_cols[0]]).flatten() | |
| for j, label in enumerate(state_labels): | |
| if j < len(state): | |
| rr.log(f"state/{label}", rr.Scalars([float(state[j])])) | |
| # Log action if available and within bounds | |
| if action_cols and frame_i < num_frames: | |
| action = np.asarray(df.iloc[frame_i][action_cols[0]]).flatten() | |
| for j, label in enumerate(action_labels): | |
| if j < len(action): | |
| rr.log(f"action/{label}", rr.Scalars([float(action[j])])) | |
| rr.save(str(rrd_path)) | |
| rr.disconnect() | |
| size_kb = rrd_path.stat().st_size / 1024 | |
| print(f" Episode {ep_idx}: saved {size_kb:.0f} KB") | |
| return rrd_path, num_frames | |
| def process_dataset(ds_config): | |
| """Process all episodes for a dataset.""" | |
| slug = ds_config["slug"] | |
| print(f"\n{'='*60}") | |
| print(f"{ds_config['robot_type']} — {ds_config['repo_id']}") | |
| print(f" Cameras: {ds_config['camera_labels']}") | |
| print(f" Episodes: {ds_config['episodes']}") | |
| manifest_entry = { | |
| "slug": slug, | |
| "repo_id": ds_config["repo_id"], | |
| "robot_type": ds_config["robot_type"], | |
| "fps": ds_config["fps"], | |
| "camera_keys": ds_config["camera_keys"], | |
| "camera_labels": ds_config["camera_labels"], | |
| "state_dim": ds_config.get("state_dim", 8), | |
| "action_dim": ds_config.get("action_dim", 7), | |
| "num_episodes_total": ds_config.get("num_episodes_total", 0), | |
| "episodes_available": [], | |
| } | |
| for ep_idx in ds_config["episodes"]: | |
| rrd_path, num_frames = process_episode(ds_config, ep_idx) | |
| if rrd_path: | |
| manifest_entry["episodes_available"].append({ | |
| "index": ep_idx, | |
| "frames": num_frames, | |
| "file": f"data/{slug}/episode_{ep_idx:06d}.rrd", | |
| }) | |
| return manifest_entry | |
| def main(): | |
| manifests = [] | |
| for ds_config in DATASETS: | |
| meta = process_dataset(ds_config) | |
| manifests.append(meta) | |
| manifest_path = DATA_DIR / "manifest.json" | |
| manifest_path.write_text(json.dumps(manifests, indent=2)) | |
| print(f"\n{'='*60}") | |
| print(f"Manifest written to {manifest_path}") | |
| print(f"Total datasets: {len(manifests)}") | |
| total_eps = sum(len(m["episodes_available"]) for m in manifests) | |
| print(f"Total episodes: {total_eps}") | |
| if __name__ == "__main__": | |
| main() | |