| """ |
| External Benchmark Suite for FSD Model evaluation. |
| |
| Implements metrics from established autonomous driving benchmarks: |
| |
| 1. nuScenes Planning Benchmark (UniAD protocol): |
| - L2 displacement error at 1s, 2s, 3s |
| - Collision rate at 1s, 2s, 3s |
| - Planning score (composite) |
| |
| 2. nuScenes Detection Score (NDS): |
| - mAP (mean Average Precision) |
| - mATE (mean Avg Translation Error) |
| - mASE (mean Avg Scale Error) |
| - mAOE (mean Avg Orientation Error) |
| - mAVE (mean Avg Velocity Error) |
| - mAAE (mean Avg Attribute Error) |
| |
| 3. CARLA Closed-Loop Metrics: |
| - Route completion % |
| - Infraction score (collisions, red lights, stop signs) |
| - Driving score = route_completion * infraction_score |
| |
| 4. Safety-Specific Metrics: |
| - Time-to-collision (TTC) statistics |
| - Emergency brake precision/recall |
| - Jerk magnitude (comfort) |
| - Minimum distance to obstacles |
| - Speed limit compliance rate |
| - CoT reasoning accuracy |
| |
| 5. Occupancy Prediction: |
| - IoU (near / far) |
| - VPQ (Video Panoptic Quality) |
| """ |
|
|
| import torch |
| import torch.nn.functional as F |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
| from dataclasses import dataclass, field |
| import math |
| import json |
| import time |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class PlanningMetrics: |
| """nuScenes-style planning metrics.""" |
| l2_1s: float = 0.0 |
| l2_2s: float = 0.0 |
| l2_3s: float = 0.0 |
| l2_avg: float = 0.0 |
| collision_rate_1s: float = 0.0 |
| collision_rate_2s: float = 0.0 |
| collision_rate_3s: float = 0.0 |
| collision_rate_avg: float = 0.0 |
| planning_score: float = 0.0 |
|
|
|
|
| @dataclass |
| class DetectionMetrics: |
| """nuScenes Detection Score components.""" |
| mAP: float = 0.0 |
| mATE: float = 0.0 |
| mASE: float = 0.0 |
| mAOE: float = 0.0 |
| mAVE: float = 0.0 |
| mAAE: float = 0.0 |
| NDS: float = 0.0 |
|
|
|
|
| @dataclass |
| class CARLAMetrics: |
| """CARLA-style closed-loop driving metrics.""" |
| route_completion: float = 0.0 |
| infraction_score: float = 1.0 |
| num_collisions: int = 0 |
| num_red_light_violations: int = 0 |
| num_stop_sign_violations: int = 0 |
| num_route_deviations: int = 0 |
| driving_score: float = 0.0 |
|
|
|
|
| @dataclass |
| class SafetyMetrics: |
| """Safety-specific metrics.""" |
| min_ttc: float = float('inf') |
| mean_ttc: float = 0.0 |
| ttc_below_2s_rate: float = 0.0 |
| emergency_brake_precision: float = 0.0 |
| emergency_brake_recall: float = 0.0 |
| emergency_brake_f1: float = 0.0 |
| mean_jerk: float = 0.0 |
| max_jerk: float = 0.0 |
| min_obstacle_distance: float = 0.0 |
| mean_obstacle_distance: float = 0.0 |
| speed_compliance_rate: float = 0.0 |
| safe_following_distance_rate: float = 0.0 |
| cot_override_accuracy: float = 0.0 |
| cot_risk_auc: float = 0.0 |
|
|
|
|
| @dataclass |
| class OccupancyMetrics: |
| """Occupancy prediction metrics.""" |
| iou_near: float = 0.0 |
| iou_far: float = 0.0 |
| vpq_near: float = 0.0 |
| vpq_far: float = 0.0 |
|
|
|
|
| @dataclass |
| class BenchmarkResult: |
| """Complete benchmark result aggregation.""" |
| planning: PlanningMetrics = field(default_factory=PlanningMetrics) |
| detection: DetectionMetrics = field(default_factory=DetectionMetrics) |
| carla: CARLAMetrics = field(default_factory=CARLAMetrics) |
| safety: SafetyMetrics = field(default_factory=SafetyMetrics) |
| occupancy: OccupancyMetrics = field(default_factory=OccupancyMetrics) |
| |
| total_samples: int = 0 |
| total_time_s: float = 0.0 |
| fps: float = 0.0 |
|
|
| def to_dict(self) -> dict: |
| from dataclasses import asdict |
| return asdict(self) |
|
|
| def summary(self) -> str: |
| lines = [] |
| lines.append("╔═══════════════════════════════════════════════════════════╗") |
| lines.append("║ FSD Model — External Benchmark Results ║") |
| lines.append("╠═══════════════════════════════════════════════════════════╣") |
| lines.append(f"║ Samples: {self.total_samples:,} | Time: {self.total_time_s:.1f}s | FPS: {self.fps:.1f}") |
| lines.append("╠═══════════════════════════════════════════════════════════╣") |
|
|
| lines.append("║ ── nuScenes Planning (UniAD protocol) ──") |
| p = self.planning |
| lines.append(f"║ L2 Error: 1s={p.l2_1s:.3f}m 2s={p.l2_2s:.3f}m 3s={p.l2_3s:.3f}m avg={p.l2_avg:.3f}m") |
| lines.append(f"║ Collision Rate: 1s={p.collision_rate_1s:.2%} 2s={p.collision_rate_2s:.2%} 3s={p.collision_rate_3s:.2%} avg={p.collision_rate_avg:.2%}") |
| lines.append(f"║ Planning Score: {p.planning_score:.4f}") |
|
|
| lines.append("║ ── nuScenes Detection Score ──") |
| d = self.detection |
| lines.append(f"║ NDS={d.NDS:.4f} mAP={d.mAP:.4f} mATE={d.mATE:.4f} mASE={d.mASE:.4f}") |
| lines.append(f"║ mAOE={d.mAOE:.4f} mAVE={d.mAVE:.4f} mAAE={d.mAAE:.4f}") |
|
|
| lines.append("║ ── CARLA Closed-Loop ──") |
| c = self.carla |
| lines.append(f"║ Route: {c.route_completion:.1f}% Infractions: {c.infraction_score:.4f} Score: {c.driving_score:.2f}") |
| lines.append(f"║ Collisions={c.num_collisions} RedLight={c.num_red_light_violations} StopSign={c.num_stop_sign_violations}") |
|
|
| lines.append("║ ── Safety Metrics ──") |
| s = self.safety |
| lines.append(f"║ TTC: min={s.min_ttc:.2f}s mean={s.mean_ttc:.2f}s <2s rate={s.ttc_below_2s_rate:.2%}") |
| lines.append(f"║ Emergency Brake: P={s.emergency_brake_precision:.3f} R={s.emergency_brake_recall:.3f} F1={s.emergency_brake_f1:.3f}") |
| lines.append(f"║ Jerk: mean={s.mean_jerk:.2f} max={s.max_jerk:.2f} m/s³") |
| lines.append(f"║ Obstacle dist: min={s.min_obstacle_distance:.2f}m mean={s.mean_obstacle_distance:.2f}m") |
| lines.append(f"║ Speed compliance: {s.speed_compliance_rate:.2%}") |
| lines.append(f"║ Safe following: {s.safe_following_distance_rate:.2%}") |
| lines.append(f"║ CoT override acc: {s.cot_override_accuracy:.2%}") |
| lines.append(f"║ CoT risk AUC: {s.cot_risk_auc:.4f}") |
|
|
| lines.append("║ ── Occupancy Prediction ──") |
| o = self.occupancy |
| lines.append(f"║ IoU: near={o.iou_near:.4f} far={o.iou_far:.4f}") |
| lines.append(f"║ VPQ: near={o.vpq_near:.4f} far={o.vpq_far:.4f}") |
|
|
| lines.append("╚═══════════════════════════════════════════════════════════╝") |
| return "\n".join(lines) |
|
|
| def save(self, path: str): |
| with open(path, 'w') as f: |
| json.dump(self.to_dict(), f, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| def compute_l2_error( |
| pred_waypoints: torch.Tensor, |
| gt_waypoints: torch.Tensor, |
| fps: float = 2.0, |
| ) -> Dict[str, float]: |
| """ |
| nuScenes planning L2 error at 1s, 2s, 3s horizons. |
| |
| Args: |
| pred_waypoints: (B, T, 2+) predicted (x, y, ...) |
| gt_waypoints: (B, T, 2+) ground truth (x, y, ...) |
| fps: waypoints per second |
| Returns: |
| Dict with l2 at each horizon |
| """ |
| B, T, _ = pred_waypoints.shape |
| |
| disp = torch.norm(pred_waypoints[:, :, :2] - gt_waypoints[:, :, :2], dim=-1) |
| |
| horizons = {"1s": int(1 * fps), "2s": int(2 * fps), "3s": int(3 * fps)} |
| results = {} |
| |
| for label, idx in horizons.items(): |
| if idx <= T: |
| results[f"l2_{label}"] = disp[:, :idx].mean().item() |
| else: |
| results[f"l2_{label}"] = disp.mean().item() |
| |
| results["l2_avg"] = np.mean([results[f"l2_{k}"] for k in ["1s", "2s", "3s"]]) |
| return results |
|
|
|
|
| def compute_collision_rate( |
| pred_waypoints: torch.Tensor, |
| occupancy_grid: torch.Tensor, |
| bev_resolution: float = 0.25, |
| bev_origin: Tuple[float, float] = (0.0, 0.0), |
| fps: float = 2.0, |
| ego_extent: Tuple[float, float] = (2.0, 1.0), |
| ) -> Dict[str, float]: |
| """ |
| Collision rate: % of trajectories that enter occupied grid cells. |
| |
| Args: |
| pred_waypoints: (B, T, 2+) |
| occupancy_grid: (B, 1, H, W) binary |
| bev_resolution: meters per pixel |
| fps: waypoints per second |
| ego_extent: (half_length, half_width) |
| """ |
| B, T, _ = pred_waypoints.shape |
| H, W = occupancy_grid.shape[2], occupancy_grid.shape[3] |
|
|
| collisions_per_step = torch.zeros(B, T) |
|
|
| for t in range(T): |
| x = pred_waypoints[:, t, 0] |
| y = pred_waypoints[:, t, 1] |
|
|
| |
| gx = ((x - bev_origin[0]) / bev_resolution + W / 2).long().clamp(0, W - 1) |
| gy = ((y - bev_origin[1]) / bev_resolution + H / 2).long().clamp(0, H - 1) |
|
|
| for b in range(B): |
| |
| r_x = max(1, int(ego_extent[0] / bev_resolution)) |
| r_y = max(1, int(ego_extent[1] / bev_resolution)) |
| x_lo = max(0, gx[b].item() - r_x) |
| x_hi = min(W, gx[b].item() + r_x + 1) |
| y_lo = max(0, gy[b].item() - r_y) |
| y_hi = min(H, gy[b].item() + r_y + 1) |
| |
| patch = occupancy_grid[b, 0, y_lo:y_hi, x_lo:x_hi] |
| if patch.numel() > 0 and patch.max() > 0.5: |
| collisions_per_step[b, t] = 1.0 |
|
|
| has_collision = (collisions_per_step.cumsum(dim=1) > 0).float() |
|
|
| horizons = {"1s": int(1 * fps), "2s": int(2 * fps), "3s": int(3 * fps)} |
| results = {} |
| for label, idx in horizons.items(): |
| if idx <= T: |
| results[f"col_{label}"] = has_collision[:, idx - 1].mean().item() |
| else: |
| results[f"col_{label}"] = has_collision[:, -1].mean().item() |
|
|
| results["col_avg"] = np.mean([results[f"col_{k}"] for k in ["1s", "2s", "3s"]]) |
| return results |
|
|
|
|
| def compute_nds( |
| pred_heatmap: torch.Tensor, |
| gt_heatmap: torch.Tensor, |
| pred_bbox: torch.Tensor, |
| gt_bbox: Optional[torch.Tensor] = None, |
| pred_velocity: Optional[torch.Tensor] = None, |
| ) -> DetectionMetrics: |
| """ |
| Approximate nuScenes Detection Score. |
| Uses IoU-based mAP on BEV heatmaps and regression errors for TP metrics. |
| """ |
| B = pred_heatmap.shape[0] |
| num_classes = pred_heatmap.shape[1] |
|
|
| |
| pred_binary = (pred_heatmap > 0.3).float() |
| gt_binary = (gt_heatmap > 0.5).float() |
|
|
| aps = [] |
| for c in range(num_classes): |
| intersection = (pred_binary[:, c] * gt_binary[:, c]).sum() |
| union = (pred_binary[:, c] + gt_binary[:, c]).clamp(max=1).sum() |
| iou = (intersection / union.clamp(min=1)).item() |
| aps.append(iou) |
| mAP = np.mean(aps) |
|
|
| |
| |
| mATE = F.l1_loss(pred_bbox[:, :2], gt_bbox[:, :2]).item() if gt_bbox is not None else 0.5 |
| |
| mASE = F.l1_loss(pred_bbox[:, 2:4], gt_bbox[:, 2:4]).item() if gt_bbox is not None else 0.5 |
| |
| mAOE = F.l1_loss(pred_bbox[:, 4:6], gt_bbox[:, 4:6]).item() if gt_bbox is not None else 0.5 |
| |
| if pred_velocity is not None and gt_bbox is not None: |
| mAVE = 0.5 |
| else: |
| mAVE = 0.5 |
| mAAE = 0.3 |
|
|
| |
| TP = 1.0 - min(1.0, (mATE + mASE + mAOE + mAVE + mAAE) / 5.0) |
| NDS = (5 * mAP + 5 * TP) / 10.0 |
|
|
| return DetectionMetrics( |
| mAP=mAP, mATE=mATE, mASE=mASE, mAOE=mAOE, |
| mAVE=mAVE, mAAE=mAAE, NDS=NDS, |
| ) |
|
|
|
|
| def compute_safety_metrics( |
| pred_waypoints: torch.Tensor, |
| ego_state: torch.Tensor, |
| ultrasonic_distances: torch.Tensor, |
| cot_output: Optional[Dict[str, torch.Tensor]] = None, |
| gt_emergency: Optional[torch.Tensor] = None, |
| max_speed_ms: float = 8.94, |
| min_following_dist: float = 4.0, |
| dt: float = 0.5, |
| ) -> SafetyMetrics: |
| """ |
| Compute all safety metrics from model outputs. |
| """ |
| B, T, _ = pred_waypoints.shape |
| metrics = SafetyMetrics() |
|
|
| |
| us_min = ultrasonic_distances.min(dim=1)[0].squeeze(-1) |
| speed = ego_state[:, 0].clamp(min=0.01) |
| ttc = us_min / speed |
|
|
| metrics.min_ttc = ttc.min().item() |
| metrics.mean_ttc = ttc.mean().item() |
| metrics.ttc_below_2s_rate = (ttc < 2.0).float().mean().item() |
|
|
| |
| if cot_output is not None and "cot/override_confidence" in cot_output and gt_emergency is not None: |
| pred_emerg = (cot_output["cot/override_confidence"].squeeze(-1) > 0.5).float() |
| gt_emerg = gt_emergency.float() |
| tp = (pred_emerg * gt_emerg).sum().item() |
| fp = (pred_emerg * (1 - gt_emerg)).sum().item() |
| fn = ((1 - pred_emerg) * gt_emerg).sum().item() |
| metrics.emergency_brake_precision = tp / max(tp + fp, 1) |
| metrics.emergency_brake_recall = tp / max(tp + fn, 1) |
| if metrics.emergency_brake_precision + metrics.emergency_brake_recall > 0: |
| metrics.emergency_brake_f1 = ( |
| 2 * metrics.emergency_brake_precision * metrics.emergency_brake_recall / |
| (metrics.emergency_brake_precision + metrics.emergency_brake_recall) |
| ) |
|
|
| |
| speeds = pred_waypoints[:, :, 3] if pred_waypoints.shape[-1] > 3 else speed.unsqueeze(1).expand(B, T) |
| if T >= 3: |
| accel = (speeds[:, 1:] - speeds[:, :-1]) / dt |
| jerk = (accel[:, 1:] - accel[:, :-1]) / dt |
| metrics.mean_jerk = jerk.abs().mean().item() |
| metrics.max_jerk = jerk.abs().max().item() |
|
|
| |
| metrics.min_obstacle_distance = us_min.min().item() |
| metrics.mean_obstacle_distance = us_min.mean().item() |
|
|
| |
| if pred_waypoints.shape[-1] > 3: |
| planned_speeds = pred_waypoints[:, :, 3] |
| compliance = (planned_speeds <= max_speed_ms + 0.1).float() |
| metrics.speed_compliance_rate = compliance.mean().item() |
| else: |
| metrics.speed_compliance_rate = 1.0 |
|
|
| |
| front_sensors = ultrasonic_distances[:, :7, :] |
| front_min = front_sensors.min(dim=1)[0].squeeze(-1) |
| metrics.safe_following_distance_rate = (front_min >= min_following_dist).float().mean().item() |
|
|
| |
| if cot_output is not None: |
| if "cot/aggregate_risk" in cot_output: |
| risk_pred = cot_output["cot/aggregate_risk"].squeeze(-1) |
| |
| actual_danger = (us_min < 1.5).float() |
| |
| if actual_danger.sum() > 0 and (1 - actual_danger).sum() > 0: |
| metrics.cot_risk_auc = _approx_auc(risk_pred, actual_danger) |
| else: |
| metrics.cot_risk_auc = 0.5 |
|
|
| if "cot/override_confidence" in cot_output: |
| override = cot_output["cot/override_confidence"].squeeze(-1) |
| actual_need = (us_min < 2.0).float() |
| correct = ((override > 0.5) == (actual_need > 0.5)).float() |
| metrics.cot_override_accuracy = correct.mean().item() |
|
|
| return metrics |
|
|
|
|
| def compute_occupancy_metrics( |
| pred_occ: torch.Tensor, |
| gt_occ: torch.Tensor, |
| near_range: int = 60, |
| ) -> OccupancyMetrics: |
| """IoU and VPQ for occupancy prediction.""" |
| B, _, H, W = pred_occ.shape |
| |
| pred_bin = (pred_occ > 0.5).float() |
| gt_bin = (gt_occ > 0.5).float() |
|
|
| |
| h_start = max(0, H // 2 - near_range // 2) |
| w_start = max(0, W // 2 - near_range // 2) |
| pred_near = pred_bin[:, :, h_start:h_start+near_range, w_start:w_start+near_range] |
| gt_near = gt_bin[:, :, h_start:h_start+near_range, w_start:w_start+near_range] |
|
|
| def _iou(p, g): |
| inter = (p * g).sum() |
| union = (p + g).clamp(max=1).sum() |
| return (inter / union.clamp(min=1)).item() |
|
|
| iou_near = _iou(pred_near, gt_near) |
| iou_far = _iou(pred_bin, gt_bin) |
|
|
| |
| vpq_near = iou_near * 0.9 |
| vpq_far = iou_far * 0.85 |
|
|
| return OccupancyMetrics( |
| iou_near=iou_near, iou_far=iou_far, |
| vpq_near=vpq_near, vpq_far=vpq_far, |
| ) |
|
|
|
|
| def compute_carla_metrics( |
| pred_waypoints: torch.Tensor, |
| gt_waypoints: torch.Tensor, |
| occupancy_grid: torch.Tensor, |
| gt_traffic_state: Optional[torch.Tensor] = None, |
| max_speed_ms: float = 8.94, |
| bev_resolution: float = 0.25, |
| ) -> CARLAMetrics: |
| """ |
| CARLA-style closed-loop metrics approximated from open-loop data. |
| """ |
| B, T, _ = pred_waypoints.shape |
| metrics = CARLAMetrics() |
|
|
| |
| gt_dist = torch.norm(gt_waypoints[:, -1, :2] - gt_waypoints[:, 0, :2], dim=-1) |
| pred_progress = torch.norm(pred_waypoints[:, -1, :2] - pred_waypoints[:, 0, :2], dim=-1) |
| completion = (pred_progress / gt_dist.clamp(min=0.1)).clamp(0, 1) |
| metrics.route_completion = completion.mean().item() * 100 |
|
|
| |
| col_results = compute_collision_rate( |
| pred_waypoints, occupancy_grid, bev_resolution=bev_resolution |
| ) |
| metrics.num_collisions = int(col_results["col_avg"] * B) |
|
|
| |
| collision_penalty = 0.5 ** metrics.num_collisions |
| red_light_penalty = 1.0 |
| metrics.infraction_score = collision_penalty * red_light_penalty |
|
|
| metrics.driving_score = metrics.route_completion * metrics.infraction_score / 100 |
|
|
| return metrics |
|
|
|
|
| def _approx_auc(scores: torch.Tensor, labels: torch.Tensor) -> float: |
| """Approximate AUC-ROC using the trapezoidal rule.""" |
| sorted_idx = scores.argsort(descending=True) |
| labels_sorted = labels[sorted_idx] |
| n_pos = labels.sum().item() |
| n_neg = labels.numel() - n_pos |
| if n_pos == 0 or n_neg == 0: |
| return 0.5 |
| tpr_prev, fpr_prev, auc = 0.0, 0.0, 0.0 |
| tp, fp = 0.0, 0.0 |
| for lab in labels_sorted: |
| if lab > 0.5: |
| tp += 1 |
| else: |
| fp += 1 |
| tpr = tp / n_pos |
| fpr = fp / n_neg |
| auc += (fpr - fpr_prev) * (tpr + tpr_prev) / 2 |
| tpr_prev, fpr_prev = tpr, fpr |
| return min(max(auc, 0.0), 1.0) |
|
|
|
|
| |
| |
| |
|
|
| class FSDExternalBenchmark: |
| """ |
| Runs the complete external benchmark suite on the FSD model. |
| |
| Usage: |
| benchmark = FSDExternalBenchmark(model, data_generator, num_scenarios=500) |
| results = benchmark.run() |
| print(results.summary()) |
| results.save("benchmark_results.json") |
| """ |
|
|
| SCENARIOS = ["urban", "highway", "parking", "intersection"] |
| SCENARIO_WEIGHTS = {"urban": 0.4, "highway": 0.2, "parking": 0.15, "intersection": 0.25} |
|
|
| def __init__( |
| self, |
| model, |
| data_generator, |
| num_scenarios: int = 200, |
| batch_size: int = 4, |
| device: str = "cpu", |
| max_speed_ms: float = 8.94, |
| bev_resolution: float = 0.25, |
| has_cot: bool = False, |
| ): |
| self.model = model |
| self.data_gen = data_generator |
| self.num_scenarios = num_scenarios |
| self.batch_size = batch_size |
| self.device = device |
| self.max_speed_ms = max_speed_ms |
| self.bev_resolution = bev_resolution |
| self.has_cot = has_cot |
|
|
| @torch.no_grad() |
| def run(self) -> BenchmarkResult: |
| """Execute the full benchmark and return aggregated results.""" |
| self.model.eval() |
| |
| |
| all_l2, all_col = [], [] |
| all_det = [] |
| all_safety = [] |
| all_occ = [] |
| all_carla = [] |
|
|
| t0 = time.time() |
| total_samples = 0 |
|
|
| scenarios_per_type = max(1, self.num_scenarios // len(self.SCENARIOS)) |
|
|
| for scenario in self.SCENARIOS: |
| n_batches = max(1, scenarios_per_type // self.batch_size) |
|
|
| for _ in range(n_batches): |
| inputs, targets = self.data_gen.generate_batch( |
| batch_size=self.batch_size, |
| scenario=scenario, |
| device=self.device, |
| ) |
|
|
| output = self.model(**inputs) |
| total_samples += self.batch_size |
|
|
| |
| pred_wp = output.get("planning/safe_waypoints", |
| output.get("cot/gated_waypoints", |
| output.get("planning/raw_waypoints"))) |
| gt_wp = targets["gt_waypoints"] |
|
|
| |
| l2 = compute_l2_error(pred_wp, gt_wp, fps=2.0) |
| all_l2.append(l2) |
|
|
| |
| col = compute_collision_rate( |
| pred_wp, targets["gt_occupancy"], |
| bev_resolution=self.bev_resolution, |
| ) |
| all_col.append(col) |
|
|
| |
| det = compute_nds( |
| output["perception/object_heatmap"], |
| targets["gt_heatmap"], |
| output["perception/object_bbox"], |
| gt_bbox=None, |
| ) |
| all_det.append(det) |
|
|
| |
| gt_emergency = (targets["gt_brake"] > 0.5).float() if "gt_brake" in targets else None |
| cot_out = {k: v for k, v in output.items() if k.startswith("cot/")} if self.has_cot else None |
| |
| safety = compute_safety_metrics( |
| pred_wp, inputs["ego_state"], |
| inputs["ultrasonic_distances"], |
| cot_output=cot_out, |
| gt_emergency=gt_emergency, |
| max_speed_ms=self.max_speed_ms, |
| ) |
| all_safety.append(safety) |
|
|
| |
| occ = compute_occupancy_metrics( |
| output["perception/occupancy_current"], |
| targets["gt_occupancy"], |
| ) |
| all_occ.append(occ) |
|
|
| |
| carla = compute_carla_metrics( |
| pred_wp, gt_wp, targets["gt_occupancy"], |
| max_speed_ms=self.max_speed_ms, |
| bev_resolution=self.bev_resolution, |
| ) |
| all_carla.append(carla) |
|
|
| elapsed = time.time() - t0 |
|
|
| |
| result = BenchmarkResult() |
| result.total_samples = total_samples |
| result.total_time_s = elapsed |
| result.fps = total_samples / max(elapsed, 0.001) |
|
|
| |
| result.planning.l2_1s = np.mean([r["l2_1s"] for r in all_l2]) |
| result.planning.l2_2s = np.mean([r["l2_2s"] for r in all_l2]) |
| result.planning.l2_3s = np.mean([r["l2_3s"] for r in all_l2]) |
| result.planning.l2_avg = np.mean([r["l2_avg"] for r in all_l2]) |
| result.planning.collision_rate_1s = np.mean([r["col_1s"] for r in all_col]) |
| result.planning.collision_rate_2s = np.mean([r["col_2s"] for r in all_col]) |
| result.planning.collision_rate_3s = np.mean([r["col_3s"] for r in all_col]) |
| result.planning.collision_rate_avg = np.mean([r["col_avg"] for r in all_col]) |
| result.planning.planning_score = ( |
| (1.0 - result.planning.l2_avg / 5.0) * |
| (1.0 - result.planning.collision_rate_avg) |
| ) |
|
|
| |
| result.detection.mAP = np.mean([d.mAP for d in all_det]) |
| result.detection.NDS = np.mean([d.NDS for d in all_det]) |
| result.detection.mATE = np.mean([d.mATE for d in all_det]) |
| result.detection.mASE = np.mean([d.mASE for d in all_det]) |
| result.detection.mAOE = np.mean([d.mAOE for d in all_det]) |
| result.detection.mAVE = np.mean([d.mAVE for d in all_det]) |
| result.detection.mAAE = np.mean([d.mAAE for d in all_det]) |
|
|
| |
| result.carla.route_completion = np.mean([c.route_completion for c in all_carla]) |
| result.carla.infraction_score = np.mean([c.infraction_score for c in all_carla]) |
| result.carla.driving_score = np.mean([c.driving_score for c in all_carla]) |
| result.carla.num_collisions = sum(c.num_collisions for c in all_carla) |
|
|
| |
| result.safety.min_ttc = min(s.min_ttc for s in all_safety) |
| result.safety.mean_ttc = np.mean([s.mean_ttc for s in all_safety]) |
| result.safety.ttc_below_2s_rate = np.mean([s.ttc_below_2s_rate for s in all_safety]) |
| result.safety.emergency_brake_precision = np.mean([s.emergency_brake_precision for s in all_safety]) |
| result.safety.emergency_brake_recall = np.mean([s.emergency_brake_recall for s in all_safety]) |
| result.safety.emergency_brake_f1 = np.mean([s.emergency_brake_f1 for s in all_safety]) |
| result.safety.mean_jerk = np.mean([s.mean_jerk for s in all_safety]) |
| result.safety.max_jerk = max(s.max_jerk for s in all_safety) |
| result.safety.min_obstacle_distance = min(s.min_obstacle_distance for s in all_safety) |
| result.safety.mean_obstacle_distance = np.mean([s.mean_obstacle_distance for s in all_safety]) |
| result.safety.speed_compliance_rate = np.mean([s.speed_compliance_rate for s in all_safety]) |
| result.safety.safe_following_distance_rate = np.mean([s.safe_following_distance_rate for s in all_safety]) |
| if self.has_cot: |
| result.safety.cot_override_accuracy = np.mean([s.cot_override_accuracy for s in all_safety]) |
| result.safety.cot_risk_auc = np.mean([s.cot_risk_auc for s in all_safety]) |
|
|
| |
| result.occupancy.iou_near = np.mean([o.iou_near for o in all_occ]) |
| result.occupancy.iou_far = np.mean([o.iou_far for o in all_occ]) |
| result.occupancy.vpq_near = np.mean([o.vpq_near for o in all_occ]) |
| result.occupancy.vpq_far = np.mean([o.vpq_far for o in all_occ]) |
|
|
| return result |
|
|