| """ |
| RoboGen β Synthetic Robotics Dataset Generator |
| generator.py: Physically-plausible episode generation for LeRobot-format parquet datasets. |
| |
| Schema |
| ------ |
| state_0..state_5 β observed joint positions at time t (rad) |
| action_0..action_5 β joint velocity COMMANDS at time t (rad/s) |
| |
| Physics model |
| ------------- |
| 1. Task-specific waypoints (approach β contact β grasp/push β retract) |
| 2. Cubic spline interpolation, clamped boundary (zero velocity at endpoints) |
| 3. Velocities = analytical first derivative of position spline (rad/s) |
| 4. Sensor noise: Gaussian Ο_pos=0.002 rad, Ο_vel=0.004 rad/s |
| 5. Contact force: spring-damper ramp during contact window |
| |
| Failure modes |
| ------------- |
| grasp_slip β smooth until 60-70%, then position discontinuity + velocity collapse |
| velocity_spike β 1-2 frames with MAD z-score > 6.5 on joint velocity |
| torque_saturation β joint clamped at limit for β₯3 frames (velocity near zero, pos at limit) |
| |
| Z-score note: both injection and detection use robust MAD z-scores so that a single |
| spike value does not inflate the reference statistics. |
| |
| Scorer |
| ------ |
| Set SCORER_PATH env var to ~/Downloads/quality_scorer root; falls back to built-in. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import sys |
| from typing import Dict, List, Optional, Tuple |
| import math |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy.interpolate import CubicSpline |
|
|
| |
|
|
| _SCORER_PATH = os.environ.get( |
| "SCORER_PATH", os.path.expanduser("~/Downloads/quality_scorer") |
| ) |
| if _SCORER_PATH not in sys.path: |
| sys.path.insert(0, _SCORER_PATH) |
|
|
| try: |
| from scorer.scorer import score_dataset as _ext_score_dataset |
| EXTERNAL_SCORER = True |
| except ImportError: |
| EXTERNAL_SCORER = False |
|
|
| |
|
|
| ROBOT_CONFIG: Dict[str, Dict] = { |
| "SO-100": { |
| "n_joints": 6, |
| |
| "joint_limits": [ |
| (-3.14, 3.14), |
| (-1.57, 1.57), |
| (-0.20, 2.80), |
| (-3.14, 3.14), |
| (-1.57, 1.57), |
| (-0.10, 1.00), |
| ], |
| "home": np.array([0.00, -0.52, 1.20, 0.00, -0.72, 0.05]), |
| "targets": { |
| "pick_and_place": np.array([ 0.30, 0.20, 0.80, 0.10, -0.30, 0.50]), |
| "push_object": np.array([ 0.50, 0.12, 1.00, 0.00, -0.50, 0.00]), |
| "grasp_and_lift": np.array([ 0.22, 0.30, 0.48, 0.18, -0.40, 0.28]), |
| "stacking": np.array([ 0.12, 0.42, 0.68, 0.14, -0.30, 0.18]), |
| }, |
| "gripper_joint": 5, |
| "torque_joint_pool": [1, 2, 3, 4], |
| "max_velocity": 1.5, |
| "force_range": (0.5, 12.0), |
| }, |
| "SO-101": { |
| "n_joints": 6, |
| "joint_limits": [ |
| (-3.14, 3.14), (-1.57, 1.57), (-0.20, 2.80), |
| (-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00), |
| ], |
| "home": np.array([0.00, -0.42, 1.12, 0.00, -0.62, 0.05]), |
| "targets": { |
| "pick_and_place": np.array([ 0.35, 0.24, 0.76, 0.14, -0.26, 0.46]), |
| "push_object": np.array([ 0.54, 0.14, 0.96, 0.04, -0.46, 0.04]), |
| "grasp_and_lift": np.array([ 0.24, 0.34, 0.46, 0.24, -0.36, 0.24]), |
| "stacking": np.array([ 0.14, 0.44, 0.66, 0.19, -0.26, 0.14]), |
| }, |
| "gripper_joint": 5, |
| "torque_joint_pool": [1, 2, 3, 4], |
| "max_velocity": 1.5, |
| "force_range": (0.5, 12.0), |
| }, |
| "Koch": { |
| "n_joints": 6, |
| "joint_limits": [ |
| (-3.14, 3.14), (-1.57, 1.57), (-0.10, 2.50), |
| (-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00), |
| ], |
| "home": np.array([0.00, -0.60, 1.28, 0.00, -0.82, 0.05]), |
| "targets": { |
| "pick_and_place": np.array([ 0.40, 0.16, 0.88, 0.06, -0.42, 0.58]), |
| "drawer_open_close": np.array([ 0.00, 0.10, 1.10, 0.00, -0.60, 0.00]), |
| "grasp_and_lift": np.array([ 0.28, 0.26, 0.54, 0.19, -0.46, 0.34]), |
| }, |
| "gripper_joint": 5, |
| "torque_joint_pool": [1, 2, 3], |
| "max_velocity": 1.2, |
| "force_range": (0.3, 10.0), |
| }, |
| } |
|
|
| TASKS_BY_ROBOT: Dict[str, List[str]] = { |
| "SO-100": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"], |
| "SO-101": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"], |
| "Koch": ["pick_and_place", "drawer_open_close", "grasp_and_lift"], |
| } |
|
|
| FAILURE_TYPES = ["grasp_slip", "velocity_spike", "torque_saturation"] |
|
|
| FRAMES_PER_EPISODE = 50 |
| DT = 0.02 |
|
|
| |
| VELOCITY_SPIKE_Z = 6.5 |
| MISMATCH_THRESHOLD = 0.50 |
|
|
| |
|
|
| def _robust_z(values: np.ndarray) -> np.ndarray: |
| """ |
| Compute per-column MAD z-scores: z = |x - median| / (MAD * 1.4826) |
| The 1.4826 factor makes MAD a consistent estimator of Ο under normality. |
| A single outlier barely affects median or MAD, so injected spikes retain |
| their true z-score instead of being pulled down by contaminated reference stats. |
| """ |
| med = np.median(values, axis=0) |
| mad = np.median(np.abs(values - med), axis=0) |
| scale = mad * 1.4826 + 1e-6 |
| return np.abs(values - med) / scale |
|
|
|
|
| |
|
|
| def _build_waypoints( |
| task: str, |
| home: np.ndarray, |
| target: np.ndarray, |
| rng: np.random.Generator, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Return (t_knots_normalised [0..1], waypoint_matrix [n_knots, n_joints]). |
| Waypoints encode task-specific motion primitives; small Gaussian noise |
| ensures episode-to-episode variation. |
| """ |
| n = len(home) |
| ns = lambda s=0.008: rng.normal(0, s, n) |
|
|
| if task == "pick_and_place": |
| approach = home + 0.25 * (target - home) + ns() |
| pre_grasp = home + 0.44 * (target - home) + ns() |
| grasp = home + 0.60 * (target - home) + ns(0.003) |
| lift = grasp.copy(); lift[2] = lift[2] * 0.82 + ns(0.003)[2] |
| transport = home + 0.82 * (target - home) + ns() |
| place = target + ns(0.003) |
| t = np.array([0.0, 0.18, 0.38, 0.55, 0.68, 0.83, 1.0]) |
| w = np.vstack([home, approach, pre_grasp, grasp, lift, transport, place]) |
|
|
| elif task == "push_object": |
| approach = home + 0.35 * (target - home) + ns() |
| contact = home + 0.56 * (target - home) + ns(0.004) |
| push_end = target + ns(0.006) |
| retract = home + 0.20 * (target - home) + ns() |
| t = np.array([0.0, 0.28, 0.52, 0.73, 1.0]) |
| w = np.vstack([home, approach, contact, push_end, retract]) |
|
|
| elif task == "grasp_and_lift": |
| descend = home + 0.28 * (target - home) + ns() |
| pre_grasp = home + 0.52 * (target - home) + ns() |
| grasp = target + ns(0.003) |
| lift = grasp.copy(); lift[2] -= 0.28 + ns(0.005)[2] |
| t = np.array([0.0, 0.22, 0.46, 0.68, 1.0]) |
| w = np.vstack([home, descend, pre_grasp, grasp, lift]) |
|
|
| elif task == "stacking": |
| grasp_pos = home + 0.38 * (target - home) + ns() |
| lift_pos = grasp_pos.copy(); lift_pos[2] -= 0.32 |
| hover = home + 0.70 * (target - home) + ns() |
| lower = target + ns(0.004) |
| release = lower.copy(); release[5] += 0.18 |
| t = np.array([0.0, 0.20, 0.40, 0.60, 0.82, 1.0]) |
| w = np.vstack([home, grasp_pos, lift_pos, hover, lower, release]) |
|
|
| elif task == "drawer_open_close": |
| extend = home + 0.28 * (target - home) + ns() |
| at_handle = home + 0.52 * (target - home) + ns(0.004) |
| pull_mid = home + 0.74 * (target - home) + ns() |
| open_pos = target + ns(0.006) |
| t = np.array([0.0, 0.26, 0.50, 0.74, 1.0]) |
| w = np.vstack([home, extend, at_handle, pull_mid, open_pos]) |
|
|
| else: |
| mid = home + 0.50 * (target - home) + ns() |
| t = np.array([0.0, 0.50, 1.0]) |
| w = np.vstack([home, mid, target]) |
|
|
| return t, w |
|
|
|
|
| |
|
|
| def _smooth_trajectory( |
| task: str, |
| home: np.ndarray, |
| target: np.ndarray, |
| n_frames: int, |
| rng: np.random.Generator, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Returns: |
| positions (n_frames, n_joints) rad β joint positions |
| velocities (n_frames, n_joints) rad/s β analytical spline derivative |
| """ |
| n_joints = len(home) |
| t_norm, wpts = _build_waypoints(task, home, target, rng) |
| t_knots = t_norm * (n_frames - 1) |
| t_frames = np.arange(n_frames, dtype=float) |
|
|
| pos = np.zeros((n_frames, n_joints)) |
| vel = np.zeros((n_frames, n_joints)) |
|
|
| for j in range(n_joints): |
| cs = CubicSpline(t_knots, wpts[:, j], bc_type="clamped") |
| pos[:, j] = cs(t_frames) |
| vel[:, j] = cs(t_frames, 1) / DT |
|
|
| pos += rng.normal(0, 0.002, pos.shape) |
| vel += rng.normal(0, 0.004, vel.shape) |
| return pos, vel |
|
|
|
|
| |
|
|
| def _force_profile( |
| n_frames: int, |
| task: str, |
| force_range: Tuple[float, float], |
| rng: np.random.Generator, |
| ) -> np.ndarray: |
| """Spring-damper contact force, non-zero during contact window.""" |
| f_min, f_max = force_range |
| force = np.zeros(n_frames) |
| if task not in {"pick_and_place", "grasp_and_lift", "stacking", |
| "push_object", "drawer_open_close"}: |
| return force |
| c0, c1 = int(0.30 * n_frames), int(0.75 * n_frames) |
| for i in range(n_frames): |
| if c0 <= i <= c1: |
| phase = (i - c0) / max(c1 - c0, 1) |
| ramp = math.sin(phase * math.pi / 2) if phase < 0.30 else 1.0 |
| force[i] = ramp * rng.uniform(f_min * 1.5, f_max) + rng.normal(0, 0.12) |
| else: |
| force[i] = rng.uniform(0, f_min * 0.25) |
| return np.clip(force, 0, f_max + 1) |
|
|
|
|
| |
|
|
| def _inject_grasp_slip( |
| pos: np.ndarray, |
| vel: np.ndarray, |
| force: np.ndarray, |
| gripper: int, |
| rng: np.random.Generator, |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """ |
| Grasp slip at 60-70% of episode: |
| pos : gripper jumps open (discontinuity β₯ 0.18 rad) |
| vel : transient spike at slip frame, then near-zero (contact lost) |
| force: collapses post-slip |
| """ |
| n = pos.shape[0] |
| sf = int(rng.uniform(0.60, 0.70) * n) |
|
|
| |
| slip_mag = rng.uniform(0.18, 0.38) |
| pos[sf:, gripper] += slip_mag |
| pos[sf:, max(0, gripper - 1)] += rng.uniform(0.04, 0.12) |
|
|
| |
| v_med = float(np.median(np.abs(vel[:sf, gripper]))) + 0.05 |
| vel[sf, gripper] += v_med * rng.uniform(9.0, 12.0) |
| vel[sf + 1:, gripper] *= rng.uniform(0.04, 0.12) |
|
|
| force[sf:] = np.clip(rng.exponential(0.25, n - sf), 0, 0.5) |
| return pos, vel, force |
|
|
|
|
| def _inject_velocity_spike( |
| vel: np.ndarray, |
| rng: np.random.Generator, |
| ) -> np.ndarray: |
| """ |
| 1-2 frames with MAD z-score > 6.5 in the middle 60% of episode. |
| Uses robust reference stats (median / MAD) so the injected spike value |
| correctly yields the target z-score when detected later. |
| """ |
| n_frames, n_joints = vel.shape |
| n_spikes = int(rng.integers(1, 3)) |
|
|
| zone = np.arange(int(0.20 * n_frames), int(0.80 * n_frames)) |
| frames = rng.choice(zone, size=min(n_spikes, len(zone)), replace=False) |
|
|
| v_med = np.median(vel, axis=0) |
| v_mad = np.median(np.abs(vel - v_med), axis=0) * 1.4826 + 1e-4 |
|
|
| for sf in frames: |
| j = int(rng.integers(0, n_joints - 1)) |
| z_target = float(rng.uniform(7.5, 9.8)) |
| sign = 1.0 if rng.random() > 0.5 else -1.0 |
| |
| vel[sf, j] = v_med[j] + sign * z_target * v_mad[j] |
|
|
| return vel |
|
|
|
|
| def _inject_torque_saturation( |
| pos: np.ndarray, |
| vel: np.ndarray, |
| joint_limits: List[Tuple[float, float]], |
| torque_pool: List[int], |
| rng: np.random.Generator, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| One arm joint hits its angular limit and holds for β₯3 frames: |
| pos : clamped at limit (within sensor noise) |
| vel : near zero during saturation (joint stalled) |
| """ |
| n = pos.shape[0] |
| j = int(rng.choice(torque_pool)) |
| lo, hi = joint_limits[j] |
|
|
| mid_pos = pos[:, j].mean() |
| at_limit = hi if mid_pos > (lo + hi) / 2 else lo |
|
|
| sat_start = int(rng.uniform(0.25, 0.62) * n) |
| sat_len = int(rng.integers(3, 9)) |
| sat_end = min(sat_start + sat_len, n) |
|
|
| pos[sat_start:sat_end, j] = at_limit + rng.normal(0, 0.001, sat_end - sat_start) |
| vel[sat_start:sat_end, j] = rng.normal(0, 0.005, sat_end - sat_start) |
|
|
| return pos, vel |
|
|
|
|
| |
|
|
| def _build_episode( |
| ep_idx: int, |
| robot: str, |
| task: str, |
| failure_type: str, |
| cfg: Dict, |
| rng: np.random.Generator, |
| ) -> pd.DataFrame: |
| """ |
| Build one LeRobot-compatible episode DataFrame. |
| |
| Columns: state_0..5 (rad), action_0..5 (rad/s), timestamp (s), |
| episode_index, frame_index, task, use_for_training, |
| failure_type, quality_score (placeholder=0) |
| """ |
| n = cfg["n_joints"] |
| home = cfg["home"].copy() |
| target = cfg["targets"][task].copy() + rng.normal(0, 0.025, n) |
|
|
| pos, vel = _smooth_trajectory(task, home, target, FRAMES_PER_EPISODE, rng) |
| force = _force_profile(FRAMES_PER_EPISODE, task, cfg["force_range"], rng) |
|
|
| if failure_type == "grasp_slip": |
| pos, vel, force = _inject_grasp_slip(pos, vel, force, cfg["gripper_joint"], rng) |
| elif failure_type == "velocity_spike": |
| vel = _inject_velocity_spike(vel, rng) |
| elif failure_type == "torque_saturation": |
| pos, vel = _inject_torque_saturation(pos, vel, cfg["joint_limits"], |
| cfg["torque_joint_pool"], rng) |
|
|
| for j, (lo, hi) in enumerate(cfg["joint_limits"]): |
| pos[:, j] = np.clip(pos[:, j], lo, hi) |
|
|
| vel = np.clip(vel, -cfg["max_velocity"] * 2, cfg["max_velocity"] * 2) |
|
|
| label = failure_type if failure_type != "none" else "success" |
|
|
| return pd.DataFrame({ |
| **{f"state_{j}": pos[:, j] for j in range(n)}, |
| **{f"action_{j}": vel[:, j] for j in range(n)}, |
| "timestamp": np.arange(FRAMES_PER_EPISODE, dtype=float) * DT, |
| "episode_index": np.full(FRAMES_PER_EPISODE, ep_idx, dtype=int), |
| "frame_index": np.arange(FRAMES_PER_EPISODE, dtype=int), |
| "task": task, |
| "use_for_training": (failure_type == "none"), |
| "failure_type": label, |
| "quality_score": 0.0, |
| }) |
|
|
|
|
| |
|
|
| def generate_dataset( |
| robot: str, |
| task: str, |
| n_episodes: int, |
| success_rate: float, |
| force_min: float, |
| force_max: float, |
| enabled_failures: List[str], |
| seed: Optional[int] = None, |
| progress_callback = None, |
| ) -> pd.DataFrame: |
| """ |
| Generate a full synthetic dataset. |
| |
| Parameters |
| ---------- |
| robot : "SO-100" | "SO-101" | "Koch" |
| task : from TASKS_BY_ROBOT[robot] |
| n_episodes : 10β500 |
| success_rate : 0.0β1.0 |
| force_min/max : contact force range override (N) |
| enabled_failures : subset of FAILURE_TYPES |
| seed : reproducibility seed |
| progress_callback : optional (fraction: float, message: str) β None |
| |
| Returns |
| ------- |
| pd.DataFrame, n_episodes * FRAMES_PER_EPISODE rows |
| """ |
| if robot not in ROBOT_CONFIG: |
| raise ValueError(f"Unknown robot '{robot}'. Options: {list(ROBOT_CONFIG)}") |
| if task not in TASKS_BY_ROBOT.get(robot, []): |
| raise ValueError(f"Task '{task}' invalid for {robot}.") |
| if not enabled_failures: |
| enabled_failures = list(FAILURE_TYPES) |
|
|
| rng = np.random.default_rng(seed) |
| cfg = {**ROBOT_CONFIG[robot], "force_range": (force_min, force_max)} |
|
|
| n_success = int(round(n_episodes * success_rate)) |
| n_fail = n_episodes - n_success |
| pool = (list(enabled_failures) * (n_fail // max(len(enabled_failures), 1) + 1))[:n_fail] |
| manifest = ["none"] * n_success + pool |
| rng.shuffle(manifest) |
|
|
| frames = [] |
| for i, ft in enumerate(manifest): |
| if progress_callback and i % max(1, n_episodes // 20) == 0: |
| progress_callback(i / n_episodes, f"Generating episode {i + 1}/{n_episodes}β¦") |
| frames.append(_build_episode(i, robot, task, ft, cfg, rng)) |
|
|
| if progress_callback: |
| progress_callback(0.95, "Concatenating datasetβ¦") |
|
|
| df = pd.concat(frames, ignore_index=True) |
| |
| df["robot"] = robot |
| return df |
|
|
|
|
| |
|
|
| def _builtin_score(df: pd.DataFrame) -> Dict: |
| """ |
| Calibrated quality scorer using MAD z-scores (community thresholds). |
| |
| Detection criteria |
| ------------------ |
| velocity_spike : MAD z-score on action columns > 6.5 in any frame |
| grasp_slip : |Ξgripper_pos| > 0.10 rad in a single frame |
| torque_saturation : β₯3 consecutive frames with |velocity| < 0.012 rad/s |
| AND position within 3% of known joint limit |
| (uses 'robot' column if present, otherwise Β±0.90*Ο heuristic) |
| mismatch_fraction : (n_spike + slip + sat anomalous frames) / n_frames |
| flagged : mismatch_fraction > 0.50 |
| """ |
| act_cols = [f"action_{j}" for j in range(6)] |
| pos_cols = [f"state_{j}" for j in range(6)] |
|
|
| |
| robot_col = df.get("robot", pd.Series(dtype=str)) |
| first_robot = robot_col.iloc[0] if len(robot_col) else None |
| joint_limits = ( |
| ROBOT_CONFIG[first_robot]["joint_limits"] |
| if first_robot in ROBOT_CONFIG else [(-3.14, 3.14)] * 6 |
| ) |
|
|
| rows = [] |
| for ep_idx, ep in df.groupby("episode_index"): |
| acts = ep[act_cols].values |
| poss = ep[pos_cols].values |
| n_f = len(ep) |
|
|
| |
| z = _robust_z(acts) |
| spike_mask = z.max(axis=1) > VELOCITY_SPIKE_Z |
| n_spikes = int(spike_mask.sum()) |
| max_z = float(z.max()) |
|
|
| |
| gripper_delta = np.abs(np.diff(poss[:, 5])) |
| n_slip_frames = int((gripper_delta > 0.10).sum()) |
|
|
| |
| sat_joints = 0 |
| for j in range(5): |
| lo, hi = joint_limits[j] |
| limit_tol = abs(hi - lo) * 0.03 |
| consec = 0 |
| for i in range(n_f): |
| near_limit = ( |
| poss[i, j] >= hi - limit_tol or |
| poss[i, j] <= lo + limit_tol |
| ) |
| near_zero = abs(acts[i, j]) < 0.012 |
| if near_limit and near_zero: |
| consec += 1 |
| if consec >= 3: |
| sat_joints += 1 |
| break |
| else: |
| consec = 0 |
|
|
| |
| n_anomalous = n_spikes + min(1, n_slip_frames) + min(1, sat_joints) |
| mismatch_fraction = n_anomalous / max(n_f, 1) |
| flagged = mismatch_fraction > MISMATCH_THRESHOLD |
|
|
| penalty = n_spikes * 9.0 + n_slip_frames * 7.0 + sat_joints * 6.0 + mismatch_fraction * 18.0 |
| ep_score = float(np.clip(100.0 - penalty, 0, 100)) |
|
|
| rows.append({ |
| "episode_index": ep_idx, |
| "failure_type": ep["failure_type"].iloc[0], |
| "n_spike_frames": n_spikes, |
| "max_velocity_z": round(max_z, 2), |
| "n_slip_frames": n_slip_frames, |
| "n_sat_joints": sat_joints, |
| "mismatch_fraction": round(mismatch_fraction, 4), |
| "episode_score": round(ep_score, 2), |
| "flagged": flagged, |
| }) |
|
|
| ep_df = pd.DataFrame(rows) |
| n_eps = len(ep_df) |
| n_flagged = int(ep_df["flagged"].sum()) |
| n_passed = n_eps - n_flagged |
| mean_score = float(ep_df["episode_score"].mean()) |
| band = "Clean" if mean_score >= 80 else ("Review" if mean_score >= 55 else "Flagged") |
|
|
| failure_breakdown = ( |
| ep_df[ep_df["failure_type"] != "success"] |
| .groupby("failure_type").size().to_dict() |
| ) |
|
|
| return { |
| "overall_score": round(mean_score, 2), |
| "band": band, |
| "n_episodes": n_eps, |
| "n_passed": n_passed, |
| "n_flagged": n_flagged, |
| "mean_mismatch": round(float(ep_df["mismatch_fraction"].mean()), 4), |
| "failure_breakdown": failure_breakdown, |
| "episode_details": ep_df, |
| "scorer_used": "builtin", |
| } |
|
|
|
|
| |
|
|
| def score_dataset(df: pd.DataFrame, progress_callback=None) -> Dict: |
| """Score a dataset; prefers external scorer if SCORER_PATH resolves.""" |
| if progress_callback: |
| progress_callback(0.05, "Running quality checksβ¦") |
|
|
| if EXTERNAL_SCORER: |
| try: |
| result = _ext_score_dataset(df) |
| result["scorer_used"] = "external" |
| if progress_callback: |
| progress_callback(1.0, "Scoring complete (external scorer).") |
| return result |
| except Exception as exc: |
| print(f"[RoboGen] External scorer failed ({exc}); using built-in.") |
|
|
| result = _builtin_score(df) |
| if progress_callback: |
| progress_callback(1.0, "Scoring complete.") |
| return result |
|
|
|
|
| def annotate_quality_scores(df: pd.DataFrame, score_result: Dict) -> pd.DataFrame: |
| """Merge per-episode quality scores into the main DataFrame.""" |
| ep_scores = ( |
| score_result["episode_details"][["episode_index", "episode_score"]] |
| .rename(columns={"episode_score": "quality_score"}) |
| ) |
| return df.drop(columns=["quality_score"], errors="ignore").merge( |
| ep_scores, on="episode_index", how="left" |
| ) |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| pd.set_option("display.max_columns", 20) |
| pd.set_option("display.width", 160) |
| pd.set_option("display.float_format", "{:+.4f}".format) |
|
|
| DEMO = [ |
| ("SO-100", "pick_and_place", "none", "SUCCESS "), |
| ("SO-100", "pick_and_place", "grasp_slip", "GRASP SLIP "), |
| ("SO-100", "pick_and_place", "velocity_spike", "VELOCITY SPIKE "), |
| ("SO-100", "pick_and_place", "torque_saturation", "TORQUE SAT. "), |
| ] |
|
|
| rng = np.random.default_rng(42) |
| cfg = ROBOT_CONFIG["SO-100"].copy() |
|
|
| print("=" * 92) |
| print(" RoboGen β generator.py validation") |
| print(" state_* = joint positions (rad) | action_* = velocity commands (rad/s)") |
| print("=" * 92) |
|
|
| for ep_idx, (robot, task, ft, label) in enumerate(DEMO): |
| print(f"\n{'β'*92}") |
| print(f" Episode {ep_idx} β {label}β {robot} / {task}") |
| print(f"{'β'*92}") |
| ep = _build_episode(ep_idx, robot, task, ft, cfg, rng) |
|
|
| cols = ["frame_index", "state_0", "state_4", "state_5", |
| "action_0", "action_4", "action_5"] |
| sample = pd.concat([ep.head(4), ep.iloc[28:33], ep.tail(4)])[cols] |
| print(sample.to_string(index=False)) |
|
|
| acts = ep[[f"action_{j}" for j in range(6)]].values |
| poss = ep[[f"state_{j}" for j in range(6)]].values |
|
|
| z = _robust_z(acts) |
| n_spikes = int((z.max(1) > VELOCITY_SPIKE_Z).sum()) |
| max_z = z.max() |
| n_slip = int((np.abs(np.diff(poss[:, 5])) > 0.10).sum()) |
|
|
| jl = cfg["joint_limits"] |
| sat_joints = 0 |
| for j in range(5): |
| lo, hi = jl[j] |
| tol = abs(hi - lo) * 0.03 |
| consec = 0 |
| for i in range(len(ep)): |
| nl = poss[i, j] >= hi - tol or poss[i, j] <= lo + tol |
| nz = abs(acts[i, j]) < 0.012 |
| consec = consec + 1 if (nl and nz) else 0 |
| if consec >= 3: sat_joints += 1; break |
|
|
| print(f"\n Vel (action) β mean={acts.mean():+.3f} std={acts.std():.3f} " |
| f"max_MAD_z={max_z:.2f} spike_frames(z>{VELOCITY_SPIKE_Z})={n_spikes}") |
| print(f" Gripper pos β max_Ξ={np.abs(np.diff(poss[:,5])).max():.4f} rad " |
| f"slip_frames(Ξ>0.10)={n_slip} sat_joints={sat_joints}") |
|
|
| print(f"\n{'='*92}") |
| print(" 10-episode mini-dataset (60% success, all failure modes, seed=99)") |
| print(f"{'='*92}") |
| mini = generate_dataset( |
| robot="SO-100", task="pick_and_place", |
| n_episodes=10, success_rate=0.60, |
| force_min=1.0, force_max=10.0, |
| enabled_failures=list(FAILURE_TYPES), seed=99, |
| ) |
| res = score_dataset(mini) |
| pd.set_option("display.float_format", "{:.2f}".format) |
| print(f"\n Overall score : {res['overall_score']:.1f}/100") |
| print(f" Band : {res['band']}") |
| print(f" Passed/Flagged : {res['n_passed']} / {res['n_flagged']}") |
| print(f" Mean mismatch : {res['mean_mismatch']:.4f}") |
| print(f" Failures : {res['failure_breakdown']}") |
| print(f" Scorer : {res['scorer_used']}") |
| print() |
| cols = ["episode_index","failure_type","n_spike_frames","max_velocity_z", |
| "n_slip_frames","n_sat_joints","mismatch_fraction","episode_score","flagged"] |
| print(res["episode_details"][cols].to_string(index=False)) |
| print(f"\n{'='*92}\n") |
|
|