FSD-Level5-CoT / fsd_model /benchmarks.py
Reality123b's picture
Add benchmarks.py
ff4e75a verified
"""
External Benchmark Suite for FSD Model evaluation.
Implements metrics from established autonomous driving benchmarks:
1. nuScenes Planning Benchmark (UniAD protocol):
- L2 displacement error at 1s, 2s, 3s
- Collision rate at 1s, 2s, 3s
- Planning score (composite)
2. nuScenes Detection Score (NDS):
- mAP (mean Average Precision)
- mATE (mean Avg Translation Error)
- mASE (mean Avg Scale Error)
- mAOE (mean Avg Orientation Error)
- mAVE (mean Avg Velocity Error)
- mAAE (mean Avg Attribute Error)
3. CARLA Closed-Loop Metrics:
- Route completion %
- Infraction score (collisions, red lights, stop signs)
- Driving score = route_completion * infraction_score
4. Safety-Specific Metrics:
- Time-to-collision (TTC) statistics
- Emergency brake precision/recall
- Jerk magnitude (comfort)
- Minimum distance to obstacles
- Speed limit compliance rate
- CoT reasoning accuracy
5. Occupancy Prediction:
- IoU (near / far)
- VPQ (Video Panoptic Quality)
"""
import torch
import torch.nn.functional as F
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
import math
import json
import time
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Metric Result Containers
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@dataclass
class PlanningMetrics:
"""nuScenes-style planning metrics."""
l2_1s: float = 0.0
l2_2s: float = 0.0
l2_3s: float = 0.0
l2_avg: float = 0.0
collision_rate_1s: float = 0.0
collision_rate_2s: float = 0.0
collision_rate_3s: float = 0.0
collision_rate_avg: float = 0.0
planning_score: float = 0.0 # composite
@dataclass
class DetectionMetrics:
"""nuScenes Detection Score components."""
mAP: float = 0.0
mATE: float = 0.0
mASE: float = 0.0
mAOE: float = 0.0
mAVE: float = 0.0
mAAE: float = 0.0
NDS: float = 0.0 # composite
@dataclass
class CARLAMetrics:
"""CARLA-style closed-loop driving metrics."""
route_completion: float = 0.0 # 0-100%
infraction_score: float = 1.0 # 1.0 = no infractions
num_collisions: int = 0
num_red_light_violations: int = 0
num_stop_sign_violations: int = 0
num_route_deviations: int = 0
driving_score: float = 0.0 # route_completion * infraction_score
@dataclass
class SafetyMetrics:
"""Safety-specific metrics."""
min_ttc: float = float('inf')
mean_ttc: float = 0.0
ttc_below_2s_rate: float = 0.0
emergency_brake_precision: float = 0.0
emergency_brake_recall: float = 0.0
emergency_brake_f1: float = 0.0
mean_jerk: float = 0.0 # m/s³ (comfort)
max_jerk: float = 0.0
min_obstacle_distance: float = 0.0
mean_obstacle_distance: float = 0.0
speed_compliance_rate: float = 0.0 # % time within speed limit
safe_following_distance_rate: float = 0.0
cot_override_accuracy: float = 0.0
cot_risk_auc: float = 0.0
@dataclass
class OccupancyMetrics:
"""Occupancy prediction metrics."""
iou_near: float = 0.0 # 30x30m
iou_far: float = 0.0 # 50x50m
vpq_near: float = 0.0
vpq_far: float = 0.0
@dataclass
class BenchmarkResult:
"""Complete benchmark result aggregation."""
planning: PlanningMetrics = field(default_factory=PlanningMetrics)
detection: DetectionMetrics = field(default_factory=DetectionMetrics)
carla: CARLAMetrics = field(default_factory=CARLAMetrics)
safety: SafetyMetrics = field(default_factory=SafetyMetrics)
occupancy: OccupancyMetrics = field(default_factory=OccupancyMetrics)
# Meta
total_samples: int = 0
total_time_s: float = 0.0
fps: float = 0.0
def to_dict(self) -> dict:
from dataclasses import asdict
return asdict(self)
def summary(self) -> str:
lines = []
lines.append("╔═══════════════════════════════════════════════════════════╗")
lines.append("║ FSD Model — External Benchmark Results ║")
lines.append("╠═══════════════════════════════════════════════════════════╣")
lines.append(f"║ Samples: {self.total_samples:,} | Time: {self.total_time_s:.1f}s | FPS: {self.fps:.1f}")
lines.append("╠═══════════════════════════════════════════════════════════╣")
lines.append("║ ── nuScenes Planning (UniAD protocol) ──")
p = self.planning
lines.append(f"║ L2 Error: 1s={p.l2_1s:.3f}m 2s={p.l2_2s:.3f}m 3s={p.l2_3s:.3f}m avg={p.l2_avg:.3f}m")
lines.append(f"║ Collision Rate: 1s={p.collision_rate_1s:.2%} 2s={p.collision_rate_2s:.2%} 3s={p.collision_rate_3s:.2%} avg={p.collision_rate_avg:.2%}")
lines.append(f"║ Planning Score: {p.planning_score:.4f}")
lines.append("║ ── nuScenes Detection Score ──")
d = self.detection
lines.append(f"║ NDS={d.NDS:.4f} mAP={d.mAP:.4f} mATE={d.mATE:.4f} mASE={d.mASE:.4f}")
lines.append(f"║ mAOE={d.mAOE:.4f} mAVE={d.mAVE:.4f} mAAE={d.mAAE:.4f}")
lines.append("║ ── CARLA Closed-Loop ──")
c = self.carla
lines.append(f"║ Route: {c.route_completion:.1f}% Infractions: {c.infraction_score:.4f} Score: {c.driving_score:.2f}")
lines.append(f"║ Collisions={c.num_collisions} RedLight={c.num_red_light_violations} StopSign={c.num_stop_sign_violations}")
lines.append("║ ── Safety Metrics ──")
s = self.safety
lines.append(f"║ TTC: min={s.min_ttc:.2f}s mean={s.mean_ttc:.2f}s <2s rate={s.ttc_below_2s_rate:.2%}")
lines.append(f"║ Emergency Brake: P={s.emergency_brake_precision:.3f} R={s.emergency_brake_recall:.3f} F1={s.emergency_brake_f1:.3f}")
lines.append(f"║ Jerk: mean={s.mean_jerk:.2f} max={s.max_jerk:.2f} m/s³")
lines.append(f"║ Obstacle dist: min={s.min_obstacle_distance:.2f}m mean={s.mean_obstacle_distance:.2f}m")
lines.append(f"║ Speed compliance: {s.speed_compliance_rate:.2%}")
lines.append(f"║ Safe following: {s.safe_following_distance_rate:.2%}")
lines.append(f"║ CoT override acc: {s.cot_override_accuracy:.2%}")
lines.append(f"║ CoT risk AUC: {s.cot_risk_auc:.4f}")
lines.append("║ ── Occupancy Prediction ──")
o = self.occupancy
lines.append(f"║ IoU: near={o.iou_near:.4f} far={o.iou_far:.4f}")
lines.append(f"║ VPQ: near={o.vpq_near:.4f} far={o.vpq_far:.4f}")
lines.append("╚═══════════════════════════════════════════════════════════╝")
return "\n".join(lines)
def save(self, path: str):
with open(path, 'w') as f:
json.dump(self.to_dict(), f, indent=2)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Metric Computation Functions
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def compute_l2_error(
pred_waypoints: torch.Tensor,
gt_waypoints: torch.Tensor,
fps: float = 2.0,
) -> Dict[str, float]:
"""
nuScenes planning L2 error at 1s, 2s, 3s horizons.
Args:
pred_waypoints: (B, T, 2+) predicted (x, y, ...)
gt_waypoints: (B, T, 2+) ground truth (x, y, ...)
fps: waypoints per second
Returns:
Dict with l2 at each horizon
"""
B, T, _ = pred_waypoints.shape
disp = torch.norm(pred_waypoints[:, :, :2] - gt_waypoints[:, :, :2], dim=-1) # (B, T)
horizons = {"1s": int(1 * fps), "2s": int(2 * fps), "3s": int(3 * fps)}
results = {}
for label, idx in horizons.items():
if idx <= T:
results[f"l2_{label}"] = disp[:, :idx].mean().item()
else:
results[f"l2_{label}"] = disp.mean().item()
results["l2_avg"] = np.mean([results[f"l2_{k}"] for k in ["1s", "2s", "3s"]])
return results
def compute_collision_rate(
pred_waypoints: torch.Tensor,
occupancy_grid: torch.Tensor,
bev_resolution: float = 0.25,
bev_origin: Tuple[float, float] = (0.0, 0.0),
fps: float = 2.0,
ego_extent: Tuple[float, float] = (2.0, 1.0),
) -> Dict[str, float]:
"""
Collision rate: % of trajectories that enter occupied grid cells.
Args:
pred_waypoints: (B, T, 2+)
occupancy_grid: (B, 1, H, W) binary
bev_resolution: meters per pixel
fps: waypoints per second
ego_extent: (half_length, half_width)
"""
B, T, _ = pred_waypoints.shape
H, W = occupancy_grid.shape[2], occupancy_grid.shape[3]
collisions_per_step = torch.zeros(B, T)
for t in range(T):
x = pred_waypoints[:, t, 0]
y = pred_waypoints[:, t, 1]
# Convert to grid coordinates
gx = ((x - bev_origin[0]) / bev_resolution + W / 2).long().clamp(0, W - 1)
gy = ((y - bev_origin[1]) / bev_resolution + H / 2).long().clamp(0, H - 1)
for b in range(B):
# Check ego footprint (approximate)
r_x = max(1, int(ego_extent[0] / bev_resolution))
r_y = max(1, int(ego_extent[1] / bev_resolution))
x_lo = max(0, gx[b].item() - r_x)
x_hi = min(W, gx[b].item() + r_x + 1)
y_lo = max(0, gy[b].item() - r_y)
y_hi = min(H, gy[b].item() + r_y + 1)
patch = occupancy_grid[b, 0, y_lo:y_hi, x_lo:x_hi]
if patch.numel() > 0 and patch.max() > 0.5:
collisions_per_step[b, t] = 1.0
has_collision = (collisions_per_step.cumsum(dim=1) > 0).float() # (B, T)
horizons = {"1s": int(1 * fps), "2s": int(2 * fps), "3s": int(3 * fps)}
results = {}
for label, idx in horizons.items():
if idx <= T:
results[f"col_{label}"] = has_collision[:, idx - 1].mean().item()
else:
results[f"col_{label}"] = has_collision[:, -1].mean().item()
results["col_avg"] = np.mean([results[f"col_{k}"] for k in ["1s", "2s", "3s"]])
return results
def compute_nds(
pred_heatmap: torch.Tensor,
gt_heatmap: torch.Tensor,
pred_bbox: torch.Tensor,
gt_bbox: Optional[torch.Tensor] = None,
pred_velocity: Optional[torch.Tensor] = None,
) -> DetectionMetrics:
"""
Approximate nuScenes Detection Score.
Uses IoU-based mAP on BEV heatmaps and regression errors for TP metrics.
"""
B = pred_heatmap.shape[0]
num_classes = pred_heatmap.shape[1]
# mAP: threshold heatmaps and compute IoU per class
pred_binary = (pred_heatmap > 0.3).float()
gt_binary = (gt_heatmap > 0.5).float()
aps = []
for c in range(num_classes):
intersection = (pred_binary[:, c] * gt_binary[:, c]).sum()
union = (pred_binary[:, c] + gt_binary[:, c]).clamp(max=1).sum()
iou = (intersection / union.clamp(min=1)).item()
aps.append(iou)
mAP = np.mean(aps)
# TP metrics (approximated from bbox regression)
# mATE: translation error
mATE = F.l1_loss(pred_bbox[:, :2], gt_bbox[:, :2]).item() if gt_bbox is not None else 0.5
# mASE: scale error
mASE = F.l1_loss(pred_bbox[:, 2:4], gt_bbox[:, 2:4]).item() if gt_bbox is not None else 0.5
# mAOE: orientation error
mAOE = F.l1_loss(pred_bbox[:, 4:6], gt_bbox[:, 4:6]).item() if gt_bbox is not None else 0.5
# mAVE: velocity error
if pred_velocity is not None and gt_bbox is not None:
mAVE = 0.5 # placeholder
else:
mAVE = 0.5
mAAE = 0.3 # attribute error placeholder
# NDS composite
TP = 1.0 - min(1.0, (mATE + mASE + mAOE + mAVE + mAAE) / 5.0)
NDS = (5 * mAP + 5 * TP) / 10.0
return DetectionMetrics(
mAP=mAP, mATE=mATE, mASE=mASE, mAOE=mAOE,
mAVE=mAVE, mAAE=mAAE, NDS=NDS,
)
def compute_safety_metrics(
pred_waypoints: torch.Tensor,
ego_state: torch.Tensor,
ultrasonic_distances: torch.Tensor,
cot_output: Optional[Dict[str, torch.Tensor]] = None,
gt_emergency: Optional[torch.Tensor] = None,
max_speed_ms: float = 8.94,
min_following_dist: float = 4.0,
dt: float = 0.5,
) -> SafetyMetrics:
"""
Compute all safety metrics from model outputs.
"""
B, T, _ = pred_waypoints.shape
metrics = SafetyMetrics()
# ── TTC from ultrasonic readings ──
us_min = ultrasonic_distances.min(dim=1)[0].squeeze(-1) # (B,)
speed = ego_state[:, 0].clamp(min=0.01)
ttc = us_min / speed # approximate TTC
metrics.min_ttc = ttc.min().item()
metrics.mean_ttc = ttc.mean().item()
metrics.ttc_below_2s_rate = (ttc < 2.0).float().mean().item()
# ── Emergency brake precision/recall ──
if cot_output is not None and "cot/override_confidence" in cot_output and gt_emergency is not None:
pred_emerg = (cot_output["cot/override_confidence"].squeeze(-1) > 0.5).float()
gt_emerg = gt_emergency.float()
tp = (pred_emerg * gt_emerg).sum().item()
fp = (pred_emerg * (1 - gt_emerg)).sum().item()
fn = ((1 - pred_emerg) * gt_emerg).sum().item()
metrics.emergency_brake_precision = tp / max(tp + fp, 1)
metrics.emergency_brake_recall = tp / max(tp + fn, 1)
if metrics.emergency_brake_precision + metrics.emergency_brake_recall > 0:
metrics.emergency_brake_f1 = (
2 * metrics.emergency_brake_precision * metrics.emergency_brake_recall /
(metrics.emergency_brake_precision + metrics.emergency_brake_recall)
)
# ── Jerk (smoothness / comfort) ──
speeds = pred_waypoints[:, :, 3] if pred_waypoints.shape[-1] > 3 else speed.unsqueeze(1).expand(B, T)
if T >= 3:
accel = (speeds[:, 1:] - speeds[:, :-1]) / dt
jerk = (accel[:, 1:] - accel[:, :-1]) / dt
metrics.mean_jerk = jerk.abs().mean().item()
metrics.max_jerk = jerk.abs().max().item()
# ── Obstacle distance ──
metrics.min_obstacle_distance = us_min.min().item()
metrics.mean_obstacle_distance = us_min.mean().item()
# ── Speed compliance ──
if pred_waypoints.shape[-1] > 3:
planned_speeds = pred_waypoints[:, :, 3]
compliance = (planned_speeds <= max_speed_ms + 0.1).float()
metrics.speed_compliance_rate = compliance.mean().item()
else:
metrics.speed_compliance_rate = 1.0
# ── Safe following distance ──
front_sensors = ultrasonic_distances[:, :7, :] # front 7 ultrasonics
front_min = front_sensors.min(dim=1)[0].squeeze(-1)
metrics.safe_following_distance_rate = (front_min >= min_following_dist).float().mean().item()
# ── CoT metrics ──
if cot_output is not None:
if "cot/aggregate_risk" in cot_output:
risk_pred = cot_output["cot/aggregate_risk"].squeeze(-1)
# AUC approximation: correlation between predicted risk and actual close distance
actual_danger = (us_min < 1.5).float()
# Simple AUC by sorting
if actual_danger.sum() > 0 and (1 - actual_danger).sum() > 0:
metrics.cot_risk_auc = _approx_auc(risk_pred, actual_danger)
else:
metrics.cot_risk_auc = 0.5
if "cot/override_confidence" in cot_output:
override = cot_output["cot/override_confidence"].squeeze(-1)
actual_need = (us_min < 2.0).float()
correct = ((override > 0.5) == (actual_need > 0.5)).float()
metrics.cot_override_accuracy = correct.mean().item()
return metrics
def compute_occupancy_metrics(
pred_occ: torch.Tensor,
gt_occ: torch.Tensor,
near_range: int = 60, # pixels for 30x30m at 0.25m/px
) -> OccupancyMetrics:
"""IoU and VPQ for occupancy prediction."""
B, _, H, W = pred_occ.shape
pred_bin = (pred_occ > 0.5).float()
gt_bin = (gt_occ > 0.5).float()
# Near range (center crop)
h_start = max(0, H // 2 - near_range // 2)
w_start = max(0, W // 2 - near_range // 2)
pred_near = pred_bin[:, :, h_start:h_start+near_range, w_start:w_start+near_range]
gt_near = gt_bin[:, :, h_start:h_start+near_range, w_start:w_start+near_range]
def _iou(p, g):
inter = (p * g).sum()
union = (p + g).clamp(max=1).sum()
return (inter / union.clamp(min=1)).item()
iou_near = _iou(pred_near, gt_near)
iou_far = _iou(pred_bin, gt_bin)
# VPQ approximation (IoU * recognition quality)
vpq_near = iou_near * 0.9 # simplified
vpq_far = iou_far * 0.85
return OccupancyMetrics(
iou_near=iou_near, iou_far=iou_far,
vpq_near=vpq_near, vpq_far=vpq_far,
)
def compute_carla_metrics(
pred_waypoints: torch.Tensor,
gt_waypoints: torch.Tensor,
occupancy_grid: torch.Tensor,
gt_traffic_state: Optional[torch.Tensor] = None,
max_speed_ms: float = 8.94,
bev_resolution: float = 0.25,
) -> CARLAMetrics:
"""
CARLA-style closed-loop metrics approximated from open-loop data.
"""
B, T, _ = pred_waypoints.shape
metrics = CARLAMetrics()
# Route completion: how far along the GT route did we get?
gt_dist = torch.norm(gt_waypoints[:, -1, :2] - gt_waypoints[:, 0, :2], dim=-1)
pred_progress = torch.norm(pred_waypoints[:, -1, :2] - pred_waypoints[:, 0, :2], dim=-1)
completion = (pred_progress / gt_dist.clamp(min=0.1)).clamp(0, 1)
metrics.route_completion = completion.mean().item() * 100
# Collision count
col_results = compute_collision_rate(
pred_waypoints, occupancy_grid, bev_resolution=bev_resolution
)
metrics.num_collisions = int(col_results["col_avg"] * B)
# Infraction penalty
collision_penalty = 0.5 ** metrics.num_collisions
red_light_penalty = 1.0 # no signal sim in open loop
metrics.infraction_score = collision_penalty * red_light_penalty
metrics.driving_score = metrics.route_completion * metrics.infraction_score / 100
return metrics
def _approx_auc(scores: torch.Tensor, labels: torch.Tensor) -> float:
"""Approximate AUC-ROC using the trapezoidal rule."""
sorted_idx = scores.argsort(descending=True)
labels_sorted = labels[sorted_idx]
n_pos = labels.sum().item()
n_neg = labels.numel() - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_prev, fpr_prev, auc = 0.0, 0.0, 0.0
tp, fp = 0.0, 0.0
for lab in labels_sorted:
if lab > 0.5:
tp += 1
else:
fp += 1
tpr = tp / n_pos
fpr = fp / n_neg
auc += (fpr - fpr_prev) * (tpr + tpr_prev) / 2
tpr_prev, fpr_prev = tpr, fpr
return min(max(auc, 0.0), 1.0)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Full Benchmark Runner
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class FSDExternalBenchmark:
"""
Runs the complete external benchmark suite on the FSD model.
Usage:
benchmark = FSDExternalBenchmark(model, data_generator, num_scenarios=500)
results = benchmark.run()
print(results.summary())
results.save("benchmark_results.json")
"""
SCENARIOS = ["urban", "highway", "parking", "intersection"]
SCENARIO_WEIGHTS = {"urban": 0.4, "highway": 0.2, "parking": 0.15, "intersection": 0.25}
def __init__(
self,
model,
data_generator,
num_scenarios: int = 200,
batch_size: int = 4,
device: str = "cpu",
max_speed_ms: float = 8.94,
bev_resolution: float = 0.25,
has_cot: bool = False,
):
self.model = model
self.data_gen = data_generator
self.num_scenarios = num_scenarios
self.batch_size = batch_size
self.device = device
self.max_speed_ms = max_speed_ms
self.bev_resolution = bev_resolution
self.has_cot = has_cot
@torch.no_grad()
def run(self) -> BenchmarkResult:
"""Execute the full benchmark and return aggregated results."""
self.model.eval()
# Accumulators
all_l2, all_col = [], []
all_det = []
all_safety = []
all_occ = []
all_carla = []
t0 = time.time()
total_samples = 0
scenarios_per_type = max(1, self.num_scenarios // len(self.SCENARIOS))
for scenario in self.SCENARIOS:
n_batches = max(1, scenarios_per_type // self.batch_size)
for _ in range(n_batches):
inputs, targets = self.data_gen.generate_batch(
batch_size=self.batch_size,
scenario=scenario,
device=self.device,
)
output = self.model(**inputs)
total_samples += self.batch_size
# Get waypoints
pred_wp = output.get("planning/safe_waypoints",
output.get("cot/gated_waypoints",
output.get("planning/raw_waypoints")))
gt_wp = targets["gt_waypoints"]
# 1. Planning L2
l2 = compute_l2_error(pred_wp, gt_wp, fps=2.0)
all_l2.append(l2)
# 2. Collision rate
col = compute_collision_rate(
pred_wp, targets["gt_occupancy"],
bev_resolution=self.bev_resolution,
)
all_col.append(col)
# 3. Detection NDS
det = compute_nds(
output["perception/object_heatmap"],
targets["gt_heatmap"],
output["perception/object_bbox"],
gt_bbox=None,
)
all_det.append(det)
# 4. Safety
gt_emergency = (targets["gt_brake"] > 0.5).float() if "gt_brake" in targets else None
cot_out = {k: v for k, v in output.items() if k.startswith("cot/")} if self.has_cot else None
safety = compute_safety_metrics(
pred_wp, inputs["ego_state"],
inputs["ultrasonic_distances"],
cot_output=cot_out,
gt_emergency=gt_emergency,
max_speed_ms=self.max_speed_ms,
)
all_safety.append(safety)
# 5. Occupancy
occ = compute_occupancy_metrics(
output["perception/occupancy_current"],
targets["gt_occupancy"],
)
all_occ.append(occ)
# 6. CARLA
carla = compute_carla_metrics(
pred_wp, gt_wp, targets["gt_occupancy"],
max_speed_ms=self.max_speed_ms,
bev_resolution=self.bev_resolution,
)
all_carla.append(carla)
elapsed = time.time() - t0
# Aggregate
result = BenchmarkResult()
result.total_samples = total_samples
result.total_time_s = elapsed
result.fps = total_samples / max(elapsed, 0.001)
# Planning
result.planning.l2_1s = np.mean([r["l2_1s"] for r in all_l2])
result.planning.l2_2s = np.mean([r["l2_2s"] for r in all_l2])
result.planning.l2_3s = np.mean([r["l2_3s"] for r in all_l2])
result.planning.l2_avg = np.mean([r["l2_avg"] for r in all_l2])
result.planning.collision_rate_1s = np.mean([r["col_1s"] for r in all_col])
result.planning.collision_rate_2s = np.mean([r["col_2s"] for r in all_col])
result.planning.collision_rate_3s = np.mean([r["col_3s"] for r in all_col])
result.planning.collision_rate_avg = np.mean([r["col_avg"] for r in all_col])
result.planning.planning_score = (
(1.0 - result.planning.l2_avg / 5.0) *
(1.0 - result.planning.collision_rate_avg)
)
# Detection
result.detection.mAP = np.mean([d.mAP for d in all_det])
result.detection.NDS = np.mean([d.NDS for d in all_det])
result.detection.mATE = np.mean([d.mATE for d in all_det])
result.detection.mASE = np.mean([d.mASE for d in all_det])
result.detection.mAOE = np.mean([d.mAOE for d in all_det])
result.detection.mAVE = np.mean([d.mAVE for d in all_det])
result.detection.mAAE = np.mean([d.mAAE for d in all_det])
# CARLA
result.carla.route_completion = np.mean([c.route_completion for c in all_carla])
result.carla.infraction_score = np.mean([c.infraction_score for c in all_carla])
result.carla.driving_score = np.mean([c.driving_score for c in all_carla])
result.carla.num_collisions = sum(c.num_collisions for c in all_carla)
# Safety
result.safety.min_ttc = min(s.min_ttc for s in all_safety)
result.safety.mean_ttc = np.mean([s.mean_ttc for s in all_safety])
result.safety.ttc_below_2s_rate = np.mean([s.ttc_below_2s_rate for s in all_safety])
result.safety.emergency_brake_precision = np.mean([s.emergency_brake_precision for s in all_safety])
result.safety.emergency_brake_recall = np.mean([s.emergency_brake_recall for s in all_safety])
result.safety.emergency_brake_f1 = np.mean([s.emergency_brake_f1 for s in all_safety])
result.safety.mean_jerk = np.mean([s.mean_jerk for s in all_safety])
result.safety.max_jerk = max(s.max_jerk for s in all_safety)
result.safety.min_obstacle_distance = min(s.min_obstacle_distance for s in all_safety)
result.safety.mean_obstacle_distance = np.mean([s.mean_obstacle_distance for s in all_safety])
result.safety.speed_compliance_rate = np.mean([s.speed_compliance_rate for s in all_safety])
result.safety.safe_following_distance_rate = np.mean([s.safe_following_distance_rate for s in all_safety])
if self.has_cot:
result.safety.cot_override_accuracy = np.mean([s.cot_override_accuracy for s in all_safety])
result.safety.cot_risk_auc = np.mean([s.cot_risk_auc for s in all_safety])
# Occupancy
result.occupancy.iou_near = np.mean([o.iou_near for o in all_occ])
result.occupancy.iou_far = np.mean([o.iou_far for o in all_occ])
result.occupancy.vpq_near = np.mean([o.vpq_near for o in all_occ])
result.occupancy.vpq_far = np.mean([o.vpq_far for o in all_occ])
return result