{html.escape(task_name)}
{html.escape(io)}
#!/usr/bin/env python3 """ Render a polished Ropedia Xperience-10M 12-task infographic. The task names, inputs, and metrics are read from results/episode_task_suite/summary_report.json. The output is a deterministic PNG rendered from HTML/CSS so the labels stay legible and inspectable. """ from __future__ import annotations import argparse import base64 import html import io import json import os import subprocess import tempfile from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SUMMARY_PATH = ROOT / "results/episode_task_suite/summary_report.json" DEFAULT_BASE = ROOT / "docs/assets/task_suite_infographic_base.png" DEFAULT_SAMPLE_DIR = ROOT.parent / "data/sample/xperience-10m-sample" DROPBOX_SAMPLE_DIR = Path.home() / "Library/CloudStorage/Dropbox/Ropedia/data/sample/xperience-10m-sample" DEFAULT_OUTPUT = ROOT / "docs/assets/task_suite_infographic.png" CANVAS_WIDTH = 1800 CANVAS_HEIGHT = 6600 THUMB_WIDTH = 880 THUMB_HEIGHT = 520 GROUPS = [ { "name": "Label + State", "tone": "teal", "color": "#9bdfff", "soft": "#071d20", "tasks": [ ("timeline_action", "supervised"), ("timeline_subtask", "supervised"), ("next_action", "supervised"), ], }, { "name": "Prediction + Reconstruction", "tone": "blue", "color": "#ccffa0", "soft": "#10210a", "tasks": [ ("hand_trajectory_forecast", "forecast"), ("modality_reconstruction", "forecast"), ("contact_prediction", "supervised"), ], }, { "name": "Grounding + Retrieval", "tone": "amber", "color": "#7ae5c3", "soft": "#092019", "tasks": [ ("caption_grounding", "retrieval"), ("cross_modal_retrieval", "retrieval"), ("object_relevance", "supervised"), ], }, { "name": "Temporal Diagnostics", "tone": "red", "color": "#d8f4a5", "soft": "#1b210d", "tasks": [ ("transition_detection", "diagnostic"), ("temporal_order", "diagnostic"), ("misalignment_detection", "diagnostic"), ], }, ] MODALITIES = [ ("video", "visual stream", "6 synchronized camera MP4 streams", "RGB/fisheye/stereo frame statistics"), ("audio", "acoustic stream", "audio stream embedded in MP4", "audio feature group"), ("depth", "geometry map", "depth map + confidence channel", "spatial geometry feature block"), ("pose / SLAM", "camera pose", "trajectory + sparse SLAM map", "position + orientation features"), ("motion capture", "human motion", "body + hand joint tracks", "3D mocap feature statistics"), ("inertial", "wearable sensor", "accelerometer + gyroscope", "wearable motion statistics"), ("language", "semantic annotation", "object tags + action captions", "task labels + semantic targets"), ] HAND_EDGES = [ (0, 1), (1, 2), (2, 3), (3, 4), (0, 5), (5, 6), (6, 7), (7, 8), (0, 9), (9, 10), (10, 11), (11, 12), (0, 13), (13, 14), (14, 15), (15, 16), (0, 17), (17, 18), (18, 19), (19, 20), ] def image_data_uri(image, fmt: str = "PNG", quality: int = 92) -> str: buffer = io.BytesIO() save_kwargs = {"format": fmt} if fmt.upper() in {"JPEG", "JPG"}: save_kwargs.update({"quality": quality, "optimize": True}) image.save(buffer, **save_kwargs) encoded = base64.b64encode(buffer.getvalue()).decode("ascii") mime = "jpeg" if fmt.upper() in {"JPEG", "JPG"} else "png" return f"data:image/{mime};base64,{encoded}" def make_canvas(size=(THUMB_WIDTH, THUMB_HEIGHT), color=(2, 5, 2)): from PIL import Image return Image.new("RGB", size, color) def fit_image(image, size=(THUMB_WIDTH, THUMB_HEIGHT)): from PIL import ImageOps return ImageOps.fit(image.convert("RGB"), size, method=3, centering=(0.5, 0.5)) def read_video_frame(video_path: Path, frame_index: int = 2400): import cv2 from PIL import Image cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}") total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) if total: frame_index = max(0, min(frame_index, total - 1)) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index) ok, frame = cap.read() cap.release() if not ok: raise RuntimeError(f"Could not read frame {frame_index} from {video_path}") frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return Image.fromarray(frame) def draw_label(draw, xy, text, fill=(244, 248, 239), size=18): from PIL import ImageFont try: font = ImageFont.truetype("/System/Library/Fonts/Supplemental/Arial Bold.ttf", size) except Exception: font = ImageFont.load_default() draw.text(xy, text, fill=fill, font=font) def video_thumb(sample_dir: Path) -> str: from PIL import Image, ImageDraw gutter = 18 panel_width = (THUMB_WIDTH - gutter) // 2 fish = fit_image(read_video_frame(sample_dir / "fisheye_cam0.mp4", 2450), (panel_width, THUMB_HEIGHT)) stereo_path = sample_dir / "stereo_left.mp4" stereo = fit_image(read_video_frame(stereo_path, 2450), (panel_width, THUMB_HEIGHT)) if stereo_path.exists() else fish.copy() canvas = make_canvas() canvas.paste(fish, (0, 0)) canvas.paste(stereo, (panel_width + gutter, 0)) draw = ImageDraw.Draw(canvas, "RGBA") draw.rounded_rectangle((panel_width - 4, 0, panel_width + gutter + 4, THUMB_HEIGHT), radius=0, fill=(2, 5, 2, 220)) draw_label(draw, (18, 20), "fisheye", fill=(255, 255, 255), size=22) draw_label(draw, (panel_width + gutter + 18, 20), "stereo", fill=(255, 255, 255), size=22) return image_data_uri(canvas, "JPEG") def colorize(values): import numpy as np stops = np.array([ [2, 5, 2], [58, 136, 102], [122, 229, 195], [167, 240, 120], [216, 244, 165], ], dtype=np.float32) x = np.clip(values, 0, 1) scaled = x * (len(stops) - 1) lo = np.floor(scaled).astype(int) hi = np.clip(lo + 1, 0, len(stops) - 1) frac = scaled - lo rgb = stops[lo] * (1 - frac[..., None]) + stops[hi] * frac[..., None] return rgb.astype("uint8") def depth_thumb(h5) -> str: import numpy as np from PIL import Image, ImageDraw gutter = 18 panel_width = (THUMB_WIDTH - gutter) // 2 frame = np.array(h5["depth/depth"][2450], dtype=np.float32) valid = np.isfinite(frame) lo, hi = np.percentile(frame[valid], [3, 97]) norm = (frame - lo) / max(hi - lo, 1e-6) rgb = colorize(norm) depth = fit_image(Image.fromarray(rgb), (panel_width, THUMB_HEIGHT)) conf = np.array(h5["depth/confidence"][2450], dtype=np.uint8) conf_img = Image.fromarray(conf, mode="L").convert("RGB") conf_img = fit_image(conf_img, (panel_width, THUMB_HEIGHT)) canvas = make_canvas() canvas.paste(depth, (0, 0)) canvas.paste(conf_img, (panel_width + gutter, 0)) draw = ImageDraw.Draw(canvas, "RGBA") draw.rounded_rectangle((0, 0, 158, 44), radius=8, fill=(2, 5, 2, 178)) draw.rounded_rectangle((panel_width + gutter, 0, panel_width + gutter + 220, 44), radius=8, fill=(2, 5, 2, 178)) draw_label(draw, (14, 11), "depth", fill=(255, 255, 255), size=22) draw_label(draw, (panel_width + gutter + 14, 11), "confidence", fill=(255, 255, 255), size=22) return image_data_uri(canvas, "JPEG") def audio_thumb(sample_dir: Path) -> str: import numpy as np from PIL import ImageDraw canvas = make_canvas() draw = ImageDraw.Draw(canvas, "RGBA") try: raw = subprocess.run( [ "ffmpeg", "-v", "error", "-ss", "45", "-t", "6", "-i", str(sample_dir / "fisheye_cam0.mp4"), "-ac", "1", "-ar", "16000", "-f", "s16le", "pipe:1", ], check=True, stdout=subprocess.PIPE, ).stdout samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32) if len(samples) == 0: raise RuntimeError("empty audio stream") samples = samples / max(float(np.max(np.abs(samples))), 1.0) bins = 220 trimmed = samples[: bins * max(1, len(samples) // bins)] chunks = np.array_split(trimmed, bins) rms = np.array([np.sqrt(np.mean(chunk * chunk)) if len(chunk) else 0.0 for chunk in chunks]) waveform = np.array([float(np.mean(chunk)) if len(chunk) else 0.0 for chunk in chunks]) baseline = THUMB_HEIGHT - 72 for i, value in enumerate(rms): x = 18 + i / max(bins - 1, 1) * (THUMB_WIDTH - 36) h = 14 + np.clip(value * 158, 0, 158) draw.line((x, baseline, x, baseline - h), fill=(167, 240, 120, 170), width=2) points = [] for i, value in enumerate(waveform): x = 18 + i / max(bins - 1, 1) * (THUMB_WIDTH - 36) y = 126 - np.clip(value, -1, 1) * 82 points.append((x, y)) draw.line(points, fill=(122, 229, 195, 220), width=2) except Exception: for i in range(48): x = 22 + i * 8 h = 16 + (i % 7) * 7 draw.rounded_rectangle((x, THUMB_HEIGHT - 72 - h, x + 4, THUMB_HEIGHT - 72), radius=2, fill=(167, 240, 120, 170)) draw_label(draw, (18, 18), "Audio waveform", fill=(244, 248, 239), size=22) return image_data_uri(canvas, "PNG") def normalize_points(points, width, height, pad=16): import numpy as np xy = points[:, :2].copy() lo = np.percentile(xy, 2, axis=0) hi = np.percentile(xy, 98, axis=0) span = np.maximum(hi - lo, 1e-6) norm = (xy - lo) / span norm = np.clip(norm, 0, 1) norm[:, 1] = 1 - norm[:, 1] out = np.empty_like(norm) out[:, 0] = pad + norm[:, 0] * (width - pad * 2) out[:, 1] = pad + norm[:, 1] * (height - pad * 2) return out def slam_thumb(h5) -> str: import numpy as np from PIL import ImageDraw canvas = make_canvas() draw = ImageDraw.Draw(canvas, "RGBA") points = np.array(h5["slam/point_cloud"], dtype=np.float64) points = points[np.isfinite(points).all(axis=1)] if len(points) > 2600: points = points[np.linspace(0, len(points) - 1, 2600).astype(int)] xy = normalize_points(points[:, [0, 2, 1]], THUMB_WIDTH, THUMB_HEIGHT) z = points[:, 1] z_norm = (z - np.percentile(z, 2)) / max(np.percentile(z, 98) - np.percentile(z, 2), 1e-6) colors = colorize(z_norm) for (x, y), color in zip(xy, colors): draw.ellipse((x - 1.2, y - 1.2, x + 1.2, y + 1.2), fill=tuple(color.tolist()) + (165,)) traj = np.array(h5["slam/trans_xyz"][:2450:36], dtype=np.float64) traj_xy = normalize_points(traj[:, [0, 2, 1]], THUMB_WIDTH, THUMB_HEIGHT) for a, b in zip(traj_xy[:-1], traj_xy[1:]): draw.line((a[0], a[1], b[0], b[1]), fill=(167, 240, 120, 205), width=2) draw_label(draw, (18, 18), "camera pose + SLAM map", fill=(244, 248, 239), size=22) return image_data_uri(canvas, "PNG") def imu_thumb(h5) -> str: import numpy as np from PIL import ImageDraw canvas = make_canvas() draw = ImageDraw.Draw(canvas, "RGBA") key_idx = int(h5["imu/keyframe_indices"][2450]) accel = np.array(h5["imu/accel_xyz"][max(0, key_idx - 220): key_idx + 220], dtype=np.float64) gyro = np.array(h5["imu/gyro_xyz"][max(0, key_idx - 220): key_idx + 220], dtype=np.float64) series = [accel[:, 0], accel[:, 1], accel[:, 2], gyro[:, 0], gyro[:, 1], gyro[:, 2]] colors = [(167, 240, 120), (122, 229, 195), (155, 223, 255), (216, 244, 165), (244, 248, 239), (165, 175, 162)] for row in range(6): y = 68 + row * 44 draw.line((18, y, THUMB_WIDTH - 18, y), fill=(167, 240, 120, 48), width=1) for values, color in zip(series, colors): values = values[:420] if len(values) < 2: continue lo, hi = np.percentile(values, [3, 97]) norm = (values - lo) / max(hi - lo, 1e-6) pts = [] for i, v in enumerate(norm): x = 18 + i / max(len(values) - 1, 1) * (THUMB_WIDTH - 36) y = THUMB_HEIGHT - 48 - np.clip(v, 0, 1) * (THUMB_HEIGHT - 116) pts.append((x, y)) draw.line(pts, fill=color + (200,), width=2) draw_label(draw, (18, 18), "inertial accel / gyro", fill=(244, 248, 239), size=22) return image_data_uri(canvas, "PNG") def mocap_thumb(h5) -> str: import numpy as np from PIL import ImageDraw canvas = make_canvas() draw = ImageDraw.Draw(canvas, "RGBA") body = np.array(h5["full_body_mocap/keypoints"][2450], dtype=np.float32) left = np.array(h5["hand_mocap/left_joints_3d"][2450], dtype=np.float32) right = np.array(h5["hand_mocap/right_joints_3d"][2450], dtype=np.float32) all_points = np.concatenate([body, left, right], axis=0) lo = np.percentile(all_points[:, :2], 2, axis=0) hi = np.percentile(all_points[:, :2], 98, axis=0) span = np.maximum(hi - lo, 1e-6) def project(points, x_offset, width): xy = (points[:, :2] - lo) / span xy[:, 1] = 1 - xy[:, 1] xy[:, 0] = x_offset + xy[:, 0] * width xy[:, 1] = 72 + xy[:, 1] * (THUMB_HEIGHT - 136) return xy body_xy = project(body, 28, 270) for x, y in body_xy: draw.ellipse((x - 2.4, y - 2.4, x + 2.4, y + 2.4), fill=(167, 240, 120, 185)) for a, b in zip(body_xy[:-1], body_xy[1:]): draw.line((a[0], a[1], b[0], b[1]), fill=(167, 240, 120, 82), width=1) for points, x_offset, color in [(left, 392, (122, 229, 195)), (right, 562, (216, 244, 165))]: xy = project(points, x_offset, 126) for a, b in HAND_EDGES: draw.line((xy[a][0], xy[a][1], xy[b][0], xy[b][1]), fill=color + (180,), width=2) for x, y in xy: draw.ellipse((x - 2.4, y - 2.4, x + 2.4, y + 2.4), fill=color + (220,)) draw_label(draw, (18, 18), "body + hand mocap", fill=(244, 248, 239), size=22) return image_data_uri(canvas, "PNG") def text_thumb(h5) -> str: from PIL import ImageDraw width = THUMB_WIDTH raw = h5["caption"][()] if isinstance(raw, bytes): raw = raw.decode("utf-8", errors="replace") data = json.loads(raw) segment = data["segments"][0] objects = sorted({item for values in segment.get("objects", {}).values() for item in values})[:5] actions = [a.get("label", "") for a in segment.get("Current Action", [])][:2] canvas = make_canvas((width, THUMB_HEIGHT)) draw = ImageDraw.Draw(canvas, "RGBA") draw_label(draw, (28, 24), "language annotation", fill=(244, 248, 239), size=28) y = 82 for label in objects: chip_width = 52 + len(label) * 16 draw.rounded_rectangle((28, y, 28 + chip_width, y + 38), radius=8, fill=(7, 18, 7, 235), outline=(167, 240, 120, 170), width=2) draw_label(draw, (44, y + 8), label, fill=(244, 248, 239), size=18) y += 47 x = 340 y = 92 for action in actions: wrapped = action[:66] + ("..." if len(action) > 66 else "") draw.rounded_rectangle((x, y, width - 28, y + 54), radius=9, fill=(7, 18, 7, 235), outline=(122, 229, 195, 180), width=2) draw_label(draw, (x + 22, y + 15), wrapped, fill=(244, 248, 239), size=20) y += 68 return image_data_uri(canvas, "PNG") def load_sample_thumbnails(sample_dir: Path | None) -> dict[str, str]: if sample_dir is None or not sample_dir.exists(): return {} hdf5_path = sample_dir / "annotation.hdf5" required = [sample_dir / "fisheye_cam0.mp4", hdf5_path] if not all(path.exists() for path in required): return {} try: import h5py thumbnails = {"video": video_thumb(sample_dir), "audio": audio_thumb(sample_dir)} with h5py.File(hdf5_path, "r") as h5: thumbnails.update({ "depth": depth_thumb(h5), "pose / SLAM": slam_thumb(h5), "motion capture": mocap_thumb(h5), "inertial": imu_thumb(h5), "language": text_thumb(h5), }) return thumbnails except Exception as exc: print(f"Warning: could not build sample modality thumbnails: {exc}") return {} def valid_sample_dir(sample_dir: Path | None) -> bool: if sample_dir is None: return False return (sample_dir / "annotation.hdf5").exists() and (sample_dir / "fisheye_cam0.mp4").exists() def resolve_sample_dir(sample_dir: Path | None) -> Path | None: candidates: list[Path] = [] env_sample_dir = os.environ.get("XPERIENCE10M_SAMPLE_DIR") if env_sample_dir: candidates.append(Path(env_sample_dir).expanduser()) workspace = os.environ.get("WORKSPACE") if workspace: candidates.append(Path(workspace).expanduser() / "data/sample/xperience-10m-sample") if sample_dir is not None: candidates.append(sample_dir) candidates.extend([ DEFAULT_SAMPLE_DIR, DROPBOX_SAMPLE_DIR, ]) for candidate in candidates: if valid_sample_dir(candidate): return candidate return sample_dir def load_summary() -> dict: return json.loads(SUMMARY_PATH.read_text(encoding="utf-8")) def fmt(value: float) -> str: return f"{float(value):.4f}" def metric_for(task_name: str, metrics: dict) -> tuple[str, str]: if task_name == "hand_trajectory_forecast": return "MPJPE", fmt(metrics["mpjpe"]) if task_name == "cross_modal_retrieval": return "top-5", fmt(metrics["top5_accuracy"]) if task_name == "caption_grounding": return "MRR", fmt(metrics["mrr"]) if task_name == "object_relevance": return "micro-F1", fmt(metrics["micro_f1"]) if task_name == "modality_reconstruction": return "R2", fmt(metrics["r2"]) if task_name in {"temporal_order", "misalignment_detection"}: return "F1", fmt(metrics["f1"]) if "macro_f1" in metrics: return "macro-F1", fmt(metrics["macro_f1"]) if "accuracy" in metrics: return "accuracy", fmt(metrics["accuracy"]) raise KeyError(f"No main metric configured for {task_name}") def short_io(task_name: str, metrics: dict) -> str: custom = { "timeline_action": "all featurized modalities -> action label", "timeline_subtask": "all featurized modalities -> subtask label", "transition_detection": "all featurized modalities -> boundary vs steady", "next_action": "window at t -> action at t+20 frames", "hand_trajectory_forecast": "all featurized modalities -> future hand joints", "contact_prediction": "non-contact modalities -> contact state", "object_relevance": "non-caption feature blocks -> relevant objects", "caption_grounding": "text query -> matching sensor window", "cross_modal_retrieval": "motion / IMU / camera -> depth / video match", "modality_reconstruction": "motion / IMU / camera -> depth / video vector", "temporal_order": "two adjacent windows -> correct order", "misalignment_detection": "motion + visual pair -> aligned or shifted", } return custom.get(task_name, metrics.get("input", "")) def task_card(task_name: str, kind: str, metrics: dict, group: dict, index: int, neural_metrics: dict | None = None) -> str: label, value = metric_for(task_name, metrics) neural_html = "" if neural_metrics and "error" not in neural_metrics: neural_label, neural_value = metric_for(task_name, neural_metrics) neural_html = f"""
{html.escape(io)}
{html.escape(sample_text)}
{html.escape(feature_text)}
A clean map from synchronized multimodal windows to 12 research task heads, comparing minimal heads with neural MLP results. Next milestone: Qwen3-Omni fine-tuning with sensor-bridge evaluation.