"""Optuna-tune the XGBoost surrogate baselines on a Parquet dataset. Companion to ``scripts/run_baselines.py``. This script tunes only XGBoost (per-target regressors + the ``stalled`` feasibility classifier); the rationale for the scope is in ``roverdevkit.surrogate.tuning`` module docstring. Outputs (under ``--out-dir``): - ``tuned_summary.csv`` — one row per ``(target, kind)`` with the val objective the tuner achieved, the test-set metric on the refit model, and the tuning wall-clock. - ``tuned_best_params.json`` — best hyperparameters per target, including the early-stopping-best ``n_estimators``. - ``tuned_test_metrics.parquet`` — long-format ``(target, metric, value, scenario_family)`` frame for the tuned models on the test split, schema-compatible with the untuned ``metrics_long.parquet`` so a sibling Notebook / table can concat them. - ``study_.csv`` — ``study.trials_dataframe()`` per target for the writeup (objective trace, parameter samples, durations). - ``tuned_registry_sanity.csv`` — Layer-1 registry-rover predictions for the tuned models, same schema as ``run_baselines.py``'s ``registry_sanity.csv`` so primary vs diagnostic targets and ``is_primary`` are handled identically. Examples -------- :: # Full v4 tuning run (50 trials per target, ~10-20 min on 8 cores) python scripts/tune_baselines.py \\ --dataset data/analytical/lhs_v4.parquet \\ --out-dir reports/tuned_v4 # Smoke (10 trials per target, no classifier, ~1 min) python scripts/tune_baselines.py \\ --dataset data/analytical/lhs_v4.parquet \\ --out-dir /tmp/tune_smoke \\ --n-trials 10 --no-classifier """ from __future__ import annotations import argparse import json import logging import sys from pathlib import Path from typing import Any import numpy as np import pandas as pd from sklearn.metrics import ( f1_score, mean_absolute_percentage_error, mean_squared_error, r2_score, roc_auc_score, ) from roverdevkit.surrogate.baselines import ( ACCEPTANCE_GATES, LAYER1_PRIMARY_TARGETS, _row_for_registry_rover, # type: ignore[reportPrivateUsage] ) from roverdevkit.surrogate.dataset import read_parquet from roverdevkit.surrogate.features import ( FEASIBILITY_COLUMN, PRIMARY_REGRESSION_TARGETS, SCENARIO_CATEGORICAL_COLUMNS, build_feature_matrix, valid_rows, ) from roverdevkit.surrogate.tuning import ( TuningResult, tune_xgboost_classifier, tune_xgboost_regressor, ) def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: p = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) p.add_argument("--dataset", type=Path, required=True) p.add_argument("--out-dir", type=Path, required=True) p.add_argument("--seed", type=int, default=42) p.add_argument( "--n-trials", type=int, default=50, help="Optuna trials per target (default 50).", ) p.add_argument( "--timeout-seconds", type=float, default=None, help="Per-target tuning wall-clock cap. Default: no cap.", ) p.add_argument( "--targets", nargs="+", default=PRIMARY_REGRESSION_TARGETS, help="Regression targets to tune. Defaults to the four primary targets.", ) p.add_argument( "--no-classifier", action="store_true", help="Skip tuning the stalled feasibility classifier.", ) p.add_argument( "--no-registry-check", action="store_true", help="Skip the tuned registry-rover Layer-1 sanity check.", ) p.add_argument( "--n-jobs", type=int, default=-1, help="Plumbed through to XGBoost (-1 = all cores).", ) p.add_argument( "--log-level", default="INFO", choices=("DEBUG", "INFO", "WARNING", "ERROR"), ) return p.parse_args(argv) def _split_xy( df: pd.DataFrame, target: str, *, feasible_only: bool ) -> tuple[pd.DataFrame, np.ndarray]: """Build the (X, y) view for one target on a single split. Regression targets see only feasible rows; classification sees all valid (status == 'ok') rows. """ df_clean = valid_rows(df) if feasible_only: # Schema v6 (v6 schema update): ``FEASIBILITY_COLUMN`` is now ``stalled`` # with positive class = infeasible, so we negate before masking # to keep only the feasible (non-stalled) regression rows. The # classifier path keeps the raw 0/1 labels (1 = stalled = the # positive failure class). mask = (~df_clean[FEASIBILITY_COLUMN].astype(bool)).to_numpy() df_clean = df_clean.loc[mask] X = build_feature_matrix(df_clean) y = df_clean[target].to_numpy() if not feasible_only: y = y.astype(int) return X, y def _regression_metrics_with_family( df_test: pd.DataFrame, y_pred: np.ndarray, *, target: str, algorithm: str, ) -> pd.DataFrame: """Mirror the per-family metric layout in ``evaluate_baselines``.""" rows: list[dict[str, Any]] = [] groups: list[tuple[str, pd.DataFrame]] = [("__all__", df_test)] if "scenario_family" in df_test.columns: for fam, sub in df_test.groupby("scenario_family", observed=True): groups.append((str(fam), sub)) for fam, sub in groups: idx = df_test.index.isin(sub.index) y_true_g = df_test.loc[idx, target].to_numpy() y_pred_g = y_pred[idx] if len(y_true_g) < 2: continue metrics = { "r2": float(r2_score(y_true_g, y_pred_g)), "rmse": float(np.sqrt(mean_squared_error(y_true_g, y_pred_g))), "mape": float(mean_absolute_percentage_error(y_true_g, y_pred_g)), "n": float(len(y_true_g)), } for metric, value in metrics.items(): rows.append( { "algorithm": algorithm, "target": target, "split": "test", "scenario_family": fam, "metric": metric, "value": value, } ) return pd.DataFrame(rows) def _classification_metrics_with_family( df_test: pd.DataFrame, y_score: np.ndarray, ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] y_true = df_test[FEASIBILITY_COLUMN].astype(int).to_numpy() y_pred = (y_score >= 0.5).astype(int) groups: list[tuple[str, pd.DataFrame]] = [("__all__", df_test)] if "scenario_family" in df_test.columns: for fam, sub in df_test.groupby("scenario_family", observed=True): groups.append((str(fam), sub)) for fam, sub in groups: idx = df_test.index.isin(sub.index) y_true_g = y_true[idx] y_score_g = y_score[idx] y_pred_g = y_pred[idx] if len(y_true_g) < 2: continue auc = ( float("nan") if len(np.unique(y_true_g)) < 2 else float(roc_auc_score(y_true_g, y_score_g)) ) metrics = { "auc": auc, "f1": float(f1_score(y_true_g, y_pred_g, zero_division=0)), "accuracy": float((y_pred_g == y_true_g).mean()), "n": float(len(y_true_g)), "positive_rate": float(y_true_g.mean()), } for metric, value in metrics.items(): rows.append( { "algorithm": "xgboost_tuned", "target": FEASIBILITY_COLUMN, "split": "test", "scenario_family": fam, "metric": metric, "value": value, } ) return pd.DataFrame(rows) def _build_training_categories(df: pd.DataFrame) -> dict[str, tuple[str, ...]]: """Mirror ``fit_baselines``' captured-categories logic so the tuned registry-rover sanity check uses the same codebook as the untuned one.""" out: dict[str, tuple[str, ...]] = {} df_clean = valid_rows(df) X_all = build_feature_matrix(df_clean) for col in SCENARIO_CATEGORICAL_COLUMNS: if col in X_all.columns: uniq = X_all[col].astype(str).unique() out[col] = tuple(sorted(str(x) for x in uniq)) return out def main(argv: list[str] | None = None) -> int: args = _parse_args(argv) logging.basicConfig( level=args.log_level, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("tune_baselines") args.out_dir.mkdir(parents=True, exist_ok=True) log.info("loading dataset from %s", args.dataset) df = read_parquet(args.dataset) df_train = df[df["split"] == "train"] df_val = df[df["split"] == "val"] df_test = df[df["split"] == "test"] log.info("train=%d val=%d test=%d", len(df_train), len(df_val), len(df_test)) summary_rows: list[dict[str, Any]] = [] best_params: dict[str, dict[str, Any]] = {} metrics_frames: list[pd.DataFrame] = [] fitted_regressors: dict[str, Any] = {} fitted_classifier: Any | None = None # --- regression tuning loop ---------------------------------------- for target in args.targets: log.info("[regressor] tuning target=%s (n_trials=%d)", target, args.n_trials) X_tr, y_tr = _split_xy(df_train, target, feasible_only=True) X_va, y_va = _split_xy(df_val, target, feasible_only=True) X_te, y_te = _split_xy(df_test, target, feasible_only=True) result: TuningResult = tune_xgboost_regressor( X_tr, y_tr, X_va, y_va, target=target, n_trials=args.n_trials, timeout_seconds=args.timeout_seconds, random_state=args.seed, n_jobs=args.n_jobs, ) # Score on test y_te_pred = np.asarray(result.final_model.predict(X_te)) df_test_feas = valid_rows(df_test) # Schema v6: negate ``stalled`` (True == infeasible) to keep the # feasible-only test rows the regression metrics expect. feas_mask = (~df_test_feas[FEASIBILITY_COLUMN].astype(bool)).to_numpy() df_test_feas = df_test_feas.loc[feas_mask] m = _regression_metrics_with_family( df_test_feas, y_te_pred, target=target, algorithm="xgboost_tuned" ) metrics_frames.append(m) fitted_regressors[target] = result.final_model test_overall = m.query("scenario_family == '__all__'").set_index("metric")["value"] log.info( " done in %.1fs over %d trials; val R²=%.4f, test R²=%.4f, RMSE=%.3f", result.elapsed_seconds, result.n_trials, result.val_score, float(test_overall.get("r2", float("nan"))), float(test_overall.get("rmse", float("nan"))), ) summary_rows.append( { "target": target, "kind": "regressor", "n_trials": result.n_trials, "tuning_seconds": result.elapsed_seconds, "val_objective": result.val_score, "val_objective_metric": "r2", "test_r2": float(test_overall.get("r2", float("nan"))), "test_rmse": float(test_overall.get("rmse", float("nan"))), "test_mape": float(test_overall.get("mape", float("nan"))), "best_n_estimators": int(result.best_params.get("n_estimators", -1)), "best_max_depth": int(result.best_params.get("max_depth", -1)), "best_learning_rate": float(result.best_params.get("learning_rate", float("nan"))), } ) best_params[target] = {k: _coerce_for_json(v) for k, v in result.best_params.items()} # Persist the trial frame result.study_df.to_csv(args.out_dir / f"study_{target}.csv", index=False) # --- classifier tuning --------------------------------------------- if not args.no_classifier: log.info("[classifier] tuning target=%s", FEASIBILITY_COLUMN) X_tr, y_tr = _split_xy(df_train, FEASIBILITY_COLUMN, feasible_only=False) X_va, y_va = _split_xy(df_val, FEASIBILITY_COLUMN, feasible_only=False) X_te, y_te = _split_xy(df_test, FEASIBILITY_COLUMN, feasible_only=False) result_cls: TuningResult = tune_xgboost_classifier( X_tr, y_tr, X_va, y_va, target=FEASIBILITY_COLUMN, n_trials=args.n_trials, timeout_seconds=args.timeout_seconds, random_state=args.seed, n_jobs=args.n_jobs, ) fitted_classifier = result_cls.final_model y_te_score = np.asarray(result_cls.final_model.predict_proba(X_te))[:, 1] df_test_clean = valid_rows(df_test) m = _classification_metrics_with_family(df_test_clean, y_te_score) metrics_frames.append(m) test_overall = m.query("scenario_family == '__all__'").set_index("metric")["value"] log.info( " done in %.1fs over %d trials; val AUC=%.4f, test AUC=%.4f, F1=%.4f", result_cls.elapsed_seconds, result_cls.n_trials, result_cls.val_score, float(test_overall.get("auc", float("nan"))), float(test_overall.get("f1", float("nan"))), ) summary_rows.append( { "target": FEASIBILITY_COLUMN, "kind": "classifier", "n_trials": result_cls.n_trials, "tuning_seconds": result_cls.elapsed_seconds, "val_objective": result_cls.val_score, "val_objective_metric": "auc", "test_auc": float(test_overall.get("auc", float("nan"))), "test_f1": float(test_overall.get("f1", float("nan"))), "test_accuracy": float(test_overall.get("accuracy", float("nan"))), "best_n_estimators": int(result_cls.best_params.get("n_estimators", -1)), "best_max_depth": int(result_cls.best_params.get("max_depth", -1)), "best_learning_rate": float( result_cls.best_params.get("learning_rate", float("nan")) ), } ) best_params[FEASIBILITY_COLUMN] = { k: _coerce_for_json(v) for k, v in result_cls.best_params.items() } result_cls.study_df.to_csv(args.out_dir / f"study_{FEASIBILITY_COLUMN}.csv", index=False) # --- write reports ------------------------------------------------- summary_df = pd.DataFrame(summary_rows) summary_path = args.out_dir / "tuned_summary.csv" summary_df.to_csv(summary_path, index=False) log.info("wrote %s", summary_path) params_path = args.out_dir / "tuned_best_params.json" params_path.write_text(json.dumps(best_params, indent=2)) log.info("wrote %s", params_path) if metrics_frames: metrics_long = pd.concat(metrics_frames, ignore_index=True) metrics_path = args.out_dir / "tuned_test_metrics.parquet" metrics_long.to_parquet(metrics_path, index=False) log.info("wrote %s (%d rows)", metrics_path, len(metrics_long)) # Acceptance summary against the project plan thresholds gate_rows: list[dict[str, Any]] = [] for tgt, thresholds in ACCEPTANCE_GATES.items(): sub = metrics_long.query("target == @tgt and scenario_family == '__all__'").set_index( "metric" )["value"] row = {"target": tgt, "thresholds": json.dumps(thresholds)} passes = True for m_name, threshold in thresholds.items(): v = float(sub.get(m_name, float("nan"))) row[f"{m_name}_observed"] = v row[f"{m_name}_threshold"] = threshold passes = passes and not np.isnan(v) and v >= threshold row["passes"] = passes gate_rows.append(row) gate_df = pd.DataFrame(gate_rows) gate_path = args.out_dir / "tuned_acceptance_gate.csv" gate_df.to_csv(gate_path, index=False) log.info( "wrote %s; tuned passes %d/%d", gate_path, int(gate_df["passes"].sum()), len(gate_df) ) print("\n=== Tuned XGBoost acceptance gate (test, all families) ===", flush=True) with pd.option_context("display.max_columns", None, "display.width", 200): print(gate_df.to_string(index=False)) print("\n=== Tuned XGBoost summary ===", flush=True) with pd.option_context("display.max_columns", None, "display.width", 200): print(summary_df.to_string(index=False)) # --- Layer-1 registry-rover sanity check --------------------------- if not args.no_registry_check and (fitted_regressors or fitted_classifier is not None): log.info("running tuned registry-rover sanity check...") try: sanity = _tuned_registry_sanity(df, fitted_regressors, fitted_classifier) sanity_path = args.out_dir / "tuned_registry_sanity.csv" sanity.to_csv(sanity_path, index=False) log.info("wrote %s (%d rows)", sanity_path, len(sanity)) _print_registry_summary(sanity) except Exception as exc: # pragma: no cover — diagnostic, not fatal log.warning("tuned registry-rover sanity check failed: %s", exc) return 0 def _coerce_for_json(value: Any) -> Any: if isinstance(value, (np.floating,)): return float(value) if isinstance(value, (np.integer,)): return int(value) if isinstance(value, (bool, int, float, str)): return value return str(value) def _tuned_registry_sanity( df: pd.DataFrame, regressors: dict[str, Any], classifier: Any | None, ) -> pd.DataFrame: """Apply tuned models to the registry-rover Layer-1 inputs.""" training_categories = _build_training_categories(df) primary_targets = set(LAYER1_PRIMARY_TARGETS) rovers = ("Pragyan", "Yutu-2", "MoonRanger", "Rashid-1") rows: list[dict[str, Any]] = [] for rover in rovers: X_row, evaluator_metrics = _row_for_registry_rover( rover, training_categories=training_categories ) for target, model in regressors.items(): y_hat = float(np.asarray(model.predict(X_row))[0]) y_true = float(evaluator_metrics[target]) rows.append( { "rover": rover, "algorithm": "xgboost_tuned", "target": target, "predicted": y_hat, "evaluator": y_true, "abs_error": y_hat - y_true, "rel_error": (y_hat - y_true) / y_true if y_true != 0 else float("nan"), "is_primary": target in primary_targets, } ) if classifier is not None: p = float(np.asarray(classifier.predict_proba(X_row))[0, 1]) y_true_bool = bool(evaluator_metrics[FEASIBILITY_COLUMN]) rows.append( { "rover": rover, "algorithm": "xgboost_tuned", "target": FEASIBILITY_COLUMN, "predicted": p, "evaluator": float(y_true_bool), "abs_error": p - float(y_true_bool), "rel_error": float("nan"), "is_primary": FEASIBILITY_COLUMN in primary_targets, } ) return pd.DataFrame(rows) def _print_registry_summary(sanity: pd.DataFrame) -> None: primary = sanity[sanity["is_primary"]] diagnostic = sanity[~sanity["is_primary"]] print("\n=== Tuned registry sanity (PRIMARY) ===", flush=True) reg = primary[primary["target"] != FEASIBILITY_COLUMN] if not reg.empty: s = ( reg.assign(abs_pct=lambda d: 100 * d["rel_error"].abs()) .groupby(["rover", "target"])["abs_pct"] .median() .unstack("target") ) print("Median |rel_error| (%):") print(s.round(2).to_string()) clf = primary[primary["target"] == FEASIBILITY_COLUMN] if not clf.empty: s = ( clf.assign( hit=lambda d: (d["predicted"] >= 0.5).astype(int) == d["evaluator"].astype(int) ) .groupby("rover")["hit"] .mean() .rename("classifier_accuracy") .to_frame() ) print("\nClassifier accuracy (stalled):") print(s.round(3).to_string()) if not diagnostic.empty: print("\n=== Tuned registry sanity (SCENARIO-OOD diagnostic) ===", flush=True) s = ( diagnostic.assign(abs_pct=lambda d: 100 * d["rel_error"].abs()) .groupby(["rover", "target"])["abs_pct"] .median() .unstack("target") ) print("Median |rel_error| (%):") print(s.round(2).to_string()) if __name__ == "__main__": sys.exit(main())