File size: 6,073 Bytes
bb21b5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Train task: FLAML AutoML per algorithm, MLflow logging for every candidate.

We run one FLAML AutoML invocation per algorithm so each produces a clean MLflow
run with that algorithm's best hyperparameters, test metrics, and a saved
sklearn Pipeline (preprocessor + estimator). Champion selection happens in
src.register based on test_macro_f1.
"""

from __future__ import annotations

import logging
import os
import time
from pathlib import Path
from typing import Any

import mlflow
import mlflow.sklearn
import pandas as pd
from flaml import AutoML
from mlflow.models.signature import infer_signature
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.pipeline import Pipeline

from src import config
from src.preprocess import build_preprocessor

logger = logging.getLogger(__name__)


def _ensure_experiment(name: str, expected_artifact_root: str) -> None:
    """Create the experiment with an explicit artifact_location if it doesn't exist.

    If it does exist but its stored artifact_location points at a path that
    isn't writable from here (e.g. an old host path baked in when running
    under Docker), fail fast with a clear remediation hint - wiping mlflow/
    and mlruns/ resets MLflow state without affecting models/champion/.
    """
    client = mlflow.MlflowClient()
    exp = client.get_experiment_by_name(name)
    if exp is None:
        client.create_experiment(name=name, artifact_location=expected_artifact_root)
    else:
        stored = exp.artifact_location.replace("file://", "")
        if not Path(stored).parent.exists():
            raise RuntimeError(
                f"Experiment '{name}' has artifact_location={stored!r} which is "
                f"not reachable from this filesystem. This usually means the "
                f"experiment was created in a different environment (host vs "
                f"Docker container). Fix: delete mlflow/ and mlruns/ to wipe "
                f"MLflow state, then re-run. models/champion/ is unaffected."
            )
    mlflow.set_experiment(name)


def run(time_budget_total: int | None = None) -> dict[str, Any]:
    """Train one FLAML AutoML per algorithm, log each to MLflow.

    Returns a dict keyed by algorithm with test metrics for quick inspection.
    """
    config.ensure_dirs()
    mlflow.set_tracking_uri(config.MLFLOW_TRACKING_URI)
    _ensure_experiment(config.EXPERIMENT_NAME, str(config.MLFLOW_ARTIFACT_ROOT))

    total = time_budget_total if time_budget_total is not None else config.AUTOML_TIME_BUDGET
    per_algo = max(15, total // len(config.AUTOML_ESTIMATORS))
    logger.info("Total budget=%ds, per-algorithm budget=%ds", total, per_algo)

    train_df = pd.read_parquet(config.PROCESSED_TRAIN)
    test_df = pd.read_parquet(config.PROCESSED_TEST)
    X_train = train_df.drop(columns=[config.TARGET_COL])
    y_train = train_df[config.TARGET_COL].astype(int)
    X_test = test_df.drop(columns=[config.TARGET_COL])
    y_test = test_df[config.TARGET_COL].astype(int)
    logger.info("Train shape %s, Test shape %s", X_train.shape, X_test.shape)

    results: dict[str, dict[str, float]] = {}

    for algo in config.AUTOML_ESTIMATORS:
        with mlflow.start_run(run_name=f"flaml_{algo}") as run:
            t0 = time.perf_counter()
            pipe = Pipeline([
                ("pre", build_preprocessor()),
                ("clf", AutoML()),
            ])
            pipe.fit(
                X_train, y_train,
                clf__time_budget=per_algo,
                clf__metric=config.AUTOML_METRIC,
                clf__task="classification",
                clf__seed=config.RANDOM_SEED,
                clf__estimator_list=[algo],
                clf__verbose=0,
                clf__eval_method="holdout",
            )
            fit_seconds = time.perf_counter() - t0

            y_pred = pipe.predict(X_test)
            y_proba = pipe.predict_proba(X_test)[:, 1]
            test_macro_f1 = float(f1_score(y_test, y_pred, average="macro"))
            test_roc_auc = float(roc_auc_score(y_test, y_proba))

            clf = pipe.named_steps["clf"]
            mlflow.set_tag("algorithm", algo)
            mlflow.log_param("algorithm", algo)
            mlflow.log_param("time_budget_seconds", per_algo)
            mlflow.log_param("random_seed", config.RANDOM_SEED)
            for k, v in (clf.best_config or {}).items():
                mlflow.log_param(f"best_{k}", v)
            mlflow.log_metrics({
                "test_macro_f1": test_macro_f1,
                "test_roc_auc": test_roc_auc,
                "val_loss": float(clf.best_loss) if clf.best_loss is not None else float("nan"),
                "fit_seconds": fit_seconds,
                "flaml_best_iteration": int(clf.best_iteration or 0),
            })

            signature = infer_signature(X_test.head(5), y_pred[:5])
            mlflow.sklearn.log_model(
                pipe,
                artifact_path="model",
                signature=signature,
                input_example=X_test.head(5),
            )

            results[algo] = {
                "run_id": run.info.run_id,
                "test_macro_f1": test_macro_f1,
                "test_roc_auc": test_roc_auc,
                "fit_seconds": fit_seconds,
            }
            logger.info(
                "[%s] test_macro_f1=%.4f test_roc_auc=%.4f fit=%.1fs run_id=%s",
                algo, test_macro_f1, test_roc_auc, fit_seconds, run.info.run_id,
            )

    return results


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
    budget_env = os.environ.get("TIME_BUDGET")
    budget = int(budget_env) if budget_env else None
    results = run(time_budget_total=budget)
    print("\n=== Per-algorithm results ===")
    for algo, r in sorted(results.items(), key=lambda kv: -kv[1]["test_macro_f1"]):
        print(f"  {algo:<16} test_macro_f1={r['test_macro_f1']:.4f}  "
              f"test_roc_auc={r['test_roc_auc']:.4f}  fit={r['fit_seconds']:.1f}s  "
              f"run_id={r['run_id']}")