"""Options Pricing with ML - Neural network for option pricing/IV prediction.""" import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from typing import Dict, Tuple, Optional import warnings warnings.filterwarnings('ignore') class OptionDataset(Dataset): """Dataset for option pricing""" def __init__(self, X: np.ndarray, y: np.ndarray): self.X = torch.FloatTensor(X) self.y = torch.FloatTensor(y).unsqueeze(1) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] class OptionPricingNN(nn.Module): """Neural network for option pricing""" def __init__(self, input_size: int, hidden_sizes: list = [256, 128, 64, 32]): super().__init__() layers = [] prev_size = input_size for hidden_size in hidden_sizes: layers.extend([ nn.Linear(prev_size, hidden_size), nn.ReLU(), nn.Dropout(0.2) ]) prev_size = hidden_size layers.append(nn.Linear(prev_size, 1)) self.network = nn.Sequential(*layers) def forward(self, x): return self.network(x) class BlackScholes: """Analytical Black-Scholes for baseline comparison""" @staticmethod def d1(S, K, T, r, sigma): from scipy.stats import norm return (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T)) @staticmethod def d2(S, K, T, r, sigma): return BlackScholes.d1(S, K, T, r, sigma) - sigma * np.sqrt(T) @staticmethod def call_price(S, K, T, r, sigma): from scipy.stats import norm d1 = BlackScholes.d1(S, K, T, r, sigma) d2 = BlackScholes.d2(S, K, T, r, sigma) return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) @staticmethod def put_price(S, K, T, r, sigma): from scipy.stats import norm d1 = BlackScholes.d1(S, K, T, r, sigma) d2 = BlackScholes.d2(S, K, T, r, sigma) return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1) @staticmethod def implied_volatility(price, S, K, T, r, option_type='call', tol=1e-5, max_iter=100): """Find implied volatility using Newton-Raphson""" sigma = 0.2 # Initial guess for _ in range(max_iter): if option_type == 'call': price_est = BlackScholes.call_price(S, K, T, r, sigma) else: price_est = BlackScholes.put_price(S, K, T, r, sigma) diff = price_est - price if abs(diff) < tol: return sigma # Vega from scipy.stats import norm d1 = BlackScholes.d1(S, K, T, r, sigma) vega = S * norm.pdf(d1) * np.sqrt(T) if vega < 1e-10: break sigma -= diff / vega sigma = max(sigma, 0.001) return sigma class MLOptionsPricer: """ML-based options pricing engine""" def __init__(self, hidden_sizes: list = [256, 128, 64, 32], device: str = 'cpu'): self.hidden_sizes = hidden_sizes self.device = torch.device(device) self.model = None self.bs = BlackScholes() def prepare_features(self, options_df: pd.DataFrame) -> np.ndarray: """ Prepare features for ML model Expected columns: S, K, T, r, sigma_hist, option_type, S_lag_1, S_lag_2, ..., S_lag_20 """ features = [] # Core features features.append(options_df['S'].values) features.append(options_df['K'].values) features.append(options_df['T'].values) features.append(options_df['r'].values) features.append(options_df['sigma_hist'].values) features.append((options_df['S'] / options_df['K']).values) # Moneyness features.append(options_df['T'].values * 252) # Days to expiry # Option type encoding features.append((options_df['option_type'] == 'call').astype(float).values) # Lag features (past 20 days of underlying price) for i in range(1, 21): col = f'S_lag_{i}' if col in options_df.columns: features.append(options_df[col].values) return np.column_stack(features) def fit(self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, epochs: int = 100, batch_size: int = 256, lr: float = 1e-3) -> Dict: """Train the neural network""" input_size = X_train.shape[1] self.model = OptionPricingNN(input_size, self.hidden_sizes).to(self.device) train_dataset = OptionDataset(X_train, y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10) criterion = nn.MSELoss() metrics = {'train_loss': [], 'val_loss': [], 'val_mae': []} for epoch in range(epochs): self.model.train() epoch_loss = 0 for X_batch, y_batch in train_loader: X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device) optimizer.zero_grad() pred = self.model(X_batch) loss = criterion(pred, y_batch) loss.backward() optimizer.step() epoch_loss += loss.item() avg_train_loss = epoch_loss / len(train_loader) metrics['train_loss'].append(avg_train_loss) # Validation if X_val is not None and y_val is not None: self.model.eval() with torch.no_grad(): X_val_t = torch.FloatTensor(X_val).to(self.device) y_val_t = torch.FloatTensor(y_val).to(self.device) val_pred = self.model(X_val_t) val_loss = criterion(val_pred, y_val_t).item() val_mae = torch.mean(torch.abs(val_pred - y_val_t)).item() metrics['val_loss'].append(val_loss) metrics['val_mae'].append(val_mae) scheduler.step(val_loss) if epoch % 10 == 0: print(f" Epoch {epoch}: train_loss={avg_train_loss:.6f}, " f"val_loss={val_loss:.6f}, val_mae={val_mae:.4f}") return metrics def predict(self, X: np.ndarray) -> np.ndarray: """Predict option prices""" if self.model is None: raise ValueError("Model must be trained before prediction") self.model.eval() with torch.no_grad(): X_t = torch.FloatTensor(X).to(self.device) pred = self.model(X_t).cpu().numpy().flatten() return pred def predict_iv(self, options_df: pd.DataFrame, market_prices: np.ndarray) -> np.ndarray: """ Predict implied volatility by inverting the model Uses Black-Scholes as baseline and ML as correction """ S = options_df['S'].values K = options_df['K'].values T = options_df['T'].values r = options_df['r'].values option_type = options_df['option_type'].values # Get ML prediction X = self.prepare_features(options_df) ml_price = self.predict(X) # Get Black-Scholes baseline bs_iv = np.array([ self.bs.implied_volatility( market_prices[i], S[i], K[i], T[i], r[i], option_type[i] ) for i in range(len(market_prices)) ]) # ML-adjusted IV: if ML price differs from market, adjust IV accordingly ml_iv = np.array([ self.bs.implied_volatility( ml_price[i], S[i], K[i], T[i], r[i], option_type[i] ) for i in range(len(ml_price)) ]) # Ensemble: weighted average ensemble_iv = 0.5 * bs_iv + 0.5 * ml_iv return ensemble_iv def detect_mispricing(self, options_df: pd.DataFrame, market_prices: np.ndarray, threshold: float = 0.05) -> pd.DataFrame: """ Detect mispriced options Returns options where |ML_price - market_price| / market_price > threshold """ X = self.prepare_features(options_df) ml_prices = self.predict(X) mispricing = (ml_prices - market_prices) / market_prices result = options_df.copy() result['ml_price'] = ml_prices result['market_price'] = market_prices result['mispricing_pct'] = mispricing * 100 result['signal'] = np.where( mispricing > threshold, 'OVERPRICED', np.where(mispricing < -threshold, 'UNDERPRICED', 'FAIR') ) return result def generate_synthetic_options(self, n_samples: int = 10000, S_range: Tuple[float, float] = (50, 200), K_range: Tuple[float, float] = (50, 200), T_range: Tuple[float, float] = (0.01, 1.0), r_range: Tuple[float, float] = (0.01, 0.05), sigma_range: Tuple[float, float] = (0.1, 0.5)) -> pd.DataFrame: """Generate synthetic option data for training""" np.random.seed(42) S = np.random.uniform(*S_range, n_samples) K = np.random.uniform(*K_range, n_samples) T = np.random.uniform(*T_range, n_samples) r = np.random.uniform(*r_range, n_samples) sigma = np.random.uniform(*sigma_range, n_samples) option_type = np.random.choice(['call', 'put'], n_samples) # Generate lag features (simulated price history) lags = {} for i in range(1, 21): lags[f'S_lag_{i}'] = S * (1 + np.random.normal(0, 0.01, n_samples)) # Calculate prices using Black-Scholes with noise prices = [] for i in range(n_samples): if option_type[i] == 'call': price = self.bs.call_price(S[i], K[i], T[i], r[i], sigma[i]) else: price = self.bs.put_price(S[i], K[i], T[i], r[i], sigma[i]) # Add noise price *= (1 + np.random.normal(0, 0.02)) prices.append(max(price, 0.01)) df = pd.DataFrame({ 'S': S, 'K': K, 'T': T, 'r': r, 'sigma_hist': sigma, 'option_type': option_type, 'price': prices, **lags }) return df