| |
| """ |
| DI LeRobot Converter API |
| ======================== |
| Receives episode data (JSON + video URL) from the iOS app, |
| creates a LeRobot v2.0 parquet file, uploads parquet + video |
| to the HuggingFace dataset repo, and updates meta/info.json. |
| |
| Deployed as a HuggingFace Space with Gradio. |
| The iOS app calls the /api/convert endpoint after uploading to GCS. |
| """ |
|
|
| import gradio as gr |
| import json |
| import os |
| import tempfile |
| import shutil |
| from pathlib import Path |
|
|
| import pandas as pd |
| import numpy as np |
| from huggingface_hub import HfApi, hf_hub_download |
|
|
| |
| HF_DATASET_REPO = "DynamicIntelligence/humanoid-robots-training-dataset" |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| GCS_BUCKET = "di_record_intern_data" |
| CHUNKS_SIZE = 100 |
|
|
|
|
| def convert_episode(episode_json: str) -> str: |
| """ |
| Convert episode data to LeRobot v2.0 format and upload to dataset repo. |
| |
| Input JSON schema: |
| { |
| "episode_index": int, # auto-assigned if -1 |
| "language_instruction": str, |
| "fps": int, |
| "frames": [ |
| { |
| "timestamp": float, |
| "pose": {"x": f, "y": f, "z": f, "yaw": f, "pitch": f, "roll": f}, |
| "left_hand": [x, y, z] or null, |
| "right_hand": [x, y, z] or null |
| }, ... |
| ], |
| "video_gcs_path": str # GCS path to rgb_video.mp4 |
| } |
| """ |
| try: |
| data = json.loads(episode_json) |
| except json.JSONDecodeError as e: |
| return json.dumps({"error": f"Invalid JSON: {e}"}) |
|
|
| api = HfApi(token=HF_TOKEN) |
|
|
| |
| episode_index = data.get("episode_index", -1) |
| if episode_index < 0: |
| |
| try: |
| info_path = hf_hub_download( |
| repo_id=HF_DATASET_REPO, filename="meta/info.json", |
| repo_type="dataset", token=HF_TOKEN |
| ) |
| with open(info_path) as f: |
| info = json.load(f) |
| episode_index = info.get("total_episodes", 0) |
| except Exception: |
| episode_index = 0 |
|
|
| lang = data.get("language_instruction", "") |
| fps = data.get("fps", 30) or 30 |
| frames = data.get("frames", []) |
| num_frames = len(frames) |
|
|
| if num_frames == 0: |
| return json.dumps({"error": "No frames in episode data"}) |
|
|
| |
| rows = [] |
| for i, frame in enumerate(frames): |
| pose = frame.get("pose", {}) |
| cam_x = pose.get("x", 0) |
| cam_y = pose.get("y", 0) |
| cam_z = pose.get("z", 0) |
| cam_roll = pose.get("roll", 0) |
| cam_pitch = pose.get("pitch", 0) |
| cam_yaw = pose.get("yaw", 0) |
| camera_pose = [cam_x, cam_y, cam_z, cam_roll, cam_pitch, cam_yaw] |
|
|
| |
| lh = frame.get("left_hand") or [0, 0, 0] |
| rh = frame.get("right_hand") or [0, 0, 0] |
| |
| left_hand = list(lh[:3]) + [0.0] * 6 |
| right_hand = list(rh[:3]) + [0.0] * 6 |
|
|
| |
| if i > 0: |
| prev = frames[i - 1] |
| pp = prev.get("pose", {}) |
| prev_cam = [pp.get("x", 0), pp.get("y", 0), pp.get("z", 0), |
| pp.get("roll", 0), pp.get("pitch", 0), pp.get("yaw", 0)] |
| cam_delta = [camera_pose[j] - prev_cam[j] for j in range(6)] |
|
|
| plh = prev.get("left_hand") or [0, 0, 0] |
| prh = prev.get("right_hand") or [0, 0, 0] |
| lh_delta = [lh[j] - plh[j] if j < len(lh) and j < len(plh) else 0 for j in range(3)] + [0.0] * 6 |
| rh_delta = [rh[j] - prh[j] if j < len(rh) and j < len(prh) else 0 for j in range(3)] + [0.0] * 6 |
| else: |
| cam_delta = [0.0] * 6 |
| lh_delta = [0.0] * 9 |
| rh_delta = [0.0] * 9 |
|
|
| rows.append({ |
| "episode_index": episode_index, |
| "frame_index": i, |
| "timestamp": frame.get("timestamp", i / fps), |
| "observation.camera_pose": camera_pose, |
| "observation.left_hand": left_hand, |
| "observation.right_hand": right_hand, |
| "action.camera_delta": cam_delta, |
| "action.left_hand_delta": lh_delta, |
| "action.right_hand_delta": rh_delta, |
| "language_instruction": lang, |
| "next.done": i == num_frames - 1, |
| }) |
|
|
| |
| tmp = Path(tempfile.mkdtemp()) |
| try: |
| df = pd.DataFrame(rows) |
| chunk_idx = episode_index // CHUNKS_SIZE |
| parquet_path = tmp / f"episode_{episode_index:06d}.parquet" |
| df.to_parquet(parquet_path, index=False) |
|
|
| |
| api.upload_file( |
| path_or_fileobj=str(parquet_path), |
| path_in_repo=f"data/chunk-{chunk_idx:03d}/episode_{episode_index:06d}.parquet", |
| repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, |
| ) |
|
|
| |
| video_gcs_path = data.get("video_gcs_path", "") |
| video_gcs_url = data.get("video_gcs_url", "") |
| video_uploaded = False |
|
|
| if video_gcs_url: |
| |
| import urllib.request |
| video_local = tmp / "rgb_video.mp4" |
| try: |
| urllib.request.urlretrieve(video_gcs_url, str(video_local)) |
| api.upload_file( |
| path_or_fileobj=str(video_local), |
| path_in_repo=f"videos/chunk-{chunk_idx:03d}/rgb/episode_{episode_index:06d}.mp4", |
| repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, |
| ) |
| video_uploaded = True |
| except Exception as ve: |
| pass |
|
|
| |
| try: |
| existing_info_path = hf_hub_download( |
| repo_id=HF_DATASET_REPO, filename="meta/info.json", |
| repo_type="dataset", token=HF_TOKEN |
| ) |
| with open(existing_info_path) as f: |
| info = json.load(f) |
| info["total_episodes"] = max(info.get("total_episodes", 0), episode_index + 1) |
| info["total_frames"] = info.get("total_frames", 0) + num_frames |
| info["splits"] = {"train": f"0:{info['total_episodes']}"} |
| info["total_chunks"] = (info["total_episodes"] - 1) // CHUNKS_SIZE + 1 |
| if video_uploaded: |
| info["total_videos"] = info.get("total_videos", 0) + 1 |
| except Exception: |
| info = build_default_info(episode_index, num_frames) |
|
|
| meta_dir = tmp / "meta" |
| meta_dir.mkdir(exist_ok=True) |
| with open(meta_dir / "info.json", "w") as f: |
| json.dump(info, f, indent=2) |
|
|
| api.upload_folder( |
| folder_path=str(meta_dir), path_in_repo="meta", |
| repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, |
| ) |
|
|
| result = { |
| "success": True, |
| "episode_index": episode_index, |
| "num_frames": num_frames, |
| "parquet_uploaded": True, |
| "video_uploaded": video_uploaded, |
| "dataset_url": f"https://huggingface.co/datasets/{HF_DATASET_REPO}", |
| } |
| return json.dumps(result) |
|
|
| finally: |
| shutil.rmtree(tmp, ignore_errors=True) |
|
|
|
|
| def build_default_info(episode_index, num_frames): |
| return { |
| "codebase_version": "v2.0", |
| "robot_type": "unknown", |
| "total_episodes": episode_index + 1, |
| "total_frames": num_frames, |
| "total_tasks": 1, |
| "total_videos": 1, |
| "total_chunks": 1, |
| "chunks_size": CHUNKS_SIZE, |
| "fps": 30, |
| "splits": {"train": f"0:{episode_index + 1}"}, |
| "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", |
| "video_path": "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4", |
| "features": { |
| "observation.camera_pose": {"dtype": "float32", "shape": [6], |
| "names": ["x", "y", "z", "roll", "pitch", "yaw"]}, |
| "observation.left_hand": {"dtype": "float32", "shape": [9], |
| "names": ["wrist_x", "wrist_y", "wrist_z", "thumb_x", "thumb_y", "thumb_z", |
| "index_x", "index_y", "index_z"]}, |
| "observation.right_hand": {"dtype": "float32", "shape": [9], |
| "names": ["wrist_x", "wrist_y", "wrist_z", "index_x", "index_y", "index_z", |
| "middle_x", "middle_y", "middle_z"]}, |
| "action.camera_delta": {"dtype": "float32", "shape": [6], |
| "names": ["dx", "dy", "dz", "droll", "dpitch", "dyaw"]}, |
| "action.left_hand_delta": {"dtype": "float32", "shape": [9], |
| "names": ["wrist_dx", "wrist_dy", "wrist_dz", "thumb_dx", "thumb_dy", |
| "thumb_dz", "index_dx", "index_dy", "index_dz"]}, |
| "action.right_hand_delta": {"dtype": "float32", "shape": [9], |
| "names": ["wrist_dx", "wrist_dy", "wrist_dz", "index_dx", "index_dy", |
| "index_dz", "middle_dx", "middle_dy", "middle_dz"]}, |
| "language_instruction": {"dtype": "string", "shape": [1], "names": None}, |
| "timestamp": {"dtype": "float64", "shape": [1], "names": None}, |
| "frame_index": {"dtype": "int64", "shape": [1], "names": None}, |
| "episode_index": {"dtype": "int64", "shape": [1], "names": None}, |
| "next.done": {"dtype": "bool", "shape": [1], "names": None}, |
| "rgb": {"dtype": "video", "shape": [480, 640, 3], |
| "names": ["height", "width", "channels"], |
| "video_info": {"video.fps": 30, "video.codec": "h264", |
| "video.pix_fmt": "yuv420p", "video.is_depth_map": False, |
| "has_audio": False}}, |
| }, |
| "videos": { |
| "rgb": {"video_info": {"video.fps": 30, "video.codec": "h264", |
| "video.pix_fmt": "yuv420p", "video.is_depth_map": False, |
| "has_audio": False}} |
| }, |
| } |
|
|
|
|
| |
| demo = gr.Interface( |
| fn=convert_episode, |
| inputs=gr.Textbox(label="Episode JSON", lines=10, placeholder="Paste episode JSON here..."), |
| outputs=gr.Textbox(label="Result"), |
| title="DI LeRobot Converter", |
| description="Converts episode data from DI iOS app to LeRobot v2.0 format and uploads to HuggingFace dataset repo.", |
| api_name="convert", |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|