alphaforge-quant-system / alpha_model.py
Premchan369's picture
Upload alpha_model.py
958d6b7 verified
"""Multi-Asset Alpha Model - Predicts expected returns using LSTM, Transformer, and XGBoost ensemble."""
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.ensemble import GradientBoostingRegressor
from typing import Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
class AlphaDataset(Dataset):
"""PyTorch dataset for alpha model training"""
def __init__(self, X: np.ndarray, y: np.ndarray):
self.X = torch.FloatTensor(X)
self.y = torch.FloatTensor(y).unsqueeze(1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
class LSTMAlpha(nn.Module):
"""LSTM-based alpha model"""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 2, dropout: float = 0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc1 = nn.Linear(hidden_size, 64)
self.fc2 = nn.Linear(64, 1)
self.relu = nn.ReLU()
def forward(self, x):
out, _ = self.lstm(x)
out = self.dropout(out[:, -1, :])
out = self.relu(self.fc1(out))
return self.fc2(out)
class TransformerAlpha(nn.Module):
"""Transformer-based alpha model"""
def __init__(self, input_size: int, d_model: int = 128,
nhead: int = 4, num_layers: int = 2, dropout: float = 0.2):
super().__init__()
self.input_proj = nn.Linear(input_size, d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead,
dim_feedforward=d_model*4, dropout=dropout,
batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
self.dropout = nn.Dropout(dropout)
self.fc1 = nn.Linear(d_model, 64)
self.fc2 = nn.Linear(64, 1)
self.relu = nn.ReLU()
def forward(self, x):
x = self.input_proj(x)
out = self.transformer(x)
out = self.dropout(out.mean(dim=1))
out = self.relu(self.fc1(out))
return self.fc2(out)
class XGBoostAlpha:
"""XGBoost-based alpha model (using sklearn GradientBoosting)"""
def __init__(self, max_depth: int = 6, learning_rate: float = 0.05,
n_estimators: int = 200):
self.model = GradientBoostingRegressor(
max_depth=max_depth,
learning_rate=learning_rate,
n_estimators=n_estimators,
subsample=0.8,
random_state=42
)
def fit(self, X: np.ndarray, y: np.ndarray):
"""X should be flattened: (n_samples, lookback * features)"""
n_samples = X.shape[0]
X_flat = X.reshape(n_samples, -1)
self.model.fit(X_flat, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
n_samples = X.shape[0]
X_flat = X.reshape(n_samples, -1)
return self.model.predict(X_flat)
def feature_importances(self) -> np.ndarray:
return self.model.feature_importances_
class AlphaEnsemble:
"""Ensemble of LSTM, Transformer, and XGBoost alpha models"""
def __init__(self, input_size: int, seq_len: int,
lstm_hidden: int = 128, lstm_layers: int = 2,
trans_d_model: int = 128, trans_nhead: int = 4, trans_layers: int = 2,
xgb_depth: int = 6, xgb_lr: float = 0.05, xgb_estimators: int = 200,
weights: Optional[Dict[str, float]] = None,
device: str = 'cpu'):
self.device = torch.device(device)
self.seq_len = seq_len
self.input_size = input_size
# Models
self.lstm = LSTMAlpha(input_size, lstm_hidden, lstm_layers).to(self.device)
self.transformer = TransformerAlpha(input_size, trans_d_model,
trans_nhead, trans_layers).to(self.device)
self.xgboost = XGBoostAlpha(xgb_depth, xgb_lr, xgb_estimators)
# Weights
self.weights = weights or {'lstm': 0.3, 'transformer': 0.3, 'xgboost': 0.4}
self.is_fitted = False
self.ic_history = []
self.feature_drift_history = []
def fit(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None,
epochs: int = 50, batch_size: int = 64, lr: float = 1e-4) -> Dict:
"""Train all models"""
# Train LSTM
print("Training LSTM alpha model...")
lstm_metrics = self._train_nn(self.lstm, X_train, y_train,
X_val, y_val, epochs, batch_size, lr)
# Train Transformer
print("Training Transformer alpha model...")
trans_metrics = self._train_nn(self.transformer, X_train, y_train,
X_val, y_val, epochs, batch_size, lr)
# Train XGBoost
print("Training XGBoost alpha model...")
self.xgboost.fit(X_train, y_train)
xgb_pred = self.xgboost.predict(X_val) if X_val is not None else None
xgb_ic = self._compute_ic(xgb_pred, y_val) if xgb_pred is not None else None
self.is_fitted = True
return {
'lstm': lstm_metrics,
'transformer': trans_metrics,
'xgboost': {'ic': xgb_ic}
}
def _train_nn(self, model: nn.Module, X_train: np.ndarray, y_train: np.ndarray,
X_val: Optional[np.ndarray], y_val: Optional[np.ndarray],
epochs: int, batch_size: int, lr: float) -> Dict:
"""Train a neural network model"""
train_dataset = AlphaDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
metrics = {'train_loss': [], 'val_loss': [], 'val_ic': []}
for epoch in range(epochs):
model.train()
epoch_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device)
optimizer.zero_grad()
pred = model(X_batch)
loss = criterion(pred, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
metrics['train_loss'].append(epoch_loss / len(train_loader))
# Validation
if X_val is not None and y_val is not None:
model.eval()
with torch.no_grad():
X_val_t = torch.FloatTensor(X_val).to(self.device)
val_pred = model(X_val_t).cpu().numpy().flatten()
val_loss = np.mean((val_pred - y_val) ** 2)
val_ic = self._compute_ic(val_pred, y_val)
metrics['val_loss'].append(val_loss)
metrics['val_ic'].append(val_ic)
if epoch % 10 == 0:
print(f" Epoch {epoch}: train_loss={metrics['train_loss'][-1]:.6f}, "
f"val_loss={val_loss:.6f}, val_ic={val_ic:.4f}")
return metrics
def predict(self, X: np.ndarray) -> np.ndarray:
"""Generate ensemble predictions"""
if not self.is_fitted:
raise ValueError("Models must be fitted before prediction")
# LSTM prediction
self.lstm.eval()
with torch.no_grad():
X_t = torch.FloatTensor(X).to(self.device)
lstm_pred = self.lstm(X_t).cpu().numpy().flatten()
# Transformer prediction
self.transformer.eval()
with torch.no_grad():
trans_pred = self.transformer(X_t).cpu().numpy().flatten()
# XGBoost prediction
xgb_pred = self.xgboost.predict(X)
# Weighted ensemble
ensemble = (self.weights['lstm'] * lstm_pred +
self.weights['transformer'] * trans_pred +
self.weights['xgboost'] * xgb_pred)
return ensemble
def _compute_ic(self, pred: np.ndarray, actual: np.ndarray) -> float:
"""Compute Information Coefficient (rank correlation)"""
if pred is None or len(pred) < 10:
return 0.0
mask = ~(np.isnan(pred) | np.isnan(actual))
if mask.sum() < 10:
return 0.0
from scipy.stats import spearmanr
ic, _ = spearmanr(pred[mask], actual[mask])
return ic if not np.isnan(ic) else 0.0
def track_ic(self, pred: np.ndarray, actual: np.ndarray):
"""Track IC over time"""
ic = self._compute_ic(pred, actual)
self.ic_history.append(ic)
return ic
def track_feature_drift(self, X_current: np.ndarray, X_reference: np.ndarray):
"""Track feature importance drift using XGBoost"""
current_imp = self.xgboost.feature_importances()
# Fit reference model
ref_model = XGBoostAlpha()
ref_model.fit(X_reference, np.zeros(len(X_reference)))
ref_imp = ref_model.feature_importances()
# JS divergence between importance distributions
drift = np.sum(np.abs(current_imp - ref_imp))
self.feature_drift_history.append(drift)
return drift