math-backend / e2e_forecast_model.py
engineportf's picture
Upload folder using huggingface_hub
558db1e verified
Raw
History Blame Contribute Delete
19.1 kB
"""
End-to-End Portfolio Forecast Model
=====================================
A neural network that learns to predict expected returns optimised for
*portfolio construction quality*, not forecast accuracy.
Architecture
------------
Raw Features β†’ Per-Asset Encoder (shared MLP)
β†’ Cross-Sectional Multi-Head Attention
β†’ ΞΌ_predicted + vol_scale
β†’ DifferentiablePortfolioLayer β†’ w*
β†’ Loss = βˆ’Sharpe(w*, realised) | SPO+ regret
The cross-asset attention is the critical structural choice: it lets
AAPL's features affect TLT's predicted return because the downstream
optimizer cares about covariance, not just individual forecasts.
Papers
------
- Amos & Kolter (2017), OptNet.
- Agrawal et al. (2019), cvxpylayers.
- Elmachtoub & Grigas (2022), Smart Predict-then-Optimize (SPO+).
"""
import os
import hashlib
import pickle
import numpy as np
import pandas as pd
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
_TORCH_AVAILABLE = True
except ImportError:
_TORCH_AVAILABLE = False
class _MockTorch:
Tensor = object
torch = _MockTorch()
class _MockNN:
Module = object
nn = _MockNN()
DataLoader = object
TensorDataset = object
from typing import Tuple, Dict, Optional
from config import logger, Color
if _TORCH_AVAILABLE:
from differentiable_optimizer import DifferentiablePortfolioLayer
else:
DifferentiablePortfolioLayer = None
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 1. NEURAL ARCHITECTURE
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class PortfolioForecastNetwork(nn.Module):
"""
Predicts per-asset expected returns optimised for decision quality.
The key insight: the loss function is portfolio Sharpe ratio (or SPO+
regret), *not* MSE. Gradients flow backward through cvxpylayers into
the network weights.
"""
def __init__(self, n_assets: int, n_features: int,
hidden_dim: int = 64):
super().__init__()
self.n_assets = n_assets
# ── Per-asset temporal encoder (shared weights) ──
self.asset_encoder = nn.Sequential(
nn.Linear(n_features, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.GELU(),
)
# ── Cross-sectional attention ──
# Assets "talk" to each other so that predicting high AAPL return
# when MSFT is also high leads to diversification-aware weights.
self.cross_asset_attention = nn.MultiheadAttention(
embed_dim=hidden_dim // 2,
num_heads=4,
dropout=0.1,
batch_first=True,
)
# ── Output heads ──
self.return_head = nn.Linear(hidden_dim // 2, 1)
# Diagonal volatility scaling (Softplus β†’ positive)
self.vol_adjustment = nn.Sequential(
nn.Linear(hidden_dim // 2, 1),
nn.Softplus(),
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Args
----
x : (batch, n_assets, n_features)
Returns
-------
mu : (batch, n_assets) predicted expected returns
vol_scale : (batch, n_assets) predicted volatility scaling factors
"""
# Encode each asset independently (shared weights)
encoded = self.asset_encoder(x) # (B, N, H/2)
# Cross-asset attention + residual
attended, _ = self.cross_asset_attention(encoded, encoded, encoded)
attended = attended + encoded # (B, N, H/2)
mu = self.return_head(attended).squeeze(-1) # (B, N)
vol_scale = self.vol_adjustment(attended).squeeze(-1) # (B, N)
return mu, vol_scale
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 2. TRAINER
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class E2EPortfolioTrainer:
"""
Trains ``PortfolioForecastNetwork`` using portfolio performance as the
loss function, back-propagating through the differentiable optimisation
layer.
Loss functions
--------------
sharpe : maximise Sharpe ratio
spo : Smart Predict-then-Optimize+ regret (Elmachtoub & Grigas 2022)
"""
def __init__(self, n_assets: int, n_features: int,
risk_factor: float = 3.0,
loss_type: str = "spo",
hidden_dim: int = 64,
lr: float = 1e-3,
device: str = "cpu"):
self.device = torch.device(device)
self.loss_type = loss_type
self.n_assets = n_assets
self.n_features = n_features
self.forecast_net = PortfolioForecastNetwork(
n_assets, n_features, hidden_dim,
).to(self.device)
self.opt_layer = DifferentiablePortfolioLayer(
n_assets, risk_factor,
)
self.optimizer = torch.optim.AdamW(
self.forecast_net.parameters(),
lr=lr,
weight_decay=1e-4,
)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer, T_max=100,
)
# ------------------------------------------------------------------ #
# Loss functions #
# ------------------------------------------------------------------ #
def _portfolio_sharpe_loss(
self,
weights: torch.Tensor,
realized_returns: torch.Tensor,
rfr: float = 0.04,
trading_days: int = 252,
) -> torch.Tensor:
"""
Negative Sharpe ratio as training loss.
weights : (batch, n_assets)
realized_returns : (batch, horizon, n_assets)
"""
# Portfolio return at each step in the horizon (B, H)
port_rets = (realized_returns * weights.unsqueeze(1)).sum(dim=-1)
daily_rfr = rfr / trading_days
excess = port_rets - daily_rfr
mean_excess = excess.mean(dim=1)
std_excess = excess.std(dim=1) + 1e-8
sharpe = mean_excess / std_excess
return -sharpe.mean()
def _spo_loss(
self,
predicted_mu: torch.Tensor,
realized_returns: torch.Tensor,
L: torch.Tensor,
) -> torch.Tensor:
"""
SPO+ regret loss (Elmachtoub & Grigas 2022).
Penalises the gap between the realised portfolio value achieved by
predicted weights vs. the oracle weights (known only in hindsight).
"""
# Weights from predicted returns β€” routed through forward() so solver
# args stay consistent with any future changes to the opt layer.
predicted_weights = self.opt_layer(predicted_mu, L)
# Oracle weights β€” from realised returns (available only in training)
realized_mean = realized_returns.mean(dim=1).detach()
with torch.no_grad():
oracle_weights, = self.opt_layer.layer(
realized_mean, L,
solver_args={"solve_method": "SCS", "eps": 1e-4, "max_iters": 5000},
)
# Decision regret
oracle_value = (oracle_weights * realized_mean).sum(dim=-1)
predicted_value = (predicted_weights * realized_mean).sum(dim=-1)
regret = oracle_value - predicted_value
return regret.mean()
# ------------------------------------------------------------------ #
# Data preparation #
# ------------------------------------------------------------------ #
def build_feature_tensors(
self,
features_dict: Dict[str, pd.DataFrame],
returns_df: pd.DataFrame,
horizon: int = 21,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, list, pd.DatetimeIndex]:
"""
Converts the existing ``features_dict`` from
``data.build_ml_features()`` into aligned tensors for batch training.
Returns
-------
X : (T, n_assets, n_features)
y : (T, horizon, n_assets) realised forward returns
cov_L : (T, n_assets, n_assets) Cholesky of rolling EWMA covariance
tickers : list
common_idx : DatetimeIndex
"""
tickers = list(features_dict.keys())
n_assets = len(tickers)
# Align time indices across all assets
all_dfs = {t: features_dict[t].dropna() for t in tickers}
common_idx = all_dfs[tickers[0]].index
for t in tickers[1:]:
common_idx = common_idx.intersection(all_dfs[t].index)
common_idx = common_idx.sort_values()
feature_cols = [c for c in all_dfs[tickers[0]].columns
if c not in ("target", "ret")]
n_features = len(feature_cols)
# ── Build X and y arrays ──
X_list, y_list = [], []
for idx in common_idx:
asset_feats = []
asset_targets = []
for t in tickers:
row = all_dfs[t].loc[idx]
asset_feats.append(row[feature_cols].values.astype(np.float32))
asset_targets.append(float(row.get("target", 0.0)))
X_list.append(asset_feats)
y_list.append(asset_targets)
X = torch.tensor(np.array(X_list), dtype=torch.float32)
# y is a single-step target; reshape to (T, 1, N) for horizon dim
y = torch.tensor(np.array(y_list), dtype=torch.float32).unsqueeze(1)
# ── Rolling Cholesky of EWMA covariance ──
T = len(common_idx)
cov_L = torch.zeros(T, n_assets, n_assets, dtype=torch.float32)
# Use the actual returns aligned to this common index for EWMA cov
aligned_rets = returns_df[tickers].reindex(common_idx).fillna(0.0)
ewma_cov = aligned_rets.ewm(halflife=126).cov()
for i, date in enumerate(common_idx):
try:
cov_slice = ewma_cov.loc[date].values.reshape(n_assets, n_assets)
# Regularise + Cholesky
cov_reg = cov_slice + np.eye(n_assets) * 1e-6
L_np = np.linalg.cholesky(cov_reg).astype(np.float32)
cov_L[i] = torch.from_numpy(L_np)
except (np.linalg.LinAlgError, KeyError, ValueError):
cov_L[i] = torch.eye(n_assets, dtype=torch.float32)
return X, y, cov_L, tickers, common_idx
# ------------------------------------------------------------------ #
# Training loop #
# ------------------------------------------------------------------ #
def train(
self,
features_dict: Dict[str, pd.DataFrame],
returns_df: pd.DataFrame,
n_epochs: int = 50,
batch_size: int = 32,
val_split: float = 0.2,
rfr: float = 0.04,
silent: bool = False,
) -> Dict[str, list]:
"""Full training loop. Returns history dict."""
X, y, cov_L, tickers, idx = self.build_feature_tensors(
features_dict, returns_df,
)
T = X.shape[0]
if T < 10:
logger.warning(f"E2E: Only {T} aligned samples β€” too few to train. "
"Returning empty history.")
return {"train_loss": [], "val_loss": [], "val_sharpe": []}
train_end = int(T * (1 - val_split))
# Strict temporal split β€” no shuffling
X_train, X_val = X[:train_end], X[train_end:]
y_train, y_val = y[:train_end], y[train_end:]
L_train, L_val = cov_L[:train_end], cov_L[train_end:]
train_ds = TensorDataset(X_train, y_train, L_train)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=False)
history = {"train_loss": [], "val_loss": [], "val_sharpe": []}
for epoch in range(n_epochs):
self.forecast_net.train()
epoch_loss = 0.0
n_batches = 0
for X_batch, y_batch, L_batch in train_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
L_batch = L_batch.to(self.device)
self.optimizer.zero_grad()
# Forward: predict returns + vol scaling
mu_pred, vol_scale = self.forecast_net(X_batch)
# Scale Cholesky by predicted vol adjustment
L_scaled = L_batch * vol_scale.unsqueeze(-1)
# Differentiable portfolio weights
try:
weights = self.opt_layer(mu_pred, L_scaled)
except Exception as e:
logger.warning(f"E2E: solver failed in epoch {epoch}: {e}")
continue
# Loss
if self.loss_type == "spo":
loss = self._spo_loss(mu_pred, y_batch, L_scaled)
else: # "sharpe" or fallback
loss = self._portfolio_sharpe_loss(weights, y_batch, rfr)
if torch.isnan(loss) or torch.isinf(loss):
continue
loss.backward()
# Gradient clipping β€” critical for stability through opt layer
torch.nn.utils.clip_grad_norm_(
self.forecast_net.parameters(), max_norm=1.0,
)
self.optimizer.step()
epoch_loss += loss.item()
n_batches += 1
self.scheduler.step()
# ── Validation ──
self.forecast_net.eval()
with torch.no_grad():
mu_val, vol_val = self.forecast_net(X_val.to(self.device))
L_val_dev = L_val.to(self.device)
L_val_scaled = L_val_dev * vol_val.unsqueeze(-1)
try:
w_val = self.opt_layer(mu_val, L_val_scaled)
val_loss = self._portfolio_sharpe_loss(
w_val, y_val.to(self.device), rfr,
).item()
port_rets_val = (y_val.squeeze(1).to(self.device) * w_val).sum(-1)
val_sharpe = float(
(port_rets_val.mean() / (port_rets_val.std() + 1e-8))
* np.sqrt(252)
)
except Exception:
val_loss = float("nan")
val_sharpe = float("nan")
avg_train = epoch_loss / max(n_batches, 1)
history["train_loss"].append(avg_train)
history["val_loss"].append(val_loss)
history["val_sharpe"].append(val_sharpe)
if not silent and epoch % 10 == 0:
print(
f" E2E Epoch {epoch:3d} β”‚ "
f"Train Loss: {avg_train:+.4f} β”‚ "
f"Val Sharpe: {val_sharpe:+.3f}"
)
return history
# ------------------------------------------------------------------ #
# Inference #
# ------------------------------------------------------------------ #
def predict(
self,
features_dict: Dict[str, pd.DataFrame],
base_cov: pd.DataFrame,
) -> Tuple[pd.Series, pd.Series]:
"""
Run inference and return expected returns + suggested weights.
Integrates with the existing ``OptimizationResult`` interface.
"""
self.forecast_net.eval()
tickers = list(features_dict.keys())
n = len(tickers)
# Build latest feature vector
latest_features = []
feature_cols: Optional[list] = None
for t in tickers:
df = features_dict[t].dropna()
if feature_cols is None:
feature_cols = [c for c in df.columns
if c not in ("target", "ret")]
latest_features.append(
df[feature_cols].iloc[-1].values.astype(np.float32)
)
X = torch.tensor(
np.array(latest_features), dtype=torch.float32,
).unsqueeze(0) # (1, N, F)
# Cholesky of base covariance
Sigma = base_cov.reindex(index=tickers, columns=tickers).fillna(0).values
try:
L_np = np.linalg.cholesky(
Sigma + np.eye(n) * 1e-6
).astype(np.float32)
except np.linalg.LinAlgError:
L_np = np.eye(n, dtype=np.float32)
L = torch.tensor(L_np).unsqueeze(0) # (1, N, N)
with torch.no_grad():
mu_pred, vol_scale = self.forecast_net(X)
L_scaled = L * vol_scale.unsqueeze(-1)
weights = self.opt_layer(mu_pred, L_scaled)
mu_series = pd.Series(mu_pred.squeeze().numpy(), index=tickers)
w_series = pd.Series(weights.squeeze().numpy(), index=tickers)
return mu_series, w_series
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 3. CACHE HELPERS
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _cache_key(tickers: list, n_obs: int, rfr: float) -> str:
raw = f"{sorted(tickers)}_{n_obs}_{rfr:.6f}"
return hashlib.md5(raw.encode()).hexdigest()
def save_model(trainer: E2EPortfolioTrainer, cache_dir: str,
tickers: list, n_obs: int, rfr: float) -> str:
os.makedirs(cache_dir, exist_ok=True)
key = _cache_key(tickers, n_obs, rfr)
path = os.path.join(cache_dir, f"e2e_{key}.pkl")
torch.save(trainer.forecast_net.state_dict(), path)
return path
def load_model(trainer: E2EPortfolioTrainer, cache_dir: str,
tickers: list, n_obs: int, rfr: float) -> bool:
key = _cache_key(tickers, n_obs, rfr)
path = os.path.join(cache_dir, f"e2e_{key}.pkl")
if os.path.exists(path):
trainer.forecast_net.load_state_dict(torch.load(path, weights_only=True))
return True
return False