Spaces:
Running
Running
| """Sweep required obstacle height and locate the rocker-bogie crossover. | |
| Re-runs the canonical evaluator-backed NSGA-II pipeline at increasing | |
| ``MissionScenario.required_obstacle_height_m`` values and records when | |
| six-wheel rocker-bogie architectures enter the Pareto set. | |
| Outputs under ``reports/architecture_obstacle_crossover/``: | |
| - ``front_<scenario>__hobs_<value>.csv`` | |
| - ``crossover_summary.csv`` | |
| - ``manifest.json`` | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| _SCRIPTS_DIR = Path(__file__).resolve().parent | |
| _REPO_ROOT = _SCRIPTS_DIR.parent | |
| for _p in (str(_REPO_ROOT), str(_SCRIPTS_DIR)): | |
| if _p not in sys.path: | |
| sys.path.insert(0, _p) | |
| import pandas as pd # noqa: E402 | |
| from roverdevkit.mission.scenarios import list_scenarios, load_scenario # noqa: E402 | |
| from roverdevkit.schema import ScenarioName # noqa: E402 | |
| from roverdevkit.terramechanics.soils import get_soil_parameters # noqa: E402 | |
| from roverdevkit.tradespace.optimizer import ( # noqa: E402 | |
| DEFAULT_OBJECTIVES, | |
| NSGA2Runner, | |
| OptimizationConstraint, | |
| ) | |
| from roverdevkit.validation.rover_rediscovery import _scenario_panel_orientation # noqa: E402 | |
| from generate_pareto_fronts import ( # noqa: E402 | |
| DEFAULT_EVALUATOR_EVAL_CAP, | |
| DEFAULT_GENERATIONS, | |
| DEFAULT_POPULATION_SIZE, | |
| DEFAULT_RANGE_FLOOR_KM, | |
| SCENARIO_OVERRIDES, | |
| ) | |
| DEFAULT_H_OBS_M: tuple[float, ...] = ( | |
| 0.0, | |
| 0.02, | |
| 0.04, | |
| 0.06, | |
| 0.08, | |
| 0.10, | |
| 0.12, | |
| 0.14, | |
| 0.16, | |
| 0.18, | |
| 0.20, | |
| 0.22, | |
| ) | |
| def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| p.add_argument( | |
| "--out-dir", | |
| type=Path, | |
| default=Path("reports") / "architecture_obstacle_crossover", | |
| ) | |
| p.add_argument("--scenarios", nargs="+", default=None) | |
| p.add_argument( | |
| "--h-obs-m", | |
| nargs="+", | |
| type=float, | |
| default=list(DEFAULT_H_OBS_M), | |
| help="Required obstacle heights to sweep (m).", | |
| ) | |
| p.add_argument("--population-size", type=int, default=DEFAULT_POPULATION_SIZE) | |
| p.add_argument("--generations", type=int, default=DEFAULT_GENERATIONS) | |
| p.add_argument("--seed", type=int, default=12) | |
| return p.parse_args(argv) | |
| def _scenario_names(raw: list[str] | None) -> list[ScenarioName]: | |
| allowed = set(list_scenarios()) | |
| values = list_scenarios() if raw is None else raw | |
| unknown = sorted(set(values) - allowed) | |
| if unknown: | |
| raise ValueError(f"unknown scenario(s) {unknown}; allowed: {sorted(allowed)}") | |
| return [name for name in values] # type: ignore[list-item] | |
| def _summary_row( | |
| scenario_name: str, | |
| h_obs_m: float, | |
| front: pd.DataFrame, | |
| ) -> dict[str, Any]: | |
| n = len(front) | |
| front_empty = n == 0 or "mobility_architecture" not in front.columns | |
| if front_empty: | |
| return { | |
| "scenario_name": scenario_name, | |
| "required_obstacle_height_m": h_obs_m, | |
| "n_points": n, | |
| "front_empty": True, | |
| "frac_rocker_bogie": float("nan"), | |
| "frac_rigid_4wheel": float("nan"), | |
| "min_mass_kg": float("nan"), | |
| "max_range_km": float("nan"), | |
| "median_obstacle_capability_m": float("nan"), | |
| } | |
| rocker = front["mobility_architecture"] == "rocker_bogie_6wheel" | |
| return { | |
| "scenario_name": scenario_name, | |
| "required_obstacle_height_m": h_obs_m, | |
| "n_points": n, | |
| "front_empty": False, | |
| "frac_rocker_bogie": float(rocker.mean()), | |
| "frac_rigid_4wheel": float((~rocker).mean()), | |
| "min_mass_kg": float(front["total_mass_kg"].min()), | |
| "max_range_km": float(front["range_km"].max()), | |
| "median_obstacle_capability_m": float(front["obstacle_capability_m"].median()), | |
| } | |
| def _rocker_summary_label(row: dict[str, Any]) -> str: | |
| if row.get("front_empty"): | |
| return "empty" | |
| frac = row["frac_rocker_bogie"] | |
| if frac != frac: # NaN | |
| return "n/a" | |
| return f"{frac:.0%}" | |
| def main(argv: list[str] | None = None) -> int: | |
| args = _parse_args(argv) | |
| out_dir = args.out_dir | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| scenarios = _scenario_names(args.scenarios) | |
| h_values = [float(h) for h in args.h_obs_m] | |
| range_floor = OptimizationConstraint( | |
| target="range_km", sense="min", value=DEFAULT_RANGE_FLOOR_KM | |
| ) | |
| obstacle_floor = OptimizationConstraint( | |
| target="obstacle_margin_m", sense="min", value=0.0 | |
| ) | |
| summary_rows: list[dict[str, Any]] = [] | |
| manifest: list[dict[str, Any]] = [] | |
| for i, scenario_name in enumerate(scenarios): | |
| override = SCENARIO_OVERRIDES.get(scenario_name) | |
| objectives = override.objectives if override else DEFAULT_OBJECTIVES | |
| extra = override.extra_constraints if override else () | |
| panel_tilt_deg, panel_azimuth_deg = _scenario_panel_orientation( | |
| load_scenario(scenario_name) | |
| ) | |
| for j, h_obs_m in enumerate(h_values): | |
| scenario = load_scenario(scenario_name).model_copy( | |
| update={"required_obstacle_height_m": h_obs_m} | |
| ) | |
| if override is not None and override.traverse_distance_m is not None: | |
| scenario = scenario.model_copy( | |
| update={"traverse_distance_m": override.traverse_distance_m} | |
| ) | |
| soil = get_soil_parameters(scenario.soil_simulant) | |
| constraints: tuple[OptimizationConstraint, ...] = ( | |
| range_floor, | |
| *extra, | |
| ) | |
| if h_obs_m > 0.0: | |
| constraints = (*constraints, obstacle_floor) | |
| seed = args.seed + i * 100 + j | |
| t0 = time.perf_counter() | |
| result = NSGA2Runner( | |
| scenario, | |
| soil, | |
| backend="evaluator", | |
| objectives=objectives, | |
| constraints=constraints, | |
| population_size=args.population_size, | |
| n_generations=args.generations, | |
| seed=seed, | |
| evaluator_eval_cap=DEFAULT_EVALUATOR_EVAL_CAP, | |
| panel_tilt_deg=panel_tilt_deg, | |
| panel_azimuth_deg=panel_azimuth_deg, | |
| ).run() | |
| elapsed_s = time.perf_counter() - t0 | |
| front = result.to_frame() | |
| tag = f"{h_obs_m:.3f}".replace(".", "p") | |
| front_path = out_dir / f"front_{scenario_name}__hobs_{tag}.csv" | |
| front.insert(0, "scenario_name", scenario_name) | |
| front.insert(1, "required_obstacle_height_m", h_obs_m) | |
| front.to_csv(front_path, index=False) | |
| summary_rows.append(_summary_row(scenario_name, h_obs_m, front)) | |
| row = summary_rows[-1] | |
| manifest.append( | |
| { | |
| "scenario_name": scenario_name, | |
| "required_obstacle_height_m": h_obs_m, | |
| "seed": seed, | |
| "elapsed_s": elapsed_s, | |
| "pareto_size": len(result.design_vectors), | |
| "front_empty": row["front_empty"], | |
| "front_csv": str(front_path), | |
| } | |
| ) | |
| print( | |
| f"{scenario_name} h={h_obs_m:.3f} m: " | |
| f"rocker={_rocker_summary_label(row)} " | |
| f"({len(front)} pts, {elapsed_s:.1f}s)", | |
| flush=True, | |
| ) | |
| summary = pd.DataFrame(summary_rows) | |
| summary_path = out_dir / "crossover_summary.csv" | |
| summary.to_csv(summary_path, index=False) | |
| manifest_path = out_dir / "manifest.json" | |
| manifest_path.write_text(json.dumps(manifest, indent=2) + "\n") | |
| print(f"wrote {summary_path}") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |