| """ |
| src/models/model_selector.py |
| ============================== |
| Utilities for training multiple candidate models, selecting the best, |
| and saving results. Used by both classifier and regressor training scripts. |
| """ |
|
|
| import pickle |
| import json |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| from datetime import datetime |
| from src.utils.logger import get_logger |
| from config.settings import MODELS_DIR, METRICS_DIR, RANDOM_SEED |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def train_test_split_time(X: pd.DataFrame, y: pd.Series, |
| test_size: float = 0.2): |
| """Chronological train/test split (preserves time order).""" |
| n = len(X) |
| split = int(n * (1 - test_size)) |
| return X.iloc[:split], X.iloc[split:], y.iloc[:split], y.iloc[split:] |
|
|
|
|
| def sensitivity_analysis(model, X: pd.DataFrame, y_pred_col: str = None, |
| n_steps: int = 20, pct_range: float = 0.3) -> pd.DataFrame: |
| """ |
| Sensitivity analysis: perturb each feature ±pct_range and measure |
| output change. Works for both classifiers and regressors. |
| |
| Returns a DataFrame sorted by sensitivity (descending). |
| """ |
| |
| |
| def _predict_continuous(m, X_arr): |
| if hasattr(m, "predict_proba"): |
| proba = m.predict_proba(X_arr) |
| return proba[:, 1] if proba.ndim == 2 else proba |
| out = m.predict(X_arr) |
| return out.astype(float) |
|
|
| base = _predict_continuous(model, X.values) |
|
|
| results = [] |
| for col in X.columns: |
| X_perturbed = X.copy() |
| col_std = X[col].std() |
| if col_std == 0: |
| continue |
| X_perturbed[col] = X[col] + col_std * pct_range |
| perturbed = _predict_continuous(model, X_perturbed.values) |
| delta = np.abs(perturbed - base).mean() |
| results.append({"feature": col, "sensitivity": round(float(delta), 6)}) |
|
|
| return pd.DataFrame(results).sort_values("sensitivity", ascending=False) |
|
|
|
|
| def walk_forward_cv(model_factory, X: pd.DataFrame, y: pd.Series, |
| n_splits: int = 5, task: str = "classification") -> dict: |
| """ |
| Walk-forward (expanding window) cross-validation for time-series data. |
| |
| Avoids lookahead bias by ensuring training data always precedes test data |
| chronologically. This is the correct validation strategy for temporal |
| datasets: a random k-fold would leak future observations into training folds. |
| |
| Parameters |
| ---------- |
| model_factory : callable |
| Returns a freshly instantiated (unfitted) sklearn model. |
| X : pd.DataFrame |
| Feature matrix, **already in chronological order**. |
| y : pd.Series |
| Target vector, same ordering as X. |
| n_splits : int |
| Number of expanding-window folds (default 5). |
| Each fold adds 1/n_splits of the data to the training set. |
| task : str |
| "classification" → reports AUC + F1. |
| "regression" → reports RMSE + R². |
| |
| Returns |
| ------- |
| dict with keys: |
| folds – per-fold metrics list |
| mean_* – mean across folds |
| std_* – std across folds |
| """ |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics import roc_auc_score, f1_score, mean_squared_error, r2_score |
| import numpy as np |
|
|
| n = len(X) |
| |
| min_train = max(int(n * 0.4), 20) |
| fold_size = max(int((n - min_train) / n_splits), 1) |
|
|
| folds = [] |
| for i in range(n_splits): |
| train_end = min_train + i * fold_size |
| test_end = min(train_end + fold_size, n) |
| if train_end >= n or test_end > n: |
| break |
|
|
| X_tr, X_te = X.iloc[:train_end].values, X.iloc[train_end:test_end].values |
| y_tr, y_te = y.iloc[:train_end].values, y.iloc[train_end:test_end].values |
|
|
| scaler = StandardScaler() |
| X_tr_s = scaler.fit_transform(X_tr) |
| X_te_s = scaler.transform(X_te) |
|
|
| model = model_factory() |
| model.fit(X_tr_s, y_tr) |
|
|
| fold_metrics = {"fold": i + 1, "train_size": train_end, "test_size": len(y_te)} |
| if task == "classification": |
| if len(np.unique(y_te)) < 2: |
| |
| fold_metrics.update({"auc": None, "f1": None}) |
| else: |
| proba = model.predict_proba(X_te_s)[:, 1] if hasattr(model, "predict_proba") else model.predict(X_te_s) |
| pred = model.predict(X_te_s) |
| fold_metrics["auc"] = round(float(roc_auc_score(y_te, proba)), 4) |
| fold_metrics["f1"] = round(float(f1_score(y_te, pred, zero_division=0)), 4) |
| else: |
| pred = model.predict(X_te_s) |
| fold_metrics["rmse"] = round(float(np.sqrt(mean_squared_error(y_te, pred))), 4) |
| fold_metrics["r2"] = round(float(r2_score(y_te, pred)), 4) |
|
|
| folds.append(fold_metrics) |
|
|
| if not folds: |
| return {"folds": [], "error": "insufficient_data"} |
|
|
| result = {"folds": folds} |
| if task == "classification": |
| aucs = [f["auc"] for f in folds if f.get("auc") is not None] |
| f1s = [f["f1"] for f in folds if f.get("f1") is not None] |
| result.update({ |
| "mean_auc": round(float(np.mean(aucs)), 4) if aucs else None, |
| "std_auc": round(float(np.std(aucs)), 4) if aucs else None, |
| "mean_f1": round(float(np.mean(f1s)), 4) if f1s else None, |
| "std_f1": round(float(np.std(f1s)), 4) if f1s else None, |
| }) |
| else: |
| rmses = [f["rmse"] for f in folds] |
| r2s = [f["r2"] for f in folds] |
| result.update({ |
| "mean_rmse": round(float(np.mean(rmses)), 4), |
| "std_rmse": round(float(np.std(rmses)), 4), |
| "mean_r2": round(float(np.mean(r2s)), 4), |
| "std_r2": round(float(np.std(r2s)), 4), |
| }) |
|
|
| logger.info("Walk-forward CV (%s, %d folds): %s", |
| task, len(folds), |
| {k: v for k, v in result.items() if k.startswith("mean_")}) |
| return result |
|
|
|
|
| def save_model(model, name: str) -> Path: |
| """Pickle model to MODELS_DIR.""" |
| path = MODELS_DIR / f"{name}.pkl" |
| with open(path, "wb") as f: |
| pickle.dump(model, f) |
| logger.info("Saved model: %s", path.name) |
| return path |
|
|
|
|
| def load_model(name: str): |
| """Load a pickled model from MODELS_DIR.""" |
| path = MODELS_DIR / f"{name}.pkl" |
| if not path.exists(): |
| logger.warning("Model not found: %s", path) |
| return None |
| with open(path, "rb") as f: |
| return pickle.load(f) |
|
|
|
|
| def save_metrics(metrics: dict, name: str) -> Path: |
| """Save metrics dict as JSON.""" |
| metrics["saved_at"] = datetime.utcnow().isoformat() |
| path = METRICS_DIR / f"{name}_metrics.json" |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(metrics, f, indent=2, default=str) |
| logger.info("Saved metrics: %s", path.name) |
| return path |
|
|
|
|
| def load_metrics(name: str) -> dict: |
| path = METRICS_DIR / f"{name}_metrics.json" |
| if not path.exists(): |
| return {} |
| with open(path, encoding="utf-8") as f: |
| return json.load(f) |
|
|