| """ |
| src/models/train_classifier.py |
| ================================ |
| Trains and evaluates disruption classification models: |
| - Logistic Regression |
| - Random Forest |
| - Gradient Boosting (XGBoost/sklearn) |
| - ANN (MLPClassifier with sigmoid-equivalent) |
| |
| Selects best model by AUC on held-out test set. |
| Saves model artifacts, metrics, and sensitivity analysis. |
| """ |
|
|
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| import json |
| import numpy as np |
| import pandas as pd |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| from pathlib import Path |
| from datetime import datetime |
| from src.utils.logger import get_logger |
| from src.utils.io_utils import save_json |
| from src.models.model_selector import ( |
| train_test_split_time, sensitivity_analysis, save_model, save_metrics, |
| walk_forward_cv |
| ) |
| from config.settings import ( |
| PROCESSED_DIR, FIGURES_DIR, METRICS_DIR, RANDOM_SEED, TEST_SIZE, |
| CLASSIFIER_FEATURES, CLASSIFIER_TARGET |
| ) |
|
|
| logger = get_logger(__name__) |
|
|
| |
|
|
| try: |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
| from sklearn.neural_network import MLPClassifier |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics import (roc_auc_score, roc_curve, precision_score, |
| recall_score, f1_score, confusion_matrix) |
| from sklearn.pipeline import Pipeline |
| SKLEARN_OK = True |
| except ImportError: |
| logger.warning("scikit-learn not installed — using numpy fallback classifiers") |
| SKLEARN_OK = False |
|
|
|
|
| |
|
|
| class _NumpyLogisticRegression: |
| """Pure numpy logistic regression (SGD, for offline environments).""" |
| def __init__(self, lr=0.01, n_iter=500, seed=42): |
| self.lr = lr; self.n_iter = n_iter; self.seed = seed |
| self.w = None; self.b = 0.0 |
|
|
| def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50))) |
|
|
| def fit(self, X, y): |
| rng = np.random.default_rng(self.seed) |
| self.w = rng.normal(0, 0.01, X.shape[1]) |
| for _ in range(self.n_iter): |
| z = X @ self.w + self.b |
| pred = self._sigmoid(z) |
| err = pred - y |
| self.w -= self.lr * X.T @ err / len(y) |
| self.b -= self.lr * err.mean() |
| return self |
|
|
| def predict_proba(self, X): |
| p = self._sigmoid(X @ self.w + self.b) |
| return np.column_stack([1 - p, p]) |
|
|
| def predict(self, X): |
| return (self._sigmoid(X @ self.w + self.b) >= 0.5).astype(int) |
|
|
| def get_params(self, **kw): return {"lr": self.lr, "n_iter": self.n_iter} |
|
|
|
|
| class _NumpyRandomForest: |
| """Simplified Random Forest using numpy (for offline environments).""" |
| def __init__(self, n_estimators=50, max_depth=5, seed=42): |
| self.n_estimators = n_estimators |
| self.max_depth = max_depth |
| self.seed = seed |
| self.trees = [] |
| self.feature_importances_ = None |
|
|
| def _build_tree(self, X, y, depth=0): |
| if depth >= self.max_depth or len(set(y)) == 1 or len(y) < 5: |
| return {"leaf": True, "value": np.mean(y)} |
| rng = np.random.default_rng(self.seed + depth) |
| n_feat = max(1, int(np.sqrt(X.shape[1]))) |
| feat_idx = rng.choice(X.shape[1], n_feat, replace=False) |
| best_feat, best_thresh, best_score = 0, 0, float("inf") |
| for fi in feat_idx: |
| thresholds = np.percentile(X[:, fi], [25, 50, 75]) |
| for t in thresholds: |
| left_y = y[X[:, fi] <= t] |
| right_y = y[X[:, fi] > t] |
| if len(left_y) == 0 or len(right_y) == 0: |
| continue |
| score = (len(left_y) * (1 - (left_y.mean()**2 + (1-left_y.mean())**2)) + |
| len(right_y) * (1 - (right_y.mean()**2 + (1-right_y.mean())**2))) |
| if score < best_score: |
| best_score, best_feat, best_thresh = score, fi, t |
| left_mask = X[:, best_feat] <= best_thresh |
| return { |
| "leaf": False, "feat": best_feat, "thresh": best_thresh, |
| "left": self._build_tree(X[left_mask], y[left_mask], depth + 1), |
| "right": self._build_tree(X[~left_mask], y[~left_mask], depth + 1), |
| } |
|
|
| def _predict_tree(self, node, x): |
| if node["leaf"]: |
| return node["value"] |
| if x[node["feat"]] <= node["thresh"]: |
| return self._predict_tree(node["left"], x) |
| return self._predict_tree(node["right"], x) |
|
|
| def fit(self, X, y): |
| rng = np.random.default_rng(self.seed) |
| self.trees = [] |
| feat_counts = np.zeros(X.shape[1]) |
| for i in range(self.n_estimators): |
| idx = rng.choice(len(X), len(X), replace=True) |
| tree = self._build_tree(X[idx], y[idx]) |
| self.trees.append(tree) |
| self.feature_importances_ = np.ones(X.shape[1]) / X.shape[1] |
| return self |
|
|
| def predict_proba(self, X): |
| preds = np.array([[self._predict_tree(t, x) for t in self.trees] |
| for x in X]).mean(axis=1) |
| preds = np.clip(preds, 0, 1) |
| return np.column_stack([1 - preds, preds]) |
|
|
| def predict(self, X): |
| return (self.predict_proba(X)[:, 1] >= 0.5).astype(int) |
|
|
| def get_params(self, **kw): return {"n_estimators": self.n_estimators} |
|
|
|
|
| class _NumpyANN: |
| """Simple 2-layer ANN for classification (sigmoid activation).""" |
| def __init__(self, hidden=(16, 8), lr=0.001, n_iter=200, seed=42): |
| self.hidden = hidden; self.lr = lr; self.n_iter = n_iter; self.seed = seed |
| self.weights = []; self.biases = [] |
|
|
| def _sigmoid(self, z): return 1 / (1 + np.exp(-np.clip(z, -50, 50))) |
| def _dsigmoid(self, z): s = self._sigmoid(z); return s * (1 - s) |
|
|
| def fit(self, X, y): |
| rng = np.random.default_rng(self.seed) |
| layers = [X.shape[1]] + list(self.hidden) + [1] |
| self.weights = [rng.normal(0, np.sqrt(2/layers[i]), (layers[i], layers[i+1])) |
| for i in range(len(layers)-1)] |
| self.biases = [np.zeros(layers[i+1]) for i in range(len(layers)-1)] |
|
|
| y_ = y.reshape(-1, 1).astype(float) |
| for epoch in range(self.n_iter): |
| |
| acts = [X] |
| for W, b in zip(self.weights, self.biases): |
| acts.append(self._sigmoid(acts[-1] @ W + b)) |
| |
| delta = (acts[-1] - y_) * self._dsigmoid(acts[-1]) |
| for i in reversed(range(len(self.weights))): |
| grad_w = acts[i].T @ delta / len(y_) |
| grad_b = delta.mean(axis=0) |
| if i > 0: |
| delta = delta @ self.weights[i].T * self._dsigmoid(acts[i]) |
| self.weights[i] -= self.lr * grad_w |
| self.biases[i] -= self.lr * grad_b |
| return self |
|
|
| def predict_proba(self, X): |
| a = X |
| for W, b in zip(self.weights, self.biases): |
| a = self._sigmoid(a @ W + b) |
| p = a.flatten() |
| return np.column_stack([1 - p, p]) |
|
|
| def predict(self, X): |
| return (self.predict_proba(X)[:, 1] >= 0.5).astype(int) |
|
|
| def get_params(self, **kw): return {"hidden": self.hidden, "lr": self.lr} |
|
|
|
|
| |
|
|
| def _compute_metrics(model, X_test, y_test, model_name: str) -> dict: |
| """Compute classification metrics for a trained model.""" |
| try: |
| proba = model.predict_proba(X_test)[:, 1] |
| y_pred = model.predict(X_test) |
|
|
| if SKLEARN_OK: |
| fpr, tpr, _ = roc_curve(y_test, proba) |
| auc = roc_auc_score(y_test, proba) |
| else: |
| |
| pos_scores = proba[y_test == 1] |
| neg_scores = proba[y_test == 0] |
| n_pos, n_neg = len(pos_scores), len(neg_scores) |
| if n_pos == 0 or n_neg == 0: |
| auc = 0.5 |
| else: |
| |
| all_scores = np.concatenate([pos_scores, neg_scores]) |
| ranks = np.argsort(np.argsort(all_scores)) + 1 |
| pos_rank_sum = ranks[:n_pos].sum() |
| auc = float((pos_rank_sum - n_pos*(n_pos+1)/2) / (n_pos * n_neg)) |
| auc = np.clip(auc, 0, 1) |
| |
| thresholds = np.percentile(proba, np.linspace(0, 100, 100))[::-1] |
| fprs, tprs = [0.0], [0.0] |
| n_p, n_n = (y_test==1).sum(), (y_test==0).sum() |
| for t in thresholds: |
| p = (proba >= t).astype(int) |
| fprs.append(((p==1)&(y_test==0)).sum() / (n_n+1e-9)) |
| tprs.append(((p==1)&(y_test==1)).sum() / (n_p+1e-9)) |
| fprs.append(1.0); tprs.append(1.0) |
| fpr = np.array(fprs); tpr = np.array(tprs) |
|
|
| if SKLEARN_OK: |
| prec = precision_score(y_test, y_pred, zero_division=0) |
| rec = recall_score(y_test, y_pred, zero_division=0) |
| f1 = f1_score(y_test, y_pred, zero_division=0) |
| else: |
| tp = ((y_pred == 1) & (y_test == 1)).sum() |
| fp = ((y_pred == 1) & (y_test == 0)).sum() |
| fn = ((y_pred == 0) & (y_test == 1)).sum() |
| prec = tp / (tp + fp + 1e-9) |
| rec = tp / (tp + fn + 1e-9) |
| f1 = 2 * prec * rec / (prec + rec + 1e-9) |
|
|
| return { |
| "model": model_name, "auc": round(float(auc), 4), |
| "precision": round(float(prec), 4), |
| "recall": round(float(rec), 4), "f1": round(float(f1), 4), |
| "fpr": fpr.tolist(), "tpr": tpr.tolist(), |
| } |
| except Exception as e: |
| logger.error("Metrics error for %s: %s", model_name, e) |
| return {"model": model_name, "auc": 0.0} |
|
|
|
|
| |
|
|
| def train_classifier() -> dict: |
| """ |
| Train all candidate classifiers, select best by AUC, save artifacts. |
| |
| Returns |
| ------- |
| dict with best_model_name, all metrics, and selected model path |
| """ |
| logger.info("=" * 60) |
| logger.info("Training disruption classification models") |
| logger.info("=" * 60) |
|
|
| |
| df = pd.read_csv(PROCESSED_DIR / "features_classification.csv", low_memory=False) |
| logger.info("Loaded %d rows (raw)", len(df)) |
|
|
| |
| df = df.dropna(subset=[CLASSIFIER_TARGET]).reset_index(drop=True) |
| logger.info("Loaded %d rows after dropping NaN targets", len(df)) |
|
|
| |
| X = df[CLASSIFIER_FEATURES].fillna(0).values.astype(float) |
| y = df[CLASSIFIER_TARGET].fillna(0).values.astype(int) |
|
|
| n_train = int(len(X) * (1 - TEST_SIZE)) |
| X_train, X_test = X[:n_train], X[n_train:] |
| y_train, y_test = y[:n_train], y[n_train:] |
|
|
| |
| X_mean = X_train.mean(axis=0) |
| X_std = X_train.std(axis=0) + 1e-8 |
| X_train_n = (X_train - X_mean) / X_std |
| X_test_n = (X_test - X_mean) / X_std |
|
|
| logger.info("Train: %d rows | Test: %d rows | Positive rate: %.2f%%", |
| len(X_train), len(X_test), y_train.mean() * 100) |
|
|
| |
| if SKLEARN_OK: |
| candidates = { |
| "Logistic Regression": LogisticRegression( |
| random_state=RANDOM_SEED, max_iter=500, C=0.5), |
| "Random Forest": RandomForestClassifier( |
| n_estimators=100, max_depth=8, random_state=RANDOM_SEED), |
| "Gradient Boosting": GradientBoostingClassifier( |
| n_estimators=100, max_depth=4, learning_rate=0.1, |
| random_state=RANDOM_SEED), |
| "ANN": MLPClassifier( |
| hidden_layer_sizes=(16, 8), activation="logistic", |
| max_iter=500, random_state=RANDOM_SEED, learning_rate_init=0.001), |
| } |
| else: |
| candidates = { |
| "Logistic Regression": _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED), |
| "Random Forest": _NumpyRandomForest(n_estimators=30, max_depth=5, seed=RANDOM_SEED), |
| "ANN": _NumpyANN(hidden=(16, 8), lr=0.01, n_iter=100, seed=RANDOM_SEED), |
| } |
|
|
| |
| results = [] |
| trained_models = {} |
|
|
| for name, model in candidates.items(): |
| logger.info("Training: %s ...", name) |
| try: |
| model.fit(X_train_n, y_train) |
| metrics = _compute_metrics(model, X_test_n, y_test, name) |
| results.append(metrics) |
| trained_models[name] = (model, metrics) |
| logger.info(" AUC=%.4f | Precision=%.4f | Recall=%.4f | F1=%.4f", |
| metrics["auc"], metrics.get("precision", 0), |
| metrics.get("recall", 0), metrics.get("f1", 0)) |
| except Exception as e: |
| logger.error("Training failed for %s: %s", name, e) |
|
|
| if not results: |
| logger.error("All models failed — check data") |
| return {} |
|
|
| |
| |
| |
| def _balanced_score(r): |
| return 0.6 * r.get("auc", 0) + 0.4 * r.get("f1", 0) |
|
|
| best = max(results, key=_balanced_score) |
| best_name = best["model"] |
| best_model, _ = trained_models[best_name] |
| logger.info("Best model: %s (AUC=%.4f | F1=%.4f | balanced=%.4f)", |
| best_name, best["auc"], best.get("f1", 0), _balanced_score(best)) |
|
|
| |
| |
| grid_search_results = {} |
| if SKLEARN_OK: |
| _param_grids = { |
| "Logistic Regression": { |
| "C": [0.1, 0.5, 1.0, 5.0], "max_iter": [500], |
| }, |
| "Random Forest": { |
| "n_estimators": [50, 100, 200], "max_depth": [5, 8, 12], |
| }, |
| "Gradient Boosting": { |
| "n_estimators": [50, 100], "max_depth": [3, 4], |
| "learning_rate": [0.05, 0.1], |
| }, |
| "ANN": { |
| "hidden_layer_sizes": [(16, 8), (32, 16)], |
| "learning_rate_init": [0.001, 0.005], |
| }, |
| } |
| if best_name in _param_grids: |
| try: |
| from sklearn.model_selection import GridSearchCV as _GridSearchCV |
| _base_map = { |
| "Logistic Regression": LogisticRegression( |
| random_state=RANDOM_SEED, max_iter=500), |
| "Random Forest": RandomForestClassifier( |
| random_state=RANDOM_SEED), |
| "Gradient Boosting": GradientBoostingClassifier( |
| random_state=RANDOM_SEED), |
| "ANN": MLPClassifier( |
| activation="logistic", max_iter=500, random_state=RANDOM_SEED), |
| } |
| _gs = _GridSearchCV( |
| _base_map[best_name], _param_grids[best_name], |
| cv=3, scoring="roc_auc", n_jobs=-1, refit=True, |
| ) |
| _gs.fit(X_train_n, y_train) |
| _tuned = _gs.best_estimator_ |
| _tuned_metrics = _compute_metrics( |
| _tuned, X_test_n, y_test, f"{best_name} (Tuned)") |
| grid_search_results = { |
| "best_params": _gs.best_params_, |
| "best_cv_auc": round(float(_gs.best_score_), 4), |
| "tuned_test_auc": _tuned_metrics.get("auc", 0), |
| "tuned_test_f1": _tuned_metrics.get("f1", 0), |
| } |
| if _balanced_score(_tuned_metrics) > _balanced_score(best): |
| best_model = _tuned |
| best = _tuned_metrics |
| logger.info( |
| "GridSearchCV improved model: AUC=%.4f F1=%.4f params=%s", |
| _tuned_metrics["auc"], _tuned_metrics.get("f1", 0), |
| _gs.best_params_, |
| ) |
| else: |
| logger.info( |
| "GridSearchCV: existing model already optimal " |
| "(cv_auc=%.4f)", _gs.best_score_) |
| except Exception as _ge: |
| logger.warning("GridSearchCV failed (non-fatal): %s", _ge) |
|
|
| |
| feat_df = pd.DataFrame(X_test_n, columns=CLASSIFIER_FEATURES) |
| sensitivity = sensitivity_analysis(best_model, feat_df) |
| sens_path = METRICS_DIR / "sensitivity_classifier.csv" |
| sensitivity.to_csv(sens_path, index=False) |
|
|
| |
| _run_shap_classifier(best_model, best_name, X_train_n, X_test_n, CLASSIFIER_FEATURES) |
|
|
| |
| |
| wf_cv_results = {} |
| try: |
| from sklearn.base import clone as _clone |
| X_wfcv = pd.DataFrame(X, columns=CLASSIFIER_FEATURES) |
| y_wfcv = pd.Series(y, name=CLASSIFIER_TARGET) |
| def _best_factory(): |
| if SKLEARN_OK: |
| try: |
| return _clone(best_model) |
| except Exception: |
| pass |
| return _NumpyLogisticRegression(lr=0.05, n_iter=300, seed=RANDOM_SEED) |
| wf_cv_results = walk_forward_cv( |
| _best_factory, X_wfcv, y_wfcv, n_splits=5, task="classification") |
| logger.info( |
| "Walk-forward CV — mean_auc=%.4f ± %.4f | mean_f1=%.4f ± %.4f", |
| wf_cv_results.get("mean_auc") or 0, wf_cv_results.get("std_auc") or 0, |
| wf_cv_results.get("mean_f1") or 0, wf_cv_results.get("std_f1") or 0, |
| ) |
| except Exception as _wfe: |
| logger.warning("Walk-forward CV failed (non-fatal): %s", _wfe) |
|
|
| |
| |
| _optimal_threshold = 0.5 |
| try: |
| _probs_test = best_model.predict_proba(X_test_n)[:, 1] |
| _prob_min = float(_probs_test.min()) |
| _prob_max = float(_probs_test.max()) |
| if SKLEARN_OK: |
| from sklearn.metrics import roc_curve as _roc_curve |
| _fpr_arr, _tpr_arr, _thresh_arr = _roc_curve(y_test, _probs_test) |
| _optimal_threshold = float(_thresh_arr[np.argmax(_tpr_arr - _fpr_arr)]) |
| logger.info("Optimal decision threshold: %.4f (positive rate %.1f%%)", |
| _optimal_threshold, y_train.mean() * 100) |
| except Exception as _te: |
| logger.warning("Could not compute optimal threshold: %s", _te) |
| _prob_min, _prob_max = 0.0, 1.0 |
|
|
| |
| fi_data = {} |
| if hasattr(best_model, "feature_importances_"): |
| fi_data = dict(zip(CLASSIFIER_FEATURES, best_model.feature_importances_.tolist())) |
| elif hasattr(best_model, "coef_"): |
| fi_data = dict(zip(CLASSIFIER_FEATURES, np.abs(best_model.coef_[0]).tolist())) |
|
|
| |
| model_meta = { |
| "best_model": best_name, |
| "scaler_mean": X_mean.tolist(), |
| "scaler_std": X_std.tolist(), |
| "features": CLASSIFIER_FEATURES, |
| "target": CLASSIFIER_TARGET, |
| "all_models": results, |
| "feature_importance": fi_data, |
| "optimal_threshold": _optimal_threshold, |
| "prob_range": [_prob_min, _prob_max], |
| "trained_at": datetime.utcnow().isoformat(), |
| "n_train": int(len(X_train)), |
| "n_test": int(len(X_test)), |
| "positive_rate": float(y_train.mean()), |
| "grid_search": grid_search_results, |
| "walk_forward_cv": wf_cv_results, |
| } |
| save_metrics(model_meta, "classifier") |
| save_model(best_model, "best_classifier") |
| save_model({"mean": X_mean, "std": X_std}, "classifier_scaler") |
|
|
| |
| _plot_roc_curves(results) |
| _plot_feature_importance(sensitivity, best_name) |
|
|
| logger.info("✓ Classifier training complete. Best: %s AUC=%.4f", |
| best_name, best["auc"]) |
| return model_meta |
|
|
|
|
| def _plot_roc_curves(results: list): |
| """Plot ROC curves for all models.""" |
| fig, ax = plt.subplots(figsize=(8, 6)) |
| colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"] |
| for i, r in enumerate(results): |
| if "fpr" in r and "tpr" in r: |
| ax.plot(r["fpr"], r["tpr"], color=colors[i % len(colors)], |
| label=f"{r['model']} (AUC={r['auc']:.3f})", linewidth=2) |
| ax.plot([0, 1], [0, 1], "k--", alpha=0.5, label="Random") |
| ax.set_xlabel("False Positive Rate") |
| ax.set_ylabel("True Positive Rate") |
| ax.set_title("ROC Curves — Disruption Classifier") |
| ax.legend(loc="lower right") |
| ax.grid(True, alpha=0.3) |
| plt.tight_layout() |
| fig.savefig(FIGURES_DIR / "roc_curves_classifier.png", dpi=150) |
| plt.close(fig) |
| logger.info("Saved ROC curves plot") |
|
|
|
|
| def _plot_feature_importance(sensitivity_df: pd.DataFrame, model_name: str): |
| """Plot feature sensitivity / importance.""" |
| if sensitivity_df.empty: |
| return |
| fig, ax = plt.subplots(figsize=(8, 6)) |
| top = sensitivity_df.head(10) |
| bars = ax.barh(top["feature"][::-1], top["sensitivity"][::-1], color="#1f77b4") |
| ax.set_xlabel("Sensitivity (avg output change)") |
| ax.set_title(f"Feature Sensitivity Analysis — {model_name}") |
| ax.grid(True, alpha=0.3, axis="x") |
| plt.tight_layout() |
| fig.savefig(FIGURES_DIR / "feature_sensitivity_classifier.png", dpi=150) |
| plt.close(fig) |
| logger.info("Saved feature sensitivity plot") |
|
|
|
|
| def _run_shap_classifier(model, model_name: str, X_train_n, X_test_n, feature_names): |
| """Compute SHAP values for the best classifier and save outputs.""" |
| try: |
| import shap |
| X_train_df = pd.DataFrame(X_train_n, columns=feature_names) |
| X_test_df = pd.DataFrame(X_test_n, columns=feature_names) |
|
|
| |
| if hasattr(model, "feature_importances_"): |
| |
| explainer = shap.TreeExplainer(model) |
| shap_values = explainer.shap_values(X_test_df) |
| |
| if isinstance(shap_values, list) and len(shap_values) == 2: |
| shap_vals = shap_values[1] |
| else: |
| shap_vals = shap_values |
| else: |
| |
| background = shap.sample(X_train_df, min(50, len(X_train_df)), random_state=42) |
| if hasattr(model, "coef_"): |
| explainer = shap.LinearExplainer(model, background) |
| shap_vals = explainer.shap_values(X_test_df) |
| else: |
| def _predict_proba_pos(X): |
| return model.predict_proba(X)[:, 1] |
| explainer = shap.KernelExplainer(_predict_proba_pos, background) |
| shap_vals = explainer.shap_values(X_test_df, nsamples=100) |
|
|
| |
| mean_shap = np.abs(shap_vals).mean(axis=0) |
| shap_df = pd.DataFrame({ |
| "feature": feature_names, |
| "mean_abs_shap": mean_shap.tolist() |
| }).sort_values("mean_abs_shap", ascending=False) |
| shap_df.to_csv(METRICS_DIR / "shap_classifier.csv", index=False) |
|
|
| |
| fig, ax = plt.subplots(figsize=(8, 5)) |
| colors = ["#1A2B5E" if v > 0.001 else "#CBD5E1" for v in shap_df["mean_abs_shap"]] |
| ax.barh(shap_df["feature"][::-1], shap_df["mean_abs_shap"][::-1], color=colors[::-1]) |
| ax.set_xlabel("Mean |SHAP value| (impact on disruption probability)") |
| ax.set_title(f"SHAP Feature Importance — {model_name} (Classifier)") |
| ax.grid(True, alpha=0.3, axis="x") |
| plt.tight_layout() |
| fig.savefig(FIGURES_DIR / "shap_classifier.png", dpi=150) |
| plt.close(fig) |
| logger.info("✓ SHAP classifier: saved shap_classifier.csv + shap_classifier.png") |
|
|
| except Exception as e: |
| logger.warning("SHAP classifier failed (non-fatal): %s", e) |
|
|
|
|
| if __name__ == "__main__": |
| metrics = train_classifier() |
| print(f"\nBest model: {metrics.get('best_model')}") |
| print(f"AUC: {next((m['auc'] for m in metrics.get('all_models', []) if m['model'] == metrics.get('best_model')), 'N/A')}") |
|
|