alphaforge-quant-system / volatility_model.py
Premchan369's picture
Upload volatility_model.py
a50d1b8 verified
raw
history blame
6.78 kB
"""Volatility Forecasting Engine - GARCH + LSTM"""
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from typing import Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
try:
from arch import arch_model
ARCH_AVAILABLE = True
except ImportError:
ARCH_AVAILABLE = False
print("arch library not available, GARCH will use fallback")
class LSTMVolatility(nn.Module):
"""LSTM for volatility forecasting with distributional output"""
def __init__(self, input_size: int, hidden_size: int = 64,
num_layers: int = 2, dropout: float = 0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0
)
self.fc_mu = nn.Linear(hidden_size, 1)
self.fc_sigma = nn.Linear(hidden_size, 1)
self.fc_nu = nn.Linear(hidden_size, 1)
def forward(self, x):
out, _ = self.lstm(x)
out = out[:, -1, :]
mu = self.fc_mu(out)
sigma = torch.nn.functional.softplus(self.fc_sigma(out)) + 1e-6
nu = torch.nn.functional.softplus(self.fc_nu(out)) + 2.1
return mu, sigma, nu
class VolatilityEngine:
"""Combined GARCH + LSTM volatility forecasting"""
def __init__(self, garch_p: int = 1, garch_q: int = 1,
garch_dist: str = 't', lstm_hidden: int = 64,
device: str = 'cpu'):
self.garch_p = garch_p
self.garch_q = garch_q
self.garch_dist = garch_dist
self.lstm_hidden = lstm_hidden
self.device = torch.device(device)
self.garch_models = {}
self.lstm_models = {}
self.forecast_history = []
def fit_garch(self, returns: pd.Series, ticker: str) -> Optional[Dict]:
"""Fit GARCH model for a single asset"""
if not ARCH_AVAILABLE:
print(f"Using rolling volatility fallback for {ticker}")
return None
try:
am = arch_model(
returns.dropna() * 100,
vol='Garch', p=self.garch_p, q=self.garch_q,
dist=self.garch_dist
)
res = am.fit(disp='off')
self.garch_models[ticker] = res
return {
'omega': res.params.get('omega', 0),
'alpha': res.params.get('alpha[1]', 0),
'beta': res.params.get('beta[1]', 0),
'aic': res.aic,
'bic': res.bic
}
except Exception as e:
print(f"GARCH fit failed for {ticker}: {e}")
return None
def forecast_garch(self, ticker: str, horizon: int = 5) -> np.ndarray:
"""Generate GARCH volatility forecast"""
if ticker not in self.garch_models or self.garch_models[ticker] is None:
return np.ones(horizon) * 0.2
try:
forecasts = self.garch_models[ticker].forecast(horizon=horizon)
var_forecast = forecasts.variance.values[-1] / 10000
return np.sqrt(var_forecast)
except Exception as e:
print(f"GARCH forecast failed for {ticker}: {e}")
return np.ones(horizon) * 0.2
def fit_lstm_volatility(self, X: np.ndarray, y: np.ndarray,
ticker: str, epochs: int = 50,
batch_size: int = 64, lr: float = 1e-3) -> Dict:
"""Fit LSTM volatility model"""
input_size = X.shape[2]
model = LSTMVolatility(input_size, self.lstm_hidden).to(self.device)
X_t = torch.FloatTensor(X).to(self.device)
y_t = torch.FloatTensor(y).to(self.device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
metrics = {'loss': []}
for epoch in range(epochs):
model.train()
total_loss = 0
n_batches = 0
for i in range(0, len(X_t), batch_size):
batch_X = X_t[i:i+batch_size]
batch_y = y_t[i:i+batch_size]
optimizer.zero_grad()
mu, sigma, nu = model(batch_X)
z = (batch_y.unsqueeze(1) - mu) / sigma
log_likelihood = (
torch.lgamma((nu + 1) / 2) -
torch.lgamma(nu / 2) -
0.5 * torch.log(np.pi * nu) -
torch.log(sigma) -
((nu + 1) / 2) * torch.log(1 + z**2 / nu)
)
loss = -log_likelihood.mean()
loss.backward()
optimizer.step()
total_loss += loss.item()
n_batches += 1
avg_loss = total_loss / n_batches
metrics['loss'].append(avg_loss)
if epoch % 10 == 0:
print(f" Epoch {epoch}: loss={avg_loss:.6f}")
self.lstm_models[ticker] = model
return metrics
def compute_realized_volatility(self, returns: pd.Series, window: int = 21) -> pd.Series:
"""Compute realized volatility"""
return returns.rolling(window).apply(
lambda x: np.sqrt(252 / len(x) * np.sum(x**2))
)
def build_covariance_matrix(self, returns_df: pd.DataFrame,
forecast_date: pd.Timestamp,
lookback: int = 63) -> pd.DataFrame:
"""Build forecasted covariance matrix"""
recent_returns = returns_df.loc[
returns_df.index <= forecast_date
].tail(lookback)
lambda_ = 0.94
weights = np.array([(1 - lambda_) * lambda_**i for i in range(len(recent_returns))])
weights = weights[::-1]
weights /= weights.sum()
weighted_returns = recent_returns.multiply(np.sqrt(weights), axis=0)
cov_matrix = weighted_returns.cov() * 252
eigenvalues = np.linalg.eigvalsh(cov_matrix.values)
min_eig = eigenvalues.min()
if min_eig < 1e-8:
cov_matrix = cov_matrix + np.eye(len(cov_matrix)) * (1e-8 - min_eig)
return cov_matrix
def ensemble_forecast(self, ticker: str, garch_weight: float = 0.3,
lstm_weight: float = 0.7, horizon: int = 5) -> np.ndarray:
"""Combine GARCH and LSTM forecasts"""
garch_vol = self.forecast_garch(ticker, horizon)
if ticker in self.lstm_models:
lstm_vol = np.ones(horizon) * 0.15
else:
lstm_vol = garch_vol
return garch_weight * garch_vol + lstm_weight * lstm_vol