FSD-Level5-CoT / run_benchmarks.py
Reality123b's picture
Add run_benchmarks.py
c5b731e verified
#!/usr/bin/env python3
"""
Full benchmark runner for FSD Model with CoT reasoning.
Runs external benchmarks, compares with/without CoT, and optimizes.
"""
import sys
import time
import torch
import numpy as np
sys.path.insert(0, '/app')
from fsd_model.config import VehicleConfig
from fsd_model.model import FullSelfDrivingModel
from fsd_model.data import FSDDataGenerator
from fsd_model.benchmarks import FSDExternalBenchmark
from fsd_model.visualization import format_parameter_count
def build_large_model(enable_cot=True):
"""Build the scaled-up FSD model (larger than v1)."""
config = VehicleConfig()
model = FullSelfDrivingModel(
vehicle_config=config,
bev_size=200,
bev_resolution=0.25,
bev_feature_dim=256,
num_object_classes=10,
num_seg_classes=7,
num_waypoints=20,
planning_d_model=256,
future_steps=6,
num_forecast_modes=6,
forecast_steps=12,
num_behaviors=10,
enable_cot=enable_cot,
cot_num_actor_queries=64,
cot_num_road_queries=32,
)
return model, config
def build_small_model(enable_cot=True):
"""Build a test-sized model for CPU benchmark."""
config = VehicleConfig()
model = FullSelfDrivingModel(
vehicle_config=config,
bev_size=100,
bev_resolution=0.5,
bev_feature_dim=128,
num_object_classes=10,
num_seg_classes=7,
num_waypoints=20,
planning_d_model=128,
future_steps=6,
num_forecast_modes=6,
forecast_steps=12,
num_behaviors=10,
enable_cot=enable_cot,
cot_num_actor_queries=32,
cot_num_road_queries=16,
)
return model, config
def run_benchmark_comparison():
"""Run benchmarks with and without CoT and compare."""
print("=" * 70)
print(" FSD Model β€” External Benchmark Suite")
print(" Comparing: Base Model vs. CoT-Enhanced Model")
print("=" * 70)
# ── Build models ──
print("\n[1/4] Building models...")
model_no_cot, config = build_small_model(enable_cot=False)
model_with_cot, _ = build_small_model(enable_cot=True)
print("\n Base model (no CoT):")
counts_no = model_no_cot.count_parameters()
print(format_parameter_count(counts_no))
print("\n CoT-enhanced model:")
counts_cot = model_with_cot.count_parameters()
print(format_parameter_count(counts_cot))
cot_overhead = counts_cot["total"] - counts_no["total"]
print(f"\n CoT parameter overhead: {cot_overhead:,} ({cot_overhead/counts_no['total']:.1%} increase)")
# ── Data generator ──
data_gen = FSDDataGenerator(config, bev_size=100, image_size=(120, 160))
# ── Quick forward pass sanity check ──
print("\n[2/4] Sanity check forward passes...")
inputs, targets = data_gen.generate_batch(batch_size=2, scenario="urban")
with torch.no_grad():
out_no = model_no_cot(**inputs)
out_cot = model_with_cot(**inputs)
print(f" Base model outputs: {len(out_no)} keys")
print(f" CoT model outputs: {len(out_cot)} keys")
cot_keys = [k for k in out_cot.keys() if k.startswith("cot/")]
print(f" CoT-specific outputs: {len(cot_keys)} keys")
for k in sorted(cot_keys)[:10]:
print(f" {k}: {out_cot[k].shape}")
# ── Run benchmarks ──
N = 48 # total scenarios (fast for CPU)
BS = 4
print(f"\n[3/4] Running external benchmarks ({N} scenarios each)...")
print("\n ── Base Model (no CoT) ──")
bench_no = FSDExternalBenchmark(
model_no_cot, data_gen,
num_scenarios=N, batch_size=BS,
max_speed_ms=config.max_speed_ms,
has_cot=False,
)
result_no = bench_no.run()
print(result_no.summary())
print("\n ── CoT-Enhanced Model ──")
bench_cot = FSDExternalBenchmark(
model_with_cot, data_gen,
num_scenarios=N, batch_size=BS,
max_speed_ms=config.max_speed_ms,
has_cot=True,
)
result_cot = bench_cot.run()
print(result_cot.summary())
# ── Comparison ──
print("\n[4/4] Comparison Summary")
print("=" * 70)
print(f"{'Metric':<40} {'Base':>12} {'+ CoT':>12} {'Delta':>12}")
print("-" * 70)
comparisons = [
("Planning L2 avg (m) ↓", result_no.planning.l2_avg, result_cot.planning.l2_avg),
("Collision rate avg ↓", result_no.planning.collision_rate_avg, result_cot.planning.collision_rate_avg),
("Planning score ↑", result_no.planning.planning_score, result_cot.planning.planning_score),
("NDS ↑", result_no.detection.NDS, result_cot.detection.NDS),
("mAP ↑", result_no.detection.mAP, result_cot.detection.mAP),
("CARLA driving score ↑", result_no.carla.driving_score, result_cot.carla.driving_score),
("Route completion % ↑", result_no.carla.route_completion, result_cot.carla.route_completion),
("Total collisions ↓", result_no.carla.num_collisions, result_cot.carla.num_collisions),
("Min TTC (s) ↑", result_no.safety.min_ttc, result_cot.safety.min_ttc),
("Mean TTC (s) ↑", result_no.safety.mean_ttc, result_cot.safety.mean_ttc),
("TTC <2s rate ↓", result_no.safety.ttc_below_2s_rate, result_cot.safety.ttc_below_2s_rate),
("Speed compliance ↑", result_no.safety.speed_compliance_rate, result_cot.safety.speed_compliance_rate),
("Safe following dist ↑", result_no.safety.safe_following_distance_rate, result_cot.safety.safe_following_distance_rate),
("Mean jerk (m/sΒ³) ↓", result_no.safety.mean_jerk, result_cot.safety.mean_jerk),
("Occ IoU near ↑", result_no.occupancy.iou_near, result_cot.occupancy.iou_near),
("Occ IoU far ↑", result_no.occupancy.iou_far, result_cot.occupancy.iou_far),
("FPS", result_no.fps, result_cot.fps),
]
# CoT-only metrics
cot_only = [
("CoT override accuracy ↑", "β€”", result_cot.safety.cot_override_accuracy),
("CoT risk AUC ↑", "β€”", result_cot.safety.cot_risk_auc),
("E-brake precision ↑", "β€”", result_cot.safety.emergency_brake_precision),
("E-brake recall ↑", "β€”", result_cot.safety.emergency_brake_recall),
("E-brake F1 ↑", "β€”", result_cot.safety.emergency_brake_f1),
]
for name, base, cot in comparisons:
delta = cot - base
sign = "+" if delta > 0 else ""
print(f" {name:<38} {base:>12.4f} {cot:>12.4f} {sign}{delta:>11.4f}")
print("-" * 70)
print(" CoT-Specific Metrics:")
for name, base, cot in cot_only:
print(f" {name:<38} {str(base):>12} {cot:>12.4f}")
print("=" * 70)
# ── Show full-size model parameter counts ──
print("\n Full-size model (production config):")
model_full, _ = build_large_model(enable_cot=True)
counts_full = model_full.count_parameters()
print(format_parameter_count(counts_full))
# Save results
result_no.save("/app/benchmark_base.json")
result_cot.save("/app/benchmark_cot.json")
print("\nResults saved to /app/benchmark_base.json and /app/benchmark_cot.json")
return result_no, result_cot
if __name__ == "__main__":
run_benchmark_comparison()