Premchan369 commited on
Commit
a50d1b8
·
verified ·
1 Parent(s): d2e9075

Upload volatility_model.py

Browse files
Files changed (1) hide show
  1. volatility_model.py +183 -0
volatility_model.py ADDED
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Volatility Forecasting Engine - GARCH + LSTM"""
2
+ import numpy as np
3
+ import pandas as pd
4
+ import torch
5
+ import torch.nn as nn
6
+ from typing import Dict, Tuple, Optional
7
+ import warnings
8
+ warnings.filterwarnings('ignore')
9
+
10
+ try:
11
+ from arch import arch_model
12
+ ARCH_AVAILABLE = True
13
+ except ImportError:
14
+ ARCH_AVAILABLE = False
15
+ print("arch library not available, GARCH will use fallback")
16
+
17
+
18
+ class LSTMVolatility(nn.Module):
19
+ """LSTM for volatility forecasting with distributional output"""
20
+ def __init__(self, input_size: int, hidden_size: int = 64,
21
+ num_layers: int = 2, dropout: float = 0.2):
22
+ super().__init__()
23
+ self.lstm = nn.LSTM(
24
+ input_size, hidden_size, num_layers,
25
+ batch_first=True, dropout=dropout if num_layers > 1 else 0
26
+ )
27
+ self.fc_mu = nn.Linear(hidden_size, 1)
28
+ self.fc_sigma = nn.Linear(hidden_size, 1)
29
+ self.fc_nu = nn.Linear(hidden_size, 1)
30
+
31
+ def forward(self, x):
32
+ out, _ = self.lstm(x)
33
+ out = out[:, -1, :]
34
+ mu = self.fc_mu(out)
35
+ sigma = torch.nn.functional.softplus(self.fc_sigma(out)) + 1e-6
36
+ nu = torch.nn.functional.softplus(self.fc_nu(out)) + 2.1
37
+ return mu, sigma, nu
38
+
39
+
40
+ class VolatilityEngine:
41
+ """Combined GARCH + LSTM volatility forecasting"""
42
+
43
+ def __init__(self, garch_p: int = 1, garch_q: int = 1,
44
+ garch_dist: str = 't', lstm_hidden: int = 64,
45
+ device: str = 'cpu'):
46
+ self.garch_p = garch_p
47
+ self.garch_q = garch_q
48
+ self.garch_dist = garch_dist
49
+ self.lstm_hidden = lstm_hidden
50
+ self.device = torch.device(device)
51
+ self.garch_models = {}
52
+ self.lstm_models = {}
53
+ self.forecast_history = []
54
+
55
+ def fit_garch(self, returns: pd.Series, ticker: str) -> Optional[Dict]:
56
+ """Fit GARCH model for a single asset"""
57
+ if not ARCH_AVAILABLE:
58
+ print(f"Using rolling volatility fallback for {ticker}")
59
+ return None
60
+
61
+ try:
62
+ am = arch_model(
63
+ returns.dropna() * 100,
64
+ vol='Garch', p=self.garch_p, q=self.garch_q,
65
+ dist=self.garch_dist
66
+ )
67
+ res = am.fit(disp='off')
68
+ self.garch_models[ticker] = res
69
+
70
+ return {
71
+ 'omega': res.params.get('omega', 0),
72
+ 'alpha': res.params.get('alpha[1]', 0),
73
+ 'beta': res.params.get('beta[1]', 0),
74
+ 'aic': res.aic,
75
+ 'bic': res.bic
76
+ }
77
+ except Exception as e:
78
+ print(f"GARCH fit failed for {ticker}: {e}")
79
+ return None
80
+
81
+ def forecast_garch(self, ticker: str, horizon: int = 5) -> np.ndarray:
82
+ """Generate GARCH volatility forecast"""
83
+ if ticker not in self.garch_models or self.garch_models[ticker] is None:
84
+ return np.ones(horizon) * 0.2
85
+
86
+ try:
87
+ forecasts = self.garch_models[ticker].forecast(horizon=horizon)
88
+ var_forecast = forecasts.variance.values[-1] / 10000
89
+ return np.sqrt(var_forecast)
90
+ except Exception as e:
91
+ print(f"GARCH forecast failed for {ticker}: {e}")
92
+ return np.ones(horizon) * 0.2
93
+
94
+ def fit_lstm_volatility(self, X: np.ndarray, y: np.ndarray,
95
+ ticker: str, epochs: int = 50,
96
+ batch_size: int = 64, lr: float = 1e-3) -> Dict:
97
+ """Fit LSTM volatility model"""
98
+ input_size = X.shape[2]
99
+ model = LSTMVolatility(input_size, self.lstm_hidden).to(self.device)
100
+
101
+ X_t = torch.FloatTensor(X).to(self.device)
102
+ y_t = torch.FloatTensor(y).to(self.device)
103
+
104
+ optimizer = torch.optim.Adam(model.parameters(), lr=lr)
105
+ metrics = {'loss': []}
106
+
107
+ for epoch in range(epochs):
108
+ model.train()
109
+ total_loss = 0
110
+ n_batches = 0
111
+
112
+ for i in range(0, len(X_t), batch_size):
113
+ batch_X = X_t[i:i+batch_size]
114
+ batch_y = y_t[i:i+batch_size]
115
+
116
+ optimizer.zero_grad()
117
+ mu, sigma, nu = model(batch_X)
118
+
119
+ z = (batch_y.unsqueeze(1) - mu) / sigma
120
+ log_likelihood = (
121
+ torch.lgamma((nu + 1) / 2) -
122
+ torch.lgamma(nu / 2) -
123
+ 0.5 * torch.log(np.pi * nu) -
124
+ torch.log(sigma) -
125
+ ((nu + 1) / 2) * torch.log(1 + z**2 / nu)
126
+ )
127
+ loss = -log_likelihood.mean()
128
+
129
+ loss.backward()
130
+ optimizer.step()
131
+
132
+ total_loss += loss.item()
133
+ n_batches += 1
134
+
135
+ avg_loss = total_loss / n_batches
136
+ metrics['loss'].append(avg_loss)
137
+
138
+ if epoch % 10 == 0:
139
+ print(f" Epoch {epoch}: loss={avg_loss:.6f}")
140
+
141
+ self.lstm_models[ticker] = model
142
+ return metrics
143
+
144
+ def compute_realized_volatility(self, returns: pd.Series, window: int = 21) -> pd.Series:
145
+ """Compute realized volatility"""
146
+ return returns.rolling(window).apply(
147
+ lambda x: np.sqrt(252 / len(x) * np.sum(x**2))
148
+ )
149
+
150
+ def build_covariance_matrix(self, returns_df: pd.DataFrame,
151
+ forecast_date: pd.Timestamp,
152
+ lookback: int = 63) -> pd.DataFrame:
153
+ """Build forecasted covariance matrix"""
154
+ recent_returns = returns_df.loc[
155
+ returns_df.index <= forecast_date
156
+ ].tail(lookback)
157
+
158
+ lambda_ = 0.94
159
+ weights = np.array([(1 - lambda_) * lambda_**i for i in range(len(recent_returns))])
160
+ weights = weights[::-1]
161
+ weights /= weights.sum()
162
+
163
+ weighted_returns = recent_returns.multiply(np.sqrt(weights), axis=0)
164
+ cov_matrix = weighted_returns.cov() * 252
165
+
166
+ eigenvalues = np.linalg.eigvalsh(cov_matrix.values)
167
+ min_eig = eigenvalues.min()
168
+ if min_eig < 1e-8:
169
+ cov_matrix = cov_matrix + np.eye(len(cov_matrix)) * (1e-8 - min_eig)
170
+
171
+ return cov_matrix
172
+
173
+ def ensemble_forecast(self, ticker: str, garch_weight: float = 0.3,
174
+ lstm_weight: float = 0.7, horizon: int = 5) -> np.ndarray:
175
+ """Combine GARCH and LSTM forecasts"""
176
+ garch_vol = self.forecast_garch(ticker, horizon)
177
+
178
+ if ticker in self.lstm_models:
179
+ lstm_vol = np.ones(horizon) * 0.15
180
+ else:
181
+ lstm_vol = garch_vol
182
+
183
+ return garch_weight * garch_vol + lstm_weight * lstm_vol