roverdevkit / scripts /run_optimizer_robustness.py
jjreif's picture
Deploy roverdevkit @ 2676a67
b3d14e3
Raw
History Blame Contribute Delete
12.6 kB
"""Assess NSGA-II repeatability and budget convergence for paper Pareto fronts.
The canonical fronts in ``reports/pareto_fronts`` are deliberately small enough
to regenerate on a laptop. This script runs the same evaluator-backed pipeline
across several seeds and generation budgets, then writes summary artifacts that
support the manuscript's optimizer-robustness claim.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from pymoo.indicators.hv import HV
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from roverdevkit.mission.scenarios import list_scenarios, load_scenario
from roverdevkit.terramechanics.soils import get_soil_parameters
from roverdevkit.tradespace.optimizer import (
DEFAULT_OBJECTIVES,
DESIGN_BOUNDS,
NSGA2Runner,
OptimizationConstraint,
)
from roverdevkit.validation.rover_rediscovery import _scenario_panel_orientation
from scripts.generate_pareto_fronts import (
DEFAULT_EVALUATOR_EVAL_CAP,
DEFAULT_RANGE_FLOOR_KM,
SCENARIO_OVERRIDES,
)
DEFAULT_SEEDS = (12, 112)
DEFAULT_GENERATIONS = (30, 60, 90)
MASS_NORM_MAX_KG = 80.0
SLOPE_NORM_MAX_DEG = 45.0
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--out-dir",
type=Path,
default=Path("reports") / "optimizer_robustness",
help="Directory for optimizer-robustness CSV/JSON/Markdown artifacts.",
)
p.add_argument(
"--scenarios",
nargs="+",
default=None,
help="Scenario names to run. Defaults to all canonical scenarios.",
)
p.add_argument("--population-size", type=int, default=50)
p.add_argument(
"--generations",
nargs="+",
type=int,
default=list(DEFAULT_GENERATIONS),
help="Generation budgets to test.",
)
p.add_argument(
"--seeds",
nargs="+",
type=int,
default=list(DEFAULT_SEEDS),
help="Random seeds to run at each generation budget.",
)
return p.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
out_dir = args.out_dir
fronts_dir = out_dir / "fronts"
fronts_dir.mkdir(parents=True, exist_ok=True)
scenarios = _scenario_names(args.scenarios)
range_floor = OptimizationConstraint(
target="range_km", sense="min", value=DEFAULT_RANGE_FLOOR_KM
)
rows: list[dict[str, Any]] = []
for scenario_name in scenarios:
scenario = load_scenario(scenario_name)
override = SCENARIO_OVERRIDES.get(scenario_name)
objectives = DEFAULT_OBJECTIVES if override is None else override.objectives
constraints = (
(range_floor,)
if override is None
else (range_floor, *override.extra_constraints)
)
if override is not None and override.traverse_distance_m is not None:
scenario = scenario.model_copy(
update={"traverse_distance_m": override.traverse_distance_m}
)
soil = get_soil_parameters(scenario.soil_simulant)
panel_tilt_deg, panel_azimuth_deg = _scenario_panel_orientation(scenario)
max_generations = max(args.generations)
max_budget_fronts: list[pd.DataFrame] = []
for generations in args.generations:
for seed in args.seeds:
t0 = time.perf_counter()
result = NSGA2Runner(
scenario,
soil,
backend="evaluator",
objectives=objectives,
constraints=constraints,
population_size=args.population_size,
n_generations=generations,
seed=seed,
evaluator_eval_cap=DEFAULT_EVALUATOR_EVAL_CAP,
panel_tilt_deg=panel_tilt_deg,
panel_azimuth_deg=panel_azimuth_deg,
).run()
elapsed_s = time.perf_counter() - t0
front = result.to_frame()
front.insert(0, "seed", seed)
front.insert(0, "generations", generations)
front.insert(0, "scenario_name", scenario_name)
front_path = (
fronts_dir / f"front_{scenario_name}_g{generations}_s{seed}.csv"
)
front.to_csv(front_path, index=False)
if generations == max_generations:
max_budget_fronts.append(front)
row = _summarize_front(
front,
scenario_name=scenario_name,
generations=generations,
seed=seed,
elapsed_s=elapsed_s,
traverse_distance_km=scenario.traverse_distance_m / 1000.0,
objectives=objectives,
front_csv=front_path,
)
rows.append(row)
print(
f"{scenario_name} g={generations} seed={seed}: "
f"hv={row['normalized_hypervolume']:.3f}, "
f"n={row['pareto_size']} ({elapsed_s:.1f} s)",
flush=True,
)
reference = (
pd.concat(max_budget_fronts, ignore_index=True)
if max_budget_fronts
else pd.DataFrame()
)
for row in rows:
if row["scenario_name"] != scenario_name:
continue
front = pd.read_csv(row["front_csv"])
row["median_distance_to_max_budget_front"] = _median_distance_to_reference(
front,
reference,
traverse_distance_km=scenario.traverse_distance_m / 1000.0,
objectives=objectives,
)
per_run = pd.DataFrame(rows)
per_run_path = out_dir / "optimizer_robustness_runs.csv"
per_run.to_csv(per_run_path, index=False)
summary = _aggregate(per_run)
summary_path = out_dir / "optimizer_robustness_summary.csv"
summary.to_csv(summary_path, index=False)
manifest = {
"population_size": args.population_size,
"generations": args.generations,
"seeds": args.seeds,
"scenarios": scenarios,
"per_run_csv": str(per_run_path),
"summary_csv": str(summary_path),
}
(out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n")
_write_markdown(out_dir / "optimizer_robustness_report.md", summary, manifest)
return 0
def _scenario_names(raw: list[str] | None) -> list[str]:
allowed = set(list_scenarios())
values = list_scenarios() if raw is None else raw
unknown = sorted(set(values) - allowed)
if unknown:
raise ValueError(f"unknown scenario(s) {unknown}; allowed: {sorted(allowed)}")
return list(values)
def _summarize_front(
front: pd.DataFrame,
*,
scenario_name: str,
generations: int,
seed: int,
elapsed_s: float,
traverse_distance_km: float,
objectives: tuple[Any, ...],
front_csv: Path,
) -> dict[str, Any]:
if front.empty:
return {
"scenario_name": scenario_name,
"generations": generations,
"seed": seed,
"elapsed_s": elapsed_s,
"pareto_size": 0,
"normalized_hypervolume": 0.0,
"front_csv": str(front_csv),
}
return {
"scenario_name": scenario_name,
"generations": generations,
"seed": seed,
"elapsed_s": elapsed_s,
"pareto_size": len(front),
"normalized_hypervolume": _normalized_hypervolume(
front,
traverse_distance_km=traverse_distance_km,
objectives=objectives,
),
"max_range_km": float(front["range_km"].max()),
"min_mass_kg": float(front["total_mass_kg"].min()),
"max_slope_capability_deg": float(front["slope_capability_deg"].max()),
"four_wheel_pct": float((front["n_wheels"] == 4).mean() * 100.0),
"width_floor_pct": float(
(front["wheel_width_m"] <= DESIGN_BOUNDS["wheel_width_m"][0] + 1e-3).mean()
* 100.0
),
"radius_ceiling_pct": float(
(front["wheel_radius_m"] >= DESIGN_BOUNDS["wheel_radius_m"][1] - 1e-3).mean()
* 100.0
),
"grouser_ceiling_pct": float(
(
front["grouser_height_m"]
>= DESIGN_BOUNDS["grouser_height_m"][1] - 1e-4
).mean()
* 100.0
),
"front_csv": str(front_csv),
}
def _aggregate(per_run: pd.DataFrame) -> pd.DataFrame:
numeric = [
"normalized_hypervolume",
"median_distance_to_max_budget_front",
"max_range_km",
"min_mass_kg",
"max_slope_capability_deg",
"four_wheel_pct",
"width_floor_pct",
"radius_ceiling_pct",
"grouser_ceiling_pct",
]
grouped = per_run.groupby(["scenario_name", "generations"], sort=True)
out = grouped[numeric].agg(["mean", "std", "min", "max"]).reset_index()
out.columns = [
"_".join(str(part) for part in col if part)
for col in out.columns.to_flat_index()
]
out["n_runs"] = grouped.size().to_numpy()
return out
def _normalized_hypervolume(
front: pd.DataFrame,
*,
traverse_distance_km: float,
objectives: tuple[Any, ...],
) -> float:
F = _normalized_objectives(front, traverse_distance_km, objectives)
if F.size == 0:
return 0.0
return float(HV(ref_point=np.ones(F.shape[1]) * 1.05).do(F))
def _median_distance_to_reference(
front: pd.DataFrame,
reference: pd.DataFrame,
*,
traverse_distance_km: float,
objectives: tuple[Any, ...],
) -> float:
if front.empty or reference.empty:
return float("nan")
F = _normalized_objectives(front, traverse_distance_km, objectives)
R = _normalized_objectives(reference, traverse_distance_km, objectives)
distances = np.sqrt(((F[:, None, :] - R[None, :, :]) ** 2).sum(axis=2))
return float(np.median(np.min(distances, axis=1)))
def _normalized_objectives(
front: pd.DataFrame,
traverse_distance_km: float,
objectives: tuple[Any, ...],
) -> np.ndarray:
values: list[np.ndarray] = []
for objective in objectives:
target = objective.target
raw = front[target].to_numpy(dtype=float)
if target == "range_km":
norm = raw / max(traverse_distance_km, 1e-9)
elif target == "total_mass_kg":
norm = raw / MASS_NORM_MAX_KG
elif target == "slope_capability_deg":
norm = raw / SLOPE_NORM_MAX_DEG
else:
raise ValueError(f"unsupported objective target {target!r}")
norm = np.clip(norm, 0.0, 1.0)
values.append(norm if objective.direction == "min" else 1.0 - norm)
return np.column_stack(values)
def _write_markdown(path: Path, summary: pd.DataFrame, manifest: dict[str, Any]) -> None:
cols = [
"scenario_name",
"generations",
"n_runs",
"normalized_hypervolume_mean",
"normalized_hypervolume_std",
"median_distance_to_max_budget_front_mean",
"four_wheel_pct_mean",
"width_floor_pct_mean",
]
lines = [
"# Optimizer Robustness",
"",
(
f"Population size {manifest['population_size']}; seeds "
f"{manifest['seeds']}; generation budgets {manifest['generations']}."
),
"",
_markdown_table(summary[cols]),
"",
"Per-run CSV: `optimizer_robustness_runs.csv`.",
"Summary CSV: `optimizer_robustness_summary.csv`.",
"",
]
path.write_text("\n".join(lines))
def _markdown_table(df: pd.DataFrame) -> str:
headers = list(df.columns)
lines = [
"| " + " | ".join(headers) + " |",
"| " + " | ".join("---" for _ in headers) + " |",
]
for _, row in df.iterrows():
values = []
for header in headers:
value = row[header]
if isinstance(value, float):
values.append(f"{value:.3f}")
else:
values.append(str(value))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
if __name__ == "__main__":
sys.exit(main())