""" src/models/model_selector.py ============================== Utilities for training multiple candidate models, selecting the best, and saving results. Used by both classifier and regressor training scripts. """ import pickle import json import numpy as np import pandas as pd from pathlib import Path from datetime import datetime from src.utils.logger import get_logger from config.settings import MODELS_DIR, METRICS_DIR, RANDOM_SEED logger = get_logger(__name__) def train_test_split_time(X: pd.DataFrame, y: pd.Series, test_size: float = 0.2): """Chronological train/test split (preserves time order).""" n = len(X) split = int(n * (1 - test_size)) return X.iloc[:split], X.iloc[split:], y.iloc[:split], y.iloc[split:] def sensitivity_analysis(model, X: pd.DataFrame, y_pred_col: str = None, n_steps: int = 20, pct_range: float = 0.3) -> pd.DataFrame: """ Sensitivity analysis: perturb each feature ±pct_range and measure output change. Works for both classifiers and regressors. Returns a DataFrame sorted by sensitivity (descending). """ # BUG FIX: use predict_proba for classifiers so perturbation produces # continuous output (not discrete 0/1 labels that always delta to 0). def _predict_continuous(m, X_arr): if hasattr(m, "predict_proba"): proba = m.predict_proba(X_arr) return proba[:, 1] if proba.ndim == 2 else proba out = m.predict(X_arr) return out.astype(float) base = _predict_continuous(model, X.values) results = [] for col in X.columns: X_perturbed = X.copy() col_std = X[col].std() if col_std == 0: continue X_perturbed[col] = X[col] + col_std * pct_range perturbed = _predict_continuous(model, X_perturbed.values) delta = np.abs(perturbed - base).mean() results.append({"feature": col, "sensitivity": round(float(delta), 6)}) return pd.DataFrame(results).sort_values("sensitivity", ascending=False) def walk_forward_cv(model_factory, X: pd.DataFrame, y: pd.Series, n_splits: int = 5, task: str = "classification") -> dict: """ Walk-forward (expanding window) cross-validation for time-series data. Avoids lookahead bias by ensuring training data always precedes test data chronologically. This is the correct validation strategy for temporal datasets: a random k-fold would leak future observations into training folds. Parameters ---------- model_factory : callable Returns a freshly instantiated (unfitted) sklearn model. X : pd.DataFrame Feature matrix, **already in chronological order**. y : pd.Series Target vector, same ordering as X. n_splits : int Number of expanding-window folds (default 5). Each fold adds 1/n_splits of the data to the training set. task : str "classification" → reports AUC + F1. "regression" → reports RMSE + R². Returns ------- dict with keys: folds – per-fold metrics list mean_* – mean across folds std_* – std across folds """ from sklearn.preprocessing import StandardScaler from sklearn.metrics import roc_auc_score, f1_score, mean_squared_error, r2_score import numpy as np n = len(X) # Minimum training size: first 40 % of data so folds are meaningful min_train = max(int(n * 0.4), 20) fold_size = max(int((n - min_train) / n_splits), 1) folds = [] for i in range(n_splits): train_end = min_train + i * fold_size test_end = min(train_end + fold_size, n) if train_end >= n or test_end > n: break X_tr, X_te = X.iloc[:train_end].values, X.iloc[train_end:test_end].values y_tr, y_te = y.iloc[:train_end].values, y.iloc[train_end:test_end].values scaler = StandardScaler() X_tr_s = scaler.fit_transform(X_tr) X_te_s = scaler.transform(X_te) model = model_factory() model.fit(X_tr_s, y_tr) fold_metrics = {"fold": i + 1, "train_size": train_end, "test_size": len(y_te)} if task == "classification": if len(np.unique(y_te)) < 2: # Only one class in this fold's test set — skip AUC fold_metrics.update({"auc": None, "f1": None}) else: proba = model.predict_proba(X_te_s)[:, 1] if hasattr(model, "predict_proba") else model.predict(X_te_s) pred = model.predict(X_te_s) fold_metrics["auc"] = round(float(roc_auc_score(y_te, proba)), 4) fold_metrics["f1"] = round(float(f1_score(y_te, pred, zero_division=0)), 4) else: pred = model.predict(X_te_s) fold_metrics["rmse"] = round(float(np.sqrt(mean_squared_error(y_te, pred))), 4) fold_metrics["r2"] = round(float(r2_score(y_te, pred)), 4) folds.append(fold_metrics) if not folds: return {"folds": [], "error": "insufficient_data"} result = {"folds": folds} if task == "classification": aucs = [f["auc"] for f in folds if f.get("auc") is not None] f1s = [f["f1"] for f in folds if f.get("f1") is not None] result.update({ "mean_auc": round(float(np.mean(aucs)), 4) if aucs else None, "std_auc": round(float(np.std(aucs)), 4) if aucs else None, "mean_f1": round(float(np.mean(f1s)), 4) if f1s else None, "std_f1": round(float(np.std(f1s)), 4) if f1s else None, }) else: rmses = [f["rmse"] for f in folds] r2s = [f["r2"] for f in folds] result.update({ "mean_rmse": round(float(np.mean(rmses)), 4), "std_rmse": round(float(np.std(rmses)), 4), "mean_r2": round(float(np.mean(r2s)), 4), "std_r2": round(float(np.std(r2s)), 4), }) logger.info("Walk-forward CV (%s, %d folds): %s", task, len(folds), {k: v for k, v in result.items() if k.startswith("mean_")}) return result def save_model(model, name: str) -> Path: """Pickle model to MODELS_DIR.""" path = MODELS_DIR / f"{name}.pkl" with open(path, "wb") as f: pickle.dump(model, f) logger.info("Saved model: %s", path.name) return path def load_model(name: str): """Load a pickled model from MODELS_DIR.""" path = MODELS_DIR / f"{name}.pkl" if not path.exists(): logger.warning("Model not found: %s", path) return None with open(path, "rb") as f: return pickle.load(f) def save_metrics(metrics: dict, name: str) -> Path: """Save metrics dict as JSON.""" metrics["saved_at"] = datetime.utcnow().isoformat() path = METRICS_DIR / f"{name}_metrics.json" with open(path, "w", encoding="utf-8") as f: json.dump(metrics, f, indent=2, default=str) logger.info("Saved metrics: %s", path.name) return path def load_metrics(name: str) -> dict: path = METRICS_DIR / f"{name}_metrics.json" if not path.exists(): return {} with open(path, encoding="utf-8") as f: return json.load(f)