#!/usr/bin/env python3 """ DI LeRobot Converter API ======================== Receives episode data (JSON + video URL) from the iOS app, creates a LeRobot v2.0 parquet file, uploads parquet + video to the HuggingFace dataset repo, and updates meta/info.json. Deployed as a HuggingFace Space with Gradio. The iOS app calls the /api/convert endpoint after uploading to GCS. """ import gradio as gr import json import os import tempfile import shutil from pathlib import Path import pandas as pd import numpy as np from huggingface_hub import HfApi, hf_hub_download # Config HF_DATASET_REPO = "DynamicIntelligence/humanoid-robots-training-dataset" HF_TOKEN = os.environ.get("HF_TOKEN", "") GCS_BUCKET = "di_record_intern_data" CHUNKS_SIZE = 100 def convert_episode(episode_json: str) -> str: """ Convert episode data to LeRobot v2.0 format and upload to dataset repo. Input JSON schema: { "episode_index": int, # auto-assigned if -1 "language_instruction": str, "fps": int, "frames": [ { "timestamp": float, "pose": {"x": f, "y": f, "z": f, "yaw": f, "pitch": f, "roll": f}, "left_hand": [x, y, z] or null, "right_hand": [x, y, z] or null }, ... ], "video_gcs_path": str # GCS path to rgb_video.mp4 } """ try: data = json.loads(episode_json) except json.JSONDecodeError as e: return json.dumps({"error": f"Invalid JSON: {e}"}) api = HfApi(token=HF_TOKEN) # Determine episode index episode_index = data.get("episode_index", -1) if episode_index < 0: # Auto-assign: read current info.json to get next index try: info_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename="meta/info.json", repo_type="dataset", token=HF_TOKEN ) with open(info_path) as f: info = json.load(f) episode_index = info.get("total_episodes", 0) except Exception: episode_index = 0 lang = data.get("language_instruction", "") fps = data.get("fps", 30) or 30 frames = data.get("frames", []) num_frames = len(frames) if num_frames == 0: return json.dumps({"error": "No frames in episode data"}) # Build parquet rows rows = [] for i, frame in enumerate(frames): pose = frame.get("pose", {}) cam_x = pose.get("x", 0) cam_y = pose.get("y", 0) cam_z = pose.get("z", 0) cam_roll = pose.get("roll", 0) cam_pitch = pose.get("pitch", 0) cam_yaw = pose.get("yaw", 0) camera_pose = [cam_x, cam_y, cam_z, cam_roll, cam_pitch, cam_yaw] # Hand data: [x, y, z] from end_effector → pad to 9 values (3 joints × xyz) lh = frame.get("left_hand") or [0, 0, 0] rh = frame.get("right_hand") or [0, 0, 0] # Pad single palm position to 3-joint format (wrist=palm, others=0) left_hand = list(lh[:3]) + [0.0] * 6 right_hand = list(rh[:3]) + [0.0] * 6 # Action deltas if i > 0: prev = frames[i - 1] pp = prev.get("pose", {}) prev_cam = [pp.get("x", 0), pp.get("y", 0), pp.get("z", 0), pp.get("roll", 0), pp.get("pitch", 0), pp.get("yaw", 0)] cam_delta = [camera_pose[j] - prev_cam[j] for j in range(6)] plh = prev.get("left_hand") or [0, 0, 0] prh = prev.get("right_hand") or [0, 0, 0] lh_delta = [lh[j] - plh[j] if j < len(lh) and j < len(plh) else 0 for j in range(3)] + [0.0] * 6 rh_delta = [rh[j] - prh[j] if j < len(rh) and j < len(prh) else 0 for j in range(3)] + [0.0] * 6 else: cam_delta = [0.0] * 6 lh_delta = [0.0] * 9 rh_delta = [0.0] * 9 rows.append({ "episode_index": episode_index, "frame_index": i, "timestamp": frame.get("timestamp", i / fps), "observation.camera_pose": camera_pose, "observation.left_hand": left_hand, "observation.right_hand": right_hand, "action.camera_delta": cam_delta, "action.left_hand_delta": lh_delta, "action.right_hand_delta": rh_delta, "language_instruction": lang, "next.done": i == num_frames - 1, }) # Create parquet tmp = Path(tempfile.mkdtemp()) try: df = pd.DataFrame(rows) chunk_idx = episode_index // CHUNKS_SIZE parquet_path = tmp / f"episode_{episode_index:06d}.parquet" df.to_parquet(parquet_path, index=False) # Upload parquet api.upload_file( path_or_fileobj=str(parquet_path), path_in_repo=f"data/chunk-{chunk_idx:03d}/episode_{episode_index:06d}.parquet", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) # Upload video from GCS if provided video_gcs_path = data.get("video_gcs_path", "") video_gcs_url = data.get("video_gcs_url", "") video_uploaded = False if video_gcs_url: # Download from GCS public URL and re-upload to HF import urllib.request video_local = tmp / "rgb_video.mp4" try: urllib.request.urlretrieve(video_gcs_url, str(video_local)) api.upload_file( path_or_fileobj=str(video_local), path_in_repo=f"videos/chunk-{chunk_idx:03d}/rgb/episode_{episode_index:06d}.mp4", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) video_uploaded = True except Exception as ve: pass # Video upload is optional # Update meta/info.json try: existing_info_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename="meta/info.json", repo_type="dataset", token=HF_TOKEN ) with open(existing_info_path) as f: info = json.load(f) info["total_episodes"] = max(info.get("total_episodes", 0), episode_index + 1) info["total_frames"] = info.get("total_frames", 0) + num_frames info["splits"] = {"train": f"0:{info['total_episodes']}"} info["total_chunks"] = (info["total_episodes"] - 1) // CHUNKS_SIZE + 1 if video_uploaded: info["total_videos"] = info.get("total_videos", 0) + 1 except Exception: info = build_default_info(episode_index, num_frames) meta_dir = tmp / "meta" meta_dir.mkdir(exist_ok=True) with open(meta_dir / "info.json", "w") as f: json.dump(info, f, indent=2) api.upload_folder( folder_path=str(meta_dir), path_in_repo="meta", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) result = { "success": True, "episode_index": episode_index, "num_frames": num_frames, "parquet_uploaded": True, "video_uploaded": video_uploaded, "dataset_url": f"https://huggingface.co/datasets/{HF_DATASET_REPO}", } return json.dumps(result) finally: shutil.rmtree(tmp, ignore_errors=True) def build_default_info(episode_index, num_frames): return { "codebase_version": "v2.0", "robot_type": "unknown", "total_episodes": episode_index + 1, "total_frames": num_frames, "total_tasks": 1, "total_videos": 1, "total_chunks": 1, "chunks_size": CHUNKS_SIZE, "fps": 30, "splits": {"train": f"0:{episode_index + 1}"}, "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", "video_path": "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4", "features": { "observation.camera_pose": {"dtype": "float32", "shape": [6], "names": ["x", "y", "z", "roll", "pitch", "yaw"]}, "observation.left_hand": {"dtype": "float32", "shape": [9], "names": ["wrist_x", "wrist_y", "wrist_z", "thumb_x", "thumb_y", "thumb_z", "index_x", "index_y", "index_z"]}, "observation.right_hand": {"dtype": "float32", "shape": [9], "names": ["wrist_x", "wrist_y", "wrist_z", "index_x", "index_y", "index_z", "middle_x", "middle_y", "middle_z"]}, "action.camera_delta": {"dtype": "float32", "shape": [6], "names": ["dx", "dy", "dz", "droll", "dpitch", "dyaw"]}, "action.left_hand_delta": {"dtype": "float32", "shape": [9], "names": ["wrist_dx", "wrist_dy", "wrist_dz", "thumb_dx", "thumb_dy", "thumb_dz", "index_dx", "index_dy", "index_dz"]}, "action.right_hand_delta": {"dtype": "float32", "shape": [9], "names": ["wrist_dx", "wrist_dy", "wrist_dz", "index_dx", "index_dy", "index_dz", "middle_dx", "middle_dy", "middle_dz"]}, "language_instruction": {"dtype": "string", "shape": [1], "names": None}, "timestamp": {"dtype": "float64", "shape": [1], "names": None}, "frame_index": {"dtype": "int64", "shape": [1], "names": None}, "episode_index": {"dtype": "int64", "shape": [1], "names": None}, "next.done": {"dtype": "bool", "shape": [1], "names": None}, "rgb": {"dtype": "video", "shape": [480, 640, 3], "names": ["height", "width", "channels"], "video_info": {"video.fps": 30, "video.codec": "h264", "video.pix_fmt": "yuv420p", "video.is_depth_map": False, "has_audio": False}}, }, "videos": { "rgb": {"video_info": {"video.fps": 30, "video.codec": "h264", "video.pix_fmt": "yuv420p", "video.is_depth_map": False, "has_audio": False}} }, } # Gradio UI (also exposes /api/convert endpoint automatically) demo = gr.Interface( fn=convert_episode, inputs=gr.Textbox(label="Episode JSON", lines=10, placeholder="Paste episode JSON here..."), outputs=gr.Textbox(label="Result"), title="DI LeRobot Converter", description="Converts episode data from DI iOS app to LeRobot v2.0 format and uploads to HuggingFace dataset repo.", api_name="convert", ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)