""" RoboGen — Synthetic Robotics Dataset Generator generator.py: Physically-plausible episode generation for LeRobot-format parquet datasets. Schema ------ state_0..state_5 — observed joint positions at time t (rad) action_0..action_5 — joint velocity COMMANDS at time t (rad/s) Physics model ------------- 1. Task-specific waypoints (approach → contact → grasp/push → retract) 2. Cubic spline interpolation, clamped boundary (zero velocity at endpoints) 3. Velocities = analytical first derivative of position spline (rad/s) 4. Sensor noise: Gaussian σ_pos=0.002 rad, σ_vel=0.004 rad/s 5. Contact force: spring-damper ramp during contact window Failure modes ------------- grasp_slip — smooth until 60-70%, then position discontinuity + velocity collapse velocity_spike — 1-2 frames with MAD z-score > 6.5 on joint velocity torque_saturation — joint clamped at limit for ≥3 frames (velocity near zero, pos at limit) Z-score note: both injection and detection use robust MAD z-scores so that a single spike value does not inflate the reference statistics. Scorer ------ Set SCORER_PATH env var to ~/Downloads/quality_scorer root; falls back to built-in. """ from __future__ import annotations import os import sys from typing import Dict, List, Optional, Tuple import math import numpy as np import pandas as pd from scipy.interpolate import CubicSpline # ── Scorer import ──────────────────────────────────────────────────────────── _SCORER_PATH = os.environ.get( "SCORER_PATH", os.path.expanduser("~/Downloads/quality_scorer") ) if _SCORER_PATH not in sys.path: sys.path.insert(0, _SCORER_PATH) try: from scorer.scorer import score_dataset as _ext_score_dataset # type: ignore EXTERNAL_SCORER = True except ImportError: EXTERNAL_SCORER = False # ── Robot / task configuration ─────────────────────────────────────────────── ROBOT_CONFIG: Dict[str, Dict] = { "SO-100": { "n_joints": 6, # Per-joint [lo, hi] limits in radians "joint_limits": [ (-3.14, 3.14), # j0 base rotation (-1.57, 1.57), # j1 shoulder pitch (-0.20, 2.80), # j2 elbow (-3.14, 3.14), # j3 wrist roll (-1.57, 1.57), # j4 wrist pitch (-0.10, 1.00), # j5 gripper ], "home": np.array([0.00, -0.52, 1.20, 0.00, -0.72, 0.05]), "targets": { "pick_and_place": np.array([ 0.30, 0.20, 0.80, 0.10, -0.30, 0.50]), "push_object": np.array([ 0.50, 0.12, 1.00, 0.00, -0.50, 0.00]), "grasp_and_lift": np.array([ 0.22, 0.30, 0.48, 0.18, -0.40, 0.28]), "stacking": np.array([ 0.12, 0.42, 0.68, 0.14, -0.30, 0.18]), }, "gripper_joint": 5, "torque_joint_pool": [1, 2, 3, 4], # joints that realistically saturate "max_velocity": 1.5, # rad/s "force_range": (0.5, 12.0), }, "SO-101": { "n_joints": 6, "joint_limits": [ (-3.14, 3.14), (-1.57, 1.57), (-0.20, 2.80), (-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00), ], "home": np.array([0.00, -0.42, 1.12, 0.00, -0.62, 0.05]), "targets": { "pick_and_place": np.array([ 0.35, 0.24, 0.76, 0.14, -0.26, 0.46]), "push_object": np.array([ 0.54, 0.14, 0.96, 0.04, -0.46, 0.04]), "grasp_and_lift": np.array([ 0.24, 0.34, 0.46, 0.24, -0.36, 0.24]), "stacking": np.array([ 0.14, 0.44, 0.66, 0.19, -0.26, 0.14]), }, "gripper_joint": 5, "torque_joint_pool": [1, 2, 3, 4], "max_velocity": 1.5, "force_range": (0.5, 12.0), }, "Koch": { "n_joints": 6, "joint_limits": [ (-3.14, 3.14), (-1.57, 1.57), (-0.10, 2.50), (-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00), ], "home": np.array([0.00, -0.60, 1.28, 0.00, -0.82, 0.05]), "targets": { "pick_and_place": np.array([ 0.40, 0.16, 0.88, 0.06, -0.42, 0.58]), "drawer_open_close": np.array([ 0.00, 0.10, 1.10, 0.00, -0.60, 0.00]), "grasp_and_lift": np.array([ 0.28, 0.26, 0.54, 0.19, -0.46, 0.34]), }, "gripper_joint": 5, "torque_joint_pool": [1, 2, 3], "max_velocity": 1.2, "force_range": (0.3, 10.0), }, } TASKS_BY_ROBOT: Dict[str, List[str]] = { "SO-100": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"], "SO-101": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"], "Koch": ["pick_and_place", "drawer_open_close", "grasp_and_lift"], } FAILURE_TYPES = ["grasp_slip", "velocity_spike", "torque_saturation"] FRAMES_PER_EPISODE = 50 DT = 0.02 # seconds/frame at 50 Hz # Community-calibrated thresholds (MAD z-score basis) VELOCITY_SPIKE_Z = 6.5 MISMATCH_THRESHOLD = 0.50 # ── Robust statistics helpers ───────────────────────────────────────────────── def _robust_z(values: np.ndarray) -> np.ndarray: """ Compute per-column MAD z-scores: z = |x - median| / (MAD * 1.4826) The 1.4826 factor makes MAD a consistent estimator of σ under normality. A single outlier barely affects median or MAD, so injected spikes retain their true z-score instead of being pulled down by contaminated reference stats. """ med = np.median(values, axis=0) mad = np.median(np.abs(values - med), axis=0) scale = mad * 1.4826 + 1e-6 return np.abs(values - med) / scale # ── Waypoint factory ────────────────────────────────────────────────────────── def _build_waypoints( task: str, home: np.ndarray, target: np.ndarray, rng: np.random.Generator, ) -> Tuple[np.ndarray, np.ndarray]: """ Return (t_knots_normalised [0..1], waypoint_matrix [n_knots, n_joints]). Waypoints encode task-specific motion primitives; small Gaussian noise ensures episode-to-episode variation. """ n = len(home) ns = lambda s=0.008: rng.normal(0, s, n) if task == "pick_and_place": approach = home + 0.25 * (target - home) + ns() pre_grasp = home + 0.44 * (target - home) + ns() grasp = home + 0.60 * (target - home) + ns(0.003) lift = grasp.copy(); lift[2] = lift[2] * 0.82 + ns(0.003)[2] transport = home + 0.82 * (target - home) + ns() place = target + ns(0.003) t = np.array([0.0, 0.18, 0.38, 0.55, 0.68, 0.83, 1.0]) w = np.vstack([home, approach, pre_grasp, grasp, lift, transport, place]) elif task == "push_object": approach = home + 0.35 * (target - home) + ns() contact = home + 0.56 * (target - home) + ns(0.004) push_end = target + ns(0.006) retract = home + 0.20 * (target - home) + ns() t = np.array([0.0, 0.28, 0.52, 0.73, 1.0]) w = np.vstack([home, approach, contact, push_end, retract]) elif task == "grasp_and_lift": descend = home + 0.28 * (target - home) + ns() pre_grasp = home + 0.52 * (target - home) + ns() grasp = target + ns(0.003) lift = grasp.copy(); lift[2] -= 0.28 + ns(0.005)[2] t = np.array([0.0, 0.22, 0.46, 0.68, 1.0]) w = np.vstack([home, descend, pre_grasp, grasp, lift]) elif task == "stacking": grasp_pos = home + 0.38 * (target - home) + ns() lift_pos = grasp_pos.copy(); lift_pos[2] -= 0.32 hover = home + 0.70 * (target - home) + ns() lower = target + ns(0.004) release = lower.copy(); release[5] += 0.18 t = np.array([0.0, 0.20, 0.40, 0.60, 0.82, 1.0]) w = np.vstack([home, grasp_pos, lift_pos, hover, lower, release]) elif task == "drawer_open_close": extend = home + 0.28 * (target - home) + ns() at_handle = home + 0.52 * (target - home) + ns(0.004) pull_mid = home + 0.74 * (target - home) + ns() open_pos = target + ns(0.006) t = np.array([0.0, 0.26, 0.50, 0.74, 1.0]) w = np.vstack([home, extend, at_handle, pull_mid, open_pos]) else: mid = home + 0.50 * (target - home) + ns() t = np.array([0.0, 0.50, 1.0]) w = np.vstack([home, mid, target]) return t, w # ── Smooth trajectory via cubic spline ──────────────────────────────────────── def _smooth_trajectory( task: str, home: np.ndarray, target: np.ndarray, n_frames: int, rng: np.random.Generator, ) -> Tuple[np.ndarray, np.ndarray]: """ Returns: positions (n_frames, n_joints) rad — joint positions velocities (n_frames, n_joints) rad/s — analytical spline derivative """ n_joints = len(home) t_norm, wpts = _build_waypoints(task, home, target, rng) t_knots = t_norm * (n_frames - 1) t_frames = np.arange(n_frames, dtype=float) pos = np.zeros((n_frames, n_joints)) vel = np.zeros((n_frames, n_joints)) for j in range(n_joints): cs = CubicSpline(t_knots, wpts[:, j], bc_type="clamped") pos[:, j] = cs(t_frames) vel[:, j] = cs(t_frames, 1) / DT # derivative in rad/frame → rad/s pos += rng.normal(0, 0.002, pos.shape) vel += rng.normal(0, 0.004, vel.shape) return pos, vel # ── Contact force model ─────────────────────────────────────────────────────── def _force_profile( n_frames: int, task: str, force_range: Tuple[float, float], rng: np.random.Generator, ) -> np.ndarray: """Spring-damper contact force, non-zero during contact window.""" f_min, f_max = force_range force = np.zeros(n_frames) if task not in {"pick_and_place", "grasp_and_lift", "stacking", "push_object", "drawer_open_close"}: return force c0, c1 = int(0.30 * n_frames), int(0.75 * n_frames) for i in range(n_frames): if c0 <= i <= c1: phase = (i - c0) / max(c1 - c0, 1) ramp = math.sin(phase * math.pi / 2) if phase < 0.30 else 1.0 force[i] = ramp * rng.uniform(f_min * 1.5, f_max) + rng.normal(0, 0.12) else: force[i] = rng.uniform(0, f_min * 0.25) return np.clip(force, 0, f_max + 1) # ── Failure injectors ───────────────────────────────────────────────────────── def _inject_grasp_slip( pos: np.ndarray, vel: np.ndarray, force: np.ndarray, gripper: int, rng: np.random.Generator, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Grasp slip at 60-70% of episode: pos : gripper jumps open (discontinuity ≥ 0.18 rad) vel : transient spike at slip frame, then near-zero (contact lost) force: collapses post-slip """ n = pos.shape[0] sf = int(rng.uniform(0.60, 0.70) * n) # Position discontinuity — gripper opens slip_mag = rng.uniform(0.18, 0.38) pos[sf:, gripper] += slip_mag pos[sf:, max(0, gripper - 1)] += rng.uniform(0.04, 0.12) # Velocity: brief spike at slip frame then collapse v_med = float(np.median(np.abs(vel[:sf, gripper]))) + 0.05 vel[sf, gripper] += v_med * rng.uniform(9.0, 12.0) # big transient vel[sf + 1:, gripper] *= rng.uniform(0.04, 0.12) force[sf:] = np.clip(rng.exponential(0.25, n - sf), 0, 0.5) return pos, vel, force def _inject_velocity_spike( vel: np.ndarray, rng: np.random.Generator, ) -> np.ndarray: """ 1-2 frames with MAD z-score > 6.5 in the middle 60% of episode. Uses robust reference stats (median / MAD) so the injected spike value correctly yields the target z-score when detected later. """ n_frames, n_joints = vel.shape n_spikes = int(rng.integers(1, 3)) zone = np.arange(int(0.20 * n_frames), int(0.80 * n_frames)) frames = rng.choice(zone, size=min(n_spikes, len(zone)), replace=False) v_med = np.median(vel, axis=0) v_mad = np.median(np.abs(vel - v_med), axis=0) * 1.4826 + 1e-4 for sf in frames: j = int(rng.integers(0, n_joints - 1)) # skip gripper z_target = float(rng.uniform(7.5, 9.8)) sign = 1.0 if rng.random() > 0.5 else -1.0 # spike = median + z_target * sigma_MAD → guaranteed MAD z-score ≈ z_target vel[sf, j] = v_med[j] + sign * z_target * v_mad[j] return vel def _inject_torque_saturation( pos: np.ndarray, vel: np.ndarray, joint_limits: List[Tuple[float, float]], torque_pool: List[int], rng: np.random.Generator, ) -> Tuple[np.ndarray, np.ndarray]: """ One arm joint hits its angular limit and holds for ≥3 frames: pos : clamped at limit (within sensor noise) vel : near zero during saturation (joint stalled) """ n = pos.shape[0] j = int(rng.choice(torque_pool)) lo, hi = joint_limits[j] mid_pos = pos[:, j].mean() at_limit = hi if mid_pos > (lo + hi) / 2 else lo sat_start = int(rng.uniform(0.25, 0.62) * n) sat_len = int(rng.integers(3, 9)) sat_end = min(sat_start + sat_len, n) pos[sat_start:sat_end, j] = at_limit + rng.normal(0, 0.001, sat_end - sat_start) vel[sat_start:sat_end, j] = rng.normal(0, 0.005, sat_end - sat_start) return pos, vel # ── Episode builder ─────────────────────────────────────────────────────────── def _build_episode( ep_idx: int, robot: str, task: str, failure_type: str, cfg: Dict, rng: np.random.Generator, ) -> pd.DataFrame: """ Build one LeRobot-compatible episode DataFrame. Columns: state_0..5 (rad), action_0..5 (rad/s), timestamp (s), episode_index, frame_index, task, use_for_training, failure_type, quality_score (placeholder=0) """ n = cfg["n_joints"] home = cfg["home"].copy() target = cfg["targets"][task].copy() + rng.normal(0, 0.025, n) pos, vel = _smooth_trajectory(task, home, target, FRAMES_PER_EPISODE, rng) force = _force_profile(FRAMES_PER_EPISODE, task, cfg["force_range"], rng) if failure_type == "grasp_slip": pos, vel, force = _inject_grasp_slip(pos, vel, force, cfg["gripper_joint"], rng) elif failure_type == "velocity_spike": vel = _inject_velocity_spike(vel, rng) elif failure_type == "torque_saturation": pos, vel = _inject_torque_saturation(pos, vel, cfg["joint_limits"], cfg["torque_joint_pool"], rng) for j, (lo, hi) in enumerate(cfg["joint_limits"]): pos[:, j] = np.clip(pos[:, j], lo, hi) vel = np.clip(vel, -cfg["max_velocity"] * 2, cfg["max_velocity"] * 2) label = failure_type if failure_type != "none" else "success" return pd.DataFrame({ **{f"state_{j}": pos[:, j] for j in range(n)}, **{f"action_{j}": vel[:, j] for j in range(n)}, "timestamp": np.arange(FRAMES_PER_EPISODE, dtype=float) * DT, "episode_index": np.full(FRAMES_PER_EPISODE, ep_idx, dtype=int), "frame_index": np.arange(FRAMES_PER_EPISODE, dtype=int), "task": task, "use_for_training": (failure_type == "none"), "failure_type": label, "quality_score": 0.0, }) # ── Dataset generator ───────────────────────────────────────────────────────── def generate_dataset( robot: str, task: str, n_episodes: int, success_rate: float, force_min: float, force_max: float, enabled_failures: List[str], seed: Optional[int] = None, progress_callback = None, ) -> pd.DataFrame: """ Generate a full synthetic dataset. Parameters ---------- robot : "SO-100" | "SO-101" | "Koch" task : from TASKS_BY_ROBOT[robot] n_episodes : 10–500 success_rate : 0.0–1.0 force_min/max : contact force range override (N) enabled_failures : subset of FAILURE_TYPES seed : reproducibility seed progress_callback : optional (fraction: float, message: str) → None Returns ------- pd.DataFrame, n_episodes * FRAMES_PER_EPISODE rows """ if robot not in ROBOT_CONFIG: raise ValueError(f"Unknown robot '{robot}'. Options: {list(ROBOT_CONFIG)}") if task not in TASKS_BY_ROBOT.get(robot, []): raise ValueError(f"Task '{task}' invalid for {robot}.") if not enabled_failures: enabled_failures = list(FAILURE_TYPES) rng = np.random.default_rng(seed) cfg = {**ROBOT_CONFIG[robot], "force_range": (force_min, force_max)} n_success = int(round(n_episodes * success_rate)) n_fail = n_episodes - n_success pool = (list(enabled_failures) * (n_fail // max(len(enabled_failures), 1) + 1))[:n_fail] manifest = ["none"] * n_success + pool rng.shuffle(manifest) frames = [] for i, ft in enumerate(manifest): if progress_callback and i % max(1, n_episodes // 20) == 0: progress_callback(i / n_episodes, f"Generating episode {i + 1}/{n_episodes}…") frames.append(_build_episode(i, robot, task, ft, cfg, rng)) if progress_callback: progress_callback(0.95, "Concatenating dataset…") df = pd.concat(frames, ignore_index=True) # Embed robot metadata column so scorer can look up joint limits df["robot"] = robot return df # ── Built-in scorer ─────────────────────────────────────────────────────────── def _builtin_score(df: pd.DataFrame) -> Dict: """ Calibrated quality scorer using MAD z-scores (community thresholds). Detection criteria ------------------ velocity_spike : MAD z-score on action columns > 6.5 in any frame grasp_slip : |Δgripper_pos| > 0.10 rad in a single frame torque_saturation : ≥3 consecutive frames with |velocity| < 0.012 rad/s AND position within 3% of known joint limit (uses 'robot' column if present, otherwise ±0.90*π heuristic) mismatch_fraction : (n_spike + slip + sat anomalous frames) / n_frames flagged : mismatch_fraction > 0.50 """ act_cols = [f"action_{j}" for j in range(6)] pos_cols = [f"state_{j}" for j in range(6)] # Try to get joint limits from robot column robot_col = df.get("robot", pd.Series(dtype=str)) first_robot = robot_col.iloc[0] if len(robot_col) else None joint_limits = ( ROBOT_CONFIG[first_robot]["joint_limits"] if first_robot in ROBOT_CONFIG else [(-3.14, 3.14)] * 6 ) rows = [] for ep_idx, ep in df.groupby("episode_index"): acts = ep[act_cols].values poss = ep[pos_cols].values n_f = len(ep) # ── Velocity spike (MAD z-score) ────────────────────────────────── z = _robust_z(acts) spike_mask = z.max(axis=1) > VELOCITY_SPIKE_Z n_spikes = int(spike_mask.sum()) max_z = float(z.max()) # ── Grasp slip ──────────────────────────────────────────────────── gripper_delta = np.abs(np.diff(poss[:, 5])) n_slip_frames = int((gripper_delta > 0.10).sum()) # ── Torque saturation (velocity near-zero AND position near limit) ─ sat_joints = 0 for j in range(5): # joints 0-4; skip gripper (j=5) lo, hi = joint_limits[j] limit_tol = abs(hi - lo) * 0.03 # 3% of range from limit consec = 0 for i in range(n_f): near_limit = ( poss[i, j] >= hi - limit_tol or poss[i, j] <= lo + limit_tol ) near_zero = abs(acts[i, j]) < 0.012 if near_limit and near_zero: consec += 1 if consec >= 3: sat_joints += 1 break else: consec = 0 # ── Mismatch fraction & episode score ───────────────────────────── n_anomalous = n_spikes + min(1, n_slip_frames) + min(1, sat_joints) mismatch_fraction = n_anomalous / max(n_f, 1) flagged = mismatch_fraction > MISMATCH_THRESHOLD penalty = n_spikes * 9.0 + n_slip_frames * 7.0 + sat_joints * 6.0 + mismatch_fraction * 18.0 ep_score = float(np.clip(100.0 - penalty, 0, 100)) rows.append({ "episode_index": ep_idx, "failure_type": ep["failure_type"].iloc[0], "n_spike_frames": n_spikes, "max_velocity_z": round(max_z, 2), "n_slip_frames": n_slip_frames, "n_sat_joints": sat_joints, "mismatch_fraction": round(mismatch_fraction, 4), "episode_score": round(ep_score, 2), "flagged": flagged, }) ep_df = pd.DataFrame(rows) n_eps = len(ep_df) n_flagged = int(ep_df["flagged"].sum()) n_passed = n_eps - n_flagged mean_score = float(ep_df["episode_score"].mean()) band = "Clean" if mean_score >= 80 else ("Review" if mean_score >= 55 else "Flagged") failure_breakdown = ( ep_df[ep_df["failure_type"] != "success"] .groupby("failure_type").size().to_dict() ) return { "overall_score": round(mean_score, 2), "band": band, "n_episodes": n_eps, "n_passed": n_passed, "n_flagged": n_flagged, "mean_mismatch": round(float(ep_df["mismatch_fraction"].mean()), 4), "failure_breakdown": failure_breakdown, "episode_details": ep_df, "scorer_used": "builtin", } # ── Public API ──────────────────────────────────────────────────────────────── def score_dataset(df: pd.DataFrame, progress_callback=None) -> Dict: """Score a dataset; prefers external scorer if SCORER_PATH resolves.""" if progress_callback: progress_callback(0.05, "Running quality checks…") if EXTERNAL_SCORER: try: result = _ext_score_dataset(df) result["scorer_used"] = "external" if progress_callback: progress_callback(1.0, "Scoring complete (external scorer).") return result except Exception as exc: print(f"[RoboGen] External scorer failed ({exc}); using built-in.") result = _builtin_score(df) if progress_callback: progress_callback(1.0, "Scoring complete.") return result def annotate_quality_scores(df: pd.DataFrame, score_result: Dict) -> pd.DataFrame: """Merge per-episode quality scores into the main DataFrame.""" ep_scores = ( score_result["episode_details"][["episode_index", "episode_score"]] .rename(columns={"episode_score": "quality_score"}) ) return df.drop(columns=["quality_score"], errors="ignore").merge( ep_scores, on="episode_index", how="left" ) # ── CLI demo ────────────────────────────────────────────────────────────────── if __name__ == "__main__": pd.set_option("display.max_columns", 20) pd.set_option("display.width", 160) pd.set_option("display.float_format", "{:+.4f}".format) DEMO = [ ("SO-100", "pick_and_place", "none", "SUCCESS "), ("SO-100", "pick_and_place", "grasp_slip", "GRASP SLIP "), ("SO-100", "pick_and_place", "velocity_spike", "VELOCITY SPIKE "), ("SO-100", "pick_and_place", "torque_saturation", "TORQUE SAT. "), ] rng = np.random.default_rng(42) cfg = ROBOT_CONFIG["SO-100"].copy() print("=" * 92) print(" RoboGen — generator.py validation") print(" state_* = joint positions (rad) | action_* = velocity commands (rad/s)") print("=" * 92) for ep_idx, (robot, task, ft, label) in enumerate(DEMO): print(f"\n{'─'*92}") print(f" Episode {ep_idx} │ {label}│ {robot} / {task}") print(f"{'─'*92}") ep = _build_episode(ep_idx, robot, task, ft, cfg, rng) cols = ["frame_index", "state_0", "state_4", "state_5", "action_0", "action_4", "action_5"] sample = pd.concat([ep.head(4), ep.iloc[28:33], ep.tail(4)])[cols] print(sample.to_string(index=False)) acts = ep[[f"action_{j}" for j in range(6)]].values poss = ep[[f"state_{j}" for j in range(6)]].values z = _robust_z(acts) n_spikes = int((z.max(1) > VELOCITY_SPIKE_Z).sum()) max_z = z.max() n_slip = int((np.abs(np.diff(poss[:, 5])) > 0.10).sum()) jl = cfg["joint_limits"] sat_joints = 0 for j in range(5): lo, hi = jl[j] tol = abs(hi - lo) * 0.03 consec = 0 for i in range(len(ep)): nl = poss[i, j] >= hi - tol or poss[i, j] <= lo + tol nz = abs(acts[i, j]) < 0.012 consec = consec + 1 if (nl and nz) else 0 if consec >= 3: sat_joints += 1; break print(f"\n Vel (action) │ mean={acts.mean():+.3f} std={acts.std():.3f} " f"max_MAD_z={max_z:.2f} spike_frames(z>{VELOCITY_SPIKE_Z})={n_spikes}") print(f" Gripper pos │ max_Δ={np.abs(np.diff(poss[:,5])).max():.4f} rad " f"slip_frames(Δ>0.10)={n_slip} sat_joints={sat_joints}") print(f"\n{'='*92}") print(" 10-episode mini-dataset (60% success, all failure modes, seed=99)") print(f"{'='*92}") mini = generate_dataset( robot="SO-100", task="pick_and_place", n_episodes=10, success_rate=0.60, force_min=1.0, force_max=10.0, enabled_failures=list(FAILURE_TYPES), seed=99, ) res = score_dataset(mini) pd.set_option("display.float_format", "{:.2f}".format) print(f"\n Overall score : {res['overall_score']:.1f}/100") print(f" Band : {res['band']}") print(f" Passed/Flagged : {res['n_passed']} / {res['n_flagged']}") print(f" Mean mismatch : {res['mean_mismatch']:.4f}") print(f" Failures : {res['failure_breakdown']}") print(f" Scorer : {res['scorer_used']}") print() cols = ["episode_index","failure_type","n_spike_frames","max_velocity_z", "n_slip_frames","n_sat_joints","mismatch_fraction","episode_score","flagged"] print(res["episode_details"][cols].to_string(index=False)) print(f"\n{'='*92}\n")