#!/usr/bin/env python3 """ Full benchmark runner for FSD Model with CoT reasoning. Runs external benchmarks, compares with/without CoT, and optimizes. """ import sys import time import torch import numpy as np sys.path.insert(0, '/app') from fsd_model.config import VehicleConfig from fsd_model.model import FullSelfDrivingModel from fsd_model.data import FSDDataGenerator from fsd_model.benchmarks import FSDExternalBenchmark from fsd_model.visualization import format_parameter_count def build_large_model(enable_cot=True): """Build the scaled-up FSD model (larger than v1).""" config = VehicleConfig() model = FullSelfDrivingModel( vehicle_config=config, bev_size=200, bev_resolution=0.25, bev_feature_dim=256, num_object_classes=10, num_seg_classes=7, num_waypoints=20, planning_d_model=256, future_steps=6, num_forecast_modes=6, forecast_steps=12, num_behaviors=10, enable_cot=enable_cot, cot_num_actor_queries=64, cot_num_road_queries=32, ) return model, config def build_small_model(enable_cot=True): """Build a test-sized model for CPU benchmark.""" config = VehicleConfig() model = FullSelfDrivingModel( vehicle_config=config, bev_size=100, bev_resolution=0.5, bev_feature_dim=128, num_object_classes=10, num_seg_classes=7, num_waypoints=20, planning_d_model=128, future_steps=6, num_forecast_modes=6, forecast_steps=12, num_behaviors=10, enable_cot=enable_cot, cot_num_actor_queries=32, cot_num_road_queries=16, ) return model, config def run_benchmark_comparison(): """Run benchmarks with and without CoT and compare.""" print("=" * 70) print(" FSD Model — External Benchmark Suite") print(" Comparing: Base Model vs. CoT-Enhanced Model") print("=" * 70) # ── Build models ── print("\n[1/4] Building models...") model_no_cot, config = build_small_model(enable_cot=False) model_with_cot, _ = build_small_model(enable_cot=True) print("\n Base model (no CoT):") counts_no = model_no_cot.count_parameters() print(format_parameter_count(counts_no)) print("\n CoT-enhanced model:") counts_cot = model_with_cot.count_parameters() print(format_parameter_count(counts_cot)) cot_overhead = counts_cot["total"] - counts_no["total"] print(f"\n CoT parameter overhead: {cot_overhead:,} ({cot_overhead/counts_no['total']:.1%} increase)") # ── Data generator ── data_gen = FSDDataGenerator(config, bev_size=100, image_size=(120, 160)) # ── Quick forward pass sanity check ── print("\n[2/4] Sanity check forward passes...") inputs, targets = data_gen.generate_batch(batch_size=2, scenario="urban") with torch.no_grad(): out_no = model_no_cot(**inputs) out_cot = model_with_cot(**inputs) print(f" Base model outputs: {len(out_no)} keys") print(f" CoT model outputs: {len(out_cot)} keys") cot_keys = [k for k in out_cot.keys() if k.startswith("cot/")] print(f" CoT-specific outputs: {len(cot_keys)} keys") for k in sorted(cot_keys)[:10]: print(f" {k}: {out_cot[k].shape}") # ── Run benchmarks ── N = 48 # total scenarios (fast for CPU) BS = 4 print(f"\n[3/4] Running external benchmarks ({N} scenarios each)...") print("\n ── Base Model (no CoT) ──") bench_no = FSDExternalBenchmark( model_no_cot, data_gen, num_scenarios=N, batch_size=BS, max_speed_ms=config.max_speed_ms, has_cot=False, ) result_no = bench_no.run() print(result_no.summary()) print("\n ── CoT-Enhanced Model ──") bench_cot = FSDExternalBenchmark( model_with_cot, data_gen, num_scenarios=N, batch_size=BS, max_speed_ms=config.max_speed_ms, has_cot=True, ) result_cot = bench_cot.run() print(result_cot.summary()) # ── Comparison ── print("\n[4/4] Comparison Summary") print("=" * 70) print(f"{'Metric':<40} {'Base':>12} {'+ CoT':>12} {'Delta':>12}") print("-" * 70) comparisons = [ ("Planning L2 avg (m) ↓", result_no.planning.l2_avg, result_cot.planning.l2_avg), ("Collision rate avg ↓", result_no.planning.collision_rate_avg, result_cot.planning.collision_rate_avg), ("Planning score ↑", result_no.planning.planning_score, result_cot.planning.planning_score), ("NDS ↑", result_no.detection.NDS, result_cot.detection.NDS), ("mAP ↑", result_no.detection.mAP, result_cot.detection.mAP), ("CARLA driving score ↑", result_no.carla.driving_score, result_cot.carla.driving_score), ("Route completion % ↑", result_no.carla.route_completion, result_cot.carla.route_completion), ("Total collisions ↓", result_no.carla.num_collisions, result_cot.carla.num_collisions), ("Min TTC (s) ↑", result_no.safety.min_ttc, result_cot.safety.min_ttc), ("Mean TTC (s) ↑", result_no.safety.mean_ttc, result_cot.safety.mean_ttc), ("TTC <2s rate ↓", result_no.safety.ttc_below_2s_rate, result_cot.safety.ttc_below_2s_rate), ("Speed compliance ↑", result_no.safety.speed_compliance_rate, result_cot.safety.speed_compliance_rate), ("Safe following dist ↑", result_no.safety.safe_following_distance_rate, result_cot.safety.safe_following_distance_rate), ("Mean jerk (m/s³) ↓", result_no.safety.mean_jerk, result_cot.safety.mean_jerk), ("Occ IoU near ↑", result_no.occupancy.iou_near, result_cot.occupancy.iou_near), ("Occ IoU far ↑", result_no.occupancy.iou_far, result_cot.occupancy.iou_far), ("FPS", result_no.fps, result_cot.fps), ] # CoT-only metrics cot_only = [ ("CoT override accuracy ↑", "—", result_cot.safety.cot_override_accuracy), ("CoT risk AUC ↑", "—", result_cot.safety.cot_risk_auc), ("E-brake precision ↑", "—", result_cot.safety.emergency_brake_precision), ("E-brake recall ↑", "—", result_cot.safety.emergency_brake_recall), ("E-brake F1 ↑", "—", result_cot.safety.emergency_brake_f1), ] for name, base, cot in comparisons: delta = cot - base sign = "+" if delta > 0 else "" print(f" {name:<38} {base:>12.4f} {cot:>12.4f} {sign}{delta:>11.4f}") print("-" * 70) print(" CoT-Specific Metrics:") for name, base, cot in cot_only: print(f" {name:<38} {str(base):>12} {cot:>12.4f}") print("=" * 70) # ── Show full-size model parameter counts ── print("\n Full-size model (production config):") model_full, _ = build_large_model(enable_cot=True) counts_full = model_full.count_parameters() print(format_parameter_count(counts_full)) # Save results result_no.save("/app/benchmark_base.json") result_cot.save("/app/benchmark_cot.json") print("\nResults saved to /app/benchmark_base.json and /app/benchmark_cot.json") return result_no, result_cot if __name__ == "__main__": run_benchmark_comparison()