| """Multi-Asset Alpha Model - Predicts expected returns using LSTM, Transformer, and XGBoost ensemble.""" |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| from sklearn.ensemble import GradientBoostingRegressor |
| from typing import Dict, Tuple, Optional |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class AlphaDataset(Dataset): |
| """PyTorch dataset for alpha model training""" |
| def __init__(self, X: np.ndarray, y: np.ndarray): |
| self.X = torch.FloatTensor(X) |
| self.y = torch.FloatTensor(y).unsqueeze(1) |
| |
| def __len__(self): |
| return len(self.X) |
| |
| def __getitem__(self, idx): |
| return self.X[idx], self.y[idx] |
|
|
|
|
| class LSTMAlpha(nn.Module): |
| """LSTM-based alpha model""" |
| def __init__(self, input_size: int, hidden_size: int = 128, |
| num_layers: int = 2, dropout: float = 0.2): |
| super().__init__() |
| self.lstm = nn.LSTM( |
| input_size, hidden_size, num_layers, |
| batch_first=True, dropout=dropout if num_layers > 1 else 0 |
| ) |
| self.dropout = nn.Dropout(dropout) |
| self.fc1 = nn.Linear(hidden_size, 64) |
| self.fc2 = nn.Linear(64, 1) |
| self.relu = nn.ReLU() |
| |
| def forward(self, x): |
| out, _ = self.lstm(x) |
| out = self.dropout(out[:, -1, :]) |
| out = self.relu(self.fc1(out)) |
| return self.fc2(out) |
|
|
|
|
| class TransformerAlpha(nn.Module): |
| """Transformer-based alpha model""" |
| def __init__(self, input_size: int, d_model: int = 128, |
| nhead: int = 4, num_layers: int = 2, dropout: float = 0.2): |
| super().__init__() |
| self.input_proj = nn.Linear(input_size, d_model) |
| encoder_layer = nn.TransformerEncoderLayer( |
| d_model=d_model, nhead=nhead, |
| dim_feedforward=d_model*4, dropout=dropout, |
| batch_first=True |
| ) |
| self.transformer = nn.TransformerEncoder(encoder_layer, num_layers) |
| self.dropout = nn.Dropout(dropout) |
| self.fc1 = nn.Linear(d_model, 64) |
| self.fc2 = nn.Linear(64, 1) |
| self.relu = nn.ReLU() |
| |
| def forward(self, x): |
| x = self.input_proj(x) |
| out = self.transformer(x) |
| out = self.dropout(out.mean(dim=1)) |
| out = self.relu(self.fc1(out)) |
| return self.fc2(out) |
|
|
|
|
| class XGBoostAlpha: |
| """XGBoost-based alpha model (using sklearn GradientBoosting)""" |
| def __init__(self, max_depth: int = 6, learning_rate: float = 0.05, |
| n_estimators: int = 200): |
| self.model = GradientBoostingRegressor( |
| max_depth=max_depth, |
| learning_rate=learning_rate, |
| n_estimators=n_estimators, |
| subsample=0.8, |
| random_state=42 |
| ) |
| |
| def fit(self, X: np.ndarray, y: np.ndarray): |
| """X should be flattened: (n_samples, lookback * features)""" |
| n_samples = X.shape[0] |
| X_flat = X.reshape(n_samples, -1) |
| self.model.fit(X_flat, y) |
| return self |
| |
| def predict(self, X: np.ndarray) -> np.ndarray: |
| n_samples = X.shape[0] |
| X_flat = X.reshape(n_samples, -1) |
| return self.model.predict(X_flat) |
| |
| def feature_importances(self) -> np.ndarray: |
| return self.model.feature_importances_ |
|
|
|
|
| class AlphaEnsemble: |
| """Ensemble of LSTM, Transformer, and XGBoost alpha models""" |
| |
| def __init__(self, input_size: int, seq_len: int, |
| lstm_hidden: int = 128, lstm_layers: int = 2, |
| trans_d_model: int = 128, trans_nhead: int = 4, trans_layers: int = 2, |
| xgb_depth: int = 6, xgb_lr: float = 0.05, xgb_estimators: int = 200, |
| weights: Optional[Dict[str, float]] = None, |
| device: str = 'cpu'): |
| self.device = torch.device(device) |
| self.seq_len = seq_len |
| self.input_size = input_size |
| |
| |
| self.lstm = LSTMAlpha(input_size, lstm_hidden, lstm_layers).to(self.device) |
| self.transformer = TransformerAlpha(input_size, trans_d_model, |
| trans_nhead, trans_layers).to(self.device) |
| self.xgboost = XGBoostAlpha(xgb_depth, xgb_lr, xgb_estimators) |
| |
| |
| self.weights = weights or {'lstm': 0.3, 'transformer': 0.3, 'xgboost': 0.4} |
| |
| self.is_fitted = False |
| self.ic_history = [] |
| self.feature_drift_history = [] |
| |
| def fit(self, X_train: np.ndarray, y_train: np.ndarray, |
| X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, |
| epochs: int = 50, batch_size: int = 64, lr: float = 1e-4) -> Dict: |
| """Train all models""" |
| |
| |
| print("Training LSTM alpha model...") |
| lstm_metrics = self._train_nn(self.lstm, X_train, y_train, |
| X_val, y_val, epochs, batch_size, lr) |
| |
| |
| print("Training Transformer alpha model...") |
| trans_metrics = self._train_nn(self.transformer, X_train, y_train, |
| X_val, y_val, epochs, batch_size, lr) |
| |
| |
| print("Training XGBoost alpha model...") |
| self.xgboost.fit(X_train, y_train) |
| xgb_pred = self.xgboost.predict(X_val) if X_val is not None else None |
| xgb_ic = self._compute_ic(xgb_pred, y_val) if xgb_pred is not None else None |
| |
| self.is_fitted = True |
| |
| return { |
| 'lstm': lstm_metrics, |
| 'transformer': trans_metrics, |
| 'xgboost': {'ic': xgb_ic} |
| } |
| |
| def _train_nn(self, model: nn.Module, X_train: np.ndarray, y_train: np.ndarray, |
| X_val: Optional[np.ndarray], y_val: Optional[np.ndarray], |
| epochs: int, batch_size: int, lr: float) -> Dict: |
| """Train a neural network model""" |
| train_dataset = AlphaDataset(X_train, y_train) |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| |
| optimizer = torch.optim.Adam(model.parameters(), lr=lr) |
| criterion = nn.MSELoss() |
| |
| metrics = {'train_loss': [], 'val_loss': [], 'val_ic': []} |
| |
| for epoch in range(epochs): |
| model.train() |
| epoch_loss = 0 |
| for X_batch, y_batch in train_loader: |
| X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device) |
| optimizer.zero_grad() |
| pred = model(X_batch) |
| loss = criterion(pred, y_batch) |
| loss.backward() |
| optimizer.step() |
| epoch_loss += loss.item() |
| |
| metrics['train_loss'].append(epoch_loss / len(train_loader)) |
| |
| |
| if X_val is not None and y_val is not None: |
| model.eval() |
| with torch.no_grad(): |
| X_val_t = torch.FloatTensor(X_val).to(self.device) |
| val_pred = model(X_val_t).cpu().numpy().flatten() |
| val_loss = np.mean((val_pred - y_val) ** 2) |
| val_ic = self._compute_ic(val_pred, y_val) |
| metrics['val_loss'].append(val_loss) |
| metrics['val_ic'].append(val_ic) |
| |
| if epoch % 10 == 0: |
| print(f" Epoch {epoch}: train_loss={metrics['train_loss'][-1]:.6f}, " |
| f"val_loss={val_loss:.6f}, val_ic={val_ic:.4f}") |
| |
| return metrics |
| |
| def predict(self, X: np.ndarray) -> np.ndarray: |
| """Generate ensemble predictions""" |
| if not self.is_fitted: |
| raise ValueError("Models must be fitted before prediction") |
| |
| |
| self.lstm.eval() |
| with torch.no_grad(): |
| X_t = torch.FloatTensor(X).to(self.device) |
| lstm_pred = self.lstm(X_t).cpu().numpy().flatten() |
| |
| |
| self.transformer.eval() |
| with torch.no_grad(): |
| trans_pred = self.transformer(X_t).cpu().numpy().flatten() |
| |
| |
| xgb_pred = self.xgboost.predict(X) |
| |
| |
| ensemble = (self.weights['lstm'] * lstm_pred + |
| self.weights['transformer'] * trans_pred + |
| self.weights['xgboost'] * xgb_pred) |
| |
| return ensemble |
| |
| def _compute_ic(self, pred: np.ndarray, actual: np.ndarray) -> float: |
| """Compute Information Coefficient (rank correlation)""" |
| if pred is None or len(pred) < 10: |
| return 0.0 |
| mask = ~(np.isnan(pred) | np.isnan(actual)) |
| if mask.sum() < 10: |
| return 0.0 |
| from scipy.stats import spearmanr |
| ic, _ = spearmanr(pred[mask], actual[mask]) |
| return ic if not np.isnan(ic) else 0.0 |
| |
| def track_ic(self, pred: np.ndarray, actual: np.ndarray): |
| """Track IC over time""" |
| ic = self._compute_ic(pred, actual) |
| self.ic_history.append(ic) |
| return ic |
| |
| def track_feature_drift(self, X_current: np.ndarray, X_reference: np.ndarray): |
| """Track feature importance drift using XGBoost""" |
| current_imp = self.xgboost.feature_importances() |
| |
| |
| ref_model = XGBoostAlpha() |
| ref_model.fit(X_reference, np.zeros(len(X_reference))) |
| ref_imp = ref_model.feature_importances() |
| |
| |
| drift = np.sum(np.abs(current_imp - ref_imp)) |
| self.feature_drift_history.append(drift) |
| return drift |
|
|