"""Volatility Forecasting Engine - GARCH + LSTM""" import numpy as np import pandas as pd import torch import torch.nn as nn from typing import Dict, Tuple, Optional import warnings warnings.filterwarnings('ignore') try: from arch import arch_model ARCH_AVAILABLE = True except ImportError: ARCH_AVAILABLE = False print("arch library not available, GARCH will use fallback") class LSTMVolatility(nn.Module): """LSTM for volatility forecasting with distributional output""" def __init__(self, input_size: int, hidden_size: int = 64, num_layers: int = 2, dropout: float = 0.2): super().__init__() self.lstm = nn.LSTM( input_size, hidden_size, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0 ) self.fc_mu = nn.Linear(hidden_size, 1) self.fc_sigma = nn.Linear(hidden_size, 1) self.fc_nu = nn.Linear(hidden_size, 1) def forward(self, x): out, _ = self.lstm(x) out = out[:, -1, :] mu = self.fc_mu(out) sigma = torch.nn.functional.softplus(self.fc_sigma(out)) + 1e-6 nu = torch.nn.functional.softplus(self.fc_nu(out)) + 2.1 return mu, sigma, nu class VolatilityEngine: """Combined GARCH + LSTM volatility forecasting""" def __init__(self, garch_p: int = 1, garch_q: int = 1, garch_dist: str = 't', lstm_hidden: int = 64, device: str = 'cpu'): self.garch_p = garch_p self.garch_q = garch_q self.garch_dist = garch_dist self.lstm_hidden = lstm_hidden self.device = torch.device(device) self.garch_models = {} self.lstm_models = {} self.forecast_history = [] def fit_garch(self, returns: pd.Series, ticker: str) -> Optional[Dict]: """Fit GARCH model for a single asset""" if not ARCH_AVAILABLE: print(f"Using rolling volatility fallback for {ticker}") return None try: am = arch_model( returns.dropna() * 100, vol='Garch', p=self.garch_p, q=self.garch_q, dist=self.garch_dist ) res = am.fit(disp='off') self.garch_models[ticker] = res return { 'omega': res.params.get('omega', 0), 'alpha': res.params.get('alpha[1]', 0), 'beta': res.params.get('beta[1]', 0), 'aic': res.aic, 'bic': res.bic } except Exception as e: print(f"GARCH fit failed for {ticker}: {e}") return None def forecast_garch(self, ticker: str, horizon: int = 5) -> np.ndarray: """Generate GARCH volatility forecast""" if ticker not in self.garch_models or self.garch_models[ticker] is None: return np.ones(horizon) * 0.2 try: forecasts = self.garch_models[ticker].forecast(horizon=horizon) var_forecast = forecasts.variance.values[-1] / 10000 return np.sqrt(var_forecast) except Exception as e: print(f"GARCH forecast failed for {ticker}: {e}") return np.ones(horizon) * 0.2 def fit_lstm_volatility(self, X: np.ndarray, y: np.ndarray, ticker: str, epochs: int = 50, batch_size: int = 64, lr: float = 1e-3) -> Dict: """Fit LSTM volatility model""" input_size = X.shape[2] model = LSTMVolatility(input_size, self.lstm_hidden).to(self.device) X_t = torch.FloatTensor(X).to(self.device) y_t = torch.FloatTensor(y).to(self.device) optimizer = torch.optim.Adam(model.parameters(), lr=lr) metrics = {'loss': []} for epoch in range(epochs): model.train() total_loss = 0 n_batches = 0 for i in range(0, len(X_t), batch_size): batch_X = X_t[i:i+batch_size] batch_y = y_t[i:i+batch_size] optimizer.zero_grad() mu, sigma, nu = model(batch_X) z = (batch_y.unsqueeze(1) - mu) / sigma log_likelihood = ( torch.lgamma((nu + 1) / 2) - torch.lgamma(nu / 2) - 0.5 * torch.log(np.pi * nu) - torch.log(sigma) - ((nu + 1) / 2) * torch.log(1 + z**2 / nu) ) loss = -log_likelihood.mean() loss.backward() optimizer.step() total_loss += loss.item() n_batches += 1 avg_loss = total_loss / n_batches metrics['loss'].append(avg_loss) if epoch % 10 == 0: print(f" Epoch {epoch}: loss={avg_loss:.6f}") self.lstm_models[ticker] = model return metrics def compute_realized_volatility(self, returns: pd.Series, window: int = 21) -> pd.Series: """Compute realized volatility""" return returns.rolling(window).apply( lambda x: np.sqrt(252 / len(x) * np.sum(x**2)) ) def build_covariance_matrix(self, returns_df: pd.DataFrame, forecast_date: pd.Timestamp, lookback: int = 63) -> pd.DataFrame: """Build forecasted covariance matrix""" recent_returns = returns_df.loc[ returns_df.index <= forecast_date ].tail(lookback) lambda_ = 0.94 weights = np.array([(1 - lambda_) * lambda_**i for i in range(len(recent_returns))]) weights = weights[::-1] weights /= weights.sum() weighted_returns = recent_returns.multiply(np.sqrt(weights), axis=0) cov_matrix = weighted_returns.cov() * 252 eigenvalues = np.linalg.eigvalsh(cov_matrix.values) min_eig = eigenvalues.min() if min_eig < 1e-8: cov_matrix = cov_matrix + np.eye(len(cov_matrix)) * (1e-8 - min_eig) return cov_matrix def ensemble_forecast(self, ticker: str, garch_weight: float = 0.3, lstm_weight: float = 0.7, horizon: int = 5) -> np.ndarray: """Combine GARCH and LSTM forecasts""" garch_vol = self.forecast_garch(ticker, horizon) if ticker in self.lstm_models: lstm_vol = np.ones(horizon) * 0.15 else: lstm_vol = garch_vol return garch_weight * garch_vol + lstm_weight * lstm_vol