File size: 4,499 Bytes
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Model training with MLflow experiment tracking."""
from __future__ import annotations

import time
from pathlib import Path
from typing import Optional

import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

from src.data.preprocessing import Preprocessor
from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger

log = get_logger(__name__)


class ModelTrainer:
    """Train and evaluate a GradientBoosting model with MLflow tracking."""

    def __init__(self) -> None:
        self.preprocessor = Preprocessor()
        self._setup_mlflow()

    def train(
        self,
        df: pd.DataFrame,
        run_name: Optional[str] = None,
        tags: Optional[dict] = None,
    ) -> dict:
        """Train a new model on `df`.

        Returns a dict with: model, metrics, feature_importances, run_id, artifact_uri.
        """
        X, y = self.preprocessor.transform_with_target(df)
        if y is None:
            raise ValueError("Training DataFrame must contain the target column.")

        X_train, X_test, y_train, y_test = train_test_split(
            X, y,
            test_size=settings.model.evaluation.test_size,
            random_state=settings.model.hyperparams.random_state,
        )

        hp = settings.model.hyperparams
        model = GradientBoostingRegressor(
            n_estimators=hp.n_estimators,
            max_depth=hp.max_depth,
            learning_rate=hp.learning_rate,
            subsample=hp.subsample,
            min_samples_split=hp.min_samples_split,
            random_state=hp.random_state,
        )

        with mlflow.start_run(run_name=run_name or f"train_{int(time.time())}") as run:
            mlflow.set_tags(tags or {})
            mlflow.log_params({
                "n_estimators": hp.n_estimators,
                "max_depth": hp.max_depth,
                "learning_rate": hp.learning_rate,
                "subsample": hp.subsample,
                "train_samples": len(X_train),
                "test_samples": len(X_test),
            })

            log.info("Training GradientBoosting on %d samples ...", len(X_train))
            t0 = time.perf_counter()
            model.fit(X_train, y_train)
            train_time = time.perf_counter() - t0

            metrics = self._evaluate(model, X_test, y_test)
            metrics["train_time_sec"] = round(train_time, 2)

            mlflow.log_metrics(metrics)
            mlflow.sklearn.log_model(model, artifact_path="model")

            fi = self._feature_importances(model)
            fi_path = resolve("data/logs/feature_importances.json")
            fi.to_json(fi_path, orient="records", indent=2)
            mlflow.log_artifact(str(fi_path))

            run_id = run.info.run_id
            artifact_uri = mlflow.get_artifact_uri("model")

        log.info(
            "Training complete — RMSE=%.4f, MAE=%.4f, R2=%.4f (run_id=%s)",
            metrics["rmse"], metrics["mae"], metrics["r2"], run_id,
        )

        return {
            "model": model,
            "metrics": metrics,
            "feature_importances": fi,
            "run_id": run_id,
            "artifact_uri": artifact_uri,
            "preprocessor": self.preprocessor,
        }

    def _evaluate(
        self,
        model: GradientBoostingRegressor,
        X: pd.DataFrame,
        y: pd.Series,
    ) -> dict:
        y_pred = model.predict(X)
        rmse = float(np.sqrt(mean_squared_error(y, y_pred)))
        mae = float(mean_absolute_error(y, y_pred))
        r2 = float(r2_score(y, y_pred))
        return {"rmse": round(rmse, 4), "mae": round(mae, 4), "r2": round(r2, 4)}

    def _feature_importances(self, model: GradientBoostingRegressor) -> pd.DataFrame:
        names = self.preprocessor.feature_names()
        importances = model.feature_importances_
        return (
            pd.DataFrame({"feature": names, "importance": importances})
            .sort_values("importance", ascending=False)
            .reset_index(drop=True)
        )

    def _setup_mlflow(self) -> None:
        tracking_uri = resolve(settings.mlflow.tracking_uri)
        mlflow.set_tracking_uri(tracking_uri.as_uri())
        mlflow.set_experiment(settings.mlflow.experiment_name)
        log.info("MLflow tracking -> %s", tracking_uri)