robogen / generator.py
HaptalAI's picture
Initial RoboGen v1
f734d80 verified
Raw
History Blame Contribute Delete
28.2 kB
"""
RoboGen β€” Synthetic Robotics Dataset Generator
generator.py: Physically-plausible episode generation for LeRobot-format parquet datasets.
Schema
------
state_0..state_5 β€” observed joint positions at time t (rad)
action_0..action_5 β€” joint velocity COMMANDS at time t (rad/s)
Physics model
-------------
1. Task-specific waypoints (approach β†’ contact β†’ grasp/push β†’ retract)
2. Cubic spline interpolation, clamped boundary (zero velocity at endpoints)
3. Velocities = analytical first derivative of position spline (rad/s)
4. Sensor noise: Gaussian Οƒ_pos=0.002 rad, Οƒ_vel=0.004 rad/s
5. Contact force: spring-damper ramp during contact window
Failure modes
-------------
grasp_slip β€” smooth until 60-70%, then position discontinuity + velocity collapse
velocity_spike β€” 1-2 frames with MAD z-score > 6.5 on joint velocity
torque_saturation β€” joint clamped at limit for β‰₯3 frames (velocity near zero, pos at limit)
Z-score note: both injection and detection use robust MAD z-scores so that a single
spike value does not inflate the reference statistics.
Scorer
------
Set SCORER_PATH env var to ~/Downloads/quality_scorer root; falls back to built-in.
"""
from __future__ import annotations
import os
import sys
from typing import Dict, List, Optional, Tuple
import math
import numpy as np
import pandas as pd
from scipy.interpolate import CubicSpline
# ── Scorer import ────────────────────────────────────────────────────────────
_SCORER_PATH = os.environ.get(
"SCORER_PATH", os.path.expanduser("~/Downloads/quality_scorer")
)
if _SCORER_PATH not in sys.path:
sys.path.insert(0, _SCORER_PATH)
try:
from scorer.scorer import score_dataset as _ext_score_dataset # type: ignore
EXTERNAL_SCORER = True
except ImportError:
EXTERNAL_SCORER = False
# ── Robot / task configuration ───────────────────────────────────────────────
ROBOT_CONFIG: Dict[str, Dict] = {
"SO-100": {
"n_joints": 6,
# Per-joint [lo, hi] limits in radians
"joint_limits": [
(-3.14, 3.14), # j0 base rotation
(-1.57, 1.57), # j1 shoulder pitch
(-0.20, 2.80), # j2 elbow
(-3.14, 3.14), # j3 wrist roll
(-1.57, 1.57), # j4 wrist pitch
(-0.10, 1.00), # j5 gripper
],
"home": np.array([0.00, -0.52, 1.20, 0.00, -0.72, 0.05]),
"targets": {
"pick_and_place": np.array([ 0.30, 0.20, 0.80, 0.10, -0.30, 0.50]),
"push_object": np.array([ 0.50, 0.12, 1.00, 0.00, -0.50, 0.00]),
"grasp_and_lift": np.array([ 0.22, 0.30, 0.48, 0.18, -0.40, 0.28]),
"stacking": np.array([ 0.12, 0.42, 0.68, 0.14, -0.30, 0.18]),
},
"gripper_joint": 5,
"torque_joint_pool": [1, 2, 3, 4], # joints that realistically saturate
"max_velocity": 1.5, # rad/s
"force_range": (0.5, 12.0),
},
"SO-101": {
"n_joints": 6,
"joint_limits": [
(-3.14, 3.14), (-1.57, 1.57), (-0.20, 2.80),
(-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00),
],
"home": np.array([0.00, -0.42, 1.12, 0.00, -0.62, 0.05]),
"targets": {
"pick_and_place": np.array([ 0.35, 0.24, 0.76, 0.14, -0.26, 0.46]),
"push_object": np.array([ 0.54, 0.14, 0.96, 0.04, -0.46, 0.04]),
"grasp_and_lift": np.array([ 0.24, 0.34, 0.46, 0.24, -0.36, 0.24]),
"stacking": np.array([ 0.14, 0.44, 0.66, 0.19, -0.26, 0.14]),
},
"gripper_joint": 5,
"torque_joint_pool": [1, 2, 3, 4],
"max_velocity": 1.5,
"force_range": (0.5, 12.0),
},
"Koch": {
"n_joints": 6,
"joint_limits": [
(-3.14, 3.14), (-1.57, 1.57), (-0.10, 2.50),
(-3.14, 3.14), (-1.57, 1.57), (-0.10, 1.00),
],
"home": np.array([0.00, -0.60, 1.28, 0.00, -0.82, 0.05]),
"targets": {
"pick_and_place": np.array([ 0.40, 0.16, 0.88, 0.06, -0.42, 0.58]),
"drawer_open_close": np.array([ 0.00, 0.10, 1.10, 0.00, -0.60, 0.00]),
"grasp_and_lift": np.array([ 0.28, 0.26, 0.54, 0.19, -0.46, 0.34]),
},
"gripper_joint": 5,
"torque_joint_pool": [1, 2, 3],
"max_velocity": 1.2,
"force_range": (0.3, 10.0),
},
}
TASKS_BY_ROBOT: Dict[str, List[str]] = {
"SO-100": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"],
"SO-101": ["pick_and_place", "push_object", "grasp_and_lift", "stacking"],
"Koch": ["pick_and_place", "drawer_open_close", "grasp_and_lift"],
}
FAILURE_TYPES = ["grasp_slip", "velocity_spike", "torque_saturation"]
FRAMES_PER_EPISODE = 50
DT = 0.02 # seconds/frame at 50 Hz
# Community-calibrated thresholds (MAD z-score basis)
VELOCITY_SPIKE_Z = 6.5
MISMATCH_THRESHOLD = 0.50
# ── Robust statistics helpers ─────────────────────────────────────────────────
def _robust_z(values: np.ndarray) -> np.ndarray:
"""
Compute per-column MAD z-scores: z = |x - median| / (MAD * 1.4826)
The 1.4826 factor makes MAD a consistent estimator of Οƒ under normality.
A single outlier barely affects median or MAD, so injected spikes retain
their true z-score instead of being pulled down by contaminated reference stats.
"""
med = np.median(values, axis=0)
mad = np.median(np.abs(values - med), axis=0)
scale = mad * 1.4826 + 1e-6
return np.abs(values - med) / scale
# ── Waypoint factory ──────────────────────────────────────────────────────────
def _build_waypoints(
task: str,
home: np.ndarray,
target: np.ndarray,
rng: np.random.Generator,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Return (t_knots_normalised [0..1], waypoint_matrix [n_knots, n_joints]).
Waypoints encode task-specific motion primitives; small Gaussian noise
ensures episode-to-episode variation.
"""
n = len(home)
ns = lambda s=0.008: rng.normal(0, s, n)
if task == "pick_and_place":
approach = home + 0.25 * (target - home) + ns()
pre_grasp = home + 0.44 * (target - home) + ns()
grasp = home + 0.60 * (target - home) + ns(0.003)
lift = grasp.copy(); lift[2] = lift[2] * 0.82 + ns(0.003)[2]
transport = home + 0.82 * (target - home) + ns()
place = target + ns(0.003)
t = np.array([0.0, 0.18, 0.38, 0.55, 0.68, 0.83, 1.0])
w = np.vstack([home, approach, pre_grasp, grasp, lift, transport, place])
elif task == "push_object":
approach = home + 0.35 * (target - home) + ns()
contact = home + 0.56 * (target - home) + ns(0.004)
push_end = target + ns(0.006)
retract = home + 0.20 * (target - home) + ns()
t = np.array([0.0, 0.28, 0.52, 0.73, 1.0])
w = np.vstack([home, approach, contact, push_end, retract])
elif task == "grasp_and_lift":
descend = home + 0.28 * (target - home) + ns()
pre_grasp = home + 0.52 * (target - home) + ns()
grasp = target + ns(0.003)
lift = grasp.copy(); lift[2] -= 0.28 + ns(0.005)[2]
t = np.array([0.0, 0.22, 0.46, 0.68, 1.0])
w = np.vstack([home, descend, pre_grasp, grasp, lift])
elif task == "stacking":
grasp_pos = home + 0.38 * (target - home) + ns()
lift_pos = grasp_pos.copy(); lift_pos[2] -= 0.32
hover = home + 0.70 * (target - home) + ns()
lower = target + ns(0.004)
release = lower.copy(); release[5] += 0.18
t = np.array([0.0, 0.20, 0.40, 0.60, 0.82, 1.0])
w = np.vstack([home, grasp_pos, lift_pos, hover, lower, release])
elif task == "drawer_open_close":
extend = home + 0.28 * (target - home) + ns()
at_handle = home + 0.52 * (target - home) + ns(0.004)
pull_mid = home + 0.74 * (target - home) + ns()
open_pos = target + ns(0.006)
t = np.array([0.0, 0.26, 0.50, 0.74, 1.0])
w = np.vstack([home, extend, at_handle, pull_mid, open_pos])
else:
mid = home + 0.50 * (target - home) + ns()
t = np.array([0.0, 0.50, 1.0])
w = np.vstack([home, mid, target])
return t, w
# ── Smooth trajectory via cubic spline ────────────────────────────────────────
def _smooth_trajectory(
task: str,
home: np.ndarray,
target: np.ndarray,
n_frames: int,
rng: np.random.Generator,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Returns:
positions (n_frames, n_joints) rad β€” joint positions
velocities (n_frames, n_joints) rad/s β€” analytical spline derivative
"""
n_joints = len(home)
t_norm, wpts = _build_waypoints(task, home, target, rng)
t_knots = t_norm * (n_frames - 1)
t_frames = np.arange(n_frames, dtype=float)
pos = np.zeros((n_frames, n_joints))
vel = np.zeros((n_frames, n_joints))
for j in range(n_joints):
cs = CubicSpline(t_knots, wpts[:, j], bc_type="clamped")
pos[:, j] = cs(t_frames)
vel[:, j] = cs(t_frames, 1) / DT # derivative in rad/frame β†’ rad/s
pos += rng.normal(0, 0.002, pos.shape)
vel += rng.normal(0, 0.004, vel.shape)
return pos, vel
# ── Contact force model ───────────────────────────────────────────────────────
def _force_profile(
n_frames: int,
task: str,
force_range: Tuple[float, float],
rng: np.random.Generator,
) -> np.ndarray:
"""Spring-damper contact force, non-zero during contact window."""
f_min, f_max = force_range
force = np.zeros(n_frames)
if task not in {"pick_and_place", "grasp_and_lift", "stacking",
"push_object", "drawer_open_close"}:
return force
c0, c1 = int(0.30 * n_frames), int(0.75 * n_frames)
for i in range(n_frames):
if c0 <= i <= c1:
phase = (i - c0) / max(c1 - c0, 1)
ramp = math.sin(phase * math.pi / 2) if phase < 0.30 else 1.0
force[i] = ramp * rng.uniform(f_min * 1.5, f_max) + rng.normal(0, 0.12)
else:
force[i] = rng.uniform(0, f_min * 0.25)
return np.clip(force, 0, f_max + 1)
# ── Failure injectors ─────────────────────────────────────────────────────────
def _inject_grasp_slip(
pos: np.ndarray,
vel: np.ndarray,
force: np.ndarray,
gripper: int,
rng: np.random.Generator,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Grasp slip at 60-70% of episode:
pos : gripper jumps open (discontinuity β‰₯ 0.18 rad)
vel : transient spike at slip frame, then near-zero (contact lost)
force: collapses post-slip
"""
n = pos.shape[0]
sf = int(rng.uniform(0.60, 0.70) * n)
# Position discontinuity β€” gripper opens
slip_mag = rng.uniform(0.18, 0.38)
pos[sf:, gripper] += slip_mag
pos[sf:, max(0, gripper - 1)] += rng.uniform(0.04, 0.12)
# Velocity: brief spike at slip frame then collapse
v_med = float(np.median(np.abs(vel[:sf, gripper]))) + 0.05
vel[sf, gripper] += v_med * rng.uniform(9.0, 12.0) # big transient
vel[sf + 1:, gripper] *= rng.uniform(0.04, 0.12)
force[sf:] = np.clip(rng.exponential(0.25, n - sf), 0, 0.5)
return pos, vel, force
def _inject_velocity_spike(
vel: np.ndarray,
rng: np.random.Generator,
) -> np.ndarray:
"""
1-2 frames with MAD z-score > 6.5 in the middle 60% of episode.
Uses robust reference stats (median / MAD) so the injected spike value
correctly yields the target z-score when detected later.
"""
n_frames, n_joints = vel.shape
n_spikes = int(rng.integers(1, 3))
zone = np.arange(int(0.20 * n_frames), int(0.80 * n_frames))
frames = rng.choice(zone, size=min(n_spikes, len(zone)), replace=False)
v_med = np.median(vel, axis=0)
v_mad = np.median(np.abs(vel - v_med), axis=0) * 1.4826 + 1e-4
for sf in frames:
j = int(rng.integers(0, n_joints - 1)) # skip gripper
z_target = float(rng.uniform(7.5, 9.8))
sign = 1.0 if rng.random() > 0.5 else -1.0
# spike = median + z_target * sigma_MAD β†’ guaranteed MAD z-score β‰ˆ z_target
vel[sf, j] = v_med[j] + sign * z_target * v_mad[j]
return vel
def _inject_torque_saturation(
pos: np.ndarray,
vel: np.ndarray,
joint_limits: List[Tuple[float, float]],
torque_pool: List[int],
rng: np.random.Generator,
) -> Tuple[np.ndarray, np.ndarray]:
"""
One arm joint hits its angular limit and holds for β‰₯3 frames:
pos : clamped at limit (within sensor noise)
vel : near zero during saturation (joint stalled)
"""
n = pos.shape[0]
j = int(rng.choice(torque_pool))
lo, hi = joint_limits[j]
mid_pos = pos[:, j].mean()
at_limit = hi if mid_pos > (lo + hi) / 2 else lo
sat_start = int(rng.uniform(0.25, 0.62) * n)
sat_len = int(rng.integers(3, 9))
sat_end = min(sat_start + sat_len, n)
pos[sat_start:sat_end, j] = at_limit + rng.normal(0, 0.001, sat_end - sat_start)
vel[sat_start:sat_end, j] = rng.normal(0, 0.005, sat_end - sat_start)
return pos, vel
# ── Episode builder ───────────────────────────────────────────────────────────
def _build_episode(
ep_idx: int,
robot: str,
task: str,
failure_type: str,
cfg: Dict,
rng: np.random.Generator,
) -> pd.DataFrame:
"""
Build one LeRobot-compatible episode DataFrame.
Columns: state_0..5 (rad), action_0..5 (rad/s), timestamp (s),
episode_index, frame_index, task, use_for_training,
failure_type, quality_score (placeholder=0)
"""
n = cfg["n_joints"]
home = cfg["home"].copy()
target = cfg["targets"][task].copy() + rng.normal(0, 0.025, n)
pos, vel = _smooth_trajectory(task, home, target, FRAMES_PER_EPISODE, rng)
force = _force_profile(FRAMES_PER_EPISODE, task, cfg["force_range"], rng)
if failure_type == "grasp_slip":
pos, vel, force = _inject_grasp_slip(pos, vel, force, cfg["gripper_joint"], rng)
elif failure_type == "velocity_spike":
vel = _inject_velocity_spike(vel, rng)
elif failure_type == "torque_saturation":
pos, vel = _inject_torque_saturation(pos, vel, cfg["joint_limits"],
cfg["torque_joint_pool"], rng)
for j, (lo, hi) in enumerate(cfg["joint_limits"]):
pos[:, j] = np.clip(pos[:, j], lo, hi)
vel = np.clip(vel, -cfg["max_velocity"] * 2, cfg["max_velocity"] * 2)
label = failure_type if failure_type != "none" else "success"
return pd.DataFrame({
**{f"state_{j}": pos[:, j] for j in range(n)},
**{f"action_{j}": vel[:, j] for j in range(n)},
"timestamp": np.arange(FRAMES_PER_EPISODE, dtype=float) * DT,
"episode_index": np.full(FRAMES_PER_EPISODE, ep_idx, dtype=int),
"frame_index": np.arange(FRAMES_PER_EPISODE, dtype=int),
"task": task,
"use_for_training": (failure_type == "none"),
"failure_type": label,
"quality_score": 0.0,
})
# ── Dataset generator ─────────────────────────────────────────────────────────
def generate_dataset(
robot: str,
task: str,
n_episodes: int,
success_rate: float,
force_min: float,
force_max: float,
enabled_failures: List[str],
seed: Optional[int] = None,
progress_callback = None,
) -> pd.DataFrame:
"""
Generate a full synthetic dataset.
Parameters
----------
robot : "SO-100" | "SO-101" | "Koch"
task : from TASKS_BY_ROBOT[robot]
n_episodes : 10–500
success_rate : 0.0–1.0
force_min/max : contact force range override (N)
enabled_failures : subset of FAILURE_TYPES
seed : reproducibility seed
progress_callback : optional (fraction: float, message: str) β†’ None
Returns
-------
pd.DataFrame, n_episodes * FRAMES_PER_EPISODE rows
"""
if robot not in ROBOT_CONFIG:
raise ValueError(f"Unknown robot '{robot}'. Options: {list(ROBOT_CONFIG)}")
if task not in TASKS_BY_ROBOT.get(robot, []):
raise ValueError(f"Task '{task}' invalid for {robot}.")
if not enabled_failures:
enabled_failures = list(FAILURE_TYPES)
rng = np.random.default_rng(seed)
cfg = {**ROBOT_CONFIG[robot], "force_range": (force_min, force_max)}
n_success = int(round(n_episodes * success_rate))
n_fail = n_episodes - n_success
pool = (list(enabled_failures) * (n_fail // max(len(enabled_failures), 1) + 1))[:n_fail]
manifest = ["none"] * n_success + pool
rng.shuffle(manifest)
frames = []
for i, ft in enumerate(manifest):
if progress_callback and i % max(1, n_episodes // 20) == 0:
progress_callback(i / n_episodes, f"Generating episode {i + 1}/{n_episodes}…")
frames.append(_build_episode(i, robot, task, ft, cfg, rng))
if progress_callback:
progress_callback(0.95, "Concatenating dataset…")
df = pd.concat(frames, ignore_index=True)
# Embed robot metadata column so scorer can look up joint limits
df["robot"] = robot
return df
# ── Built-in scorer ───────────────────────────────────────────────────────────
def _builtin_score(df: pd.DataFrame) -> Dict:
"""
Calibrated quality scorer using MAD z-scores (community thresholds).
Detection criteria
------------------
velocity_spike : MAD z-score on action columns > 6.5 in any frame
grasp_slip : |Ξ”gripper_pos| > 0.10 rad in a single frame
torque_saturation : β‰₯3 consecutive frames with |velocity| < 0.012 rad/s
AND position within 3% of known joint limit
(uses 'robot' column if present, otherwise Β±0.90*Ο€ heuristic)
mismatch_fraction : (n_spike + slip + sat anomalous frames) / n_frames
flagged : mismatch_fraction > 0.50
"""
act_cols = [f"action_{j}" for j in range(6)]
pos_cols = [f"state_{j}" for j in range(6)]
# Try to get joint limits from robot column
robot_col = df.get("robot", pd.Series(dtype=str))
first_robot = robot_col.iloc[0] if len(robot_col) else None
joint_limits = (
ROBOT_CONFIG[first_robot]["joint_limits"]
if first_robot in ROBOT_CONFIG else [(-3.14, 3.14)] * 6
)
rows = []
for ep_idx, ep in df.groupby("episode_index"):
acts = ep[act_cols].values
poss = ep[pos_cols].values
n_f = len(ep)
# ── Velocity spike (MAD z-score) ──────────────────────────────────
z = _robust_z(acts)
spike_mask = z.max(axis=1) > VELOCITY_SPIKE_Z
n_spikes = int(spike_mask.sum())
max_z = float(z.max())
# ── Grasp slip ────────────────────────────────────────────────────
gripper_delta = np.abs(np.diff(poss[:, 5]))
n_slip_frames = int((gripper_delta > 0.10).sum())
# ── Torque saturation (velocity near-zero AND position near limit) ─
sat_joints = 0
for j in range(5): # joints 0-4; skip gripper (j=5)
lo, hi = joint_limits[j]
limit_tol = abs(hi - lo) * 0.03 # 3% of range from limit
consec = 0
for i in range(n_f):
near_limit = (
poss[i, j] >= hi - limit_tol or
poss[i, j] <= lo + limit_tol
)
near_zero = abs(acts[i, j]) < 0.012
if near_limit and near_zero:
consec += 1
if consec >= 3:
sat_joints += 1
break
else:
consec = 0
# ── Mismatch fraction & episode score ─────────────────────────────
n_anomalous = n_spikes + min(1, n_slip_frames) + min(1, sat_joints)
mismatch_fraction = n_anomalous / max(n_f, 1)
flagged = mismatch_fraction > MISMATCH_THRESHOLD
penalty = n_spikes * 9.0 + n_slip_frames * 7.0 + sat_joints * 6.0 + mismatch_fraction * 18.0
ep_score = float(np.clip(100.0 - penalty, 0, 100))
rows.append({
"episode_index": ep_idx,
"failure_type": ep["failure_type"].iloc[0],
"n_spike_frames": n_spikes,
"max_velocity_z": round(max_z, 2),
"n_slip_frames": n_slip_frames,
"n_sat_joints": sat_joints,
"mismatch_fraction": round(mismatch_fraction, 4),
"episode_score": round(ep_score, 2),
"flagged": flagged,
})
ep_df = pd.DataFrame(rows)
n_eps = len(ep_df)
n_flagged = int(ep_df["flagged"].sum())
n_passed = n_eps - n_flagged
mean_score = float(ep_df["episode_score"].mean())
band = "Clean" if mean_score >= 80 else ("Review" if mean_score >= 55 else "Flagged")
failure_breakdown = (
ep_df[ep_df["failure_type"] != "success"]
.groupby("failure_type").size().to_dict()
)
return {
"overall_score": round(mean_score, 2),
"band": band,
"n_episodes": n_eps,
"n_passed": n_passed,
"n_flagged": n_flagged,
"mean_mismatch": round(float(ep_df["mismatch_fraction"].mean()), 4),
"failure_breakdown": failure_breakdown,
"episode_details": ep_df,
"scorer_used": "builtin",
}
# ── Public API ────────────────────────────────────────────────────────────────
def score_dataset(df: pd.DataFrame, progress_callback=None) -> Dict:
"""Score a dataset; prefers external scorer if SCORER_PATH resolves."""
if progress_callback:
progress_callback(0.05, "Running quality checks…")
if EXTERNAL_SCORER:
try:
result = _ext_score_dataset(df)
result["scorer_used"] = "external"
if progress_callback:
progress_callback(1.0, "Scoring complete (external scorer).")
return result
except Exception as exc:
print(f"[RoboGen] External scorer failed ({exc}); using built-in.")
result = _builtin_score(df)
if progress_callback:
progress_callback(1.0, "Scoring complete.")
return result
def annotate_quality_scores(df: pd.DataFrame, score_result: Dict) -> pd.DataFrame:
"""Merge per-episode quality scores into the main DataFrame."""
ep_scores = (
score_result["episode_details"][["episode_index", "episode_score"]]
.rename(columns={"episode_score": "quality_score"})
)
return df.drop(columns=["quality_score"], errors="ignore").merge(
ep_scores, on="episode_index", how="left"
)
# ── CLI demo ──────────────────────────────────────────────────────────────────
if __name__ == "__main__":
pd.set_option("display.max_columns", 20)
pd.set_option("display.width", 160)
pd.set_option("display.float_format", "{:+.4f}".format)
DEMO = [
("SO-100", "pick_and_place", "none", "SUCCESS "),
("SO-100", "pick_and_place", "grasp_slip", "GRASP SLIP "),
("SO-100", "pick_and_place", "velocity_spike", "VELOCITY SPIKE "),
("SO-100", "pick_and_place", "torque_saturation", "TORQUE SAT. "),
]
rng = np.random.default_rng(42)
cfg = ROBOT_CONFIG["SO-100"].copy()
print("=" * 92)
print(" RoboGen β€” generator.py validation")
print(" state_* = joint positions (rad) | action_* = velocity commands (rad/s)")
print("=" * 92)
for ep_idx, (robot, task, ft, label) in enumerate(DEMO):
print(f"\n{'─'*92}")
print(f" Episode {ep_idx} β”‚ {label}β”‚ {robot} / {task}")
print(f"{'─'*92}")
ep = _build_episode(ep_idx, robot, task, ft, cfg, rng)
cols = ["frame_index", "state_0", "state_4", "state_5",
"action_0", "action_4", "action_5"]
sample = pd.concat([ep.head(4), ep.iloc[28:33], ep.tail(4)])[cols]
print(sample.to_string(index=False))
acts = ep[[f"action_{j}" for j in range(6)]].values
poss = ep[[f"state_{j}" for j in range(6)]].values
z = _robust_z(acts)
n_spikes = int((z.max(1) > VELOCITY_SPIKE_Z).sum())
max_z = z.max()
n_slip = int((np.abs(np.diff(poss[:, 5])) > 0.10).sum())
jl = cfg["joint_limits"]
sat_joints = 0
for j in range(5):
lo, hi = jl[j]
tol = abs(hi - lo) * 0.03
consec = 0
for i in range(len(ep)):
nl = poss[i, j] >= hi - tol or poss[i, j] <= lo + tol
nz = abs(acts[i, j]) < 0.012
consec = consec + 1 if (nl and nz) else 0
if consec >= 3: sat_joints += 1; break
print(f"\n Vel (action) β”‚ mean={acts.mean():+.3f} std={acts.std():.3f} "
f"max_MAD_z={max_z:.2f} spike_frames(z>{VELOCITY_SPIKE_Z})={n_spikes}")
print(f" Gripper pos β”‚ max_Ξ”={np.abs(np.diff(poss[:,5])).max():.4f} rad "
f"slip_frames(Ξ”>0.10)={n_slip} sat_joints={sat_joints}")
print(f"\n{'='*92}")
print(" 10-episode mini-dataset (60% success, all failure modes, seed=99)")
print(f"{'='*92}")
mini = generate_dataset(
robot="SO-100", task="pick_and_place",
n_episodes=10, success_rate=0.60,
force_min=1.0, force_max=10.0,
enabled_failures=list(FAILURE_TYPES), seed=99,
)
res = score_dataset(mini)
pd.set_option("display.float_format", "{:.2f}".format)
print(f"\n Overall score : {res['overall_score']:.1f}/100")
print(f" Band : {res['band']}")
print(f" Passed/Flagged : {res['n_passed']} / {res['n_flagged']}")
print(f" Mean mismatch : {res['mean_mismatch']:.4f}")
print(f" Failures : {res['failure_breakdown']}")
print(f" Scorer : {res['scorer_used']}")
print()
cols = ["episode_index","failure_type","n_spike_frames","max_velocity_z",
"n_slip_frames","n_sat_joints","mismatch_fraction","episode_score","flagged"]
print(res["episode_details"][cols].to_string(index=False))
print(f"\n{'='*92}\n")