"""Regime-conditional stacking meta-learner using LightGBM. Trained ONLY on out-of-fold predictions from base models. Combines base model predictions, GNN embeddings, regime info, stock type, and trailing errors into a single ensemble prediction. """ import logging from typing import Optional import joblib import lightgbm as lgb import numpy as np import pandas as pd from src.models.base import PredictionResult logger = logging.getLogger(__name__) STOCK_TYPES = ["large_cap", "mid_cap", "small_cap", "penny", "etf", "reit"] class EnsembleMetaLearner: """Regime-conditional stacking meta-learner.""" def __init__(self, horizon: int = 5, **kwargs): self.horizon = horizon self.params = { "num_leaves": kwargs.get("num_leaves", 31), "learning_rate": kwargs.get("learning_rate", 0.05), "n_estimators": kwargs.get("n_estimators", 300), "feature_fraction": kwargs.get("feature_fraction", 0.8), "bagging_fraction": kwargs.get("bagging_fraction", 0.8), "bagging_freq": kwargs.get("bagging_freq", 5), "min_child_samples": kwargs.get("min_child_samples", 20), "random_state": 42, "n_jobs": -1, "verbose": -1, } self.direction_meta = None self.magnitude_meta = None self.volatility_meta = None self._dir_map = {-1: 0, 0: 1, 1: 2} self._dir_inv = {0: -1, 1: 0, 2: 1} self.is_fitted = False def build_meta_features( self, base_predictions: dict[str, PredictionResult], gnn_embeddings: Optional[np.ndarray] = None, regime_info: Optional[dict] = None, stock_type: Optional[str] = None, trailing_errors: Optional[dict[str, float]] = None, ) -> pd.DataFrame: """Construct meta-feature matrix from base model outputs. Args: base_predictions: {model_name: PredictionResult} gnn_embeddings: (n_samples, embed_dim) array or None regime_info: {regime_labels: array, regime_probabilities: array} or None stock_type: one of 6 types, one-hot encoded trailing_errors: {model_name: recent_error} for dynamic weighting """ if not base_predictions: raise ValueError("base_predictions must not be empty") n_samples = None features = {} # Base model predictions as features for name, pred in base_predictions.items(): n_samples = len(pred.direction) features[f"{name}_direction"] = pred.direction features[f"{name}_magnitude"] = pred.magnitude features[f"{name}_volatility"] = pred.volatility features[f"{name}_confidence"] = pred.confidence # Direction probabilities (3 columns per model) for i in range(pred.direction_proba.shape[1]): features[f"{name}_dir_prob_{i}"] = pred.direction_proba[:, i] # GNN embeddings if gnn_embeddings is not None: for i in range(gnn_embeddings.shape[1]): features[f"gnn_emb_{i}"] = gnn_embeddings[:, i] # Regime info if regime_info is not None: if "regime_labels" in regime_info: features["regime_label"] = regime_info["regime_labels"] if "regime_probabilities" in regime_info: probs = regime_info["regime_probabilities"] if probs.ndim == 2: for i in range(probs.shape[1]): features[f"regime_prob_{i}"] = probs[:, i] # Stock type one-hot if stock_type is not None and n_samples is not None: for st in STOCK_TYPES: features[f"type_{st}"] = np.ones(n_samples) if st == stock_type else np.zeros(n_samples) # Trailing errors (broadcast to all samples as dynamic weight signal) if trailing_errors is not None and n_samples is not None: for name, error in trailing_errors.items(): features[f"{name}_trailing_error"] = np.full(n_samples, error) return pd.DataFrame(features) def fit( self, meta_X: pd.DataFrame, y: pd.DataFrame, ) -> "EnsembleMetaLearner": """Train meta-learner on OOF predictions only.""" dir_col = f"direction_{self.horizon}d" mag_col = f"magnitude_{self.horizon}d" vol_col = f"volatility_{self.horizon}d" callbacks = [lgb.log_evaluation(0)] # Direction meta-classifier y_dir = y[dir_col].fillna(0).astype(int).map(self._dir_map).values self.direction_meta = lgb.LGBMClassifier( **self.params, objective="multiclass", num_class=3 ) self.direction_meta.fit(meta_X, y_dir, callbacks=callbacks) # Magnitude meta-regressor self.magnitude_meta = lgb.LGBMRegressor( **self.params, objective="regression" ) self.magnitude_meta.fit(meta_X, y[mag_col].fillna(0), callbacks=callbacks) # Volatility meta-regressor self.volatility_meta = lgb.LGBMRegressor( **self.params, objective="regression" ) self.volatility_meta.fit(meta_X, y[vol_col].fillna(0), callbacks=callbacks) self.is_fitted = True return self def predict(self, meta_X: pd.DataFrame) -> PredictionResult: """Generate ensemble predictions.""" if not self.is_fitted: raise RuntimeError("Meta-learner not fitted") dir_proba = self.direction_meta.predict_proba(meta_X) direction = np.array([self._dir_inv[i] for i in np.argmax(dir_proba, axis=1)]) magnitude = self.magnitude_meta.predict(meta_X) volatility = self.volatility_meta.predict(meta_X) confidence = np.max(dir_proba, axis=1) return PredictionResult( direction=direction, direction_proba=dir_proba, magnitude=magnitude, volatility=volatility, confidence=confidence, ) def save(self, path: str) -> None: """Save meta-learner to disk.""" joblib.dump( { "direction_meta": self.direction_meta, "magnitude_meta": self.magnitude_meta, "volatility_meta": self.volatility_meta, "params": self.params, "horizon": self.horizon, }, path, ) @classmethod def load(cls, path: str) -> "EnsembleMetaLearner": """Load meta-learner from disk.""" data = joblib.load(path) model = cls(horizon=data["horizon"]) model.direction_meta = data["direction_meta"] model.magnitude_meta = data["magnitude_meta"] model.volatility_meta = data["volatility_meta"] model.params = data["params"] model.is_fitted = True return model