Spaces:
Sleeping
Sleeping
File size: 4,499 Bytes
1aa566a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | """Model training with MLflow experiment tracking."""
from __future__ import annotations
import time
from pathlib import Path
from typing import Optional
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from src.data.preprocessing import Preprocessor
from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger
log = get_logger(__name__)
class ModelTrainer:
"""Train and evaluate a GradientBoosting model with MLflow tracking."""
def __init__(self) -> None:
self.preprocessor = Preprocessor()
self._setup_mlflow()
def train(
self,
df: pd.DataFrame,
run_name: Optional[str] = None,
tags: Optional[dict] = None,
) -> dict:
"""Train a new model on `df`.
Returns a dict with: model, metrics, feature_importances, run_id, artifact_uri.
"""
X, y = self.preprocessor.transform_with_target(df)
if y is None:
raise ValueError("Training DataFrame must contain the target column.")
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=settings.model.evaluation.test_size,
random_state=settings.model.hyperparams.random_state,
)
hp = settings.model.hyperparams
model = GradientBoostingRegressor(
n_estimators=hp.n_estimators,
max_depth=hp.max_depth,
learning_rate=hp.learning_rate,
subsample=hp.subsample,
min_samples_split=hp.min_samples_split,
random_state=hp.random_state,
)
with mlflow.start_run(run_name=run_name or f"train_{int(time.time())}") as run:
mlflow.set_tags(tags or {})
mlflow.log_params({
"n_estimators": hp.n_estimators,
"max_depth": hp.max_depth,
"learning_rate": hp.learning_rate,
"subsample": hp.subsample,
"train_samples": len(X_train),
"test_samples": len(X_test),
})
log.info("Training GradientBoosting on %d samples ...", len(X_train))
t0 = time.perf_counter()
model.fit(X_train, y_train)
train_time = time.perf_counter() - t0
metrics = self._evaluate(model, X_test, y_test)
metrics["train_time_sec"] = round(train_time, 2)
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, artifact_path="model")
fi = self._feature_importances(model)
fi_path = resolve("data/logs/feature_importances.json")
fi.to_json(fi_path, orient="records", indent=2)
mlflow.log_artifact(str(fi_path))
run_id = run.info.run_id
artifact_uri = mlflow.get_artifact_uri("model")
log.info(
"Training complete — RMSE=%.4f, MAE=%.4f, R2=%.4f (run_id=%s)",
metrics["rmse"], metrics["mae"], metrics["r2"], run_id,
)
return {
"model": model,
"metrics": metrics,
"feature_importances": fi,
"run_id": run_id,
"artifact_uri": artifact_uri,
"preprocessor": self.preprocessor,
}
def _evaluate(
self,
model: GradientBoostingRegressor,
X: pd.DataFrame,
y: pd.Series,
) -> dict:
y_pred = model.predict(X)
rmse = float(np.sqrt(mean_squared_error(y, y_pred)))
mae = float(mean_absolute_error(y, y_pred))
r2 = float(r2_score(y, y_pred))
return {"rmse": round(rmse, 4), "mae": round(mae, 4), "r2": round(r2, 4)}
def _feature_importances(self, model: GradientBoostingRegressor) -> pd.DataFrame:
names = self.preprocessor.feature_names()
importances = model.feature_importances_
return (
pd.DataFrame({"feature": names, "importance": importances})
.sort_values("importance", ascending=False)
.reset_index(drop=True)
)
def _setup_mlflow(self) -> None:
tracking_uri = resolve(settings.mlflow.tracking_uri)
mlflow.set_tracking_uri(tracking_uri.as_uri())
mlflow.set_experiment(settings.mlflow.experiment_name)
log.info("MLflow tracking -> %s", tracking_uri)
|