Spaces:
Sleeping
Sleeping
File size: 6,073 Bytes
bb21b5d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """Train task: FLAML AutoML per algorithm, MLflow logging for every candidate.
We run one FLAML AutoML invocation per algorithm so each produces a clean MLflow
run with that algorithm's best hyperparameters, test metrics, and a saved
sklearn Pipeline (preprocessor + estimator). Champion selection happens in
src.register based on test_macro_f1.
"""
from __future__ import annotations
import logging
import os
import time
from pathlib import Path
from typing import Any
import mlflow
import mlflow.sklearn
import pandas as pd
from flaml import AutoML
from mlflow.models.signature import infer_signature
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.pipeline import Pipeline
from src import config
from src.preprocess import build_preprocessor
logger = logging.getLogger(__name__)
def _ensure_experiment(name: str, expected_artifact_root: str) -> None:
"""Create the experiment with an explicit artifact_location if it doesn't exist.
If it does exist but its stored artifact_location points at a path that
isn't writable from here (e.g. an old host path baked in when running
under Docker), fail fast with a clear remediation hint - wiping mlflow/
and mlruns/ resets MLflow state without affecting models/champion/.
"""
client = mlflow.MlflowClient()
exp = client.get_experiment_by_name(name)
if exp is None:
client.create_experiment(name=name, artifact_location=expected_artifact_root)
else:
stored = exp.artifact_location.replace("file://", "")
if not Path(stored).parent.exists():
raise RuntimeError(
f"Experiment '{name}' has artifact_location={stored!r} which is "
f"not reachable from this filesystem. This usually means the "
f"experiment was created in a different environment (host vs "
f"Docker container). Fix: delete mlflow/ and mlruns/ to wipe "
f"MLflow state, then re-run. models/champion/ is unaffected."
)
mlflow.set_experiment(name)
def run(time_budget_total: int | None = None) -> dict[str, Any]:
"""Train one FLAML AutoML per algorithm, log each to MLflow.
Returns a dict keyed by algorithm with test metrics for quick inspection.
"""
config.ensure_dirs()
mlflow.set_tracking_uri(config.MLFLOW_TRACKING_URI)
_ensure_experiment(config.EXPERIMENT_NAME, str(config.MLFLOW_ARTIFACT_ROOT))
total = time_budget_total if time_budget_total is not None else config.AUTOML_TIME_BUDGET
per_algo = max(15, total // len(config.AUTOML_ESTIMATORS))
logger.info("Total budget=%ds, per-algorithm budget=%ds", total, per_algo)
train_df = pd.read_parquet(config.PROCESSED_TRAIN)
test_df = pd.read_parquet(config.PROCESSED_TEST)
X_train = train_df.drop(columns=[config.TARGET_COL])
y_train = train_df[config.TARGET_COL].astype(int)
X_test = test_df.drop(columns=[config.TARGET_COL])
y_test = test_df[config.TARGET_COL].astype(int)
logger.info("Train shape %s, Test shape %s", X_train.shape, X_test.shape)
results: dict[str, dict[str, float]] = {}
for algo in config.AUTOML_ESTIMATORS:
with mlflow.start_run(run_name=f"flaml_{algo}") as run:
t0 = time.perf_counter()
pipe = Pipeline([
("pre", build_preprocessor()),
("clf", AutoML()),
])
pipe.fit(
X_train, y_train,
clf__time_budget=per_algo,
clf__metric=config.AUTOML_METRIC,
clf__task="classification",
clf__seed=config.RANDOM_SEED,
clf__estimator_list=[algo],
clf__verbose=0,
clf__eval_method="holdout",
)
fit_seconds = time.perf_counter() - t0
y_pred = pipe.predict(X_test)
y_proba = pipe.predict_proba(X_test)[:, 1]
test_macro_f1 = float(f1_score(y_test, y_pred, average="macro"))
test_roc_auc = float(roc_auc_score(y_test, y_proba))
clf = pipe.named_steps["clf"]
mlflow.set_tag("algorithm", algo)
mlflow.log_param("algorithm", algo)
mlflow.log_param("time_budget_seconds", per_algo)
mlflow.log_param("random_seed", config.RANDOM_SEED)
for k, v in (clf.best_config or {}).items():
mlflow.log_param(f"best_{k}", v)
mlflow.log_metrics({
"test_macro_f1": test_macro_f1,
"test_roc_auc": test_roc_auc,
"val_loss": float(clf.best_loss) if clf.best_loss is not None else float("nan"),
"fit_seconds": fit_seconds,
"flaml_best_iteration": int(clf.best_iteration or 0),
})
signature = infer_signature(X_test.head(5), y_pred[:5])
mlflow.sklearn.log_model(
pipe,
artifact_path="model",
signature=signature,
input_example=X_test.head(5),
)
results[algo] = {
"run_id": run.info.run_id,
"test_macro_f1": test_macro_f1,
"test_roc_auc": test_roc_auc,
"fit_seconds": fit_seconds,
}
logger.info(
"[%s] test_macro_f1=%.4f test_roc_auc=%.4f fit=%.1fs run_id=%s",
algo, test_macro_f1, test_roc_auc, fit_seconds, run.info.run_id,
)
return results
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
budget_env = os.environ.get("TIME_BUDGET")
budget = int(budget_env) if budget_env else None
results = run(time_budget_total=budget)
print("\n=== Per-algorithm results ===")
for algo, r in sorted(results.items(), key=lambda kv: -kv[1]["test_macro_f1"]):
print(f" {algo:<16} test_macro_f1={r['test_macro_f1']:.4f} "
f"test_roc_auc={r['test_roc_auc']:.4f} fit={r['fit_seconds']:.1f}s "
f"run_id={r['run_id']}")
|