| """Volatility Forecasting Engine - GARCH + LSTM""" |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from typing import Dict, Tuple, Optional |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| try: |
| from arch import arch_model |
| ARCH_AVAILABLE = True |
| except ImportError: |
| ARCH_AVAILABLE = False |
| print("arch library not available, GARCH will use fallback") |
|
|
|
|
| class LSTMVolatility(nn.Module): |
| """LSTM for volatility forecasting with distributional output""" |
| def __init__(self, input_size: int, hidden_size: int = 64, |
| num_layers: int = 2, dropout: float = 0.2): |
| super().__init__() |
| self.lstm = nn.LSTM( |
| input_size, hidden_size, num_layers, |
| batch_first=True, dropout=dropout if num_layers > 1 else 0 |
| ) |
| self.fc_mu = nn.Linear(hidden_size, 1) |
| self.fc_sigma = nn.Linear(hidden_size, 1) |
| self.fc_nu = nn.Linear(hidden_size, 1) |
| |
| def forward(self, x): |
| out, _ = self.lstm(x) |
| out = out[:, -1, :] |
| mu = self.fc_mu(out) |
| sigma = torch.nn.functional.softplus(self.fc_sigma(out)) + 1e-6 |
| nu = torch.nn.functional.softplus(self.fc_nu(out)) + 2.1 |
| return mu, sigma, nu |
|
|
|
|
| class VolatilityEngine: |
| """Combined GARCH + LSTM volatility forecasting""" |
| |
| def __init__(self, garch_p: int = 1, garch_q: int = 1, |
| garch_dist: str = 't', lstm_hidden: int = 64, |
| device: str = 'cpu'): |
| self.garch_p = garch_p |
| self.garch_q = garch_q |
| self.garch_dist = garch_dist |
| self.lstm_hidden = lstm_hidden |
| self.device = torch.device(device) |
| self.garch_models = {} |
| self.lstm_models = {} |
| self.forecast_history = [] |
| |
| def fit_garch(self, returns: pd.Series, ticker: str) -> Optional[Dict]: |
| """Fit GARCH model for a single asset""" |
| if not ARCH_AVAILABLE: |
| print(f"Using rolling volatility fallback for {ticker}") |
| return None |
| |
| try: |
| am = arch_model( |
| returns.dropna() * 100, |
| vol='Garch', p=self.garch_p, q=self.garch_q, |
| dist=self.garch_dist |
| ) |
| res = am.fit(disp='off') |
| self.garch_models[ticker] = res |
| |
| return { |
| 'omega': res.params.get('omega', 0), |
| 'alpha': res.params.get('alpha[1]', 0), |
| 'beta': res.params.get('beta[1]', 0), |
| 'aic': res.aic, |
| 'bic': res.bic |
| } |
| except Exception as e: |
| print(f"GARCH fit failed for {ticker}: {e}") |
| return None |
| |
| def forecast_garch(self, ticker: str, horizon: int = 5) -> np.ndarray: |
| """Generate GARCH volatility forecast""" |
| if ticker not in self.garch_models or self.garch_models[ticker] is None: |
| return np.ones(horizon) * 0.2 |
| |
| try: |
| forecasts = self.garch_models[ticker].forecast(horizon=horizon) |
| var_forecast = forecasts.variance.values[-1] / 10000 |
| return np.sqrt(var_forecast) |
| except Exception as e: |
| print(f"GARCH forecast failed for {ticker}: {e}") |
| return np.ones(horizon) * 0.2 |
| |
| def fit_lstm_volatility(self, X: np.ndarray, y: np.ndarray, |
| ticker: str, epochs: int = 50, |
| batch_size: int = 64, lr: float = 1e-3) -> Dict: |
| """Fit LSTM volatility model""" |
| input_size = X.shape[2] |
| model = LSTMVolatility(input_size, self.lstm_hidden).to(self.device) |
| |
| X_t = torch.FloatTensor(X).to(self.device) |
| y_t = torch.FloatTensor(y).to(self.device) |
| |
| optimizer = torch.optim.Adam(model.parameters(), lr=lr) |
| metrics = {'loss': []} |
| |
| for epoch in range(epochs): |
| model.train() |
| total_loss = 0 |
| n_batches = 0 |
| |
| for i in range(0, len(X_t), batch_size): |
| batch_X = X_t[i:i+batch_size] |
| batch_y = y_t[i:i+batch_size] |
| |
| optimizer.zero_grad() |
| mu, sigma, nu = model(batch_X) |
| |
| z = (batch_y.unsqueeze(1) - mu) / sigma |
| log_likelihood = ( |
| torch.lgamma((nu + 1) / 2) - |
| torch.lgamma(nu / 2) - |
| 0.5 * torch.log(np.pi * nu) - |
| torch.log(sigma) - |
| ((nu + 1) / 2) * torch.log(1 + z**2 / nu) |
| ) |
| loss = -log_likelihood.mean() |
| |
| loss.backward() |
| optimizer.step() |
| |
| total_loss += loss.item() |
| n_batches += 1 |
| |
| avg_loss = total_loss / n_batches |
| metrics['loss'].append(avg_loss) |
| |
| if epoch % 10 == 0: |
| print(f" Epoch {epoch}: loss={avg_loss:.6f}") |
| |
| self.lstm_models[ticker] = model |
| return metrics |
| |
| def compute_realized_volatility(self, returns: pd.Series, window: int = 21) -> pd.Series: |
| """Compute realized volatility""" |
| return returns.rolling(window).apply( |
| lambda x: np.sqrt(252 / len(x) * np.sum(x**2)) |
| ) |
| |
| def build_covariance_matrix(self, returns_df: pd.DataFrame, |
| forecast_date: pd.Timestamp, |
| lookback: int = 63) -> pd.DataFrame: |
| """Build forecasted covariance matrix""" |
| recent_returns = returns_df.loc[ |
| returns_df.index <= forecast_date |
| ].tail(lookback) |
| |
| lambda_ = 0.94 |
| weights = np.array([(1 - lambda_) * lambda_**i for i in range(len(recent_returns))]) |
| weights = weights[::-1] |
| weights /= weights.sum() |
| |
| weighted_returns = recent_returns.multiply(np.sqrt(weights), axis=0) |
| cov_matrix = weighted_returns.cov() * 252 |
| |
| eigenvalues = np.linalg.eigvalsh(cov_matrix.values) |
| min_eig = eigenvalues.min() |
| if min_eig < 1e-8: |
| cov_matrix = cov_matrix + np.eye(len(cov_matrix)) * (1e-8 - min_eig) |
| |
| return cov_matrix |
| |
| def ensemble_forecast(self, ticker: str, garch_weight: float = 0.3, |
| lstm_weight: float = 0.7, horizon: int = 5) -> np.ndarray: |
| """Combine GARCH and LSTM forecasts""" |
| garch_vol = self.forecast_garch(ticker, horizon) |
| |
| if ticker in self.lstm_models: |
| lstm_vol = np.ones(horizon) * 0.15 |
| else: |
| lstm_vol = garch_vol |
| |
| return garch_weight * garch_vol + lstm_weight * lstm_vol |
|
|