| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
|
|
| import numpy as np |
|
|
|
|
| @dataclass |
| class BenchmarkMetrics: |
| per_task_success: dict[str, float] |
| mean_success: float |
| visibility_integral: float | None = None |
| corridor_availability: float | None = None |
| reocclusion_rate: float | None = None |
| persistence_horizon_mae: float | None = None |
| disturbance_cost: float | None = None |
|
|
|
|
| @dataclass |
| class PlannerDiagnostics: |
| top1_accuracy: float |
| regret: float |
| risk_calibration_mse: float |
| role_collapse_rate: float |
| proposal_diversity: float | None = None |
| planner_score_utility_spearman: float | None = None |
| left_right_equivariance_error: float | None = None |
| belief_calibration_brier: float | None = None |
| reocclusion_calibration_brier: float | None = None |
| support_stability_mae: float | None = None |
| clearance_auc: float | None = None |
|
|
|
|
| def mean_success(per_task_success: dict[str, float]) -> float: |
| if not per_task_success: |
| return 0.0 |
| return float(np.mean(list(per_task_success.values()))) |
|
|
|
|
| def visibility_integral(curve: np.ndarray) -> float: |
| curve = np.asarray(curve, dtype=np.float32) |
| return float(curve.sum()) |
|
|
|
|
| def corridor_availability(corridor_open: np.ndarray) -> float: |
| corridor_open = np.asarray(corridor_open, dtype=np.float32) |
| return float(corridor_open.mean()) |
|
|
|
|
| def reocclusion_rate(corridor_open: np.ndarray) -> float: |
| corridor_open = np.asarray(corridor_open, dtype=np.float32) |
| if corridor_open.size < 2: |
| return 0.0 |
| return float(np.logical_and(corridor_open[:-1] > 0.5, corridor_open[1:] <= 0.5).mean()) |
|
|
|
|
| def persistence_horizon_mae(prediction: np.ndarray, target: np.ndarray) -> float: |
| prediction = np.asarray(prediction, dtype=np.float32) |
| target = np.asarray(target, dtype=np.float32) |
| return float(np.abs(prediction - target).mean()) |
|
|
|
|
| def mean_disturbance_cost(values: np.ndarray) -> float: |
| values = np.asarray(values, dtype=np.float32) |
| if values.size == 0: |
| return 0.0 |
| return float(values.mean()) |
|
|
|
|
| def planner_top1_accuracy(pred_scores: np.ndarray, oracle_utility: np.ndarray) -> float: |
| pred_scores = np.asarray(pred_scores) |
| oracle_utility = np.asarray(oracle_utility) |
| if pred_scores.size == 0: |
| return 0.0 |
| return float((pred_scores.argmax(axis=-1) == oracle_utility.argmax(axis=-1)).mean()) |
|
|
|
|
| def planner_regret(selected_indices: np.ndarray, oracle_utility: np.ndarray) -> float: |
| selected_indices = np.asarray(selected_indices, dtype=np.int64) |
| oracle_utility = np.asarray(oracle_utility, dtype=np.float32) |
| if oracle_utility.size == 0: |
| return 0.0 |
| batch_index = np.arange(selected_indices.shape[0]) |
| selected = oracle_utility[batch_index, selected_indices] |
| oracle = oracle_utility.max(axis=-1) |
| return float((oracle - selected).mean()) |
|
|
|
|
| def risk_calibration_mse(predicted_risk: np.ndarray, realized_risk: np.ndarray) -> float: |
| predicted_risk = np.asarray(predicted_risk, dtype=np.float32) |
| realized_risk = np.asarray(realized_risk, dtype=np.float32) |
| if predicted_risk.size == 0: |
| return 0.0 |
| return float(np.mean((predicted_risk - realized_risk) ** 2)) |
|
|
|
|
| def proposal_diversity(proposal_chunks: np.ndarray) -> float: |
| proposal_chunks = np.asarray(proposal_chunks, dtype=np.float32) |
| if proposal_chunks.ndim != 4 or proposal_chunks.shape[1] <= 1: |
| return 0.0 |
| flat = proposal_chunks.reshape(proposal_chunks.shape[0], proposal_chunks.shape[1], -1) |
| diffs = flat[:, :, None, :] - flat[:, None, :, :] |
| distances = np.abs(diffs).mean(axis=-1) |
| mask = ~np.eye(distances.shape[1], dtype=bool) |
| if not mask.any(): |
| return 0.0 |
| off_diagonal = distances[:, mask] |
| return float(off_diagonal.mean()) |
|
|
|
|
| def planner_score_utility_spearman(pred_scores: np.ndarray, oracle_utility: np.ndarray) -> float: |
| pred_scores = np.asarray(pred_scores, dtype=np.float32) |
| oracle_utility = np.asarray(oracle_utility, dtype=np.float32) |
| if pred_scores.size == 0: |
| return 0.0 |
| pred_rank = pred_scores.argsort(axis=-1).argsort(axis=-1).astype(np.float32) |
| oracle_rank = oracle_utility.argsort(axis=-1).argsort(axis=-1).astype(np.float32) |
| pred_rank = pred_rank - pred_rank.mean(axis=-1, keepdims=True) |
| oracle_rank = oracle_rank - oracle_rank.mean(axis=-1, keepdims=True) |
| denom = np.sqrt((pred_rank**2).sum(axis=-1) * (oracle_rank**2).sum(axis=-1)) |
| valid = denom > 1e-6 |
| if not np.any(valid): |
| return 0.0 |
| corr = np.zeros_like(denom) |
| corr[valid] = (pred_rank[valid] * oracle_rank[valid]).sum(axis=-1) / denom[valid] |
| return float(corr.mean()) |
|
|
|
|
| def left_right_equivariance_error(pred: np.ndarray, swapped_target: np.ndarray) -> float: |
| pred = np.asarray(pred, dtype=np.float32) |
| swapped_target = np.asarray(swapped_target, dtype=np.float32) |
| if pred.size == 0 or swapped_target.size == 0: |
| return 0.0 |
| return float(np.abs(pred - swapped_target).mean()) |
|
|
|
|
| def belief_calibration_brier(predicted_belief: np.ndarray, target_belief: np.ndarray) -> float: |
| predicted_belief = np.asarray(predicted_belief, dtype=np.float32) |
| target_belief = np.asarray(target_belief, dtype=np.float32) |
| if predicted_belief.size == 0: |
| return 0.0 |
| return float(np.mean((predicted_belief - target_belief) ** 2)) |
|
|
|
|
| def reocclusion_calibration_brier(predicted_reocclusion: np.ndarray, target_reocclusion: np.ndarray) -> float: |
| predicted_reocclusion = np.asarray(predicted_reocclusion, dtype=np.float32) |
| target_reocclusion = np.asarray(target_reocclusion, dtype=np.float32) |
| if predicted_reocclusion.size == 0: |
| return 0.0 |
| return float(np.mean((predicted_reocclusion - target_reocclusion) ** 2)) |
|
|
|
|
| def support_stability_mae(predicted: np.ndarray, target: np.ndarray) -> float: |
| predicted = np.asarray(predicted, dtype=np.float32) |
| target = np.asarray(target, dtype=np.float32) |
| if predicted.size == 0: |
| return 0.0 |
| return float(np.abs(predicted - target).mean()) |
|
|
|
|
| def clearance_auc(predicted: np.ndarray, target: np.ndarray) -> float: |
| predicted = np.asarray(predicted, dtype=np.float32).reshape(-1) |
| target = np.asarray(target, dtype=np.float32).reshape(-1) |
| positives = target > 0.5 |
| negatives = ~positives |
| if positives.sum() == 0 or negatives.sum() == 0: |
| return 0.0 |
| order = np.argsort(predicted) |
| ranks = np.empty_like(order, dtype=np.float32) |
| ranks[order] = np.arange(order.shape[0], dtype=np.float32) |
| pos_ranks = ranks[positives] |
| return float((pos_ranks.sum() - positives.sum() * (positives.sum() - 1) / 2.0) / (positives.sum() * negatives.sum())) |
|
|
|
|
| def role_collapse_rate( |
| action_chunks: np.ndarray, |
| arm_role_logits: np.ndarray | None = None, |
| action_threshold: float = 1e-2, |
| role_threshold: float = 0.1, |
| ) -> float: |
| action_chunks = np.asarray(action_chunks, dtype=np.float32) |
| right_actions = action_chunks[..., :7] |
| left_actions = action_chunks[..., 7:] |
| action_gap = np.mean(np.abs(right_actions - left_actions), axis=(-1, -2)) |
| collapsed = action_gap <= action_threshold |
| if arm_role_logits is not None: |
| arm_role_logits = np.asarray(arm_role_logits, dtype=np.float32) |
| role_probs = np.exp(arm_role_logits - arm_role_logits.max(axis=-1, keepdims=True)) |
| role_probs = role_probs / np.clip(role_probs.sum(axis=-1, keepdims=True), 1e-6, None) |
| role_gap = np.mean(np.abs(role_probs[..., 0, :] - role_probs[..., 1, :]), axis=-1) |
| collapsed = np.logical_or(collapsed, role_gap <= role_threshold) |
| return float(collapsed.mean()) |
|
|