| | """ |
| | MSCI Threshold Calibration |
| | |
| | Calibrates MSCI thresholds using ROC analysis to find optimal |
| | classification boundaries for "coherent" vs "incoherent" samples. |
| | |
| | Key analyses: |
| | - ROC curve: MSCI as classifier |
| | - AUC (Area Under Curve) |
| | - Optimal threshold via Youden's J statistic |
| | - Precision-Recall analysis |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple |
| | import numpy as np |
| | from scipy import stats |
| |
|
| |
|
| | @dataclass |
| | class CalibrationResult: |
| | """Result of threshold calibration.""" |
| | optimal_threshold: float |
| | youden_j: float |
| | auc: float |
| | sensitivity_at_optimal: float |
| | specificity_at_optimal: float |
| | precision_at_optimal: float |
| | f1_at_optimal: float |
| | roc_curve: Dict[str, List[float]] |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary.""" |
| | return { |
| | "optimal_threshold": self.optimal_threshold, |
| | "youden_j": self.youden_j, |
| | "auc": self.auc, |
| | "sensitivity_at_optimal": self.sensitivity_at_optimal, |
| | "specificity_at_optimal": self.specificity_at_optimal, |
| | "precision_at_optimal": self.precision_at_optimal, |
| | "f1_at_optimal": self.f1_at_optimal, |
| | "roc_curve": self.roc_curve, |
| | } |
| |
|
| |
|
| | class ThresholdCalibrator: |
| | """ |
| | Calibrates MSCI thresholds for coherence classification. |
| | |
| | Uses human judgments as the validation target to find optimal |
| | MSCI threshold that maximizes discrimination between coherent |
| | and incoherent samples. Note: human judgments serve as the |
| | best available reference, not absolute ground truth. |
| | """ |
| |
|
| | def __init__(self, human_threshold: float = 0.6): |
| | """ |
| | Initialize calibrator. |
| | |
| | Args: |
| | human_threshold: Human score above which sample is "coherent" |
| | (e.g., 0.6 = 3/5 or higher on Likert scale) |
| | """ |
| | self.human_threshold = human_threshold |
| |
|
| | def compute_roc_curve( |
| | self, |
| | msci_scores: List[float], |
| | human_scores: List[float], |
| | n_thresholds: int = 100, |
| | ) -> Tuple[List[float], List[float], List[float]]: |
| | """ |
| | Compute ROC curve points. |
| | |
| | Args: |
| | msci_scores: MSCI scores (predictor) |
| | human_scores: Human scores (validation target, normalized 0-1) |
| | n_thresholds: Number of threshold points |
| | |
| | Returns: |
| | Tuple of (thresholds, tpr_list, fpr_list) |
| | """ |
| | |
| | y_true = [1 if h >= self.human_threshold else 0 for h in human_scores] |
| |
|
| | |
| | min_msci = min(msci_scores) |
| | max_msci = max(msci_scores) |
| | thresholds = np.linspace(min_msci - 0.01, max_msci + 0.01, n_thresholds) |
| |
|
| | tpr_list = [] |
| | fpr_list = [] |
| |
|
| | for threshold in thresholds: |
| | |
| | y_pred = [1 if m >= threshold else 0 for m in msci_scores] |
| |
|
| | |
| | tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 1) |
| | fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 0) |
| | fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 1) |
| | tn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 0) |
| |
|
| | |
| | tpr = tp / (tp + fn) if (tp + fn) > 0 else 0 |
| | fpr = fp / (fp + tn) if (fp + tn) > 0 else 0 |
| |
|
| | tpr_list.append(tpr) |
| | fpr_list.append(fpr) |
| |
|
| | return list(thresholds), tpr_list, fpr_list |
| |
|
| | def compute_auc( |
| | self, |
| | fpr_list: List[float], |
| | tpr_list: List[float], |
| | ) -> float: |
| | """ |
| | Compute Area Under ROC Curve using trapezoidal rule. |
| | |
| | Args: |
| | fpr_list: False positive rates |
| | tpr_list: True positive rates |
| | |
| | Returns: |
| | AUC value |
| | """ |
| | |
| | sorted_points = sorted(zip(fpr_list, tpr_list)) |
| | sorted_fpr = [p[0] for p in sorted_points] |
| | sorted_tpr = [p[1] for p in sorted_points] |
| |
|
| | |
| | auc = 0.0 |
| | for i in range(1, len(sorted_fpr)): |
| | auc += (sorted_fpr[i] - sorted_fpr[i-1]) * (sorted_tpr[i] + sorted_tpr[i-1]) / 2 |
| |
|
| | return auc |
| |
|
| | def find_optimal_threshold( |
| | self, |
| | thresholds: List[float], |
| | tpr_list: List[float], |
| | fpr_list: List[float], |
| | ) -> Tuple[float, float, int]: |
| | """ |
| | Find optimal threshold using Youden's J statistic. |
| | |
| | J = sensitivity + specificity - 1 = TPR - FPR |
| | |
| | Args: |
| | thresholds: MSCI threshold values |
| | tpr_list: True positive rates |
| | fpr_list: False positive rates |
| | |
| | Returns: |
| | Tuple of (optimal_threshold, youden_j, optimal_index) |
| | """ |
| | youden_j = [tpr - fpr for tpr, fpr in zip(tpr_list, fpr_list)] |
| | optimal_idx = int(np.argmax(youden_j)) |
| |
|
| | return thresholds[optimal_idx], youden_j[optimal_idx], optimal_idx |
| |
|
| | def calibrate( |
| | self, |
| | msci_scores: List[float], |
| | human_scores: List[float], |
| | ) -> CalibrationResult: |
| | """ |
| | Perform full threshold calibration. |
| | |
| | Args: |
| | msci_scores: MSCI scores |
| | human_scores: Human coherence scores (normalized 0-1) |
| | |
| | Returns: |
| | CalibrationResult with optimal threshold and metrics |
| | """ |
| | if len(msci_scores) != len(human_scores): |
| | raise ValueError("Score lists must have same length") |
| |
|
| | if len(msci_scores) < 10: |
| | raise ValueError("Need at least 10 samples for calibration") |
| |
|
| | |
| | thresholds, tpr_list, fpr_list = self.compute_roc_curve( |
| | msci_scores, human_scores |
| | ) |
| |
|
| | |
| | auc = self.compute_auc(fpr_list, tpr_list) |
| |
|
| | |
| | optimal_threshold, youden_j, opt_idx = self.find_optimal_threshold( |
| | thresholds, tpr_list, fpr_list |
| | ) |
| |
|
| | |
| | sensitivity = tpr_list[opt_idx] |
| | specificity = 1 - fpr_list[opt_idx] |
| |
|
| | |
| | y_true = [1 if h >= self.human_threshold else 0 for h in human_scores] |
| | y_pred = [1 if m >= optimal_threshold else 0 for m in msci_scores] |
| |
|
| | tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 1) |
| | fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 1) |
| | fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 0) |
| |
|
| | precision = tp / (tp + fp) if (tp + fp) > 0 else 0 |
| | recall = sensitivity |
| | f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0 |
| |
|
| | return CalibrationResult( |
| | optimal_threshold=optimal_threshold, |
| | youden_j=youden_j, |
| | auc=auc, |
| | sensitivity_at_optimal=sensitivity, |
| | specificity_at_optimal=specificity, |
| | precision_at_optimal=precision, |
| | f1_at_optimal=f1, |
| | roc_curve={ |
| | "thresholds": thresholds, |
| | "tpr": tpr_list, |
| | "fpr": fpr_list, |
| | }, |
| | ) |
| |
|
| | def calibrate_from_human_eval( |
| | self, |
| | human_eval_path: Path, |
| | ) -> CalibrationResult: |
| | """ |
| | Calibrate from human evaluation session. |
| | |
| | Args: |
| | human_eval_path: Path to human evaluation session JSON |
| | |
| | Returns: |
| | CalibrationResult |
| | """ |
| | from src.evaluation.human_eval_schema import EvaluationSession |
| |
|
| | session = EvaluationSession.load(Path(human_eval_path)) |
| |
|
| | msci_scores = [] |
| | human_scores = [] |
| |
|
| | |
| | sample_msci = {s.sample_id: s.msci_score for s in session.samples if s.msci_score} |
| |
|
| | for eval in session.evaluations: |
| | if eval.is_rerating: |
| | continue |
| | if eval.sample_id not in sample_msci: |
| | continue |
| |
|
| | msci_scores.append(sample_msci[eval.sample_id]) |
| | human_scores.append(eval.weighted_score()) |
| |
|
| | return self.calibrate(msci_scores, human_scores) |
| |
|
| | def evaluate_thresholds( |
| | self, |
| | msci_scores: List[float], |
| | human_scores: List[float], |
| | thresholds: List[float], |
| | ) -> Dict[str, Dict[str, float]]: |
| | """ |
| | Evaluate classification performance at multiple thresholds. |
| | |
| | Args: |
| | msci_scores: MSCI scores |
| | human_scores: Human scores |
| | thresholds: Thresholds to evaluate |
| | |
| | Returns: |
| | Dict mapping threshold to performance metrics |
| | """ |
| | y_true = [1 if h >= self.human_threshold else 0 for h in human_scores] |
| | results = {} |
| |
|
| | for threshold in thresholds: |
| | y_pred = [1 if m >= threshold else 0 for m in msci_scores] |
| |
|
| | tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 1) |
| | tn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 0) |
| | fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 1) |
| | fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 0) |
| |
|
| | accuracy = (tp + tn) / len(y_true) if y_true else 0 |
| | precision = tp / (tp + fp) if (tp + fp) > 0 else 0 |
| | recall = tp / (tp + fn) if (tp + fn) > 0 else 0 |
| | f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0 |
| |
|
| | results[f"{threshold:.3f}"] = { |
| | "accuracy": accuracy, |
| | "precision": precision, |
| | "recall": recall, |
| | "f1": f1, |
| | "true_positives": tp, |
| | "true_negatives": tn, |
| | "false_positives": fp, |
| | "false_negatives": fn, |
| | } |
| |
|
| | return results |
| |
|
| | def generate_report( |
| | self, |
| | calibration_result: CalibrationResult, |
| | output_path: Optional[Path] = None, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Generate calibration report. |
| | |
| | Args: |
| | calibration_result: Result from calibrate() |
| | output_path: Optional path to save report |
| | |
| | Returns: |
| | Complete calibration report |
| | """ |
| | report = { |
| | "analysis_type": "MSCI Threshold Calibration", |
| | "purpose": "Find optimal MSCI threshold for coherence classification", |
| | "method": "ROC analysis with Youden's J optimization", |
| | "human_threshold": self.human_threshold, |
| | "results": calibration_result.to_dict(), |
| | } |
| |
|
| | |
| | auc = calibration_result.auc |
| | if auc >= 0.9: |
| | auc_interp = "Excellent discrimination" |
| | elif auc >= 0.8: |
| | auc_interp = "Good discrimination" |
| | elif auc >= 0.7: |
| | auc_interp = "Acceptable discrimination" |
| | elif auc >= 0.6: |
| | auc_interp = "Poor discrimination" |
| | else: |
| | auc_interp = "Failed discrimination (no better than chance)" |
| |
|
| | report["interpretation"] = { |
| | "auc_interpretation": auc_interp, |
| | "optimal_threshold": calibration_result.optimal_threshold, |
| | "threshold_usage": ( |
| | f"Samples with MSCI >= {calibration_result.optimal_threshold:.3f} " |
| | f"should be classified as 'coherent'" |
| | ), |
| | "expected_performance": { |
| | "sensitivity": f"{calibration_result.sensitivity_at_optimal:.1%} of coherent samples correctly identified", |
| | "specificity": f"{calibration_result.specificity_at_optimal:.1%} of incoherent samples correctly rejected", |
| | "precision": f"{calibration_result.precision_at_optimal:.1%} of 'coherent' predictions are correct", |
| | }, |
| | } |
| |
|
| | |
| | if auc >= 0.7: |
| | report["recommendations"] = [ |
| | f"Use MSCI threshold of {calibration_result.optimal_threshold:.3f} for binary classification", |
| | "MSCI provides meaningful discrimination between coherent and incoherent samples", |
| | ] |
| | else: |
| | report["recommendations"] = [ |
| | "MSCI alone may not reliably distinguish coherent from incoherent samples", |
| | "Consider combining MSCI with other metrics", |
| | "Human evaluation may be necessary for borderline cases", |
| | ] |
| |
|
| | if output_path: |
| | |
| | report_to_save = report.copy() |
| | if "roc_curve" in report_to_save.get("results", {}): |
| | report_to_save["results"] = report_to_save["results"].copy() |
| | del report_to_save["results"]["roc_curve"] |
| | report_to_save["results"]["roc_curve_note"] = "Excluded from file (100 points)" |
| |
|
| | output_path = Path(output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | with output_path.open("w", encoding="utf-8") as f: |
| | json.dump(report_to_save, f, indent=2, ensure_ascii=False) |
| |
|
| | return report |
| |
|