bhanug2026
Initial commit
47c6cfd
"""
src/models/train_classifier.py
================================
Trains and evaluates disruption classification models:
- Logistic Regression
- Random Forest
- Gradient Boosting (XGBoost/sklearn)
- ANN (MLPClassifier with sigmoid-equivalent)
Selects best model by AUC on held-out test set.
Saves model artifacts, metrics, and sensitivity analysis.
"""
import warnings
warnings.filterwarnings("ignore")
import json
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
from src.utils.logger import get_logger
from src.utils.io_utils import save_json
from src.models.model_selector import (
train_test_split_time, sensitivity_analysis, save_model, save_metrics,
walk_forward_cv
)
from config.settings import (
PROCESSED_DIR, FIGURES_DIR, METRICS_DIR, RANDOM_SEED, TEST_SIZE,
CLASSIFIER_FEATURES, CLASSIFIER_TARGET
)
logger = get_logger(__name__)
# ── Import ML libraries (graceful degradation) ────────────────────────────────
try:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (roc_auc_score, roc_curve, precision_score,
recall_score, f1_score, confusion_matrix)
from sklearn.pipeline import Pipeline
SKLEARN_OK = True
except ImportError:
logger.warning("scikit-learn not installed — using numpy fallback classifiers")
SKLEARN_OK = False
# ── Numpy Fallback Implementations ───────────────────────────────────────────
class _NumpyLogisticRegression:
"""Pure numpy logistic regression (SGD, for offline environments)."""
def __init__(self, lr=0.01, n_iter=500, seed=42):
self.lr = lr; self.n_iter = n_iter; self.seed = seed
self.w = None; self.b = 0.0
def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50)))
def fit(self, X, y):
rng = np.random.default_rng(self.seed)
self.w = rng.normal(0, 0.01, X.shape[1])
for _ in range(self.n_iter):
z = X @ self.w + self.b
pred = self._sigmoid(z)
err = pred - y
self.w -= self.lr * X.T @ err / len(y)
self.b -= self.lr * err.mean()
return self
def predict_proba(self, X):
p = self._sigmoid(X @ self.w + self.b)
return np.column_stack([1 - p, p])
def predict(self, X):
return (self._sigmoid(X @ self.w + self.b) >= 0.5).astype(int)
def get_params(self, **kw): return {"lr": self.lr, "n_iter": self.n_iter}
class _NumpyRandomForest:
"""Simplified Random Forest using numpy (for offline environments)."""
def __init__(self, n_estimators=50, max_depth=5, seed=42):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.seed = seed
self.trees = []
self.feature_importances_ = None
def _build_tree(self, X, y, depth=0):
if depth >= self.max_depth or len(set(y)) == 1 or len(y) < 5:
return {"leaf": True, "value": np.mean(y)}
rng = np.random.default_rng(self.seed + depth)
n_feat = max(1, int(np.sqrt(X.shape[1])))
feat_idx = rng.choice(X.shape[1], n_feat, replace=False)
best_feat, best_thresh, best_score = 0, 0, float("inf")
for fi in feat_idx:
thresholds = np.percentile(X[:, fi], [25, 50, 75])
for t in thresholds:
left_y = y[X[:, fi] <= t]
right_y = y[X[:, fi] > t]
if len(left_y) == 0 or len(right_y) == 0:
continue
score = (len(left_y) * (1 - (left_y.mean()**2 + (1-left_y.mean())**2)) +
len(right_y) * (1 - (right_y.mean()**2 + (1-right_y.mean())**2)))
if score < best_score:
best_score, best_feat, best_thresh = score, fi, t
left_mask = X[:, best_feat] <= best_thresh
return {
"leaf": False, "feat": best_feat, "thresh": best_thresh,
"left": self._build_tree(X[left_mask], y[left_mask], depth + 1),
"right": self._build_tree(X[~left_mask], y[~left_mask], depth + 1),
}
def _predict_tree(self, node, x):
if node["leaf"]:
return node["value"]
if x[node["feat"]] <= node["thresh"]:
return self._predict_tree(node["left"], x)
return self._predict_tree(node["right"], x)
def fit(self, X, y):
rng = np.random.default_rng(self.seed)
self.trees = []
feat_counts = np.zeros(X.shape[1])
for i in range(self.n_estimators):
idx = rng.choice(len(X), len(X), replace=True)
tree = self._build_tree(X[idx], y[idx])
self.trees.append(tree)
self.feature_importances_ = np.ones(X.shape[1]) / X.shape[1]
return self
def predict_proba(self, X):
preds = np.array([[self._predict_tree(t, x) for t in self.trees]
for x in X]).mean(axis=1)
preds = np.clip(preds, 0, 1)
return np.column_stack([1 - preds, preds])
def predict(self, X):
return (self.predict_proba(X)[:, 1] >= 0.5).astype(int)
def get_params(self, **kw): return {"n_estimators": self.n_estimators}
class _NumpyANN:
"""Simple 2-layer ANN for classification (sigmoid activation)."""
def __init__(self, hidden=(16, 8), lr=0.001, n_iter=200, seed=42):
self.hidden = hidden; self.lr = lr; self.n_iter = n_iter; self.seed = seed
self.weights = []; self.biases = []
def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50)))
def _dsigmoid(self, z): s = self._sigmoid(z); return s * (1 - s)
def fit(self, X, y):
rng = np.random.default_rng(self.seed)
layers = [X.shape[1]] + list(self.hidden) + [1]
self.weights = [rng.normal(0, np.sqrt(2/layers[i]), (layers[i], layers[i+1]))
for i in range(len(layers)-1)]
self.biases = [np.zeros(layers[i+1]) for i in range(len(layers)-1)]
y_ = y.reshape(-1, 1).astype(float)
for epoch in range(self.n_iter):
# Forward
acts = [X]
for W, b in zip(self.weights, self.biases):
acts.append(self._sigmoid(acts[-1] @ W + b))
# Backward
delta = (acts[-1] - y_) * self._dsigmoid(acts[-1])
for i in reversed(range(len(self.weights))):
grad_w = acts[i].T @ delta / len(y_)
grad_b = delta.mean(axis=0)
if i > 0:
delta = delta @ self.weights[i].T * self._dsigmoid(acts[i])
self.weights[i] -= self.lr * grad_w
self.biases[i] -= self.lr * grad_b
return self
def predict_proba(self, X):
a = X
for W, b in zip(self.weights, self.biases):
a = self._sigmoid(a @ W + b)
p = a.flatten()
return np.column_stack([1 - p, p])
def predict(self, X):
return (self.predict_proba(X)[:, 1] >= 0.5).astype(int)
def get_params(self, **kw): return {"hidden": self.hidden, "lr": self.lr}
# ── Metrics Helpers ───────────────────────────────────────────────────────────
def _compute_metrics(model, X_test, y_test, model_name: str) -> dict:
"""Compute classification metrics for a trained model."""
try:
proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)
if SKLEARN_OK:
fpr, tpr, _ = roc_curve(y_test, proba)
auc = roc_auc_score(y_test, proba)
else:
# AUC via Mann-Whitney U (numerically stable, no sklearn needed)
pos_scores = proba[y_test == 1]
neg_scores = proba[y_test == 0]
n_pos, n_neg = len(pos_scores), len(neg_scores)
if n_pos == 0 or n_neg == 0:
auc = 0.5
else:
# U statistic: rank-based
all_scores = np.concatenate([pos_scores, neg_scores])
ranks = np.argsort(np.argsort(all_scores)) + 1
pos_rank_sum = ranks[:n_pos].sum()
auc = float((pos_rank_sum - n_pos*(n_pos+1)/2) / (n_pos * n_neg))
auc = np.clip(auc, 0, 1)
# Build ROC for plotting
thresholds = np.percentile(proba, np.linspace(0, 100, 100))[::-1]
fprs, tprs = [0.0], [0.0]
n_p, n_n = (y_test==1).sum(), (y_test==0).sum()
for t in thresholds:
p = (proba >= t).astype(int)
fprs.append(((p==1)&(y_test==0)).sum() / (n_n+1e-9))
tprs.append(((p==1)&(y_test==1)).sum() / (n_p+1e-9))
fprs.append(1.0); tprs.append(1.0)
fpr = np.array(fprs); tpr = np.array(tprs)
if SKLEARN_OK:
prec = precision_score(y_test, y_pred, zero_division=0)
rec = recall_score(y_test, y_pred, zero_division=0)
f1 = f1_score(y_test, y_pred, zero_division=0)
else:
tp = ((y_pred == 1) & (y_test == 1)).sum()
fp = ((y_pred == 1) & (y_test == 0)).sum()
fn = ((y_pred == 0) & (y_test == 1)).sum()
prec = tp / (tp + fp + 1e-9)
rec = tp / (tp + fn + 1e-9)
f1 = 2 * prec * rec / (prec + rec + 1e-9)
return {
"model": model_name, "auc": round(float(auc), 4),
"precision": round(float(prec), 4),
"recall": round(float(rec), 4), "f1": round(float(f1), 4),
"fpr": fpr.tolist(), "tpr": tpr.tolist(),
}
except Exception as e:
logger.error("Metrics error for %s: %s", model_name, e)
return {"model": model_name, "auc": 0.0}
# ── Main Training Function ────────────────────────────────────────────────────
def train_classifier() -> dict:
"""
Train all candidate classifiers, select best by AUC, save artifacts.
Returns
-------
dict with best_model_name, all metrics, and selected model path
"""
logger.info("=" * 60)
logger.info("Training disruption classification models")
logger.info("=" * 60)
# 1. Load data
df = pd.read_csv(PROCESSED_DIR / "features_classification.csv", low_memory=False)
logger.info("Loaded %d rows (raw)", len(df))
# Drop rows where the target is NaN (can appear from mis-joined pipeline output)
df = df.dropna(subset=[CLASSIFIER_TARGET]).reset_index(drop=True)
logger.info("Loaded %d rows after dropping NaN targets", len(df))
# 2. Prepare features
X = df[CLASSIFIER_FEATURES].fillna(0).values.astype(float)
y = df[CLASSIFIER_TARGET].fillna(0).values.astype(int)
n_train = int(len(X) * (1 - TEST_SIZE))
X_train, X_test = X[:n_train], X[n_train:]
y_train, y_test = y[:n_train], y[n_train:]
# Normalise
X_mean = X_train.mean(axis=0)
X_std = X_train.std(axis=0) + 1e-8
X_train_n = (X_train - X_mean) / X_std
X_test_n = (X_test - X_mean) / X_std
logger.info("Train: %d rows | Test: %d rows | Positive rate: %.2f%%",
len(X_train), len(X_test), y_train.mean() * 100)
# 3. Define candidate models
if SKLEARN_OK:
candidates = {
"Logistic Regression": LogisticRegression(
random_state=RANDOM_SEED, max_iter=500, C=0.5),
"Random Forest": RandomForestClassifier(
n_estimators=100, max_depth=8, random_state=RANDOM_SEED),
"Gradient Boosting": GradientBoostingClassifier(
n_estimators=100, max_depth=4, learning_rate=0.1,
random_state=RANDOM_SEED),
"ANN": MLPClassifier(
hidden_layer_sizes=(16, 8), activation="logistic",
max_iter=500, random_state=RANDOM_SEED, learning_rate_init=0.001),
}
else:
candidates = {
"Logistic Regression": _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED),
"Random Forest": _NumpyRandomForest(n_estimators=30, max_depth=5, seed=RANDOM_SEED),
"ANN": _NumpyANN(hidden=(16, 8), lr=0.01, n_iter=100, seed=RANDOM_SEED),
}
# 4. Train and evaluate
results = []
trained_models = {}
for name, model in candidates.items():
logger.info("Training: %s ...", name)
try:
model.fit(X_train_n, y_train)
metrics = _compute_metrics(model, X_test_n, y_test, name)
results.append(metrics)
trained_models[name] = (model, metrics)
logger.info(" AUC=%.4f | Precision=%.4f | Recall=%.4f | F1=%.4f",
metrics["auc"], metrics.get("precision", 0),
metrics.get("recall", 0), metrics.get("f1", 0))
except Exception as e:
logger.error("Training failed for %s: %s", name, e)
if not results:
logger.error("All models failed — check data")
return {}
# 5. Select best model by balanced score: 60% AUC + 40% F1
# AUC alone is misleading on imbalanced data (positive rate ~10%).
# F1 captures whether the model actually flags any disruptions.
def _balanced_score(r):
return 0.6 * r.get("auc", 0) + 0.4 * r.get("f1", 0)
best = max(results, key=_balanced_score)
best_name = best["model"]
best_model, _ = trained_models[best_name]
logger.info("Best model: %s (AUC=%.4f | F1=%.4f | balanced=%.4f)",
best_name, best["auc"], best.get("f1", 0), _balanced_score(best))
# 5b. GridSearchCV — hyperparameter tuning on the best model type
# Runs only with sklearn. Uses a targeted param grid to keep runtime ≤60s.
grid_search_results = {}
if SKLEARN_OK:
_param_grids = {
"Logistic Regression": {
"C": [0.1, 0.5, 1.0, 5.0], "max_iter": [500],
},
"Random Forest": {
"n_estimators": [50, 100, 200], "max_depth": [5, 8, 12],
},
"Gradient Boosting": {
"n_estimators": [50, 100], "max_depth": [3, 4],
"learning_rate": [0.05, 0.1],
},
"ANN": {
"hidden_layer_sizes": [(16, 8), (32, 16)],
"learning_rate_init": [0.001, 0.005],
},
}
if best_name in _param_grids:
try:
from sklearn.model_selection import GridSearchCV as _GridSearchCV
_base_map = {
"Logistic Regression": LogisticRegression(
random_state=RANDOM_SEED, max_iter=500),
"Random Forest": RandomForestClassifier(
random_state=RANDOM_SEED),
"Gradient Boosting": GradientBoostingClassifier(
random_state=RANDOM_SEED),
"ANN": MLPClassifier(
activation="logistic", max_iter=500, random_state=RANDOM_SEED),
}
_gs = _GridSearchCV(
_base_map[best_name], _param_grids[best_name],
cv=3, scoring="roc_auc", n_jobs=-1, refit=True,
)
_gs.fit(X_train_n, y_train)
_tuned = _gs.best_estimator_
_tuned_metrics = _compute_metrics(
_tuned, X_test_n, y_test, f"{best_name} (Tuned)")
grid_search_results = {
"best_params": _gs.best_params_,
"best_cv_auc": round(float(_gs.best_score_), 4),
"tuned_test_auc": _tuned_metrics.get("auc", 0),
"tuned_test_f1": _tuned_metrics.get("f1", 0),
}
if _balanced_score(_tuned_metrics) > _balanced_score(best):
best_model = _tuned
best = _tuned_metrics
logger.info(
"GridSearchCV improved model: AUC=%.4f F1=%.4f params=%s",
_tuned_metrics["auc"], _tuned_metrics.get("f1", 0),
_gs.best_params_,
)
else:
logger.info(
"GridSearchCV: existing model already optimal "
"(cv_auc=%.4f)", _gs.best_score_)
except Exception as _ge:
logger.warning("GridSearchCV failed (non-fatal): %s", _ge)
# 6. Sensitivity analysis for best model (bug-fixed: uses predict_proba)
feat_df = pd.DataFrame(X_test_n, columns=CLASSIFIER_FEATURES)
sensitivity = sensitivity_analysis(best_model, feat_df)
sens_path = METRICS_DIR / "sensitivity_classifier.csv"
sensitivity.to_csv(sens_path, index=False)
# 6b. SHAP explainability
_run_shap_classifier(best_model, best_name, X_train_n, X_test_n, CLASSIFIER_FEATURES)
# 6c. Walk-forward (expanding-window) cross-validation
# Avoids lookahead bias: each fold trains only on data that precedes the test window
wf_cv_results = {}
try:
from sklearn.base import clone as _clone
X_wfcv = pd.DataFrame(X, columns=CLASSIFIER_FEATURES)
y_wfcv = pd.Series(y, name=CLASSIFIER_TARGET)
def _best_factory():
if SKLEARN_OK:
try:
return _clone(best_model)
except Exception:
pass
return _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED)
wf_cv_results = walk_forward_cv(
_best_factory, X_wfcv, y_wfcv, n_splits=5, task="classification")
logger.info(
"Walk-forward CV — mean_auc=%.4f ± %.4f | mean_f1=%.4f ± %.4f",
wf_cv_results.get("mean_auc") or 0, wf_cv_results.get("std_auc") or 0,
wf_cv_results.get("mean_f1") or 0, wf_cv_results.get("std_f1") or 0,
)
except Exception as _wfe:
logger.warning("Walk-forward CV failed (non-fatal): %s", _wfe)
# Compute optimal decision threshold (maximises TPR-FPR on test set)
# This is important when class imbalance means default 0.5 is suboptimal.
_optimal_threshold = 0.5 # fallback
try:
_probs_test = best_model.predict_proba(X_test_n)[:, 1]
_prob_min = float(_probs_test.min())
_prob_max = float(_probs_test.max())
if SKLEARN_OK:
from sklearn.metrics import roc_curve as _roc_curve
_fpr_arr, _tpr_arr, _thresh_arr = _roc_curve(y_test, _probs_test)
_optimal_threshold = float(_thresh_arr[np.argmax(_tpr_arr - _fpr_arr)])
logger.info("Optimal decision threshold: %.4f (positive rate %.1f%%)",
_optimal_threshold, y_train.mean() * 100)
except Exception as _te:
logger.warning("Could not compute optimal threshold: %s", _te)
_prob_min, _prob_max = 0.0, 1.0
# Feature importance (if available)
fi_data = {}
if hasattr(best_model, "feature_importances_"):
fi_data = dict(zip(CLASSIFIER_FEATURES, best_model.feature_importances_.tolist()))
elif hasattr(best_model, "coef_"):
fi_data = dict(zip(CLASSIFIER_FEATURES, np.abs(best_model.coef_[0]).tolist()))
# 7. Save model and metrics
model_meta = {
"best_model": best_name,
"scaler_mean": X_mean.tolist(),
"scaler_std": X_std.tolist(),
"features": CLASSIFIER_FEATURES,
"target": CLASSIFIER_TARGET,
"all_models": results,
"feature_importance": fi_data,
"optimal_threshold": _optimal_threshold,
"prob_range": [_prob_min, _prob_max],
"trained_at": datetime.utcnow().isoformat(),
"n_train": int(len(X_train)),
"n_test": int(len(X_test)),
"positive_rate": float(y_train.mean()),
"grid_search": grid_search_results,
"walk_forward_cv": wf_cv_results,
}
save_metrics(model_meta, "classifier")
save_model(best_model, "best_classifier")
save_model({"mean": X_mean, "std": X_std}, "classifier_scaler")
# 8. Plots
_plot_roc_curves(results)
_plot_feature_importance(sensitivity, best_name)
logger.info("✓ Classifier training complete. Best: %s AUC=%.4f",
best_name, best["auc"])
return model_meta
def _plot_roc_curves(results: list):
"""Plot ROC curves for all models."""
fig, ax = plt.subplots(figsize=(8, 6))
colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"]
for i, r in enumerate(results):
if "fpr" in r and "tpr" in r:
ax.plot(r["fpr"], r["tpr"], color=colors[i % len(colors)],
label=f"{r['model']} (AUC={r['auc']:.3f})", linewidth=2)
ax.plot([0, 1], [0, 1], "k--", alpha=0.5, label="Random")
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title("ROC Curves — Disruption Classifier")
ax.legend(loc="lower right")
ax.grid(True, alpha=0.3)
plt.tight_layout()
fig.savefig(FIGURES_DIR / "roc_curves_classifier.png", dpi=150)
plt.close(fig)
logger.info("Saved ROC curves plot")
def _plot_feature_importance(sensitivity_df: pd.DataFrame, model_name: str):
"""Plot feature sensitivity / importance."""
if sensitivity_df.empty:
return
fig, ax = plt.subplots(figsize=(8, 6))
top = sensitivity_df.head(10)
bars = ax.barh(top["feature"][::-1], top["sensitivity"][::-1], color="#1f77b4")
ax.set_xlabel("Sensitivity (avg output change)")
ax.set_title(f"Feature Sensitivity Analysis — {model_name}")
ax.grid(True, alpha=0.3, axis="x")
plt.tight_layout()
fig.savefig(FIGURES_DIR / "feature_sensitivity_classifier.png", dpi=150)
plt.close(fig)
logger.info("Saved feature sensitivity plot")
def _run_shap_classifier(model, model_name: str, X_train_n, X_test_n, feature_names):
"""Compute SHAP values for the best classifier and save outputs."""
try:
import shap
X_train_df = pd.DataFrame(X_train_n, columns=feature_names)
X_test_df = pd.DataFrame(X_test_n, columns=feature_names)
# Choose explainer based on model type
if hasattr(model, "feature_importances_"):
# Tree-based (RF, GB): fast TreeExplainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_df)
# For binary classifiers TreeExplainer returns list[class0, class1]
if isinstance(shap_values, list) and len(shap_values) == 2:
shap_vals = shap_values[1] # class-1 (disruption)
else:
shap_vals = shap_values
else:
# Linear / ANN: use LinearExplainer or KernelExplainer (sample 50 rows)
background = shap.sample(X_train_df, min(50, len(X_train_df)), random_state=42)
if hasattr(model, "coef_"):
explainer = shap.LinearExplainer(model, background)
shap_vals = explainer.shap_values(X_test_df)
else:
def _predict_proba_pos(X):
return model.predict_proba(X)[:, 1]
explainer = shap.KernelExplainer(_predict_proba_pos, background)
shap_vals = explainer.shap_values(X_test_df, nsamples=100)
# Mean absolute SHAP per feature
mean_shap = np.abs(shap_vals).mean(axis=0)
shap_df = pd.DataFrame({
"feature": feature_names,
"mean_abs_shap": mean_shap.tolist()
}).sort_values("mean_abs_shap", ascending=False)
shap_df.to_csv(METRICS_DIR / "shap_classifier.csv", index=False)
# SHAP summary bar chart
fig, ax = plt.subplots(figsize=(8, 5))
colors = ["#1A2B5E" if v > 0.001 else "#CBD5E1" for v in shap_df["mean_abs_shap"]]
ax.barh(shap_df["feature"][::-1], shap_df["mean_abs_shap"][::-1], color=colors[::-1])
ax.set_xlabel("Mean |SHAP value| (impact on disruption probability)")
ax.set_title(f"SHAP Feature Importance — {model_name} (Classifier)")
ax.grid(True, alpha=0.3, axis="x")
plt.tight_layout()
fig.savefig(FIGURES_DIR / "shap_classifier.png", dpi=150)
plt.close(fig)
logger.info("✓ SHAP classifier: saved shap_classifier.csv + shap_classifier.png")
except Exception as e:
logger.warning("SHAP classifier failed (non-fatal): %s", e)
if __name__ == "__main__":
metrics = train_classifier()
print(f"\nBest model: {metrics.get('best_model')}")
print(f"AUC: {next((m['auc'] for m in metrics.get('all_models', []) if m['model'] == metrics.get('best_model')), 'N/A')}")