"""Assess NSGA-II repeatability and budget convergence for paper Pareto fronts. The canonical fronts in ``reports/pareto_fronts`` are deliberately small enough to regenerate on a laptop. This script runs the same evaluator-backed pipeline across several seeds and generation budgets, then writes summary artifacts that support the manuscript's optimizer-robustness claim. """ from __future__ import annotations import argparse import json import sys import time from pathlib import Path from typing import Any import numpy as np import pandas as pd from pymoo.indicators.hv import HV REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from roverdevkit.mission.scenarios import list_scenarios, load_scenario from roverdevkit.terramechanics.soils import get_soil_parameters from roverdevkit.tradespace.optimizer import ( DEFAULT_OBJECTIVES, DESIGN_BOUNDS, NSGA2Runner, OptimizationConstraint, ) from roverdevkit.validation.rover_rediscovery import _scenario_panel_orientation from scripts.generate_pareto_fronts import ( DEFAULT_EVALUATOR_EVAL_CAP, DEFAULT_RANGE_FLOOR_KM, SCENARIO_OVERRIDES, ) DEFAULT_SEEDS = (12, 112) DEFAULT_GENERATIONS = (30, 60, 90) MASS_NORM_MAX_KG = 80.0 SLOPE_NORM_MAX_DEG = 45.0 def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__) p.add_argument( "--out-dir", type=Path, default=Path("reports") / "optimizer_robustness", help="Directory for optimizer-robustness CSV/JSON/Markdown artifacts.", ) p.add_argument( "--scenarios", nargs="+", default=None, help="Scenario names to run. Defaults to all canonical scenarios.", ) p.add_argument("--population-size", type=int, default=50) p.add_argument( "--generations", nargs="+", type=int, default=list(DEFAULT_GENERATIONS), help="Generation budgets to test.", ) p.add_argument( "--seeds", nargs="+", type=int, default=list(DEFAULT_SEEDS), help="Random seeds to run at each generation budget.", ) return p.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = _parse_args(argv) out_dir = args.out_dir fronts_dir = out_dir / "fronts" fronts_dir.mkdir(parents=True, exist_ok=True) scenarios = _scenario_names(args.scenarios) range_floor = OptimizationConstraint( target="range_km", sense="min", value=DEFAULT_RANGE_FLOOR_KM ) rows: list[dict[str, Any]] = [] for scenario_name in scenarios: scenario = load_scenario(scenario_name) override = SCENARIO_OVERRIDES.get(scenario_name) objectives = DEFAULT_OBJECTIVES if override is None else override.objectives constraints = ( (range_floor,) if override is None else (range_floor, *override.extra_constraints) ) if override is not None and override.traverse_distance_m is not None: scenario = scenario.model_copy( update={"traverse_distance_m": override.traverse_distance_m} ) soil = get_soil_parameters(scenario.soil_simulant) panel_tilt_deg, panel_azimuth_deg = _scenario_panel_orientation(scenario) max_generations = max(args.generations) max_budget_fronts: list[pd.DataFrame] = [] for generations in args.generations: for seed in args.seeds: t0 = time.perf_counter() result = NSGA2Runner( scenario, soil, backend="evaluator", objectives=objectives, constraints=constraints, population_size=args.population_size, n_generations=generations, seed=seed, evaluator_eval_cap=DEFAULT_EVALUATOR_EVAL_CAP, panel_tilt_deg=panel_tilt_deg, panel_azimuth_deg=panel_azimuth_deg, ).run() elapsed_s = time.perf_counter() - t0 front = result.to_frame() front.insert(0, "seed", seed) front.insert(0, "generations", generations) front.insert(0, "scenario_name", scenario_name) front_path = ( fronts_dir / f"front_{scenario_name}_g{generations}_s{seed}.csv" ) front.to_csv(front_path, index=False) if generations == max_generations: max_budget_fronts.append(front) row = _summarize_front( front, scenario_name=scenario_name, generations=generations, seed=seed, elapsed_s=elapsed_s, traverse_distance_km=scenario.traverse_distance_m / 1000.0, objectives=objectives, front_csv=front_path, ) rows.append(row) print( f"{scenario_name} g={generations} seed={seed}: " f"hv={row['normalized_hypervolume']:.3f}, " f"n={row['pareto_size']} ({elapsed_s:.1f} s)", flush=True, ) reference = ( pd.concat(max_budget_fronts, ignore_index=True) if max_budget_fronts else pd.DataFrame() ) for row in rows: if row["scenario_name"] != scenario_name: continue front = pd.read_csv(row["front_csv"]) row["median_distance_to_max_budget_front"] = _median_distance_to_reference( front, reference, traverse_distance_km=scenario.traverse_distance_m / 1000.0, objectives=objectives, ) per_run = pd.DataFrame(rows) per_run_path = out_dir / "optimizer_robustness_runs.csv" per_run.to_csv(per_run_path, index=False) summary = _aggregate(per_run) summary_path = out_dir / "optimizer_robustness_summary.csv" summary.to_csv(summary_path, index=False) manifest = { "population_size": args.population_size, "generations": args.generations, "seeds": args.seeds, "scenarios": scenarios, "per_run_csv": str(per_run_path), "summary_csv": str(summary_path), } (out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n") _write_markdown(out_dir / "optimizer_robustness_report.md", summary, manifest) return 0 def _scenario_names(raw: list[str] | None) -> list[str]: allowed = set(list_scenarios()) values = list_scenarios() if raw is None else raw unknown = sorted(set(values) - allowed) if unknown: raise ValueError(f"unknown scenario(s) {unknown}; allowed: {sorted(allowed)}") return list(values) def _summarize_front( front: pd.DataFrame, *, scenario_name: str, generations: int, seed: int, elapsed_s: float, traverse_distance_km: float, objectives: tuple[Any, ...], front_csv: Path, ) -> dict[str, Any]: if front.empty: return { "scenario_name": scenario_name, "generations": generations, "seed": seed, "elapsed_s": elapsed_s, "pareto_size": 0, "normalized_hypervolume": 0.0, "front_csv": str(front_csv), } return { "scenario_name": scenario_name, "generations": generations, "seed": seed, "elapsed_s": elapsed_s, "pareto_size": len(front), "normalized_hypervolume": _normalized_hypervolume( front, traverse_distance_km=traverse_distance_km, objectives=objectives, ), "max_range_km": float(front["range_km"].max()), "min_mass_kg": float(front["total_mass_kg"].min()), "max_slope_capability_deg": float(front["slope_capability_deg"].max()), "four_wheel_pct": float((front["n_wheels"] == 4).mean() * 100.0), "width_floor_pct": float( (front["wheel_width_m"] <= DESIGN_BOUNDS["wheel_width_m"][0] + 1e-3).mean() * 100.0 ), "radius_ceiling_pct": float( (front["wheel_radius_m"] >= DESIGN_BOUNDS["wheel_radius_m"][1] - 1e-3).mean() * 100.0 ), "grouser_ceiling_pct": float( ( front["grouser_height_m"] >= DESIGN_BOUNDS["grouser_height_m"][1] - 1e-4 ).mean() * 100.0 ), "front_csv": str(front_csv), } def _aggregate(per_run: pd.DataFrame) -> pd.DataFrame: numeric = [ "normalized_hypervolume", "median_distance_to_max_budget_front", "max_range_km", "min_mass_kg", "max_slope_capability_deg", "four_wheel_pct", "width_floor_pct", "radius_ceiling_pct", "grouser_ceiling_pct", ] grouped = per_run.groupby(["scenario_name", "generations"], sort=True) out = grouped[numeric].agg(["mean", "std", "min", "max"]).reset_index() out.columns = [ "_".join(str(part) for part in col if part) for col in out.columns.to_flat_index() ] out["n_runs"] = grouped.size().to_numpy() return out def _normalized_hypervolume( front: pd.DataFrame, *, traverse_distance_km: float, objectives: tuple[Any, ...], ) -> float: F = _normalized_objectives(front, traverse_distance_km, objectives) if F.size == 0: return 0.0 return float(HV(ref_point=np.ones(F.shape[1]) * 1.05).do(F)) def _median_distance_to_reference( front: pd.DataFrame, reference: pd.DataFrame, *, traverse_distance_km: float, objectives: tuple[Any, ...], ) -> float: if front.empty or reference.empty: return float("nan") F = _normalized_objectives(front, traverse_distance_km, objectives) R = _normalized_objectives(reference, traverse_distance_km, objectives) distances = np.sqrt(((F[:, None, :] - R[None, :, :]) ** 2).sum(axis=2)) return float(np.median(np.min(distances, axis=1))) def _normalized_objectives( front: pd.DataFrame, traverse_distance_km: float, objectives: tuple[Any, ...], ) -> np.ndarray: values: list[np.ndarray] = [] for objective in objectives: target = objective.target raw = front[target].to_numpy(dtype=float) if target == "range_km": norm = raw / max(traverse_distance_km, 1e-9) elif target == "total_mass_kg": norm = raw / MASS_NORM_MAX_KG elif target == "slope_capability_deg": norm = raw / SLOPE_NORM_MAX_DEG else: raise ValueError(f"unsupported objective target {target!r}") norm = np.clip(norm, 0.0, 1.0) values.append(norm if objective.direction == "min" else 1.0 - norm) return np.column_stack(values) def _write_markdown(path: Path, summary: pd.DataFrame, manifest: dict[str, Any]) -> None: cols = [ "scenario_name", "generations", "n_runs", "normalized_hypervolume_mean", "normalized_hypervolume_std", "median_distance_to_max_budget_front_mean", "four_wheel_pct_mean", "width_floor_pct_mean", ] lines = [ "# Optimizer Robustness", "", ( f"Population size {manifest['population_size']}; seeds " f"{manifest['seeds']}; generation budgets {manifest['generations']}." ), "", _markdown_table(summary[cols]), "", "Per-run CSV: `optimizer_robustness_runs.csv`.", "Summary CSV: `optimizer_robustness_summary.csv`.", "", ] path.write_text("\n".join(lines)) def _markdown_table(df: pd.DataFrame) -> str: headers = list(df.columns) lines = [ "| " + " | ".join(headers) + " |", "| " + " | ".join("---" for _ in headers) + " |", ] for _, row in df.iterrows(): values = [] for header in headers: value = row[header] if isinstance(value, float): values.append(f"{value:.3f}") else: values.append(str(value)) lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) if __name__ == "__main__": sys.exit(main())