| """ |
| Phase 8 β Explainability Engine (SHAP + ElasticNet Coefficients) |
| |
| Consumes outputs from Phase 6 (trained models + preprocessors + test data). |
| No model retraining. No feature engineering. |
| |
| Produces: |
| data/explainability/ |
| shap_global_xgb_{asset}.csv - mean |SHAP| per feature (XGBoost) |
| shap_global_en_{asset}.csv - mean |SHAP| per feature (ElasticNet) |
| shap_values_xgb_{asset}.csv - full SHAP matrix, test set (XGBoost) |
| shap_values_en_{asset}.csv - full SHAP matrix, test set (ElasticNet) |
| elastic_net_coefficients_{asset}.csv - actual EN coefficients from pkl model |
| feature_importance_comparison.csv - unified table: XGB gain | SHAP | EN coeff |
| latest_prediction_explanation.json - per-asset local explanation, most recent row |
| explainability_report.json - full summary |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| import shap |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| ASSETS = ["spx", "ndx", "gold", "btc"] |
| MONTHS_PER_YEAR = 12 |
|
|
|
|
| class _Timer: |
| def __init__(self) -> None: |
| self._t0 = time.time() |
|
|
| def log(self, tag: str, msg: str) -> None: |
| elapsed = time.time() - self._t0 |
| m, s = divmod(int(elapsed), 60) |
| h, m = divmod(m, 60) |
| logger.info(f"[{h:02d}:{m:02d}:{s:02d}] [{tag}] {msg}") |
|
|
|
|
| class ExplainabilityEngine: |
| """ |
| Phase 8: SHAP-based explainability + ElasticNet coefficient analysis. |
| |
| Loads Phase 6 pkl models + preprocessors, computes SHAP values on the |
| out-of-sample test set, and extracts EN coefficients from the model objects. |
| """ |
|
|
| def __init__(self, backend_root: str | Path) -> None: |
| self.root = Path(backend_root) |
| self.timer = _Timer() |
|
|
| self.models_dir = self.root / "models" / "phase6" |
| self.cache_dir = self.models_dir / "training_cache" |
| self.output_dir = self.root / "data" / "explainability" |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
|
|
| def run(self) -> dict: |
| self.timer.log("START", "Phase 8 Explainability started") |
|
|
| report: dict = {"assets": {}, "cross_asset_summary": {}} |
| comparison_rows: List[dict] = [] |
|
|
| for asset in ASSETS: |
| self.timer.log(asset.upper(), f"Processing {asset.upper()}") |
| result = self._process_asset(asset) |
| report["assets"][asset] = result |
| comparison_rows.extend(result.pop("_comparison_rows")) |
|
|
| |
| comparison_df = pd.DataFrame(comparison_rows) |
| comparison_df.to_csv(self.output_dir / "feature_importance_comparison.csv", index=False) |
| self.timer.log("COMPARE", f"Feature importance comparison saved ({len(comparison_df)} rows)") |
|
|
| |
| report["cross_asset_summary"] = self._cross_asset_summary(comparison_df) |
|
|
| |
| with open(self.output_dir / "explainability_report.json", "w", encoding="utf-8") as f: |
| json.dump(report, f, indent=2, default=str) |
|
|
| self.timer.log("END", "Phase 8 completed successfully") |
| return report |
|
|
| |
|
|
| def _process_asset(self, asset: str) -> dict: |
| |
| feature_cols = self._load_feature_columns(asset) |
| test_df = self._load_test_snapshot(asset) |
| X_test_raw, y_test = self._split_xy(test_df, asset) |
|
|
| |
| en_model, en_pre = self._load_model(asset, "elastic_net") |
| X_test_en = en_pre.transform(X_test_raw) |
| transformed_cols = self._get_transformed_columns(asset) |
|
|
| en_coeffs_df = self._extract_en_coefficients(en_model, transformed_cols, asset) |
| shap_en_df, shap_en_global = self._compute_shap_linear( |
| en_model, en_pre, X_test_raw, transformed_cols, asset |
| ) |
|
|
| |
| xgb_model, xgb_pre = self._load_model(asset, "xgboost") |
| X_test_xgb = xgb_pre.transform(X_test_raw) |
|
|
| shap_xgb_df, shap_xgb_global = self._compute_shap_tree( |
| xgb_model, X_test_xgb, transformed_cols, asset |
| ) |
|
|
| xgb_gain_df = self._load_xgb_gain_importance(asset) |
|
|
| |
| local_explanation = self._explain_latest_prediction( |
| asset, X_test_raw, y_test, |
| en_model, en_pre, shap_en_df, |
| xgb_model, xgb_pre, shap_xgb_df, |
| transformed_cols, |
| ) |
|
|
| |
| comparison_rows = self._build_comparison_rows( |
| asset, transformed_cols, shap_xgb_global, shap_en_global, en_coeffs_df, xgb_gain_df |
| ) |
|
|
| self.timer.log(asset.upper(), "Done") |
|
|
| return { |
| "en_active_features": int((en_coeffs_df["coefficient"] != 0).sum()), |
| "en_total_features": len(en_coeffs_df), |
| "xgb_top_feature": shap_xgb_global.iloc[0]["feature"] if not shap_xgb_global.empty else None, |
| "en_top_feature": shap_en_global.iloc[0]["feature"] if not shap_en_global.empty and shap_en_global.iloc[0]["mean_abs_shap"] > 0 else "all_zero", |
| "latest_explanation": local_explanation, |
| "_comparison_rows": comparison_rows, |
| } |
|
|
| |
|
|
| def _compute_shap_tree( |
| self, |
| model, |
| X_transformed: np.ndarray, |
| feature_names: List[str], |
| asset: str, |
| ) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| explainer = shap.TreeExplainer(model) |
| shap_values = explainer.shap_values(X_transformed) |
|
|
| shap_df = pd.DataFrame(shap_values, columns=feature_names) |
| shap_df.to_csv(self.output_dir / f"shap_values_xgb_{asset}.csv", index=False) |
|
|
| global_df = pd.DataFrame({ |
| "feature": feature_names, |
| "mean_abs_shap": np.abs(shap_values).mean(axis=0), |
| }).sort_values("mean_abs_shap", ascending=False).reset_index(drop=True) |
|
|
| global_df.to_csv(self.output_dir / f"shap_global_xgb_{asset}.csv", index=False) |
| self.timer.log(asset.upper(), f"XGBoost SHAP computed β top feature: {global_df.iloc[0]['feature']} ({global_df.iloc[0]['mean_abs_shap']:.5f})") |
|
|
| return shap_df, global_df |
|
|
| |
|
|
| def _compute_shap_linear( |
| self, |
| model, |
| preprocessor, |
| X_raw: pd.DataFrame, |
| feature_names: List[str], |
| asset: str, |
| ) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| X_transformed = preprocessor.transform(X_raw) |
|
|
| |
| explainer = shap.LinearExplainer(model, X_transformed, feature_perturbation="interventional") |
| shap_values = explainer.shap_values(X_transformed) |
|
|
| shap_df = pd.DataFrame(shap_values, columns=feature_names) |
| shap_df.to_csv(self.output_dir / f"shap_values_en_{asset}.csv", index=False) |
|
|
| global_df = pd.DataFrame({ |
| "feature": feature_names, |
| "mean_abs_shap": np.abs(shap_values).mean(axis=0), |
| }).sort_values("mean_abs_shap", ascending=False).reset_index(drop=True) |
|
|
| global_df.to_csv(self.output_dir / f"shap_global_en_{asset}.csv", index=False) |
| top = global_df.iloc[0] |
| self.timer.log(asset.upper(), f"ElasticNet SHAP computed β top feature: {top['feature']} ({top['mean_abs_shap']:.5f})") |
|
|
| return shap_df, global_df |
|
|
| |
|
|
| def _extract_en_coefficients( |
| self, |
| model, |
| feature_names: List[str], |
| asset: str, |
| ) -> pd.DataFrame: |
| """ |
| Extract actual coefficients from the ElasticNetCV pkl object. |
| The saved CSV has all-zeros due to heavy regularization β the pkl has the truth. |
| """ |
| coef = np.asarray(model.coef_).flatten() |
| intercept = float(model.intercept_) if hasattr(model, "intercept_") else 0.0 |
|
|
| df = pd.DataFrame({ |
| "feature": feature_names, |
| "coefficient": coef, |
| "abs_coefficient": np.abs(coef), |
| "direction": np.where(coef > 0, "positive", np.where(coef < 0, "negative", "zero")), |
| }).sort_values("abs_coefficient", ascending=False).reset_index(drop=True) |
|
|
| df.attrs["intercept"] = intercept |
|
|
| |
| intercept_row = pd.DataFrame([{ |
| "feature": "__intercept__", |
| "coefficient": intercept, |
| "abs_coefficient": abs(intercept), |
| "direction": "positive" if intercept >= 0 else "negative", |
| }]) |
| df = pd.concat([df, intercept_row], ignore_index=True) |
|
|
| df.to_csv(self.output_dir / f"elastic_net_coefficients_{asset}.csv", index=False) |
| n_active = int((np.abs(coef) > 0).sum()) |
| self.timer.log(asset.upper(), f"EN coefficients extracted β {n_active}/{len(coef)} active, intercept={intercept:.6f}") |
|
|
| return df |
|
|
| |
|
|
| def _explain_latest_prediction( |
| self, |
| asset: str, |
| X_test_raw: pd.DataFrame, |
| y_test: pd.Series, |
| en_model, en_pre, |
| shap_en_df: pd.DataFrame, |
| xgb_model, xgb_pre, |
| shap_xgb_df: pd.DataFrame, |
| feature_names: List[str], |
| ) -> dict: |
| """ |
| Explain the most recent (last row of test set) prediction for each model. |
| Returns top-5 drivers from each model. |
| """ |
| last_idx = -1 |
|
|
| |
| raw_values = X_test_raw.iloc[last_idx].to_dict() |
|
|
| |
| X_last_en = en_pre.transform(X_test_raw.iloc[[last_idx]]) |
| en_pred = float(en_model.predict(X_last_en)[0]) |
| en_shap_last = shap_en_df.iloc[last_idx].to_dict() |
| en_top5 = sorted(en_shap_last.items(), key=lambda x: abs(x[1]), reverse=True)[:5] |
|
|
| |
| X_last_xgb = xgb_pre.transform(X_test_raw.iloc[[last_idx]]) |
| xgb_pred = float(xgb_model.predict(X_last_xgb)[0]) |
| xgb_shap_last = shap_xgb_df.iloc[last_idx].to_dict() |
| xgb_top5 = sorted(xgb_shap_last.items(), key=lambda x: abs(x[1]), reverse=True)[:5] |
|
|
| actual = float(y_test.iloc[last_idx]) |
|
|
| explanation = { |
| "asset": asset, |
| "actual_return": round(actual, 6), |
| "elastic_net": { |
| "prediction": round(en_pred, 6), |
| "error": round(en_pred - actual, 6), |
| "top_drivers": [ |
| {"feature": f, "shap_value": round(v, 6), "direction": "bullish" if v > 0 else "bearish"} |
| for f, v in en_top5 |
| ], |
| }, |
| "xgboost": { |
| "prediction": round(xgb_pred, 6), |
| "error": round(xgb_pred - actual, 6), |
| "top_drivers": [ |
| {"feature": f, "shap_value": round(v, 6), "direction": "bullish" if v > 0 else "bearish"} |
| for f, v in xgb_top5 |
| ], |
| }, |
| "raw_feature_snapshot": {k: round(float(v), 6) if isinstance(v, (int, float, np.floating)) else str(v) |
| for k, v in raw_values.items()}, |
| } |
|
|
| return explanation |
|
|
| |
|
|
| def _cross_asset_summary(self, comparison_df: pd.DataFrame) -> dict: |
| """Average XGB SHAP importance across all assets β universal drivers.""" |
| if "xgb_mean_abs_shap" not in comparison_df.columns: |
| return {} |
|
|
| avg = ( |
| comparison_df.groupby("feature")["xgb_mean_abs_shap"] |
| .mean() |
| .sort_values(ascending=False) |
| .reset_index() |
| ) |
| avg.columns = ["feature", "avg_xgb_shap_across_assets"] |
| avg.to_csv(self.output_dir / "cross_asset_shap_importance.csv", index=False) |
|
|
| return { |
| "top5_universal_drivers": avg.head(5).to_dict(orient="records"), |
| "note": "Average XGBoost SHAP importance across SPX, NDX, Gold", |
| } |
|
|
| |
|
|
| def _build_comparison_rows( |
| self, |
| asset: str, |
| feature_names: List[str], |
| shap_xgb_global: pd.DataFrame, |
| shap_en_global: pd.DataFrame, |
| en_coeffs_df: pd.DataFrame, |
| xgb_gain_df: pd.DataFrame, |
| ) -> List[dict]: |
| |
| xgb_shap_map = dict(zip(shap_xgb_global["feature"], shap_xgb_global["mean_abs_shap"])) |
| en_shap_map = dict(zip(shap_en_global["feature"], shap_en_global["mean_abs_shap"])) |
| en_coeff_map = {} |
| for _, row in en_coeffs_df.iterrows(): |
| if row["feature"] != "__intercept__": |
| en_coeff_map[row["feature"]] = row["coefficient"] |
|
|
| xgb_gain_map = {} |
| if xgb_gain_df is not None: |
| xgb_gain_map = dict(zip(xgb_gain_df["feature"], xgb_gain_df["importance"])) |
|
|
| rows = [] |
| for feat in feature_names: |
| rows.append({ |
| "asset": asset, |
| "feature": feat, |
| "xgb_gain": round(xgb_gain_map.get(feat, 0.0), 6), |
| "xgb_mean_abs_shap": round(xgb_shap_map.get(feat, 0.0), 6), |
| "en_mean_abs_shap": round(en_shap_map.get(feat, 0.0), 6), |
| "en_coefficient": round(en_coeff_map.get(feat, 0.0), 6), |
| "en_direction": ("positive" if en_coeff_map.get(feat, 0.0) > 0 |
| else "negative" if en_coeff_map.get(feat, 0.0) < 0 |
| else "zero"), |
| }) |
| return rows |
|
|
| |
|
|
| def _load_model(self, asset: str, model_type: str): |
| if model_type == "elastic_net": |
| model_path = self.models_dir / f"elastic_net_{asset}.pkl" |
| pre_path = self.models_dir / f"elastic_net_{asset}_preprocessor.pkl" |
| else: |
| model_path = self.models_dir / f"xgb_{asset}.pkl" |
| pre_path = self.models_dir / f"xgb_{asset}_preprocessor.pkl" |
|
|
| model = joblib.load(model_path) |
| pre = joblib.load(pre_path) |
| return model, pre |
|
|
| def _load_test_snapshot(self, asset: str) -> pd.DataFrame: |
| path = self.cache_dir / f"test_snapshot_{asset}.csv" |
| df = pd.read_csv(path) |
| df["date"] = pd.to_datetime(df["date"], format="mixed") |
| return df.sort_values("date").reset_index(drop=True) |
|
|
| def _load_feature_columns(self, asset: str) -> List[str]: |
| path = self.models_dir / f"elastic_net_{asset}_feature_columns.json" |
| with open(path, encoding="utf-8") as f: |
| data = json.load(f) |
| return data["raw_feature_columns"] |
|
|
| def _get_transformed_columns(self, asset: str) -> List[str]: |
| path = self.models_dir / f"elastic_net_{asset}_feature_columns.json" |
| with open(path, encoding="utf-8") as f: |
| data = json.load(f) |
| return data["transformed_feature_columns"] |
|
|
| def _split_xy(self, df: pd.DataFrame, asset: str) -> Tuple[pd.DataFrame, pd.Series]: |
| target_col = f"{asset}_target" |
| raw_cols = self._load_feature_columns(asset) |
|
|
| |
| drop_cols = [c for c in df.columns if c not in raw_cols and c != target_col] |
| X = df[raw_cols].copy() |
| y = df[target_col].copy() |
| return X, y |
|
|
| def _load_xgb_gain_importance(self, asset: str) -> Optional[pd.DataFrame]: |
| path = self.models_dir / "feature_importance" / f"xgb_{asset}_importance.csv" |
| if path.exists(): |
| return pd.read_csv(path) |
| return None |
|
|