alphaforge-quant-system / options_pricer.py
Premchan369's picture
Add ML options pricing with neural network and mispricing detection
2065fdc verified
"""Options Pricing with ML - Neural network for option pricing/IV prediction."""
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from typing import Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
class OptionDataset(Dataset):
"""Dataset for option pricing"""
def __init__(self, X: np.ndarray, y: np.ndarray):
self.X = torch.FloatTensor(X)
self.y = torch.FloatTensor(y).unsqueeze(1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
class OptionPricingNN(nn.Module):
"""Neural network for option pricing"""
def __init__(self, input_size: int, hidden_sizes: list = [256, 128, 64, 32]):
super().__init__()
layers = []
prev_size = input_size
for hidden_size in hidden_sizes:
layers.extend([
nn.Linear(prev_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_size = hidden_size
layers.append(nn.Linear(prev_size, 1))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
class BlackScholes:
"""Analytical Black-Scholes for baseline comparison"""
@staticmethod
def d1(S, K, T, r, sigma):
from scipy.stats import norm
return (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
@staticmethod
def d2(S, K, T, r, sigma):
return BlackScholes.d1(S, K, T, r, sigma) - sigma * np.sqrt(T)
@staticmethod
def call_price(S, K, T, r, sigma):
from scipy.stats import norm
d1 = BlackScholes.d1(S, K, T, r, sigma)
d2 = BlackScholes.d2(S, K, T, r, sigma)
return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
@staticmethod
def put_price(S, K, T, r, sigma):
from scipy.stats import norm
d1 = BlackScholes.d1(S, K, T, r, sigma)
d2 = BlackScholes.d2(S, K, T, r, sigma)
return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
@staticmethod
def implied_volatility(price, S, K, T, r, option_type='call', tol=1e-5, max_iter=100):
"""Find implied volatility using Newton-Raphson"""
sigma = 0.2 # Initial guess
for _ in range(max_iter):
if option_type == 'call':
price_est = BlackScholes.call_price(S, K, T, r, sigma)
else:
price_est = BlackScholes.put_price(S, K, T, r, sigma)
diff = price_est - price
if abs(diff) < tol:
return sigma
# Vega
from scipy.stats import norm
d1 = BlackScholes.d1(S, K, T, r, sigma)
vega = S * norm.pdf(d1) * np.sqrt(T)
if vega < 1e-10:
break
sigma -= diff / vega
sigma = max(sigma, 0.001)
return sigma
class MLOptionsPricer:
"""ML-based options pricing engine"""
def __init__(self, hidden_sizes: list = [256, 128, 64, 32],
device: str = 'cpu'):
self.hidden_sizes = hidden_sizes
self.device = torch.device(device)
self.model = None
self.bs = BlackScholes()
def prepare_features(self, options_df: pd.DataFrame) -> np.ndarray:
"""
Prepare features for ML model
Expected columns: S, K, T, r, sigma_hist, option_type,
S_lag_1, S_lag_2, ..., S_lag_20
"""
features = []
# Core features
features.append(options_df['S'].values)
features.append(options_df['K'].values)
features.append(options_df['T'].values)
features.append(options_df['r'].values)
features.append(options_df['sigma_hist'].values)
features.append((options_df['S'] / options_df['K']).values) # Moneyness
features.append(options_df['T'].values * 252) # Days to expiry
# Option type encoding
features.append((options_df['option_type'] == 'call').astype(float).values)
# Lag features (past 20 days of underlying price)
for i in range(1, 21):
col = f'S_lag_{i}'
if col in options_df.columns:
features.append(options_df[col].values)
return np.column_stack(features)
def fit(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None,
epochs: int = 100, batch_size: int = 256, lr: float = 1e-3) -> Dict:
"""Train the neural network"""
input_size = X_train.shape[1]
self.model = OptionPricingNN(input_size, self.hidden_sizes).to(self.device)
train_dataset = OptionDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10)
criterion = nn.MSELoss()
metrics = {'train_loss': [], 'val_loss': [], 'val_mae': []}
for epoch in range(epochs):
self.model.train()
epoch_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device)
optimizer.zero_grad()
pred = self.model(X_batch)
loss = criterion(pred, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_train_loss = epoch_loss / len(train_loader)
metrics['train_loss'].append(avg_train_loss)
# Validation
if X_val is not None and y_val is not None:
self.model.eval()
with torch.no_grad():
X_val_t = torch.FloatTensor(X_val).to(self.device)
y_val_t = torch.FloatTensor(y_val).to(self.device)
val_pred = self.model(X_val_t)
val_loss = criterion(val_pred, y_val_t).item()
val_mae = torch.mean(torch.abs(val_pred - y_val_t)).item()
metrics['val_loss'].append(val_loss)
metrics['val_mae'].append(val_mae)
scheduler.step(val_loss)
if epoch % 10 == 0:
print(f" Epoch {epoch}: train_loss={avg_train_loss:.6f}, "
f"val_loss={val_loss:.6f}, val_mae={val_mae:.4f}")
return metrics
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict option prices"""
if self.model is None:
raise ValueError("Model must be trained before prediction")
self.model.eval()
with torch.no_grad():
X_t = torch.FloatTensor(X).to(self.device)
pred = self.model(X_t).cpu().numpy().flatten()
return pred
def predict_iv(self, options_df: pd.DataFrame, market_prices: np.ndarray) -> np.ndarray:
"""
Predict implied volatility by inverting the model
Uses Black-Scholes as baseline and ML as correction
"""
S = options_df['S'].values
K = options_df['K'].values
T = options_df['T'].values
r = options_df['r'].values
option_type = options_df['option_type'].values
# Get ML prediction
X = self.prepare_features(options_df)
ml_price = self.predict(X)
# Get Black-Scholes baseline
bs_iv = np.array([
self.bs.implied_volatility(
market_prices[i], S[i], K[i], T[i], r[i], option_type[i]
)
for i in range(len(market_prices))
])
# ML-adjusted IV: if ML price differs from market, adjust IV accordingly
ml_iv = np.array([
self.bs.implied_volatility(
ml_price[i], S[i], K[i], T[i], r[i], option_type[i]
)
for i in range(len(ml_price))
])
# Ensemble: weighted average
ensemble_iv = 0.5 * bs_iv + 0.5 * ml_iv
return ensemble_iv
def detect_mispricing(self, options_df: pd.DataFrame,
market_prices: np.ndarray,
threshold: float = 0.05) -> pd.DataFrame:
"""
Detect mispriced options
Returns options where |ML_price - market_price| / market_price > threshold
"""
X = self.prepare_features(options_df)
ml_prices = self.predict(X)
mispricing = (ml_prices - market_prices) / market_prices
result = options_df.copy()
result['ml_price'] = ml_prices
result['market_price'] = market_prices
result['mispricing_pct'] = mispricing * 100
result['signal'] = np.where(
mispricing > threshold, 'OVERPRICED',
np.where(mispricing < -threshold, 'UNDERPRICED', 'FAIR')
)
return result
def generate_synthetic_options(self, n_samples: int = 10000,
S_range: Tuple[float, float] = (50, 200),
K_range: Tuple[float, float] = (50, 200),
T_range: Tuple[float, float] = (0.01, 1.0),
r_range: Tuple[float, float] = (0.01, 0.05),
sigma_range: Tuple[float, float] = (0.1, 0.5)) -> pd.DataFrame:
"""Generate synthetic option data for training"""
np.random.seed(42)
S = np.random.uniform(*S_range, n_samples)
K = np.random.uniform(*K_range, n_samples)
T = np.random.uniform(*T_range, n_samples)
r = np.random.uniform(*r_range, n_samples)
sigma = np.random.uniform(*sigma_range, n_samples)
option_type = np.random.choice(['call', 'put'], n_samples)
# Generate lag features (simulated price history)
lags = {}
for i in range(1, 21):
lags[f'S_lag_{i}'] = S * (1 + np.random.normal(0, 0.01, n_samples))
# Calculate prices using Black-Scholes with noise
prices = []
for i in range(n_samples):
if option_type[i] == 'call':
price = self.bs.call_price(S[i], K[i], T[i], r[i], sigma[i])
else:
price = self.bs.put_price(S[i], K[i], T[i], r[i], sigma[i])
# Add noise
price *= (1 + np.random.normal(0, 0.02))
prices.append(max(price, 0.01))
df = pd.DataFrame({
'S': S,
'K': K,
'T': T,
'r': r,
'sigma_hist': sigma,
'option_type': option_type,
'price': prices,
**lags
})
return df