Spaces:
Sleeping
Sleeping
| """Regime-conditional stacking meta-learner using LightGBM. | |
| Trained ONLY on out-of-fold predictions from base models. Combines base model | |
| predictions, GNN embeddings, regime info, stock type, and trailing errors | |
| into a single ensemble prediction. | |
| """ | |
| import logging | |
| from typing import Optional | |
| import joblib | |
| import lightgbm as lgb | |
| import numpy as np | |
| import pandas as pd | |
| from src.models.base import PredictionResult | |
| logger = logging.getLogger(__name__) | |
| STOCK_TYPES = ["large_cap", "mid_cap", "small_cap", "penny", "etf", "reit"] | |
| class EnsembleMetaLearner: | |
| """Regime-conditional stacking meta-learner.""" | |
| def __init__(self, horizon: int = 5, **kwargs): | |
| self.horizon = horizon | |
| self.params = { | |
| "num_leaves": kwargs.get("num_leaves", 31), | |
| "learning_rate": kwargs.get("learning_rate", 0.05), | |
| "n_estimators": kwargs.get("n_estimators", 300), | |
| "feature_fraction": kwargs.get("feature_fraction", 0.8), | |
| "bagging_fraction": kwargs.get("bagging_fraction", 0.8), | |
| "bagging_freq": kwargs.get("bagging_freq", 5), | |
| "min_child_samples": kwargs.get("min_child_samples", 20), | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| "verbose": -1, | |
| } | |
| self.direction_meta = None | |
| self.magnitude_meta = None | |
| self.volatility_meta = None | |
| self._dir_map = {-1: 0, 0: 1, 1: 2} | |
| self._dir_inv = {0: -1, 1: 0, 2: 1} | |
| self.is_fitted = False | |
| def build_meta_features( | |
| self, | |
| base_predictions: dict[str, PredictionResult], | |
| gnn_embeddings: Optional[np.ndarray] = None, | |
| regime_info: Optional[dict] = None, | |
| stock_type: Optional[str] = None, | |
| trailing_errors: Optional[dict[str, float]] = None, | |
| ) -> pd.DataFrame: | |
| """Construct meta-feature matrix from base model outputs. | |
| Args: | |
| base_predictions: {model_name: PredictionResult} | |
| gnn_embeddings: (n_samples, embed_dim) array or None | |
| regime_info: {regime_labels: array, regime_probabilities: array} or None | |
| stock_type: one of 6 types, one-hot encoded | |
| trailing_errors: {model_name: recent_error} for dynamic weighting | |
| """ | |
| if not base_predictions: | |
| raise ValueError("base_predictions must not be empty") | |
| n_samples = None | |
| features = {} | |
| # Base model predictions as features | |
| for name, pred in base_predictions.items(): | |
| n_samples = len(pred.direction) | |
| features[f"{name}_direction"] = pred.direction | |
| features[f"{name}_magnitude"] = pred.magnitude | |
| features[f"{name}_volatility"] = pred.volatility | |
| features[f"{name}_confidence"] = pred.confidence | |
| # Direction probabilities (3 columns per model) | |
| for i in range(pred.direction_proba.shape[1]): | |
| features[f"{name}_dir_prob_{i}"] = pred.direction_proba[:, i] | |
| # GNN embeddings | |
| if gnn_embeddings is not None: | |
| for i in range(gnn_embeddings.shape[1]): | |
| features[f"gnn_emb_{i}"] = gnn_embeddings[:, i] | |
| # Regime info | |
| if regime_info is not None: | |
| if "regime_labels" in regime_info: | |
| features["regime_label"] = regime_info["regime_labels"] | |
| if "regime_probabilities" in regime_info: | |
| probs = regime_info["regime_probabilities"] | |
| if probs.ndim == 2: | |
| for i in range(probs.shape[1]): | |
| features[f"regime_prob_{i}"] = probs[:, i] | |
| # Stock type one-hot | |
| if stock_type is not None and n_samples is not None: | |
| for st in STOCK_TYPES: | |
| features[f"type_{st}"] = np.ones(n_samples) if st == stock_type else np.zeros(n_samples) | |
| # Trailing errors (broadcast to all samples as dynamic weight signal) | |
| if trailing_errors is not None and n_samples is not None: | |
| for name, error in trailing_errors.items(): | |
| features[f"{name}_trailing_error"] = np.full(n_samples, error) | |
| return pd.DataFrame(features) | |
| def fit( | |
| self, | |
| meta_X: pd.DataFrame, | |
| y: pd.DataFrame, | |
| ) -> "EnsembleMetaLearner": | |
| """Train meta-learner on OOF predictions only.""" | |
| dir_col = f"direction_{self.horizon}d" | |
| mag_col = f"magnitude_{self.horizon}d" | |
| vol_col = f"volatility_{self.horizon}d" | |
| callbacks = [lgb.log_evaluation(0)] | |
| # Direction meta-classifier | |
| y_dir = y[dir_col].fillna(0).astype(int).map(self._dir_map).values | |
| self.direction_meta = lgb.LGBMClassifier( | |
| **self.params, objective="multiclass", num_class=3 | |
| ) | |
| self.direction_meta.fit(meta_X, y_dir, callbacks=callbacks) | |
| # Magnitude meta-regressor | |
| self.magnitude_meta = lgb.LGBMRegressor( | |
| **self.params, objective="regression" | |
| ) | |
| self.magnitude_meta.fit(meta_X, y[mag_col].fillna(0), callbacks=callbacks) | |
| # Volatility meta-regressor | |
| self.volatility_meta = lgb.LGBMRegressor( | |
| **self.params, objective="regression" | |
| ) | |
| self.volatility_meta.fit(meta_X, y[vol_col].fillna(0), callbacks=callbacks) | |
| self.is_fitted = True | |
| return self | |
| def predict(self, meta_X: pd.DataFrame) -> PredictionResult: | |
| """Generate ensemble predictions.""" | |
| if not self.is_fitted: | |
| raise RuntimeError("Meta-learner not fitted") | |
| dir_proba = self.direction_meta.predict_proba(meta_X) | |
| direction = np.array([self._dir_inv[i] for i in np.argmax(dir_proba, axis=1)]) | |
| magnitude = self.magnitude_meta.predict(meta_X) | |
| volatility = self.volatility_meta.predict(meta_X) | |
| confidence = np.max(dir_proba, axis=1) | |
| return PredictionResult( | |
| direction=direction, | |
| direction_proba=dir_proba, | |
| magnitude=magnitude, | |
| volatility=volatility, | |
| confidence=confidence, | |
| ) | |
| def save(self, path: str) -> None: | |
| """Save meta-learner to disk.""" | |
| joblib.dump( | |
| { | |
| "direction_meta": self.direction_meta, | |
| "magnitude_meta": self.magnitude_meta, | |
| "volatility_meta": self.volatility_meta, | |
| "params": self.params, | |
| "horizon": self.horizon, | |
| }, | |
| path, | |
| ) | |
| def load(cls, path: str) -> "EnsembleMetaLearner": | |
| """Load meta-learner from disk.""" | |
| data = joblib.load(path) | |
| model = cls(horizon=data["horizon"]) | |
| model.direction_meta = data["direction_meta"] | |
| model.magnitude_meta = data["magnitude_meta"] | |
| model.volatility_meta = data["volatility_meta"] | |
| model.params = data["params"] | |
| model.is_fitted = True | |
| return model | |