""" src/models/train_classifier.py ================================ Trains and evaluates disruption classification models: - Logistic Regression - Random Forest - Gradient Boosting (XGBoost/sklearn) - ANN (MLPClassifier with sigmoid-equivalent) Selects best model by AUC on held-out test set. Saves model artifacts, metrics, and sensitivity analysis. """ import warnings warnings.filterwarnings("ignore") import json import numpy as np import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime from src.utils.logger import get_logger from src.utils.io_utils import save_json from src.models.model_selector import ( train_test_split_time, sensitivity_analysis, save_model, save_metrics, walk_forward_cv ) from config.settings import ( PROCESSED_DIR, FIGURES_DIR, METRICS_DIR, RANDOM_SEED, TEST_SIZE, CLASSIFIER_FEATURES, CLASSIFIER_TARGET ) logger = get_logger(__name__) # ── Import ML libraries (graceful degradation) ──────────────────────────────── try: from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.neural_network import MLPClassifier from sklearn.preprocessing import StandardScaler from sklearn.metrics import (roc_auc_score, roc_curve, precision_score, recall_score, f1_score, confusion_matrix) from sklearn.pipeline import Pipeline SKLEARN_OK = True except ImportError: logger.warning("scikit-learn not installed — using numpy fallback classifiers") SKLEARN_OK = False # ── Numpy Fallback Implementations ─────────────────────────────────────────── class _NumpyLogisticRegression: """Pure numpy logistic regression (SGD, for offline environments).""" def __init__(self, lr=0.01, n_iter=500, seed=42): self.lr = lr; self.n_iter = n_iter; self.seed = seed self.w = None; self.b = 0.0 def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50))) def fit(self, X, y): rng = np.random.default_rng(self.seed) self.w = rng.normal(0, 0.01, X.shape[1]) for _ in range(self.n_iter): z = X @ self.w + self.b pred = self._sigmoid(z) err = pred - y self.w -= self.lr * X.T @ err / len(y) self.b -= self.lr * err.mean() return self def predict_proba(self, X): p = self._sigmoid(X @ self.w + self.b) return np.column_stack([1 - p, p]) def predict(self, X): return (self._sigmoid(X @ self.w + self.b) >= 0.5).astype(int) def get_params(self, **kw): return {"lr": self.lr, "n_iter": self.n_iter} class _NumpyRandomForest: """Simplified Random Forest using numpy (for offline environments).""" def __init__(self, n_estimators=50, max_depth=5, seed=42): self.n_estimators = n_estimators self.max_depth = max_depth self.seed = seed self.trees = [] self.feature_importances_ = None def _build_tree(self, X, y, depth=0): if depth >= self.max_depth or len(set(y)) == 1 or len(y) < 5: return {"leaf": True, "value": np.mean(y)} rng = np.random.default_rng(self.seed + depth) n_feat = max(1, int(np.sqrt(X.shape[1]))) feat_idx = rng.choice(X.shape[1], n_feat, replace=False) best_feat, best_thresh, best_score = 0, 0, float("inf") for fi in feat_idx: thresholds = np.percentile(X[:, fi], [25, 50, 75]) for t in thresholds: left_y = y[X[:, fi] <= t] right_y = y[X[:, fi] > t] if len(left_y) == 0 or len(right_y) == 0: continue score = (len(left_y) * (1 - (left_y.mean()**2 + (1-left_y.mean())**2)) + len(right_y) * (1 - (right_y.mean()**2 + (1-right_y.mean())**2))) if score < best_score: best_score, best_feat, best_thresh = score, fi, t left_mask = X[:, best_feat] <= best_thresh return { "leaf": False, "feat": best_feat, "thresh": best_thresh, "left": self._build_tree(X[left_mask], y[left_mask], depth + 1), "right": self._build_tree(X[~left_mask], y[~left_mask], depth + 1), } def _predict_tree(self, node, x): if node["leaf"]: return node["value"] if x[node["feat"]] <= node["thresh"]: return self._predict_tree(node["left"], x) return self._predict_tree(node["right"], x) def fit(self, X, y): rng = np.random.default_rng(self.seed) self.trees = [] feat_counts = np.zeros(X.shape[1]) for i in range(self.n_estimators): idx = rng.choice(len(X), len(X), replace=True) tree = self._build_tree(X[idx], y[idx]) self.trees.append(tree) self.feature_importances_ = np.ones(X.shape[1]) / X.shape[1] return self def predict_proba(self, X): preds = np.array([[self._predict_tree(t, x) for t in self.trees] for x in X]).mean(axis=1) preds = np.clip(preds, 0, 1) return np.column_stack([1 - preds, preds]) def predict(self, X): return (self.predict_proba(X)[:, 1] >= 0.5).astype(int) def get_params(self, **kw): return {"n_estimators": self.n_estimators} class _NumpyANN: """Simple 2-layer ANN for classification (sigmoid activation).""" def __init__(self, hidden=(16, 8), lr=0.001, n_iter=200, seed=42): self.hidden = hidden; self.lr = lr; self.n_iter = n_iter; self.seed = seed self.weights = []; self.biases = [] def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50))) def _dsigmoid(self, z): s = self._sigmoid(z); return s * (1 - s) def fit(self, X, y): rng = np.random.default_rng(self.seed) layers = [X.shape[1]] + list(self.hidden) + [1] self.weights = [rng.normal(0, np.sqrt(2/layers[i]), (layers[i], layers[i+1])) for i in range(len(layers)-1)] self.biases = [np.zeros(layers[i+1]) for i in range(len(layers)-1)] y_ = y.reshape(-1, 1).astype(float) for epoch in range(self.n_iter): # Forward acts = [X] for W, b in zip(self.weights, self.biases): acts.append(self._sigmoid(acts[-1] @ W + b)) # Backward delta = (acts[-1] - y_) * self._dsigmoid(acts[-1]) for i in reversed(range(len(self.weights))): grad_w = acts[i].T @ delta / len(y_) grad_b = delta.mean(axis=0) if i > 0: delta = delta @ self.weights[i].T * self._dsigmoid(acts[i]) self.weights[i] -= self.lr * grad_w self.biases[i] -= self.lr * grad_b return self def predict_proba(self, X): a = X for W, b in zip(self.weights, self.biases): a = self._sigmoid(a @ W + b) p = a.flatten() return np.column_stack([1 - p, p]) def predict(self, X): return (self.predict_proba(X)[:, 1] >= 0.5).astype(int) def get_params(self, **kw): return {"hidden": self.hidden, "lr": self.lr} # ── Metrics Helpers ─────────────────────────────────────────────────────────── def _compute_metrics(model, X_test, y_test, model_name: str) -> dict: """Compute classification metrics for a trained model.""" try: proba = model.predict_proba(X_test)[:, 1] y_pred = model.predict(X_test) if SKLEARN_OK: fpr, tpr, _ = roc_curve(y_test, proba) auc = roc_auc_score(y_test, proba) else: # AUC via Mann-Whitney U (numerically stable, no sklearn needed) pos_scores = proba[y_test == 1] neg_scores = proba[y_test == 0] n_pos, n_neg = len(pos_scores), len(neg_scores) if n_pos == 0 or n_neg == 0: auc = 0.5 else: # U statistic: rank-based all_scores = np.concatenate([pos_scores, neg_scores]) ranks = np.argsort(np.argsort(all_scores)) + 1 pos_rank_sum = ranks[:n_pos].sum() auc = float((pos_rank_sum - n_pos*(n_pos+1)/2) / (n_pos * n_neg)) auc = np.clip(auc, 0, 1) # Build ROC for plotting thresholds = np.percentile(proba, np.linspace(0, 100, 100))[::-1] fprs, tprs = [0.0], [0.0] n_p, n_n = (y_test==1).sum(), (y_test==0).sum() for t in thresholds: p = (proba >= t).astype(int) fprs.append(((p==1)&(y_test==0)).sum() / (n_n+1e-9)) tprs.append(((p==1)&(y_test==1)).sum() / (n_p+1e-9)) fprs.append(1.0); tprs.append(1.0) fpr = np.array(fprs); tpr = np.array(tprs) if SKLEARN_OK: prec = precision_score(y_test, y_pred, zero_division=0) rec = recall_score(y_test, y_pred, zero_division=0) f1 = f1_score(y_test, y_pred, zero_division=0) else: tp = ((y_pred == 1) & (y_test == 1)).sum() fp = ((y_pred == 1) & (y_test == 0)).sum() fn = ((y_pred == 0) & (y_test == 1)).sum() prec = tp / (tp + fp + 1e-9) rec = tp / (tp + fn + 1e-9) f1 = 2 * prec * rec / (prec + rec + 1e-9) return { "model": model_name, "auc": round(float(auc), 4), "precision": round(float(prec), 4), "recall": round(float(rec), 4), "f1": round(float(f1), 4), "fpr": fpr.tolist(), "tpr": tpr.tolist(), } except Exception as e: logger.error("Metrics error for %s: %s", model_name, e) return {"model": model_name, "auc": 0.0} # ── Main Training Function ──────────────────────────────────────────────────── def train_classifier() -> dict: """ Train all candidate classifiers, select best by AUC, save artifacts. Returns ------- dict with best_model_name, all metrics, and selected model path """ logger.info("=" * 60) logger.info("Training disruption classification models") logger.info("=" * 60) # 1. Load data df = pd.read_csv(PROCESSED_DIR / "features_classification.csv", low_memory=False) logger.info("Loaded %d rows (raw)", len(df)) # Drop rows where the target is NaN (can appear from mis-joined pipeline output) df = df.dropna(subset=[CLASSIFIER_TARGET]).reset_index(drop=True) logger.info("Loaded %d rows after dropping NaN targets", len(df)) # 2. Prepare features X = df[CLASSIFIER_FEATURES].fillna(0).values.astype(float) y = df[CLASSIFIER_TARGET].fillna(0).values.astype(int) n_train = int(len(X) * (1 - TEST_SIZE)) X_train, X_test = X[:n_train], X[n_train:] y_train, y_test = y[:n_train], y[n_train:] # Normalise X_mean = X_train.mean(axis=0) X_std = X_train.std(axis=0) + 1e-8 X_train_n = (X_train - X_mean) / X_std X_test_n = (X_test - X_mean) / X_std logger.info("Train: %d rows | Test: %d rows | Positive rate: %.2f%%", len(X_train), len(X_test), y_train.mean() * 100) # 3. Define candidate models if SKLEARN_OK: candidates = { "Logistic Regression": LogisticRegression( random_state=RANDOM_SEED, max_iter=500, C=0.5), "Random Forest": RandomForestClassifier( n_estimators=100, max_depth=8, random_state=RANDOM_SEED), "Gradient Boosting": GradientBoostingClassifier( n_estimators=100, max_depth=4, learning_rate=0.1, random_state=RANDOM_SEED), "ANN": MLPClassifier( hidden_layer_sizes=(16, 8), activation="logistic", max_iter=500, random_state=RANDOM_SEED, learning_rate_init=0.001), } else: candidates = { "Logistic Regression": _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED), "Random Forest": _NumpyRandomForest(n_estimators=30, max_depth=5, seed=RANDOM_SEED), "ANN": _NumpyANN(hidden=(16, 8), lr=0.01, n_iter=100, seed=RANDOM_SEED), } # 4. Train and evaluate results = [] trained_models = {} for name, model in candidates.items(): logger.info("Training: %s ...", name) try: model.fit(X_train_n, y_train) metrics = _compute_metrics(model, X_test_n, y_test, name) results.append(metrics) trained_models[name] = (model, metrics) logger.info(" AUC=%.4f | Precision=%.4f | Recall=%.4f | F1=%.4f", metrics["auc"], metrics.get("precision", 0), metrics.get("recall", 0), metrics.get("f1", 0)) except Exception as e: logger.error("Training failed for %s: %s", name, e) if not results: logger.error("All models failed — check data") return {} # 5. Select best model by balanced score: 60% AUC + 40% F1 # AUC alone is misleading on imbalanced data (positive rate ~10%). # F1 captures whether the model actually flags any disruptions. def _balanced_score(r): return 0.6 * r.get("auc", 0) + 0.4 * r.get("f1", 0) best = max(results, key=_balanced_score) best_name = best["model"] best_model, _ = trained_models[best_name] logger.info("Best model: %s (AUC=%.4f | F1=%.4f | balanced=%.4f)", best_name, best["auc"], best.get("f1", 0), _balanced_score(best)) # 5b. GridSearchCV — hyperparameter tuning on the best model type # Runs only with sklearn. Uses a targeted param grid to keep runtime ≤60s. grid_search_results = {} if SKLEARN_OK: _param_grids = { "Logistic Regression": { "C": [0.1, 0.5, 1.0, 5.0], "max_iter": [500], }, "Random Forest": { "n_estimators": [50, 100, 200], "max_depth": [5, 8, 12], }, "Gradient Boosting": { "n_estimators": [50, 100], "max_depth": [3, 4], "learning_rate": [0.05, 0.1], }, "ANN": { "hidden_layer_sizes": [(16, 8), (32, 16)], "learning_rate_init": [0.001, 0.005], }, } if best_name in _param_grids: try: from sklearn.model_selection import GridSearchCV as _GridSearchCV _base_map = { "Logistic Regression": LogisticRegression( random_state=RANDOM_SEED, max_iter=500), "Random Forest": RandomForestClassifier( random_state=RANDOM_SEED), "Gradient Boosting": GradientBoostingClassifier( random_state=RANDOM_SEED), "ANN": MLPClassifier( activation="logistic", max_iter=500, random_state=RANDOM_SEED), } _gs = _GridSearchCV( _base_map[best_name], _param_grids[best_name], cv=3, scoring="roc_auc", n_jobs=-1, refit=True, ) _gs.fit(X_train_n, y_train) _tuned = _gs.best_estimator_ _tuned_metrics = _compute_metrics( _tuned, X_test_n, y_test, f"{best_name} (Tuned)") grid_search_results = { "best_params": _gs.best_params_, "best_cv_auc": round(float(_gs.best_score_), 4), "tuned_test_auc": _tuned_metrics.get("auc", 0), "tuned_test_f1": _tuned_metrics.get("f1", 0), } if _balanced_score(_tuned_metrics) > _balanced_score(best): best_model = _tuned best = _tuned_metrics logger.info( "GridSearchCV improved model: AUC=%.4f F1=%.4f params=%s", _tuned_metrics["auc"], _tuned_metrics.get("f1", 0), _gs.best_params_, ) else: logger.info( "GridSearchCV: existing model already optimal " "(cv_auc=%.4f)", _gs.best_score_) except Exception as _ge: logger.warning("GridSearchCV failed (non-fatal): %s", _ge) # 6. Sensitivity analysis for best model (bug-fixed: uses predict_proba) feat_df = pd.DataFrame(X_test_n, columns=CLASSIFIER_FEATURES) sensitivity = sensitivity_analysis(best_model, feat_df) sens_path = METRICS_DIR / "sensitivity_classifier.csv" sensitivity.to_csv(sens_path, index=False) # 6b. SHAP explainability _run_shap_classifier(best_model, best_name, X_train_n, X_test_n, CLASSIFIER_FEATURES) # 6c. Walk-forward (expanding-window) cross-validation # Avoids lookahead bias: each fold trains only on data that precedes the test window wf_cv_results = {} try: from sklearn.base import clone as _clone X_wfcv = pd.DataFrame(X, columns=CLASSIFIER_FEATURES) y_wfcv = pd.Series(y, name=CLASSIFIER_TARGET) def _best_factory(): if SKLEARN_OK: try: return _clone(best_model) except Exception: pass return _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED) wf_cv_results = walk_forward_cv( _best_factory, X_wfcv, y_wfcv, n_splits=5, task="classification") logger.info( "Walk-forward CV — mean_auc=%.4f ± %.4f | mean_f1=%.4f ± %.4f", wf_cv_results.get("mean_auc") or 0, wf_cv_results.get("std_auc") or 0, wf_cv_results.get("mean_f1") or 0, wf_cv_results.get("std_f1") or 0, ) except Exception as _wfe: logger.warning("Walk-forward CV failed (non-fatal): %s", _wfe) # Compute optimal decision threshold (maximises TPR-FPR on test set) # This is important when class imbalance means default 0.5 is suboptimal. _optimal_threshold = 0.5 # fallback try: _probs_test = best_model.predict_proba(X_test_n)[:, 1] _prob_min = float(_probs_test.min()) _prob_max = float(_probs_test.max()) if SKLEARN_OK: from sklearn.metrics import roc_curve as _roc_curve _fpr_arr, _tpr_arr, _thresh_arr = _roc_curve(y_test, _probs_test) _optimal_threshold = float(_thresh_arr[np.argmax(_tpr_arr - _fpr_arr)]) logger.info("Optimal decision threshold: %.4f (positive rate %.1f%%)", _optimal_threshold, y_train.mean() * 100) except Exception as _te: logger.warning("Could not compute optimal threshold: %s", _te) _prob_min, _prob_max = 0.0, 1.0 # Feature importance (if available) fi_data = {} if hasattr(best_model, "feature_importances_"): fi_data = dict(zip(CLASSIFIER_FEATURES, best_model.feature_importances_.tolist())) elif hasattr(best_model, "coef_"): fi_data = dict(zip(CLASSIFIER_FEATURES, np.abs(best_model.coef_[0]).tolist())) # 7. Save model and metrics model_meta = { "best_model": best_name, "scaler_mean": X_mean.tolist(), "scaler_std": X_std.tolist(), "features": CLASSIFIER_FEATURES, "target": CLASSIFIER_TARGET, "all_models": results, "feature_importance": fi_data, "optimal_threshold": _optimal_threshold, "prob_range": [_prob_min, _prob_max], "trained_at": datetime.utcnow().isoformat(), "n_train": int(len(X_train)), "n_test": int(len(X_test)), "positive_rate": float(y_train.mean()), "grid_search": grid_search_results, "walk_forward_cv": wf_cv_results, } save_metrics(model_meta, "classifier") save_model(best_model, "best_classifier") save_model({"mean": X_mean, "std": X_std}, "classifier_scaler") # 8. Plots _plot_roc_curves(results) _plot_feature_importance(sensitivity, best_name) logger.info("✓ Classifier training complete. Best: %s AUC=%.4f", best_name, best["auc"]) return model_meta def _plot_roc_curves(results: list): """Plot ROC curves for all models.""" fig, ax = plt.subplots(figsize=(8, 6)) colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"] for i, r in enumerate(results): if "fpr" in r and "tpr" in r: ax.plot(r["fpr"], r["tpr"], color=colors[i % len(colors)], label=f"{r['model']} (AUC={r['auc']:.3f})", linewidth=2) ax.plot([0, 1], [0, 1], "k--", alpha=0.5, label="Random") ax.set_xlabel("False Positive Rate") ax.set_ylabel("True Positive Rate") ax.set_title("ROC Curves — Disruption Classifier") ax.legend(loc="lower right") ax.grid(True, alpha=0.3) plt.tight_layout() fig.savefig(FIGURES_DIR / "roc_curves_classifier.png", dpi=150) plt.close(fig) logger.info("Saved ROC curves plot") def _plot_feature_importance(sensitivity_df: pd.DataFrame, model_name: str): """Plot feature sensitivity / importance.""" if sensitivity_df.empty: return fig, ax = plt.subplots(figsize=(8, 6)) top = sensitivity_df.head(10) bars = ax.barh(top["feature"][::-1], top["sensitivity"][::-1], color="#1f77b4") ax.set_xlabel("Sensitivity (avg output change)") ax.set_title(f"Feature Sensitivity Analysis — {model_name}") ax.grid(True, alpha=0.3, axis="x") plt.tight_layout() fig.savefig(FIGURES_DIR / "feature_sensitivity_classifier.png", dpi=150) plt.close(fig) logger.info("Saved feature sensitivity plot") def _run_shap_classifier(model, model_name: str, X_train_n, X_test_n, feature_names): """Compute SHAP values for the best classifier and save outputs.""" try: import shap X_train_df = pd.DataFrame(X_train_n, columns=feature_names) X_test_df = pd.DataFrame(X_test_n, columns=feature_names) # Choose explainer based on model type if hasattr(model, "feature_importances_"): # Tree-based (RF, GB): fast TreeExplainer explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test_df) # For binary classifiers TreeExplainer returns list[class0, class1] if isinstance(shap_values, list) and len(shap_values) == 2: shap_vals = shap_values[1] # class-1 (disruption) else: shap_vals = shap_values else: # Linear / ANN: use LinearExplainer or KernelExplainer (sample 50 rows) background = shap.sample(X_train_df, min(50, len(X_train_df)), random_state=42) if hasattr(model, "coef_"): explainer = shap.LinearExplainer(model, background) shap_vals = explainer.shap_values(X_test_df) else: def _predict_proba_pos(X): return model.predict_proba(X)[:, 1] explainer = shap.KernelExplainer(_predict_proba_pos, background) shap_vals = explainer.shap_values(X_test_df, nsamples=100) # Mean absolute SHAP per feature mean_shap = np.abs(shap_vals).mean(axis=0) shap_df = pd.DataFrame({ "feature": feature_names, "mean_abs_shap": mean_shap.tolist() }).sort_values("mean_abs_shap", ascending=False) shap_df.to_csv(METRICS_DIR / "shap_classifier.csv", index=False) # SHAP summary bar chart fig, ax = plt.subplots(figsize=(8, 5)) colors = ["#1A2B5E" if v > 0.001 else "#CBD5E1" for v in shap_df["mean_abs_shap"]] ax.barh(shap_df["feature"][::-1], shap_df["mean_abs_shap"][::-1], color=colors[::-1]) ax.set_xlabel("Mean |SHAP value| (impact on disruption probability)") ax.set_title(f"SHAP Feature Importance — {model_name} (Classifier)") ax.grid(True, alpha=0.3, axis="x") plt.tight_layout() fig.savefig(FIGURES_DIR / "shap_classifier.png", dpi=150) plt.close(fig) logger.info("✓ SHAP classifier: saved shap_classifier.csv + shap_classifier.png") except Exception as e: logger.warning("SHAP classifier failed (non-fatal): %s", e) if __name__ == "__main__": metrics = train_classifier() print(f"\nBest model: {metrics.get('best_model')}") print(f"AUC: {next((m['auc'] for m in metrics.get('all_models', []) if m['model'] == metrics.get('best_model')), 'N/A')}")