bhanug2026
Initial commit
47c6cfd
"""
src/models/model_selector.py
==============================
Utilities for training multiple candidate models, selecting the best,
and saving results. Used by both classifier and regressor training scripts.
"""
import pickle
import json
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
from src.utils.logger import get_logger
from config.settings import MODELS_DIR, METRICS_DIR, RANDOM_SEED
logger = get_logger(__name__)
def train_test_split_time(X: pd.DataFrame, y: pd.Series,
test_size: float = 0.2):
"""Chronological train/test split (preserves time order)."""
n = len(X)
split = int(n * (1 - test_size))
return X.iloc[:split], X.iloc[split:], y.iloc[:split], y.iloc[split:]
def sensitivity_analysis(model, X: pd.DataFrame, y_pred_col: str = None,
n_steps: int = 20, pct_range: float = 0.3) -> pd.DataFrame:
"""
Sensitivity analysis: perturb each feature ±pct_range and measure
output change. Works for both classifiers and regressors.
Returns a DataFrame sorted by sensitivity (descending).
"""
# BUG FIX: use predict_proba for classifiers so perturbation produces
# continuous output (not discrete 0/1 labels that always delta to 0).
def _predict_continuous(m, X_arr):
if hasattr(m, "predict_proba"):
proba = m.predict_proba(X_arr)
return proba[:, 1] if proba.ndim == 2 else proba
out = m.predict(X_arr)
return out.astype(float)
base = _predict_continuous(model, X.values)
results = []
for col in X.columns:
X_perturbed = X.copy()
col_std = X[col].std()
if col_std == 0:
continue
X_perturbed[col] = X[col] + col_std * pct_range
perturbed = _predict_continuous(model, X_perturbed.values)
delta = np.abs(perturbed - base).mean()
results.append({"feature": col, "sensitivity": round(float(delta), 6)})
return pd.DataFrame(results).sort_values("sensitivity", ascending=False)
def walk_forward_cv(model_factory, X: pd.DataFrame, y: pd.Series,
n_splits: int = 5, task: str = "classification") -> dict:
"""
Walk-forward (expanding window) cross-validation for time-series data.
Avoids lookahead bias by ensuring training data always precedes test data
chronologically. This is the correct validation strategy for temporal
datasets: a random k-fold would leak future observations into training folds.
Parameters
----------
model_factory : callable
Returns a freshly instantiated (unfitted) sklearn model.
X : pd.DataFrame
Feature matrix, **already in chronological order**.
y : pd.Series
Target vector, same ordering as X.
n_splits : int
Number of expanding-window folds (default 5).
Each fold adds 1/n_splits of the data to the training set.
task : str
"classification" → reports AUC + F1.
"regression" → reports RMSE + R².
Returns
-------
dict with keys:
folds – per-fold metrics list
mean_* – mean across folds
std_* – std across folds
"""
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, f1_score, mean_squared_error, r2_score
import numpy as np
n = len(X)
# Minimum training size: first 40 % of data so folds are meaningful
min_train = max(int(n * 0.4), 20)
fold_size = max(int((n - min_train) / n_splits), 1)
folds = []
for i in range(n_splits):
train_end = min_train + i * fold_size
test_end = min(train_end + fold_size, n)
if train_end >= n or test_end > n:
break
X_tr, X_te = X.iloc[:train_end].values, X.iloc[train_end:test_end].values
y_tr, y_te = y.iloc[:train_end].values, y.iloc[train_end:test_end].values
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_tr)
X_te_s = scaler.transform(X_te)
model = model_factory()
model.fit(X_tr_s, y_tr)
fold_metrics = {"fold": i + 1, "train_size": train_end, "test_size": len(y_te)}
if task == "classification":
if len(np.unique(y_te)) < 2:
# Only one class in this fold's test set — skip AUC
fold_metrics.update({"auc": None, "f1": None})
else:
proba = model.predict_proba(X_te_s)[:, 1] if hasattr(model, "predict_proba") else model.predict(X_te_s)
pred = model.predict(X_te_s)
fold_metrics["auc"] = round(float(roc_auc_score(y_te, proba)), 4)
fold_metrics["f1"] = round(float(f1_score(y_te, pred, zero_division=0)), 4)
else:
pred = model.predict(X_te_s)
fold_metrics["rmse"] = round(float(np.sqrt(mean_squared_error(y_te, pred))), 4)
fold_metrics["r2"] = round(float(r2_score(y_te, pred)), 4)
folds.append(fold_metrics)
if not folds:
return {"folds": [], "error": "insufficient_data"}
result = {"folds": folds}
if task == "classification":
aucs = [f["auc"] for f in folds if f.get("auc") is not None]
f1s = [f["f1"] for f in folds if f.get("f1") is not None]
result.update({
"mean_auc": round(float(np.mean(aucs)), 4) if aucs else None,
"std_auc": round(float(np.std(aucs)), 4) if aucs else None,
"mean_f1": round(float(np.mean(f1s)), 4) if f1s else None,
"std_f1": round(float(np.std(f1s)), 4) if f1s else None,
})
else:
rmses = [f["rmse"] for f in folds]
r2s = [f["r2"] for f in folds]
result.update({
"mean_rmse": round(float(np.mean(rmses)), 4),
"std_rmse": round(float(np.std(rmses)), 4),
"mean_r2": round(float(np.mean(r2s)), 4),
"std_r2": round(float(np.std(r2s)), 4),
})
logger.info("Walk-forward CV (%s, %d folds): %s",
task, len(folds),
{k: v for k, v in result.items() if k.startswith("mean_")})
return result
def save_model(model, name: str) -> Path:
"""Pickle model to MODELS_DIR."""
path = MODELS_DIR / f"{name}.pkl"
with open(path, "wb") as f:
pickle.dump(model, f)
logger.info("Saved model: %s", path.name)
return path
def load_model(name: str):
"""Load a pickled model from MODELS_DIR."""
path = MODELS_DIR / f"{name}.pkl"
if not path.exists():
logger.warning("Model not found: %s", path)
return None
with open(path, "rb") as f:
return pickle.load(f)
def save_metrics(metrics: dict, name: str) -> Path:
"""Save metrics dict as JSON."""
metrics["saved_at"] = datetime.utcnow().isoformat()
path = METRICS_DIR / f"{name}_metrics.json"
with open(path, "w", encoding="utf-8") as f:
json.dump(metrics, f, indent=2, default=str)
logger.info("Saved metrics: %s", path.name)
return path
def load_metrics(name: str) -> dict:
path = METRICS_DIR / f"{name}_metrics.json"
if not path.exists():
return {}
with open(path, encoding="utf-8") as f:
return json.load(f)