Spaces:
Sleeping
Sleeping
| # Generated by Claude Code — 2026-02-13 | |
| """Conformal prediction for calibrated risk bounds. | |
| Provides distribution-free prediction sets with guaranteed marginal coverage: | |
| P(true_label ∈ prediction_set) ≥ 1 - alpha | |
| This directly addresses NASA CARA's criticism about uncertainty quantification | |
| in ML-based collision risk assessment. Instead of a single probability, we | |
| output a prediction set (e.g., {LOW, MODERATE}) that provably covers the | |
| true risk tier at the specified confidence level. | |
| Method: Split conformal prediction (Vovk et al. 2005, Lei et al. 2018) | |
| - Calibrate on a held-out set separate from training AND model selection | |
| - Compute nonconformity scores | |
| - Use quantile of calibration scores to construct prediction sets at test time | |
| References: | |
| - Vovk, Gammerman, Shafer (2005) "Algorithmic Learning in a Random World" | |
| - Lei et al. (2018) "Distribution-Free Predictive Inference for Regression" | |
| - Angelopoulos & Bates (2021) "A Gentle Introduction to Conformal Prediction" | |
| """ | |
| import numpy as np | |
| from dataclasses import dataclass | |
| class ConformalResult: | |
| """Result of conformal prediction for a single example.""" | |
| prediction_set: list[str] # e.g., ["LOW", "MODERATE"] | |
| set_size: int # |prediction_set| | |
| risk_prob: float # raw model probability | |
| lower_bound: float # lower probability bound | |
| upper_bound: float # upper probability bound | |
| class ConformalPredictor: | |
| """Split conformal prediction for binary risk classification. | |
| Workflow: | |
| 1. Train model on training set | |
| 2. Select model (early stopping) on validation set | |
| 3. calibrate() on a SEPARATE calibration set (held out from validation) | |
| 4. predict() on test data with coverage guarantee | |
| The calibration set must NOT be used for training or model selection, | |
| otherwise the coverage guarantee is invalidated. | |
| """ | |
| # Risk tiers with thresholds | |
| TIERS = { | |
| "LOW": (0.0, 0.10), | |
| "MODERATE": (0.10, 0.40), | |
| "HIGH": (0.40, 0.70), | |
| "CRITICAL": (0.70, 1.0), | |
| } | |
| def __init__(self): | |
| self.quantile_lower = None # q_hat for lower bound | |
| self.quantile_upper = None # q_hat for upper bound | |
| self.alpha = None | |
| self.n_cal = 0 | |
| self.is_calibrated = False | |
| def calibrate( | |
| self, | |
| cal_probs: np.ndarray, | |
| cal_labels: np.ndarray, | |
| alpha: float = 0.10, | |
| ) -> dict: | |
| """Calibrate conformal predictor on held-out calibration set. | |
| Args: | |
| cal_probs: Model predicted probabilities on calibration set, shape (n,) | |
| cal_labels: True binary labels on calibration set, shape (n,) | |
| alpha: Desired miscoverage rate. 1-alpha = coverage level. | |
| alpha=0.10 → 90% coverage guarantee. | |
| Returns: | |
| Calibration summary dict with quantiles and statistics | |
| """ | |
| n = len(cal_probs) | |
| if n < 10: | |
| raise ValueError(f"Calibration set too small: {n} examples (need >= 10)") | |
| self.alpha = alpha | |
| self.n_cal = n | |
| # Nonconformity score: how "wrong" is the model on each calibration example? | |
| # For binary classification with probabilities: | |
| # score = 1 - P(true class) | |
| # High score = model is wrong/uncertain | |
| scores = np.where( | |
| cal_labels == 1, | |
| 1.0 - cal_probs, # positive: score = 1 - P(positive) | |
| cal_probs, # negative: score = P(positive) = 1 - P(negative) | |
| ) | |
| # Conformal quantile: includes finite-sample correction | |
| # q_hat = ceil((n+1)(1-alpha))/n -th quantile of scores | |
| adjusted_level = np.ceil((n + 1) * (1 - alpha)) / n | |
| adjusted_level = min(adjusted_level, 1.0) | |
| self.q_hat = float(np.quantile(scores, adjusted_level)) | |
| # For prediction intervals on the probability itself: | |
| # We also compute quantiles for constructing upper/lower prob bounds | |
| # Using calibration residuals: |P(positive) - is_positive| | |
| residuals = np.abs(cal_probs - cal_labels.astype(float)) | |
| self.q_residual = float(np.quantile(residuals, adjusted_level)) | |
| self.is_calibrated = True | |
| # Report calibration statistics | |
| empirical_coverage = np.mean(scores <= self.q_hat) | |
| summary = { | |
| "alpha": alpha, | |
| "target_coverage": 1 - alpha, | |
| "n_calibration": n, | |
| "q_hat": self.q_hat, | |
| "q_residual": self.q_residual, | |
| "empirical_coverage_cal": float(empirical_coverage), | |
| "mean_score": float(scores.mean()), | |
| "median_score": float(np.median(scores)), | |
| "cal_pos_rate": float(cal_labels.mean()), | |
| } | |
| print(f" Conformal calibration (alpha={alpha}):") | |
| print(f" Calibration set: {n} examples ({cal_labels.sum():.0f} positive)") | |
| print(f" q_hat (nonconformity): {self.q_hat:.4f}") | |
| print(f" q_residual: {self.q_residual:.4f}") | |
| print(f" Empirical coverage (cal): {empirical_coverage:.4f}") | |
| return summary | |
| def predict(self, test_probs: np.ndarray) -> list[ConformalResult]: | |
| """Produce conformal prediction sets for test examples. | |
| For each test example, returns: | |
| - Prediction set: set of risk tiers that could contain the true risk | |
| - Probability bounds: [lower, upper] interval on the true probability | |
| Coverage guarantee: P(true_tier ∈ prediction_set) ≥ 1 - alpha | |
| """ | |
| if not self.is_calibrated: | |
| raise RuntimeError("Must call calibrate() before predict()") | |
| results = [] | |
| for p in test_probs: | |
| # Probability bounds from residual quantile | |
| lower = max(0.0, p - self.q_residual) | |
| upper = min(1.0, p + self.q_residual) | |
| # Prediction set: all tiers that overlap with [lower, upper] | |
| pred_set = [] | |
| for tier_name, (tier_lo, tier_hi) in self.TIERS.items(): | |
| if lower < tier_hi and upper > tier_lo: | |
| pred_set.append(tier_name) | |
| results.append(ConformalResult( | |
| prediction_set=pred_set, | |
| set_size=len(pred_set), | |
| risk_prob=float(p), | |
| lower_bound=lower, | |
| upper_bound=upper, | |
| )) | |
| return results | |
| def evaluate( | |
| self, | |
| test_probs: np.ndarray, | |
| test_labels: np.ndarray, | |
| ) -> dict: | |
| """Evaluate conformal prediction on test set. | |
| Reports: | |
| - Marginal coverage: fraction of test examples where true label | |
| falls within prediction set | |
| - Average set size: how informative are the predictions | |
| - Coverage by tier: per-tier coverage (conditional coverage) | |
| - Efficiency: 1 - (avg_set_size / n_tiers) | |
| """ | |
| if not self.is_calibrated: | |
| raise RuntimeError("Must call calibrate() before evaluate()") | |
| results = self.predict(test_probs) | |
| # Map labels to tiers for coverage check | |
| def label_to_tier(prob: float) -> str: | |
| for tier_name, (lo, hi) in self.TIERS.items(): | |
| if lo <= prob < hi: | |
| return tier_name | |
| return "CRITICAL" # prob == 1.0 | |
| # True "tier" based on actual probability (binary: 0 or 1) | |
| true_tiers = [label_to_tier(float(l)) for l in test_labels] | |
| # Marginal coverage: does the prediction set contain the true tier? | |
| covered = [ | |
| true_tier in result.prediction_set | |
| for true_tier, result in zip(true_tiers, results) | |
| ] | |
| marginal_coverage = np.mean(covered) | |
| # Average set size | |
| set_sizes = [r.set_size for r in results] | |
| avg_set_size = np.mean(set_sizes) | |
| # Coverage by true label value | |
| pos_mask = test_labels == 1 | |
| neg_mask = test_labels == 0 | |
| pos_coverage = np.mean([c for c, m in zip(covered, pos_mask) if m]) if pos_mask.sum() > 0 else 0.0 | |
| neg_coverage = np.mean([c for c, m in zip(covered, neg_mask) if m]) if neg_mask.sum() > 0 else 0.0 | |
| # Set size distribution | |
| size_counts = {} | |
| for s in set_sizes: | |
| size_counts[s] = size_counts.get(s, 0) + 1 | |
| # Efficiency: lower set sizes = more informative | |
| efficiency = 1.0 - (avg_set_size / len(self.TIERS)) | |
| # Interval width statistics | |
| widths = [r.upper_bound - r.lower_bound for r in results] | |
| metrics = { | |
| "alpha": self.alpha, | |
| "target_coverage": 1 - self.alpha, | |
| "marginal_coverage": float(marginal_coverage), | |
| "coverage_guarantee_met": bool(marginal_coverage >= (1 - self.alpha - 0.01)), | |
| "avg_set_size": float(avg_set_size), | |
| "efficiency": float(efficiency), | |
| "positive_coverage": float(pos_coverage), | |
| "negative_coverage": float(neg_coverage), | |
| "set_size_distribution": {str(k): v for k, v in sorted(size_counts.items())}, | |
| "n_test": len(test_labels), | |
| "mean_interval_width": float(np.mean(widths)), | |
| "median_interval_width": float(np.median(widths)), | |
| } | |
| print(f"\n Conformal Prediction Evaluation (alpha={self.alpha}):") | |
| print(f" Target coverage: {1 - self.alpha:.1%}") | |
| print(f" Marginal coverage: {marginal_coverage:.1%} " | |
| f"{'OK' if metrics['coverage_guarantee_met'] else 'VIOLATION'}") | |
| print(f" Positive coverage: {pos_coverage:.1%}") | |
| print(f" Negative coverage: {neg_coverage:.1%}") | |
| print(f" Avg set size: {avg_set_size:.2f} / {len(self.TIERS)} tiers") | |
| print(f" Efficiency: {efficiency:.1%}") | |
| print(f" Mean interval: [{np.mean([r.lower_bound for r in results]):.3f}, " | |
| f"{np.mean([r.upper_bound for r in results]):.3f}]") | |
| print(f" Set size dist: {size_counts}") | |
| return metrics | |
| def save_state(self) -> dict: | |
| """Serialize calibration state for checkpoint saving.""" | |
| if not self.is_calibrated: | |
| return {"is_calibrated": False} | |
| return { | |
| "is_calibrated": True, | |
| "alpha": self.alpha, | |
| "q_hat": self.q_hat, | |
| "q_residual": self.q_residual, | |
| "n_cal": self.n_cal, | |
| "tiers": {k: list(v) for k, v in self.TIERS.items()}, | |
| } | |
| def from_state(cls, state: dict) -> "ConformalPredictor": | |
| """Restore from serialized state.""" | |
| obj = cls() | |
| if state.get("is_calibrated", False): | |
| obj.alpha = state["alpha"] | |
| obj.q_hat = state["q_hat"] | |
| obj.q_residual = state["q_residual"] | |
| obj.n_cal = state["n_cal"] | |
| obj.is_calibrated = True | |
| return obj | |
| def run_conformal_at_multiple_levels( | |
| cal_probs: np.ndarray, | |
| cal_labels: np.ndarray, | |
| test_probs: np.ndarray, | |
| test_labels: np.ndarray, | |
| alphas: list[float] = None, | |
| ) -> dict: | |
| """Run conformal prediction at multiple coverage levels. | |
| Useful for reporting: "at 90% coverage, avg set size = X; | |
| at 95%, avg set size = Y; at 99%, avg set size = Z" | |
| """ | |
| if alphas is None: | |
| alphas = [0.01, 0.05, 0.10, 0.20] | |
| all_results = {} | |
| for alpha in alphas: | |
| cp = ConformalPredictor() | |
| cp.calibrate(cal_probs, cal_labels, alpha=alpha) | |
| eval_metrics = cp.evaluate(test_probs, test_labels) | |
| all_results[f"alpha_{alpha}"] = { | |
| "conformal_metrics": eval_metrics, | |
| "conformal_state": cp.save_state(), | |
| } | |
| return all_results | |