"""Conformal Prediction & Bootstrap Uncertainty Quantification Jane Street doesn't just predict — they NEED to know HOW WRONG they might be. Without uncertainty quantification, you can't size positions or manage risk. Methods: 1. Conformal Prediction: Distribution-free prediction intervals with coverage guarantees 2. Bootstrap Prediction Intervals: Resample to estimate forecast variance 3. Quantile Regression: Predict full distribution, not just point estimate 4. Monte Carlo Dropout: Bayesian approximation for neural nets Guarantee: 95% prediction intervals actually contain 95% of outcomes. This is NOT what a standard MSE loss gives you. Based on: - Shafer & Vovk (2008): "A Tutorial on Conformal Prediction" - Angelopoulos & Bates (2021): "A Gentle Introduction to Conformal Prediction" - Tibshirani et al. (2019): "Conformal Prediction Under Covariate Shift" """ import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional, Callable from collections import deque import warnings warnings.filterwarnings('ignore') class ConformalPredictor: """ Split conformal prediction for regression/returns forecasting. Steps: 1. Split data into proper training + calibration 2. Train model on proper training 3. Compute nonconformity scores on calibration: |y - y_hat| 4. For prediction: interval = [y_hat - q, y_hat + q] where q = quantile of scores Result: Guaranteed 1-alpha coverage on new iid data. """ def __init__(self, alpha: float = 0.1): """ alpha: miscoverage rate (0.1 = 90% prediction interval) """ self.alpha = alpha self.calibration_scores = [] self.quantile = None def fit(self, y_true_cal: np.ndarray, y_pred_cal: np.ndarray): """ Calibrate on held-out calibration set. y_true_cal: actual values from calibration set y_pred_cal: model predictions on calibration set """ scores = np.abs(y_true_cal - y_pred_cal) self.calibration_scores = scores # Compute (1-alpha) quantile of scores # We need ceiling((n+1)*(1-alpha))/n quantile for exact coverage n = len(scores) q_level = np.ceil((n + 1) * (1 - self.alpha)) / n q_level = min(q_level, 1.0) self.quantile = np.quantile(scores, q_level) return self def predict_interval(self, y_pred: np.ndarray) -> np.ndarray: """ Get prediction intervals. Returns: (n, 2) array of [lower, upper] bounds """ if self.quantile is None: raise ValueError("Must call fit() first") lower = y_pred - self.quantile upper = y_pred + self.quantile return np.column_stack([lower, upper]) def evaluate_coverage(self, y_true_test: np.ndarray, y_pred_test: np.ndarray) -> Dict: """ Evaluate actual coverage on test set. Should be >= 1-alpha for valid conformal prediction. """ intervals = self.predict_interval(y_pred_test) coverage = np.mean((y_true_test >= intervals[:, 0]) & (y_true_test <= intervals[:, 1])) interval_width = np.mean(intervals[:, 1] - intervals[:, 0]) # Average interval width by prediction magnitude relative_width = interval_width / (np.abs(y_pred_test).mean() + 1e-10) return { 'target_coverage': 1 - self.alpha, 'actual_coverage': coverage, 'avg_interval_width': interval_width, 'relative_width': relative_width, 'is_valid': coverage >= 1 - self.alpha - 0.02 # Allow 2% tolerance } class AdaptiveConformalPrediction: """ Adaptive conformal prediction for non-stationary data. Standard conformal assumes iid data. Markets are NOT iid. Solution: Update quantile using online learning. If recent coverage is too low → widen intervals. If recent coverage is too high → narrow intervals (more profit). """ def __init__(self, alpha: float = 0.1, gamma: float = 0.005, # Learning rate for quantile adaptation window_size: int = 100): # Recent window for coverage estimation self.alpha = alpha self.gamma = gamma self.window_size = window_size self.quantile = None self.coverage_history = deque(maxlen=window_size) self.score_history = deque(maxlen=window_size) def update(self, y_true: float, y_pred: float): """ Update quantile with one new observation. Algorithm (Gibbs & Candes 2021): 1. Compute score s = |y - y_pred| 2. Check if in interval: coverage = 1 if s <= quantile else 0 3. Update: quantile += γ * (target_coverage - coverage) """ score = abs(y_true - y_pred) self.score_history.append(score) if self.quantile is None: # Initialize with first score self.quantile = score * 1.5 self.coverage_history.append(1) return # Check coverage in_interval = 1 if score <= self.quantile else 0 self.coverage_history.append(in_interval) # Update quantile target = 1 - self.alpha error = target - in_interval self.quantile += self.gamma * error self.quantile = max(self.quantile, 0.0) def predict_interval(self, y_pred: float) -> Tuple[float, float]: """Get adaptive prediction interval""" if self.quantile is None: return (y_pred - 0.05, y_pred + 0.05) return (y_pred - self.quantile, y_pred + self.quantile) def get_state(self) -> Dict: """Current adaptive state""" if len(self.coverage_history) == 0: return {'quantile': None, 'recent_coverage': 0} return { 'quantile': self.quantile, 'recent_coverage': np.mean(list(self.coverage_history)), 'n_observations': len(self.score_history), 'target_coverage': 1 - self.alpha, 'avg_score': np.mean(list(self.score_history)) } class BootstrapUncertaintyEstimator: """ Bootstrap-based uncertainty estimation. Resample residuals to estimate prediction distribution. Useful when you have a model but no analytical uncertainty. """ def __init__(self, n_bootstrap: int = 1000): self.n_bootstrap = n_bootstrap self.residuals = [] def fit(self, y_true: np.ndarray, y_pred: np.ndarray): """Store residuals from training data""" self.residuals = y_true - y_pred return self def predict_distribution(self, y_pred: float, n_samples: Optional[int] = None) -> np.ndarray: """ Generate bootstrap samples of y = y_pred + resampled_residual. Returns distribution of possible y values. """ n = n_samples or self.n_bootstrap # Resample residuals boot_idx = np.random.choice(len(self.residuals), size=n, replace=True) boot_residuals = self.residuals[boot_idx] return y_pred + boot_residuals def predict_interval(self, y_pred: float, alpha: float = 0.1) -> Tuple[float, float]: """Get (1-alpha) prediction interval via bootstrap""" dist = self.predict_distribution(y_pred) lower = np.percentile(dist, alpha / 2 * 100) upper = np.percentile(dist, (1 - alpha / 2) * 100) return (lower, upper) def predict_quantiles(self, y_pred: float, quantiles: List[float] = [0.1, 0.25, 0.5, 0.75, 0.9]) -> Dict: """Get specific quantiles of prediction distribution""" dist = self.predict_distribution(y_pred, n_samples=10000) return {f'q{int(q*100)}': np.percentile(dist, q * 100) for q in quantiles} class QuantileForecaster: """ Quantile regression forecaster. Instead of predicting mean (MSE), predict arbitrary quantiles. Loss: Pinball loss L(y, ŷ) = α * (y - ŷ) if y > ŷ (1-α) * (ŷ - y) if y <= ŷ Train separate model for each quantile: 0.1, 0.5, 0.9 Benefits: - Asymmetric uncertainty (downside risk > upside potential) - No distributional assumptions - Direct VaR estimation (e.g., q0.05 = 5% VaR) """ def __init__(self, quantiles: List[float] = [0.1, 0.5, 0.9]): self.quantiles = quantiles self.models = {} # quantile -> SimpleQuantileRegressor def _pinball_loss(self, y_true: np.ndarray, y_pred: np.ndarray, alpha: float) -> float: """Pinball/quantile loss""" residuals = y_true - y_pred loss = np.where(residuals > 0, alpha * residuals, (alpha - 1) * residuals) return np.mean(loss) def fit(self, X: np.ndarray, y: np.ndarray, n_iterations: int = 500, lr: float = 0.01): """ Fit quantile regression models via gradient descent. Simple linear quantile regression for demonstration. In practice, use LightGBM/XGBoost quantile regression or neural nets. """ n_features = X.shape[1] for q in self.quantiles: # Initialize weights = np.zeros(n_features) bias = np.mean(y) # Gradient descent for _ in range(n_iterations): preds = X @ weights + bias residuals = y - preds # Gradient of pinball loss grad_w = -X.T @ np.where(residuals > 0, q, q - 1) / len(y) grad_b = -np.mean(np.where(residuals > 0, q, q - 1)) weights -= lr * grad_w bias -= lr * grad_b self.models[q] = {'weights': weights, 'bias': bias} return self def predict(self, X: np.ndarray) -> Dict[float, np.ndarray]: """Predict all quantiles""" predictions = {} for q, model in self.models.items(): preds = X @ model['weights'] + model['bias'] predictions[q] = preds return predictions def predict_interval(self, X: np.ndarray, alpha: float = 0.1) -> np.ndarray: """ Get prediction interval from quantile predictions. Uses q(α/2) and q(1-α/2) as bounds. """ all_preds = self.predict(X) lower_q = alpha / 2 upper_q = 1 - alpha / 2 # Find closest quantiles lower = min(self.quantiles, key=lambda q: abs(q - lower_q)) upper = min(self.quantiles, key=lambda q: abs(q - upper_q)) return np.column_stack([all_preds[lower], all_preds[upper]]) class UncertaintyEnsemble: """ Ensemble multiple uncertainty methods for robust estimates. Combines: - Conformal prediction (distribution-free guarantee) - Bootstrap (residual-based) - Quantile regression (asymmetric uncertainty) Final interval: union or intersection of all three. """ def __init__(self, alpha: float = 0.1): self.alpha = alpha self.conformal = ConformalPredictor(alpha=alpha) self.bootstrap = BootstrapUncertaintyEstimator() self.quantile = QuantileForecaster(quantiles=[0.05, 0.25, 0.5, 0.75, 0.95]) def fit(self, X_cal: np.ndarray, y_cal: np.ndarray, y_pred_cal: np.ndarray): """Fit all uncertainty models on calibration data""" # Conformal self.conformal.fit(y_cal, y_pred_cal) # Bootstrap self.bootstrap.fit(y_cal, y_pred_cal) # Quantile self.quantile.fit(X_cal, y_cal) return self def predict_interval(self, X: np.ndarray, y_pred: np.ndarray, method: str = 'conservative') -> np.ndarray: """ Get ensemble prediction interval. method: - 'conservative': widest interval (union) - 'tight': narrowest interval (intersection) - 'average': mean of all bounds """ # Conformal conf_interval = self.conformal.predict_interval(y_pred) # Bootstrap (pointwise, approximate) boot_lowers = [] boot_uppers = [] for p in y_pred: lo, hi = self.bootstrap.predict_interval(p) boot_lowers.append(lo) boot_uppers.append(hi) boot_interval = np.column_stack([boot_lowers, boot_uppers]) # Quantile quant_interval = self.quantile.predict_interval(X, alpha=self.alpha) if method == 'conservative': lower = np.minimum.reduce([conf_interval[:, 0], boot_interval[:, 0], quant_interval[:, 0]]) upper = np.maximum.reduce([conf_interval[:, 1], boot_interval[:, 1], quant_interval[:, 1]]) elif method == 'tight': lower = np.maximum.reduce([conf_interval[:, 0], boot_interval[:, 0], quant_interval[:, 0]]) upper = np.minimum.reduce([conf_interval[:, 1], boot_interval[:, 1], quant_interval[:, 1]]) else: # average lower = np.mean([conf_interval[:, 0], boot_interval[:, 0], quant_interval[:, 0]], axis=0) upper = np.mean([conf_interval[:, 1], boot_interval[:, 1], quant_interval[:, 1]], axis=0) return np.column_stack([lower, upper]) if __name__ == '__main__': print("=" * 70) print(" UNCERTAINTY QUANTIFICATION & CONFORMAL PREDICTION") print("=" * 70) np.random.seed(42) # Generate data with heteroscedastic noise n = 1000 X = np.random.randn(n, 3) y_true = X[:, 0] * 0.5 + X[:, 1] * 0.3 + np.random.randn(n) * 0.1 # Heteroscedastic noise: larger when |X_0| is large noise_scale = 0.05 + 0.15 * np.abs(X[:, 0]) y_true += np.random.randn(n) * noise_scale # Split n_train = 500 n_cal = 200 n_test = 300 X_train = X[:n_train] y_train = y_true[:n_train] X_cal = X[n_train:n_train+n_cal] y_cal = y_true[n_train:n_train+n_cal] X_test = X[n_train+n_cal:] y_test = y_true[n_train+n_cal:] # Simple linear model beta = np.linalg.lstsq(X_train, y_train, rcond=None)[0] y_pred_cal = X_cal @ beta y_pred_test = X_test @ beta print("\n1. CONFORMAL PREDICTION (90% intervals)") cp = ConformalPredictor(alpha=0.1) cp.fit(y_cal, y_pred_cal) eval_result = cp.evaluate_coverage(y_test, y_pred_test) print(f" Target coverage: {eval_result['target_coverage']*100:.0f}%") print(f" Actual coverage: {eval_result['actual_coverage']*100:.1f}%") print(f" Avg interval width: {eval_result['avg_interval_width']:.4f}") print(f" Valid: {eval_result['is_valid']}") print("\n2. ADAPTIVE CONFORMAL (online)") acp = AdaptiveConformalPrediction(alpha=0.1, gamma=0.01) for i in range(len(y_test)): acp.update(y_test[i], y_pred_test[i]) state = acp.get_state() print(f" Final quantile: {state['quantile']:.4f}") print(f" Recent coverage: {state['recent_coverage']*100:.1f}%") print(f" Target: {state['target_coverage']*100:.0f}%") print("\n3. BOOTSTRAP UNCERTAINTY") boot = BootstrapUncertaintyEstimator(n_bootstrap=1000) boot.fit(y_cal, y_pred_cal) # Test on first prediction lo, hi = boot.predict_interval(y_pred_test[0], alpha=0.1) dist = boot.predict_distribution(y_pred_test[0]) print(f" Point prediction: {y_pred_test[0]:.4f}") print(f" 90% CI: [{lo:.4f}, {hi:.4f}]") print(f" Actual: {y_test[0]:.4f}") print(f" In interval: {lo <= y_test[0] <= hi}") print("\n4. QUANTILE REGRESSION") qf = QuantileForecaster(quantiles=[0.1, 0.5, 0.9]) qf.fit(X_train, y_train, n_iterations=1000, lr=0.01) preds = qf.predict(X_test[:5]) for q, p in preds.items(): print(f" q{int(q*100)}: {p[0]:.4f}") print("\n5. UNCERTAINTY ENSEMBLE") ensemble = UncertaintyEnsemble(alpha=0.1) ensemble.fit(X_cal, y_cal, y_pred_cal) for method in ['conservative', 'tight', 'average']: interval = ensemble.predict_interval(X_test[:5], y_pred_test[:5], method=method) widths = interval[:, 1] - interval[:, 0] print(f" {method:12s}: avg width = {widths.mean():.4f}") print(f"\n KEY INSIGHT:") print(f" Without uncertainty quantification, you're trading BLIND.") print(f" Position size should depend on prediction confidence.") print(f" Kelly criterion: bet size ∝ expected_return / variance") print(f" Conformal gives you GUARANTEED coverage — no assumptions needed.")