| """Options Pricing with ML - Neural network for option pricing/IV prediction.""" |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| from typing import Dict, Tuple, Optional |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class OptionDataset(Dataset): |
| """Dataset for option pricing""" |
| def __init__(self, X: np.ndarray, y: np.ndarray): |
| self.X = torch.FloatTensor(X) |
| self.y = torch.FloatTensor(y).unsqueeze(1) |
| |
| def __len__(self): |
| return len(self.X) |
| |
| def __getitem__(self, idx): |
| return self.X[idx], self.y[idx] |
|
|
|
|
| class OptionPricingNN(nn.Module): |
| """Neural network for option pricing""" |
| def __init__(self, input_size: int, hidden_sizes: list = [256, 128, 64, 32]): |
| super().__init__() |
| |
| layers = [] |
| prev_size = input_size |
| for hidden_size in hidden_sizes: |
| layers.extend([ |
| nn.Linear(prev_size, hidden_size), |
| nn.ReLU(), |
| nn.Dropout(0.2) |
| ]) |
| prev_size = hidden_size |
| |
| layers.append(nn.Linear(prev_size, 1)) |
| self.network = nn.Sequential(*layers) |
| |
| def forward(self, x): |
| return self.network(x) |
|
|
|
|
| class BlackScholes: |
| """Analytical Black-Scholes for baseline comparison""" |
| |
| @staticmethod |
| def d1(S, K, T, r, sigma): |
| from scipy.stats import norm |
| return (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T)) |
| |
| @staticmethod |
| def d2(S, K, T, r, sigma): |
| return BlackScholes.d1(S, K, T, r, sigma) - sigma * np.sqrt(T) |
| |
| @staticmethod |
| def call_price(S, K, T, r, sigma): |
| from scipy.stats import norm |
| d1 = BlackScholes.d1(S, K, T, r, sigma) |
| d2 = BlackScholes.d2(S, K, T, r, sigma) |
| return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) |
| |
| @staticmethod |
| def put_price(S, K, T, r, sigma): |
| from scipy.stats import norm |
| d1 = BlackScholes.d1(S, K, T, r, sigma) |
| d2 = BlackScholes.d2(S, K, T, r, sigma) |
| return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1) |
| |
| @staticmethod |
| def implied_volatility(price, S, K, T, r, option_type='call', tol=1e-5, max_iter=100): |
| """Find implied volatility using Newton-Raphson""" |
| sigma = 0.2 |
| for _ in range(max_iter): |
| if option_type == 'call': |
| price_est = BlackScholes.call_price(S, K, T, r, sigma) |
| else: |
| price_est = BlackScholes.put_price(S, K, T, r, sigma) |
| |
| diff = price_est - price |
| if abs(diff) < tol: |
| return sigma |
| |
| |
| from scipy.stats import norm |
| d1 = BlackScholes.d1(S, K, T, r, sigma) |
| vega = S * norm.pdf(d1) * np.sqrt(T) |
| |
| if vega < 1e-10: |
| break |
| |
| sigma -= diff / vega |
| sigma = max(sigma, 0.001) |
| |
| return sigma |
|
|
|
|
| class MLOptionsPricer: |
| """ML-based options pricing engine""" |
| |
| def __init__(self, hidden_sizes: list = [256, 128, 64, 32], |
| device: str = 'cpu'): |
| self.hidden_sizes = hidden_sizes |
| self.device = torch.device(device) |
| self.model = None |
| self.bs = BlackScholes() |
| |
| def prepare_features(self, options_df: pd.DataFrame) -> np.ndarray: |
| """ |
| Prepare features for ML model |
| |
| Expected columns: S, K, T, r, sigma_hist, option_type, |
| S_lag_1, S_lag_2, ..., S_lag_20 |
| """ |
| features = [] |
| |
| |
| features.append(options_df['S'].values) |
| features.append(options_df['K'].values) |
| features.append(options_df['T'].values) |
| features.append(options_df['r'].values) |
| features.append(options_df['sigma_hist'].values) |
| features.append((options_df['S'] / options_df['K']).values) |
| features.append(options_df['T'].values * 252) |
| |
| |
| features.append((options_df['option_type'] == 'call').astype(float).values) |
| |
| |
| for i in range(1, 21): |
| col = f'S_lag_{i}' |
| if col in options_df.columns: |
| features.append(options_df[col].values) |
| |
| return np.column_stack(features) |
| |
| def fit(self, X_train: np.ndarray, y_train: np.ndarray, |
| X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, |
| epochs: int = 100, batch_size: int = 256, lr: float = 1e-3) -> Dict: |
| """Train the neural network""" |
| input_size = X_train.shape[1] |
| self.model = OptionPricingNN(input_size, self.hidden_sizes).to(self.device) |
| |
| train_dataset = OptionDataset(X_train, y_train) |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| |
| optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10) |
| criterion = nn.MSELoss() |
| |
| metrics = {'train_loss': [], 'val_loss': [], 'val_mae': []} |
| |
| for epoch in range(epochs): |
| self.model.train() |
| epoch_loss = 0 |
| for X_batch, y_batch in train_loader: |
| X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device) |
| optimizer.zero_grad() |
| pred = self.model(X_batch) |
| loss = criterion(pred, y_batch) |
| loss.backward() |
| optimizer.step() |
| epoch_loss += loss.item() |
| |
| avg_train_loss = epoch_loss / len(train_loader) |
| metrics['train_loss'].append(avg_train_loss) |
| |
| |
| if X_val is not None and y_val is not None: |
| self.model.eval() |
| with torch.no_grad(): |
| X_val_t = torch.FloatTensor(X_val).to(self.device) |
| y_val_t = torch.FloatTensor(y_val).to(self.device) |
| val_pred = self.model(X_val_t) |
| val_loss = criterion(val_pred, y_val_t).item() |
| val_mae = torch.mean(torch.abs(val_pred - y_val_t)).item() |
| metrics['val_loss'].append(val_loss) |
| metrics['val_mae'].append(val_mae) |
| |
| scheduler.step(val_loss) |
| |
| if epoch % 10 == 0: |
| print(f" Epoch {epoch}: train_loss={avg_train_loss:.6f}, " |
| f"val_loss={val_loss:.6f}, val_mae={val_mae:.4f}") |
| |
| return metrics |
| |
| def predict(self, X: np.ndarray) -> np.ndarray: |
| """Predict option prices""" |
| if self.model is None: |
| raise ValueError("Model must be trained before prediction") |
| |
| self.model.eval() |
| with torch.no_grad(): |
| X_t = torch.FloatTensor(X).to(self.device) |
| pred = self.model(X_t).cpu().numpy().flatten() |
| |
| return pred |
| |
| def predict_iv(self, options_df: pd.DataFrame, market_prices: np.ndarray) -> np.ndarray: |
| """ |
| Predict implied volatility by inverting the model |
| Uses Black-Scholes as baseline and ML as correction |
| """ |
| S = options_df['S'].values |
| K = options_df['K'].values |
| T = options_df['T'].values |
| r = options_df['r'].values |
| option_type = options_df['option_type'].values |
| |
| |
| X = self.prepare_features(options_df) |
| ml_price = self.predict(X) |
| |
| |
| bs_iv = np.array([ |
| self.bs.implied_volatility( |
| market_prices[i], S[i], K[i], T[i], r[i], option_type[i] |
| ) |
| for i in range(len(market_prices)) |
| ]) |
| |
| |
| ml_iv = np.array([ |
| self.bs.implied_volatility( |
| ml_price[i], S[i], K[i], T[i], r[i], option_type[i] |
| ) |
| for i in range(len(ml_price)) |
| ]) |
| |
| |
| ensemble_iv = 0.5 * bs_iv + 0.5 * ml_iv |
| |
| return ensemble_iv |
| |
| def detect_mispricing(self, options_df: pd.DataFrame, |
| market_prices: np.ndarray, |
| threshold: float = 0.05) -> pd.DataFrame: |
| """ |
| Detect mispriced options |
| |
| Returns options where |ML_price - market_price| / market_price > threshold |
| """ |
| X = self.prepare_features(options_df) |
| ml_prices = self.predict(X) |
| |
| mispricing = (ml_prices - market_prices) / market_prices |
| |
| result = options_df.copy() |
| result['ml_price'] = ml_prices |
| result['market_price'] = market_prices |
| result['mispricing_pct'] = mispricing * 100 |
| result['signal'] = np.where( |
| mispricing > threshold, 'OVERPRICED', |
| np.where(mispricing < -threshold, 'UNDERPRICED', 'FAIR') |
| ) |
| |
| return result |
| |
| def generate_synthetic_options(self, n_samples: int = 10000, |
| S_range: Tuple[float, float] = (50, 200), |
| K_range: Tuple[float, float] = (50, 200), |
| T_range: Tuple[float, float] = (0.01, 1.0), |
| r_range: Tuple[float, float] = (0.01, 0.05), |
| sigma_range: Tuple[float, float] = (0.1, 0.5)) -> pd.DataFrame: |
| """Generate synthetic option data for training""" |
| np.random.seed(42) |
| |
| S = np.random.uniform(*S_range, n_samples) |
| K = np.random.uniform(*K_range, n_samples) |
| T = np.random.uniform(*T_range, n_samples) |
| r = np.random.uniform(*r_range, n_samples) |
| sigma = np.random.uniform(*sigma_range, n_samples) |
| option_type = np.random.choice(['call', 'put'], n_samples) |
| |
| |
| lags = {} |
| for i in range(1, 21): |
| lags[f'S_lag_{i}'] = S * (1 + np.random.normal(0, 0.01, n_samples)) |
| |
| |
| prices = [] |
| for i in range(n_samples): |
| if option_type[i] == 'call': |
| price = self.bs.call_price(S[i], K[i], T[i], r[i], sigma[i]) |
| else: |
| price = self.bs.put_price(S[i], K[i], T[i], r[i], sigma[i]) |
| |
| price *= (1 + np.random.normal(0, 0.02)) |
| prices.append(max(price, 0.01)) |
| |
| df = pd.DataFrame({ |
| 'S': S, |
| 'K': K, |
| 'T': T, |
| 'r': r, |
| 'sigma_hist': sigma, |
| 'option_type': option_type, |
| 'price': prices, |
| **lags |
| }) |
| |
| return df |
|
|