lsnu's picture
Add files using upload-large-folder tool
504ec88 verified
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
@dataclass
class BenchmarkMetrics:
per_task_success: dict[str, float]
mean_success: float
visibility_integral: float | None = None
corridor_availability: float | None = None
reocclusion_rate: float | None = None
persistence_horizon_mae: float | None = None
disturbance_cost: float | None = None
@dataclass
class PlannerDiagnostics:
top1_accuracy: float
regret: float
risk_calibration_mse: float
role_collapse_rate: float
proposal_diversity: float | None = None
planner_score_utility_spearman: float | None = None
left_right_equivariance_error: float | None = None
belief_calibration_brier: float | None = None
reocclusion_calibration_brier: float | None = None
support_stability_mae: float | None = None
clearance_auc: float | None = None
def mean_success(per_task_success: dict[str, float]) -> float:
if not per_task_success:
return 0.0
return float(np.mean(list(per_task_success.values())))
def visibility_integral(curve: np.ndarray) -> float:
curve = np.asarray(curve, dtype=np.float32)
return float(curve.sum())
def corridor_availability(corridor_open: np.ndarray) -> float:
corridor_open = np.asarray(corridor_open, dtype=np.float32)
return float(corridor_open.mean())
def reocclusion_rate(corridor_open: np.ndarray) -> float:
corridor_open = np.asarray(corridor_open, dtype=np.float32)
if corridor_open.size < 2:
return 0.0
return float(np.logical_and(corridor_open[:-1] > 0.5, corridor_open[1:] <= 0.5).mean())
def persistence_horizon_mae(prediction: np.ndarray, target: np.ndarray) -> float:
prediction = np.asarray(prediction, dtype=np.float32)
target = np.asarray(target, dtype=np.float32)
return float(np.abs(prediction - target).mean())
def mean_disturbance_cost(values: np.ndarray) -> float:
values = np.asarray(values, dtype=np.float32)
if values.size == 0:
return 0.0
return float(values.mean())
def planner_top1_accuracy(pred_scores: np.ndarray, oracle_utility: np.ndarray) -> float:
pred_scores = np.asarray(pred_scores)
oracle_utility = np.asarray(oracle_utility)
if pred_scores.size == 0:
return 0.0
return float((pred_scores.argmax(axis=-1) == oracle_utility.argmax(axis=-1)).mean())
def planner_regret(selected_indices: np.ndarray, oracle_utility: np.ndarray) -> float:
selected_indices = np.asarray(selected_indices, dtype=np.int64)
oracle_utility = np.asarray(oracle_utility, dtype=np.float32)
if oracle_utility.size == 0:
return 0.0
batch_index = np.arange(selected_indices.shape[0])
selected = oracle_utility[batch_index, selected_indices]
oracle = oracle_utility.max(axis=-1)
return float((oracle - selected).mean())
def risk_calibration_mse(predicted_risk: np.ndarray, realized_risk: np.ndarray) -> float:
predicted_risk = np.asarray(predicted_risk, dtype=np.float32)
realized_risk = np.asarray(realized_risk, dtype=np.float32)
if predicted_risk.size == 0:
return 0.0
return float(np.mean((predicted_risk - realized_risk) ** 2))
def proposal_diversity(proposal_chunks: np.ndarray) -> float:
proposal_chunks = np.asarray(proposal_chunks, dtype=np.float32)
if proposal_chunks.ndim != 4 or proposal_chunks.shape[1] <= 1:
return 0.0
flat = proposal_chunks.reshape(proposal_chunks.shape[0], proposal_chunks.shape[1], -1)
diffs = flat[:, :, None, :] - flat[:, None, :, :]
distances = np.abs(diffs).mean(axis=-1)
mask = ~np.eye(distances.shape[1], dtype=bool)
if not mask.any():
return 0.0
off_diagonal = distances[:, mask]
return float(off_diagonal.mean())
def planner_score_utility_spearman(pred_scores: np.ndarray, oracle_utility: np.ndarray) -> float:
pred_scores = np.asarray(pred_scores, dtype=np.float32)
oracle_utility = np.asarray(oracle_utility, dtype=np.float32)
if pred_scores.size == 0:
return 0.0
pred_rank = pred_scores.argsort(axis=-1).argsort(axis=-1).astype(np.float32)
oracle_rank = oracle_utility.argsort(axis=-1).argsort(axis=-1).astype(np.float32)
pred_rank = pred_rank - pred_rank.mean(axis=-1, keepdims=True)
oracle_rank = oracle_rank - oracle_rank.mean(axis=-1, keepdims=True)
denom = np.sqrt((pred_rank**2).sum(axis=-1) * (oracle_rank**2).sum(axis=-1))
valid = denom > 1e-6
if not np.any(valid):
return 0.0
corr = np.zeros_like(denom)
corr[valid] = (pred_rank[valid] * oracle_rank[valid]).sum(axis=-1) / denom[valid]
return float(corr.mean())
def left_right_equivariance_error(pred: np.ndarray, swapped_target: np.ndarray) -> float:
pred = np.asarray(pred, dtype=np.float32)
swapped_target = np.asarray(swapped_target, dtype=np.float32)
if pred.size == 0 or swapped_target.size == 0:
return 0.0
return float(np.abs(pred - swapped_target).mean())
def belief_calibration_brier(predicted_belief: np.ndarray, target_belief: np.ndarray) -> float:
predicted_belief = np.asarray(predicted_belief, dtype=np.float32)
target_belief = np.asarray(target_belief, dtype=np.float32)
if predicted_belief.size == 0:
return 0.0
return float(np.mean((predicted_belief - target_belief) ** 2))
def reocclusion_calibration_brier(predicted_reocclusion: np.ndarray, target_reocclusion: np.ndarray) -> float:
predicted_reocclusion = np.asarray(predicted_reocclusion, dtype=np.float32)
target_reocclusion = np.asarray(target_reocclusion, dtype=np.float32)
if predicted_reocclusion.size == 0:
return 0.0
return float(np.mean((predicted_reocclusion - target_reocclusion) ** 2))
def support_stability_mae(predicted: np.ndarray, target: np.ndarray) -> float:
predicted = np.asarray(predicted, dtype=np.float32)
target = np.asarray(target, dtype=np.float32)
if predicted.size == 0:
return 0.0
return float(np.abs(predicted - target).mean())
def clearance_auc(predicted: np.ndarray, target: np.ndarray) -> float:
predicted = np.asarray(predicted, dtype=np.float32).reshape(-1)
target = np.asarray(target, dtype=np.float32).reshape(-1)
positives = target > 0.5
negatives = ~positives
if positives.sum() == 0 or negatives.sum() == 0:
return 0.0
order = np.argsort(predicted)
ranks = np.empty_like(order, dtype=np.float32)
ranks[order] = np.arange(order.shape[0], dtype=np.float32)
pos_ranks = ranks[positives]
return float((pos_ranks.sum() - positives.sum() * (positives.sum() - 1) / 2.0) / (positives.sum() * negatives.sum()))
def role_collapse_rate(
action_chunks: np.ndarray,
arm_role_logits: np.ndarray | None = None,
action_threshold: float = 1e-2,
role_threshold: float = 0.1,
) -> float:
action_chunks = np.asarray(action_chunks, dtype=np.float32)
right_actions = action_chunks[..., :7]
left_actions = action_chunks[..., 7:]
action_gap = np.mean(np.abs(right_actions - left_actions), axis=(-1, -2))
collapsed = action_gap <= action_threshold
if arm_role_logits is not None:
arm_role_logits = np.asarray(arm_role_logits, dtype=np.float32)
role_probs = np.exp(arm_role_logits - arm_role_logits.max(axis=-1, keepdims=True))
role_probs = role_probs / np.clip(role_probs.sum(axis=-1, keepdims=True), 1e-6, None)
role_gap = np.mean(np.abs(role_probs[..., 0, :] - role_probs[..., 1, :]), axis=-1)
collapsed = np.logical_or(collapsed, role_gap <= role_threshold)
return float(collapsed.mean())