"""Multi-Asset Alpha Model - Predicts expected returns using LSTM, Transformer, and XGBoost ensemble.""" import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from sklearn.ensemble import GradientBoostingRegressor from typing import Dict, Tuple, Optional import warnings warnings.filterwarnings('ignore') class AlphaDataset(Dataset): """PyTorch dataset for alpha model training""" def __init__(self, X: np.ndarray, y: np.ndarray): self.X = torch.FloatTensor(X) self.y = torch.FloatTensor(y).unsqueeze(1) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] class LSTMAlpha(nn.Module): """LSTM-based alpha model""" def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2, dropout: float = 0.2): super().__init__() self.lstm = nn.LSTM( input_size, hidden_size, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0 ) self.dropout = nn.Dropout(dropout) self.fc1 = nn.Linear(hidden_size, 64) self.fc2 = nn.Linear(64, 1) self.relu = nn.ReLU() def forward(self, x): out, _ = self.lstm(x) out = self.dropout(out[:, -1, :]) out = self.relu(self.fc1(out)) return self.fc2(out) class TransformerAlpha(nn.Module): """Transformer-based alpha model""" def __init__(self, input_size: int, d_model: int = 128, nhead: int = 4, num_layers: int = 2, dropout: float = 0.2): super().__init__() self.input_proj = nn.Linear(input_size, d_model) encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=d_model*4, dropout=dropout, batch_first=True ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers) self.dropout = nn.Dropout(dropout) self.fc1 = nn.Linear(d_model, 64) self.fc2 = nn.Linear(64, 1) self.relu = nn.ReLU() def forward(self, x): x = self.input_proj(x) out = self.transformer(x) out = self.dropout(out.mean(dim=1)) out = self.relu(self.fc1(out)) return self.fc2(out) class XGBoostAlpha: """XGBoost-based alpha model (using sklearn GradientBoosting)""" def __init__(self, max_depth: int = 6, learning_rate: float = 0.05, n_estimators: int = 200): self.model = GradientBoostingRegressor( max_depth=max_depth, learning_rate=learning_rate, n_estimators=n_estimators, subsample=0.8, random_state=42 ) def fit(self, X: np.ndarray, y: np.ndarray): """X should be flattened: (n_samples, lookback * features)""" n_samples = X.shape[0] X_flat = X.reshape(n_samples, -1) self.model.fit(X_flat, y) return self def predict(self, X: np.ndarray) -> np.ndarray: n_samples = X.shape[0] X_flat = X.reshape(n_samples, -1) return self.model.predict(X_flat) def feature_importances(self) -> np.ndarray: return self.model.feature_importances_ class AlphaEnsemble: """Ensemble of LSTM, Transformer, and XGBoost alpha models""" def __init__(self, input_size: int, seq_len: int, lstm_hidden: int = 128, lstm_layers: int = 2, trans_d_model: int = 128, trans_nhead: int = 4, trans_layers: int = 2, xgb_depth: int = 6, xgb_lr: float = 0.05, xgb_estimators: int = 200, weights: Optional[Dict[str, float]] = None, device: str = 'cpu'): self.device = torch.device(device) self.seq_len = seq_len self.input_size = input_size # Models self.lstm = LSTMAlpha(input_size, lstm_hidden, lstm_layers).to(self.device) self.transformer = TransformerAlpha(input_size, trans_d_model, trans_nhead, trans_layers).to(self.device) self.xgboost = XGBoostAlpha(xgb_depth, xgb_lr, xgb_estimators) # Weights self.weights = weights or {'lstm': 0.3, 'transformer': 0.3, 'xgboost': 0.4} self.is_fitted = False self.ic_history = [] self.feature_drift_history = [] def fit(self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, epochs: int = 50, batch_size: int = 64, lr: float = 1e-4) -> Dict: """Train all models""" # Train LSTM print("Training LSTM alpha model...") lstm_metrics = self._train_nn(self.lstm, X_train, y_train, X_val, y_val, epochs, batch_size, lr) # Train Transformer print("Training Transformer alpha model...") trans_metrics = self._train_nn(self.transformer, X_train, y_train, X_val, y_val, epochs, batch_size, lr) # Train XGBoost print("Training XGBoost alpha model...") self.xgboost.fit(X_train, y_train) xgb_pred = self.xgboost.predict(X_val) if X_val is not None else None xgb_ic = self._compute_ic(xgb_pred, y_val) if xgb_pred is not None else None self.is_fitted = True return { 'lstm': lstm_metrics, 'transformer': trans_metrics, 'xgboost': {'ic': xgb_ic} } def _train_nn(self, model: nn.Module, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray], y_val: Optional[np.ndarray], epochs: int, batch_size: int, lr: float) -> Dict: """Train a neural network model""" train_dataset = AlphaDataset(X_train, y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) optimizer = torch.optim.Adam(model.parameters(), lr=lr) criterion = nn.MSELoss() metrics = {'train_loss': [], 'val_loss': [], 'val_ic': []} for epoch in range(epochs): model.train() epoch_loss = 0 for X_batch, y_batch in train_loader: X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device) optimizer.zero_grad() pred = model(X_batch) loss = criterion(pred, y_batch) loss.backward() optimizer.step() epoch_loss += loss.item() metrics['train_loss'].append(epoch_loss / len(train_loader)) # Validation if X_val is not None and y_val is not None: model.eval() with torch.no_grad(): X_val_t = torch.FloatTensor(X_val).to(self.device) val_pred = model(X_val_t).cpu().numpy().flatten() val_loss = np.mean((val_pred - y_val) ** 2) val_ic = self._compute_ic(val_pred, y_val) metrics['val_loss'].append(val_loss) metrics['val_ic'].append(val_ic) if epoch % 10 == 0: print(f" Epoch {epoch}: train_loss={metrics['train_loss'][-1]:.6f}, " f"val_loss={val_loss:.6f}, val_ic={val_ic:.4f}") return metrics def predict(self, X: np.ndarray) -> np.ndarray: """Generate ensemble predictions""" if not self.is_fitted: raise ValueError("Models must be fitted before prediction") # LSTM prediction self.lstm.eval() with torch.no_grad(): X_t = torch.FloatTensor(X).to(self.device) lstm_pred = self.lstm(X_t).cpu().numpy().flatten() # Transformer prediction self.transformer.eval() with torch.no_grad(): trans_pred = self.transformer(X_t).cpu().numpy().flatten() # XGBoost prediction xgb_pred = self.xgboost.predict(X) # Weighted ensemble ensemble = (self.weights['lstm'] * lstm_pred + self.weights['transformer'] * trans_pred + self.weights['xgboost'] * xgb_pred) return ensemble def _compute_ic(self, pred: np.ndarray, actual: np.ndarray) -> float: """Compute Information Coefficient (rank correlation)""" if pred is None or len(pred) < 10: return 0.0 mask = ~(np.isnan(pred) | np.isnan(actual)) if mask.sum() < 10: return 0.0 from scipy.stats import spearmanr ic, _ = spearmanr(pred[mask], actual[mask]) return ic if not np.isnan(ic) else 0.0 def track_ic(self, pred: np.ndarray, actual: np.ndarray): """Track IC over time""" ic = self._compute_ic(pred, actual) self.ic_history.append(ic) return ic def track_feature_drift(self, X_current: np.ndarray, X_reference: np.ndarray): """Track feature importance drift using XGBoost""" current_imp = self.xgboost.feature_importances() # Fit reference model ref_model = XGBoostAlpha() ref_model.fit(X_reference, np.zeros(len(X_reference))) ref_imp = ref_model.feature_importances() # JS divergence between importance distributions drift = np.sum(np.abs(current_imp - ref_imp)) self.feature_drift_history.append(drift) return drift