| | """ |
| | MSCI Sensitivity Analysis |
| | |
| | Tests whether MSCI is sensitive to controlled semantic perturbations. |
| | This addresses RQ1: "Is MSCI sensitive to controlled semantic perturbations?" |
| | |
| | Key tests: |
| | - Perturbation gradient: 0%, 25%, 50%, 75%, 100% semantic mismatch |
| | - Expected: monotonic MSCI decrease with increasing perturbation |
| | - If not monotonic: MSCI may be unreliable |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | from dataclasses import dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple |
| | import numpy as np |
| | from scipy import stats |
| |
|
| |
|
| | @dataclass |
| | class PerturbationLevel: |
| | """A single perturbation level result.""" |
| | level: float |
| | label: str |
| | msci_scores: List[float] = field(default_factory=list) |
| | n_samples: int = 0 |
| |
|
| | @property |
| | def mean_msci(self) -> float: |
| | """Mean MSCI at this perturbation level.""" |
| | return np.mean(self.msci_scores) if self.msci_scores else 0.0 |
| |
|
| | @property |
| | def std_msci(self) -> float: |
| | """Standard deviation of MSCI.""" |
| | return np.std(self.msci_scores, ddof=1) if len(self.msci_scores) > 1 else 0.0 |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary.""" |
| | return { |
| | "level": self.level, |
| | "label": self.label, |
| | "n_samples": len(self.msci_scores), |
| | "mean_msci": self.mean_msci, |
| | "std_msci": self.std_msci, |
| | "min_msci": min(self.msci_scores) if self.msci_scores else None, |
| | "max_msci": max(self.msci_scores) if self.msci_scores else None, |
| | } |
| |
|
| |
|
| | @dataclass |
| | class PerturbationGradient: |
| | """Results from a perturbation gradient analysis.""" |
| | levels: List[PerturbationLevel] |
| | is_monotonic: bool |
| | spearman_correlation: float |
| | spearman_p: float |
| | linear_slope: float |
| | r_squared: float |
| | sensitivity_score: float |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary.""" |
| | return { |
| | "levels": [l.to_dict() for l in self.levels], |
| | "is_monotonic": self.is_monotonic, |
| | "spearman_correlation": self.spearman_correlation, |
| | "spearman_p": self.spearman_p, |
| | "linear_slope": self.linear_slope, |
| | "r_squared": self.r_squared, |
| | "sensitivity_score": self.sensitivity_score, |
| | } |
| |
|
| |
|
| | class MSCISensitivityAnalyzer: |
| | """ |
| | Analyzes MSCI sensitivity to semantic perturbations. |
| | |
| | RQ1: "Is MSCI sensitive to controlled semantic perturbations?" |
| | H0: MSCI(baseline) = MSCI(perturbed) |
| | H1: MSCI(baseline) > MSCI(perturbed) |
| | """ |
| |
|
| | def __init__(self): |
| | self.results: Dict[str, Any] = {} |
| |
|
| | def analyze_perturbation_gradient( |
| | self, |
| | baseline_scores: List[float], |
| | perturbed_scores_by_level: Dict[float, List[float]], |
| | ) -> PerturbationGradient: |
| | """ |
| | Analyze MSCI response to perturbation gradient. |
| | |
| | Args: |
| | baseline_scores: MSCI scores for unperturbed samples |
| | perturbed_scores_by_level: Dict mapping perturbation level (0-1) to MSCI scores |
| | |
| | Returns: |
| | PerturbationGradient with analysis results |
| | """ |
| | |
| | levels = [ |
| | PerturbationLevel( |
| | level=0.0, |
| | label="baseline", |
| | msci_scores=baseline_scores, |
| | ) |
| | ] |
| |
|
| | for level, scores in sorted(perturbed_scores_by_level.items()): |
| | levels.append( |
| | PerturbationLevel( |
| | level=level, |
| | label=f"{int(level * 100)}% perturbation", |
| | msci_scores=scores, |
| | ) |
| | ) |
| |
|
| | |
| | means = [l.mean_msci for l in levels] |
| | is_monotonic = all(means[i] >= means[i + 1] for i in range(len(means) - 1)) |
| |
|
| | |
| | all_levels = [] |
| | all_scores = [] |
| | for level in levels: |
| | for score in level.msci_scores: |
| | all_levels.append(level.level) |
| | all_scores.append(score) |
| |
|
| | if len(all_scores) >= 3: |
| | spearman_result = stats.spearmanr(all_levels, all_scores) |
| | spearman_rho = spearman_result.correlation |
| | spearman_p = spearman_result.pvalue |
| |
|
| | |
| | slope, intercept, r_value, p_value, std_err = stats.linregress( |
| | all_levels, all_scores |
| | ) |
| | r_squared = r_value ** 2 |
| | else: |
| | spearman_rho = 0.0 |
| | spearman_p = 1.0 |
| | slope = 0.0 |
| | r_squared = 0.0 |
| |
|
| | |
| | sensitivity = 0.0 |
| | if spearman_rho < 0: |
| | sensitivity = abs(spearman_rho) |
| | if not is_monotonic: |
| | sensitivity *= 0.5 |
| |
|
| | return PerturbationGradient( |
| | levels=levels, |
| | is_monotonic=is_monotonic, |
| | spearman_correlation=spearman_rho, |
| | spearman_p=spearman_p, |
| | linear_slope=slope, |
| | r_squared=r_squared, |
| | sensitivity_score=sensitivity, |
| | ) |
| |
|
| | def paired_sensitivity_test( |
| | self, |
| | baseline_scores: List[float], |
| | perturbed_scores: List[float], |
| | alpha: float = 0.05, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Perform paired t-test for sensitivity. |
| | |
| | Tests H0: MSCI(baseline) = MSCI(perturbed) |
| | vs H1: MSCI(baseline) > MSCI(perturbed) |
| | |
| | Args: |
| | baseline_scores: MSCI scores for baseline (same prompts) |
| | perturbed_scores: MSCI scores for perturbed (same prompts) |
| | alpha: Significance level |
| | |
| | Returns: |
| | Dictionary with test results |
| | """ |
| | from src.experiments.statistical_analysis import paired_ttest, compute_effect_size |
| |
|
| | if len(baseline_scores) != len(perturbed_scores): |
| | raise ValueError("Baseline and perturbed must have same length for paired test") |
| |
|
| | |
| | result = paired_ttest( |
| | baseline_scores, |
| | perturbed_scores, |
| | alpha=alpha, |
| | alternative="greater", |
| | ) |
| |
|
| | |
| | effect_size = compute_effect_size(baseline_scores, perturbed_scores, paired=True) |
| |
|
| | |
| | baseline_mean = np.mean(baseline_scores) |
| | perturbed_mean = np.mean(perturbed_scores) |
| | mean_drop = baseline_mean - perturbed_mean |
| | percent_drop = (mean_drop / baseline_mean * 100) if baseline_mean > 0 else 0 |
| |
|
| | return { |
| | "test": "paired_t_test_one_sided", |
| | "hypothesis": "H1: MSCI(baseline) > MSCI(perturbed)", |
| | "n": len(baseline_scores), |
| | "baseline_mean": baseline_mean, |
| | "perturbed_mean": perturbed_mean, |
| | "mean_drop": mean_drop, |
| | "percent_drop": percent_drop, |
| | "t_statistic": result.statistic, |
| | "p_value": result.p_value, |
| | "effect_size_d": effect_size, |
| | "significant": result.significant, |
| | "interpretation": self._interpret_sensitivity( |
| | result.significant, effect_size, percent_drop |
| | ), |
| | } |
| |
|
| | def _interpret_sensitivity( |
| | self, |
| | significant: bool, |
| | effect_size: float, |
| | percent_drop: float, |
| | ) -> str: |
| | """Generate interpretation of sensitivity test.""" |
| | if not significant: |
| | return "MSCI is NOT significantly sensitive to this perturbation (H0 not rejected)" |
| |
|
| | if effect_size > 0.8: |
| | strength = "highly" |
| | elif effect_size > 0.5: |
| | strength = "moderately" |
| | else: |
| | strength = "weakly" |
| |
|
| | return ( |
| | f"MSCI is {strength} sensitive to perturbation " |
| | f"(d={effect_size:.2f}, {percent_drop:.1f}% drop)" |
| | ) |
| |
|
| | def analyze_from_experiment_results( |
| | self, |
| | results_path: Path, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Analyze sensitivity from experiment results JSON. |
| | |
| | Args: |
| | results_path: Path to experiment_results.json |
| | |
| | Returns: |
| | Sensitivity analysis results |
| | """ |
| | with Path(results_path).open("r", encoding="utf-8") as f: |
| | data = json.load(f) |
| |
|
| | |
| | raw_results = data.get("raw_results", []) |
| | scores_by_condition: Dict[str, List[float]] = {} |
| |
|
| | for result in raw_results: |
| | if not result.get("success"): |
| | continue |
| | condition = result.get("condition", "") |
| | msci = result.get("scores", {}).get("msci") |
| | if msci is not None and condition: |
| | if condition not in scores_by_condition: |
| | scores_by_condition[condition] = [] |
| | scores_by_condition[condition].append(msci) |
| |
|
| | |
| | analyses = {} |
| |
|
| | |
| | baseline_conditions = [c for c in scores_by_condition if "baseline" in c] |
| |
|
| | for baseline_cond in baseline_conditions: |
| | mode = baseline_cond.replace("_baseline", "") |
| | baseline_scores = scores_by_condition[baseline_cond] |
| |
|
| | |
| | for cond, scores in scores_by_condition.items(): |
| | if cond == baseline_cond: |
| | continue |
| | if not cond.startswith(mode): |
| | continue |
| |
|
| | |
| | n = min(len(baseline_scores), len(scores)) |
| | if n >= 3: |
| | test_result = self.paired_sensitivity_test( |
| | baseline_scores[:n], scores[:n] |
| | ) |
| | analyses[f"{baseline_cond}_vs_{cond}"] = test_result |
| |
|
| | return { |
| | "source": str(results_path), |
| | "conditions_analyzed": list(scores_by_condition.keys()), |
| | "sensitivity_tests": analyses, |
| | "summary": self._summarize_sensitivity(analyses), |
| | } |
| |
|
| | def _summarize_sensitivity(self, analyses: Dict[str, Dict]) -> Dict[str, Any]: |
| | """Summarize sensitivity results.""" |
| | if not analyses: |
| | return {"conclusion": "No sensitivity tests performed"} |
| |
|
| | n_significant = sum(1 for a in analyses.values() if a.get("significant")) |
| | n_total = len(analyses) |
| |
|
| | avg_effect = np.mean([ |
| | a.get("effect_size_d", 0) for a in analyses.values() |
| | ]) |
| | avg_drop = np.mean([ |
| | a.get("percent_drop", 0) for a in analyses.values() |
| | ]) |
| |
|
| | if n_significant == n_total and avg_effect > 0.5: |
| | verdict = "STRONG SENSITIVITY: MSCI reliably detects perturbations" |
| | elif n_significant > n_total / 2: |
| | verdict = "MODERATE SENSITIVITY: MSCI detects most perturbations" |
| | elif n_significant > 0: |
| | verdict = "WEAK SENSITIVITY: MSCI detects some perturbations" |
| | else: |
| | verdict = "NO SENSITIVITY: MSCI fails to detect perturbations" |
| |
|
| | return { |
| | "n_tests": n_total, |
| | "n_significant": n_significant, |
| | "sensitivity_rate": n_significant / n_total if n_total > 0 else 0, |
| | "average_effect_size": avg_effect, |
| | "average_percent_drop": avg_drop, |
| | "verdict": verdict, |
| | } |
| |
|
| | def generate_report( |
| | self, |
| | gradient_result: Optional[PerturbationGradient] = None, |
| | sensitivity_tests: Optional[Dict[str, Dict]] = None, |
| | output_path: Optional[Path] = None, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Generate comprehensive sensitivity report. |
| | |
| | Args: |
| | gradient_result: Optional perturbation gradient analysis |
| | sensitivity_tests: Optional dict of sensitivity test results |
| | output_path: Optional path to save report |
| | |
| | Returns: |
| | Complete sensitivity report |
| | """ |
| | report = { |
| | "analysis_type": "MSCI Sensitivity Analysis", |
| | "research_question": "RQ1: Is MSCI sensitive to controlled semantic perturbations?", |
| | "hypothesis": { |
| | "H0": "MSCI(baseline) = MSCI(perturbed)", |
| | "H1": "MSCI(baseline) > MSCI(perturbed)", |
| | }, |
| | } |
| |
|
| | if gradient_result: |
| | report["gradient_analysis"] = gradient_result.to_dict() |
| | report["gradient_verdict"] = ( |
| | "PASS: MSCI shows monotonic decrease with perturbation" |
| | if gradient_result.is_monotonic and gradient_result.sensitivity_score > 0.5 |
| | else "FAIL: MSCI does not reliably track perturbation level" |
| | ) |
| |
|
| | if sensitivity_tests: |
| | report["sensitivity_tests"] = sensitivity_tests |
| | report["summary"] = self._summarize_sensitivity(sensitivity_tests) |
| |
|
| | |
| | verdicts = [] |
| | if gradient_result: |
| | verdicts.append(gradient_result.sensitivity_score > 0.5) |
| | if sensitivity_tests: |
| | summary = report.get("summary", {}) |
| | verdicts.append(summary.get("sensitivity_rate", 0) > 0.5) |
| |
|
| | if verdicts: |
| | report["rq1_verdict"] = ( |
| | "SUPPORTED: MSCI is sensitive to semantic perturbations" |
| | if all(verdicts) |
| | else "PARTIALLY SUPPORTED: Mixed sensitivity results" |
| | if any(verdicts) |
| | else "NOT SUPPORTED: MSCI is not reliably sensitive" |
| | ) |
| |
|
| | if output_path: |
| | output_path = Path(output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | with output_path.open("w", encoding="utf-8") as f: |
| | json.dump(report, f, indent=2, ensure_ascii=False) |
| |
|
| | return report |
| |
|