roverdevkit / scripts /tune_baselines.py
jjreif's picture
Deploy roverdevkit @ 2676a67
b3d14e3
Raw
History Blame Contribute Delete
21.2 kB
"""Optuna-tune the XGBoost surrogate baselines on a Parquet dataset.
Companion to ``scripts/run_baselines.py``. This script tunes only
XGBoost (per-target regressors + the ``stalled`` feasibility classifier);
the rationale for the scope is in
``roverdevkit.surrogate.tuning`` module docstring.
Outputs (under ``--out-dir``):
- ``tuned_summary.csv`` — one row per ``(target, kind)`` with the val
objective the tuner achieved, the test-set metric on the refit
model, and the tuning wall-clock.
- ``tuned_best_params.json`` — best hyperparameters per target,
including the early-stopping-best ``n_estimators``.
- ``tuned_test_metrics.parquet`` — long-format ``(target, metric, value,
scenario_family)`` frame for the tuned models on the test split,
schema-compatible with the untuned ``metrics_long.parquet`` so a
sibling Notebook / table can concat them.
- ``study_<target>.csv`` — ``study.trials_dataframe()`` per target
for the writeup (objective trace, parameter samples, durations).
- ``tuned_registry_sanity.csv`` — Layer-1 registry-rover predictions
for the tuned models, same schema as ``run_baselines.py``'s
``registry_sanity.csv`` so primary vs diagnostic targets and
``is_primary`` are handled identically.
Examples
--------
::
# Full v4 tuning run (50 trials per target, ~10-20 min on 8 cores)
python scripts/tune_baselines.py \\
--dataset data/analytical/lhs_v4.parquet \\
--out-dir reports/tuned_v4
# Smoke (10 trials per target, no classifier, ~1 min)
python scripts/tune_baselines.py \\
--dataset data/analytical/lhs_v4.parquet \\
--out-dir /tmp/tune_smoke \\
--n-trials 10 --no-classifier
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.metrics import (
f1_score,
mean_absolute_percentage_error,
mean_squared_error,
r2_score,
roc_auc_score,
)
from roverdevkit.surrogate.baselines import (
ACCEPTANCE_GATES,
LAYER1_PRIMARY_TARGETS,
_row_for_registry_rover, # type: ignore[reportPrivateUsage]
)
from roverdevkit.surrogate.dataset import read_parquet
from roverdevkit.surrogate.features import (
FEASIBILITY_COLUMN,
PRIMARY_REGRESSION_TARGETS,
SCENARIO_CATEGORICAL_COLUMNS,
build_feature_matrix,
valid_rows,
)
from roverdevkit.surrogate.tuning import (
TuningResult,
tune_xgboost_classifier,
tune_xgboost_regressor,
)
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
p.add_argument("--dataset", type=Path, required=True)
p.add_argument("--out-dir", type=Path, required=True)
p.add_argument("--seed", type=int, default=42)
p.add_argument(
"--n-trials",
type=int,
default=50,
help="Optuna trials per target (default 50).",
)
p.add_argument(
"--timeout-seconds",
type=float,
default=None,
help="Per-target tuning wall-clock cap. Default: no cap.",
)
p.add_argument(
"--targets",
nargs="+",
default=PRIMARY_REGRESSION_TARGETS,
help="Regression targets to tune. Defaults to the four primary targets.",
)
p.add_argument(
"--no-classifier",
action="store_true",
help="Skip tuning the stalled feasibility classifier.",
)
p.add_argument(
"--no-registry-check",
action="store_true",
help="Skip the tuned registry-rover Layer-1 sanity check.",
)
p.add_argument(
"--n-jobs",
type=int,
default=-1,
help="Plumbed through to XGBoost (-1 = all cores).",
)
p.add_argument(
"--log-level",
default="INFO",
choices=("DEBUG", "INFO", "WARNING", "ERROR"),
)
return p.parse_args(argv)
def _split_xy(
df: pd.DataFrame, target: str, *, feasible_only: bool
) -> tuple[pd.DataFrame, np.ndarray]:
"""Build the (X, y) view for one target on a single split.
Regression targets see only feasible rows; classification sees all
valid (status == 'ok') rows.
"""
df_clean = valid_rows(df)
if feasible_only:
# Schema v6 (v6 schema update): ``FEASIBILITY_COLUMN`` is now ``stalled``
# with positive class = infeasible, so we negate before masking
# to keep only the feasible (non-stalled) regression rows. The
# classifier path keeps the raw 0/1 labels (1 = stalled = the
# positive failure class).
mask = (~df_clean[FEASIBILITY_COLUMN].astype(bool)).to_numpy()
df_clean = df_clean.loc[mask]
X = build_feature_matrix(df_clean)
y = df_clean[target].to_numpy()
if not feasible_only:
y = y.astype(int)
return X, y
def _regression_metrics_with_family(
df_test: pd.DataFrame,
y_pred: np.ndarray,
*,
target: str,
algorithm: str,
) -> pd.DataFrame:
"""Mirror the per-family metric layout in ``evaluate_baselines``."""
rows: list[dict[str, Any]] = []
groups: list[tuple[str, pd.DataFrame]] = [("__all__", df_test)]
if "scenario_family" in df_test.columns:
for fam, sub in df_test.groupby("scenario_family", observed=True):
groups.append((str(fam), sub))
for fam, sub in groups:
idx = df_test.index.isin(sub.index)
y_true_g = df_test.loc[idx, target].to_numpy()
y_pred_g = y_pred[idx]
if len(y_true_g) < 2:
continue
metrics = {
"r2": float(r2_score(y_true_g, y_pred_g)),
"rmse": float(np.sqrt(mean_squared_error(y_true_g, y_pred_g))),
"mape": float(mean_absolute_percentage_error(y_true_g, y_pred_g)),
"n": float(len(y_true_g)),
}
for metric, value in metrics.items():
rows.append(
{
"algorithm": algorithm,
"target": target,
"split": "test",
"scenario_family": fam,
"metric": metric,
"value": value,
}
)
return pd.DataFrame(rows)
def _classification_metrics_with_family(
df_test: pd.DataFrame,
y_score: np.ndarray,
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
y_true = df_test[FEASIBILITY_COLUMN].astype(int).to_numpy()
y_pred = (y_score >= 0.5).astype(int)
groups: list[tuple[str, pd.DataFrame]] = [("__all__", df_test)]
if "scenario_family" in df_test.columns:
for fam, sub in df_test.groupby("scenario_family", observed=True):
groups.append((str(fam), sub))
for fam, sub in groups:
idx = df_test.index.isin(sub.index)
y_true_g = y_true[idx]
y_score_g = y_score[idx]
y_pred_g = y_pred[idx]
if len(y_true_g) < 2:
continue
auc = (
float("nan")
if len(np.unique(y_true_g)) < 2
else float(roc_auc_score(y_true_g, y_score_g))
)
metrics = {
"auc": auc,
"f1": float(f1_score(y_true_g, y_pred_g, zero_division=0)),
"accuracy": float((y_pred_g == y_true_g).mean()),
"n": float(len(y_true_g)),
"positive_rate": float(y_true_g.mean()),
}
for metric, value in metrics.items():
rows.append(
{
"algorithm": "xgboost_tuned",
"target": FEASIBILITY_COLUMN,
"split": "test",
"scenario_family": fam,
"metric": metric,
"value": value,
}
)
return pd.DataFrame(rows)
def _build_training_categories(df: pd.DataFrame) -> dict[str, tuple[str, ...]]:
"""Mirror ``fit_baselines``' captured-categories logic so the tuned
registry-rover sanity check uses the same codebook as the untuned
one."""
out: dict[str, tuple[str, ...]] = {}
df_clean = valid_rows(df)
X_all = build_feature_matrix(df_clean)
for col in SCENARIO_CATEGORICAL_COLUMNS:
if col in X_all.columns:
uniq = X_all[col].astype(str).unique()
out[col] = tuple(sorted(str(x) for x in uniq))
return out
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
logging.basicConfig(
level=args.log_level,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("tune_baselines")
args.out_dir.mkdir(parents=True, exist_ok=True)
log.info("loading dataset from %s", args.dataset)
df = read_parquet(args.dataset)
df_train = df[df["split"] == "train"]
df_val = df[df["split"] == "val"]
df_test = df[df["split"] == "test"]
log.info("train=%d val=%d test=%d", len(df_train), len(df_val), len(df_test))
summary_rows: list[dict[str, Any]] = []
best_params: dict[str, dict[str, Any]] = {}
metrics_frames: list[pd.DataFrame] = []
fitted_regressors: dict[str, Any] = {}
fitted_classifier: Any | None = None
# --- regression tuning loop ----------------------------------------
for target in args.targets:
log.info("[regressor] tuning target=%s (n_trials=%d)", target, args.n_trials)
X_tr, y_tr = _split_xy(df_train, target, feasible_only=True)
X_va, y_va = _split_xy(df_val, target, feasible_only=True)
X_te, y_te = _split_xy(df_test, target, feasible_only=True)
result: TuningResult = tune_xgboost_regressor(
X_tr,
y_tr,
X_va,
y_va,
target=target,
n_trials=args.n_trials,
timeout_seconds=args.timeout_seconds,
random_state=args.seed,
n_jobs=args.n_jobs,
)
# Score on test
y_te_pred = np.asarray(result.final_model.predict(X_te))
df_test_feas = valid_rows(df_test)
# Schema v6: negate ``stalled`` (True == infeasible) to keep the
# feasible-only test rows the regression metrics expect.
feas_mask = (~df_test_feas[FEASIBILITY_COLUMN].astype(bool)).to_numpy()
df_test_feas = df_test_feas.loc[feas_mask]
m = _regression_metrics_with_family(
df_test_feas, y_te_pred, target=target, algorithm="xgboost_tuned"
)
metrics_frames.append(m)
fitted_regressors[target] = result.final_model
test_overall = m.query("scenario_family == '__all__'").set_index("metric")["value"]
log.info(
" done in %.1fs over %d trials; val R²=%.4f, test R²=%.4f, RMSE=%.3f",
result.elapsed_seconds,
result.n_trials,
result.val_score,
float(test_overall.get("r2", float("nan"))),
float(test_overall.get("rmse", float("nan"))),
)
summary_rows.append(
{
"target": target,
"kind": "regressor",
"n_trials": result.n_trials,
"tuning_seconds": result.elapsed_seconds,
"val_objective": result.val_score,
"val_objective_metric": "r2",
"test_r2": float(test_overall.get("r2", float("nan"))),
"test_rmse": float(test_overall.get("rmse", float("nan"))),
"test_mape": float(test_overall.get("mape", float("nan"))),
"best_n_estimators": int(result.best_params.get("n_estimators", -1)),
"best_max_depth": int(result.best_params.get("max_depth", -1)),
"best_learning_rate": float(result.best_params.get("learning_rate", float("nan"))),
}
)
best_params[target] = {k: _coerce_for_json(v) for k, v in result.best_params.items()}
# Persist the trial frame
result.study_df.to_csv(args.out_dir / f"study_{target}.csv", index=False)
# --- classifier tuning ---------------------------------------------
if not args.no_classifier:
log.info("[classifier] tuning target=%s", FEASIBILITY_COLUMN)
X_tr, y_tr = _split_xy(df_train, FEASIBILITY_COLUMN, feasible_only=False)
X_va, y_va = _split_xy(df_val, FEASIBILITY_COLUMN, feasible_only=False)
X_te, y_te = _split_xy(df_test, FEASIBILITY_COLUMN, feasible_only=False)
result_cls: TuningResult = tune_xgboost_classifier(
X_tr,
y_tr,
X_va,
y_va,
target=FEASIBILITY_COLUMN,
n_trials=args.n_trials,
timeout_seconds=args.timeout_seconds,
random_state=args.seed,
n_jobs=args.n_jobs,
)
fitted_classifier = result_cls.final_model
y_te_score = np.asarray(result_cls.final_model.predict_proba(X_te))[:, 1]
df_test_clean = valid_rows(df_test)
m = _classification_metrics_with_family(df_test_clean, y_te_score)
metrics_frames.append(m)
test_overall = m.query("scenario_family == '__all__'").set_index("metric")["value"]
log.info(
" done in %.1fs over %d trials; val AUC=%.4f, test AUC=%.4f, F1=%.4f",
result_cls.elapsed_seconds,
result_cls.n_trials,
result_cls.val_score,
float(test_overall.get("auc", float("nan"))),
float(test_overall.get("f1", float("nan"))),
)
summary_rows.append(
{
"target": FEASIBILITY_COLUMN,
"kind": "classifier",
"n_trials": result_cls.n_trials,
"tuning_seconds": result_cls.elapsed_seconds,
"val_objective": result_cls.val_score,
"val_objective_metric": "auc",
"test_auc": float(test_overall.get("auc", float("nan"))),
"test_f1": float(test_overall.get("f1", float("nan"))),
"test_accuracy": float(test_overall.get("accuracy", float("nan"))),
"best_n_estimators": int(result_cls.best_params.get("n_estimators", -1)),
"best_max_depth": int(result_cls.best_params.get("max_depth", -1)),
"best_learning_rate": float(
result_cls.best_params.get("learning_rate", float("nan"))
),
}
)
best_params[FEASIBILITY_COLUMN] = {
k: _coerce_for_json(v) for k, v in result_cls.best_params.items()
}
result_cls.study_df.to_csv(args.out_dir / f"study_{FEASIBILITY_COLUMN}.csv", index=False)
# --- write reports -------------------------------------------------
summary_df = pd.DataFrame(summary_rows)
summary_path = args.out_dir / "tuned_summary.csv"
summary_df.to_csv(summary_path, index=False)
log.info("wrote %s", summary_path)
params_path = args.out_dir / "tuned_best_params.json"
params_path.write_text(json.dumps(best_params, indent=2))
log.info("wrote %s", params_path)
if metrics_frames:
metrics_long = pd.concat(metrics_frames, ignore_index=True)
metrics_path = args.out_dir / "tuned_test_metrics.parquet"
metrics_long.to_parquet(metrics_path, index=False)
log.info("wrote %s (%d rows)", metrics_path, len(metrics_long))
# Acceptance summary against the project plan thresholds
gate_rows: list[dict[str, Any]] = []
for tgt, thresholds in ACCEPTANCE_GATES.items():
sub = metrics_long.query("target == @tgt and scenario_family == '__all__'").set_index(
"metric"
)["value"]
row = {"target": tgt, "thresholds": json.dumps(thresholds)}
passes = True
for m_name, threshold in thresholds.items():
v = float(sub.get(m_name, float("nan")))
row[f"{m_name}_observed"] = v
row[f"{m_name}_threshold"] = threshold
passes = passes and not np.isnan(v) and v >= threshold
row["passes"] = passes
gate_rows.append(row)
gate_df = pd.DataFrame(gate_rows)
gate_path = args.out_dir / "tuned_acceptance_gate.csv"
gate_df.to_csv(gate_path, index=False)
log.info(
"wrote %s; tuned passes %d/%d", gate_path, int(gate_df["passes"].sum()), len(gate_df)
)
print("\n=== Tuned XGBoost acceptance gate (test, all families) ===", flush=True)
with pd.option_context("display.max_columns", None, "display.width", 200):
print(gate_df.to_string(index=False))
print("\n=== Tuned XGBoost summary ===", flush=True)
with pd.option_context("display.max_columns", None, "display.width", 200):
print(summary_df.to_string(index=False))
# --- Layer-1 registry-rover sanity check ---------------------------
if not args.no_registry_check and (fitted_regressors or fitted_classifier is not None):
log.info("running tuned registry-rover sanity check...")
try:
sanity = _tuned_registry_sanity(df, fitted_regressors, fitted_classifier)
sanity_path = args.out_dir / "tuned_registry_sanity.csv"
sanity.to_csv(sanity_path, index=False)
log.info("wrote %s (%d rows)", sanity_path, len(sanity))
_print_registry_summary(sanity)
except Exception as exc: # pragma: no cover — diagnostic, not fatal
log.warning("tuned registry-rover sanity check failed: %s", exc)
return 0
def _coerce_for_json(value: Any) -> Any:
if isinstance(value, (np.floating,)):
return float(value)
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (bool, int, float, str)):
return value
return str(value)
def _tuned_registry_sanity(
df: pd.DataFrame,
regressors: dict[str, Any],
classifier: Any | None,
) -> pd.DataFrame:
"""Apply tuned models to the registry-rover Layer-1 inputs."""
training_categories = _build_training_categories(df)
primary_targets = set(LAYER1_PRIMARY_TARGETS)
rovers = ("Pragyan", "Yutu-2", "MoonRanger", "Rashid-1")
rows: list[dict[str, Any]] = []
for rover in rovers:
X_row, evaluator_metrics = _row_for_registry_rover(
rover, training_categories=training_categories
)
for target, model in regressors.items():
y_hat = float(np.asarray(model.predict(X_row))[0])
y_true = float(evaluator_metrics[target])
rows.append(
{
"rover": rover,
"algorithm": "xgboost_tuned",
"target": target,
"predicted": y_hat,
"evaluator": y_true,
"abs_error": y_hat - y_true,
"rel_error": (y_hat - y_true) / y_true if y_true != 0 else float("nan"),
"is_primary": target in primary_targets,
}
)
if classifier is not None:
p = float(np.asarray(classifier.predict_proba(X_row))[0, 1])
y_true_bool = bool(evaluator_metrics[FEASIBILITY_COLUMN])
rows.append(
{
"rover": rover,
"algorithm": "xgboost_tuned",
"target": FEASIBILITY_COLUMN,
"predicted": p,
"evaluator": float(y_true_bool),
"abs_error": p - float(y_true_bool),
"rel_error": float("nan"),
"is_primary": FEASIBILITY_COLUMN in primary_targets,
}
)
return pd.DataFrame(rows)
def _print_registry_summary(sanity: pd.DataFrame) -> None:
primary = sanity[sanity["is_primary"]]
diagnostic = sanity[~sanity["is_primary"]]
print("\n=== Tuned registry sanity (PRIMARY) ===", flush=True)
reg = primary[primary["target"] != FEASIBILITY_COLUMN]
if not reg.empty:
s = (
reg.assign(abs_pct=lambda d: 100 * d["rel_error"].abs())
.groupby(["rover", "target"])["abs_pct"]
.median()
.unstack("target")
)
print("Median |rel_error| (%):")
print(s.round(2).to_string())
clf = primary[primary["target"] == FEASIBILITY_COLUMN]
if not clf.empty:
s = (
clf.assign(
hit=lambda d: (d["predicted"] >= 0.5).astype(int) == d["evaluator"].astype(int)
)
.groupby("rover")["hit"]
.mean()
.rename("classifier_accuracy")
.to_frame()
)
print("\nClassifier accuracy (stalled):")
print(s.round(3).to_string())
if not diagnostic.empty:
print("\n=== Tuned registry sanity (SCENARIO-OOD diagnostic) ===", flush=True)
s = (
diagnostic.assign(abs_pct=lambda d: 100 * d["rel_error"].abs())
.groupby(["rover", "target"])["abs_pct"]
.median()
.unstack("target")
)
print("Median |rel_error| (%):")
print(s.round(2).to_string())
if __name__ == "__main__":
sys.exit(main())