| """ |
| Distribution Normalization for cMSCI. |
| |
| Scores from different embedding spaces (CLIP vs CLAP) and different |
| pairwise channels (st_i, st_a, gram_volume) have different natural |
| distributions. Z-score normalization makes them comparable. |
| |
| The ReferenceDistribution class fits mean/std from existing experiment |
| data and normalizes new scores to z-scores or percentile ranks. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| import numpy as np |
| from scipy import stats as sp_stats |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ReferenceDistribution: |
| """ |
| Stores mean/std for a single score channel and provides normalization. |
| |
| Usage: |
| ref = ReferenceDistribution() |
| ref.fit(list_of_scores) |
| z = ref.normalize(new_score) # z-score |
| p = ref.percentile(new_score) # percentile rank [0, 1] |
| """ |
|
|
| def __init__(self, name: str = ""): |
| self.name = name |
| self.mean: float = 0.0 |
| self.std: float = 1.0 |
| self.n: int = 0 |
| self._sorted_values: Optional[np.ndarray] = None |
|
|
| def fit(self, scores: List[float]) -> None: |
| """Fit the distribution from a list of observed scores.""" |
| arr = np.array(scores, dtype=np.float64) |
| self.n = len(arr) |
| self.mean = float(np.mean(arr)) |
| self.std = float(np.std(arr, ddof=1)) if self.n > 1 else 1.0 |
| if self.std < 1e-10: |
| self.std = 1.0 |
| self._sorted_values = np.sort(arr) |
|
|
| def normalize(self, score: float) -> float: |
| """Z-score normalization: (score - mean) / std.""" |
| return float((score - self.mean) / self.std) |
|
|
| def percentile(self, score: float) -> float: |
| """ |
| Percentile rank of score within the reference distribution. |
| |
| Returns a value in [0, 1] where 0.5 = median of reference. |
| """ |
| if self._sorted_values is None or len(self._sorted_values) == 0: |
| return 0.5 |
| rank = np.searchsorted(self._sorted_values, score, side="right") |
| return float(rank / len(self._sorted_values)) |
|
|
| def to_dict(self) -> Dict: |
| return { |
| "name": self.name, |
| "mean": self.mean, |
| "std": self.std, |
| "n": self.n, |
| } |
|
|
| @classmethod |
| def from_dict(cls, d: Dict) -> "ReferenceDistribution": |
| obj = cls(name=d.get("name", "")) |
| obj.mean = d["mean"] |
| obj.std = d["std"] |
| obj.n = d.get("n", 0) |
| return obj |
|
|
| def save(self, path: str) -> None: |
| with open(path, "w") as f: |
| json.dump(self.to_dict(), f, indent=2) |
|
|
| @classmethod |
| def load(cls, path: str) -> "ReferenceDistribution": |
| with open(path) as f: |
| return cls.from_dict(json.load(f)) |
|
|
|
|
| class CalibrationStore: |
| """ |
| Collection of ReferenceDistributions for all score channels. |
| |
| Provides save/load for the full calibration state. |
| """ |
|
|
| def __init__(self): |
| self.distributions: Dict[str, ReferenceDistribution] = {} |
|
|
| def add(self, name: str, scores: List[float]) -> ReferenceDistribution: |
| ref = ReferenceDistribution(name=name) |
| ref.fit(scores) |
| self.distributions[name] = ref |
| logger.info( |
| "Calibration[%s]: mean=%.4f, std=%.4f, n=%d", |
| name, ref.mean, ref.std, ref.n, |
| ) |
| return ref |
|
|
| def normalize(self, name: str, score: float) -> float: |
| if name not in self.distributions: |
| return score |
| return self.distributions[name].normalize(score) |
|
|
| def percentile(self, name: str, score: float) -> float: |
| if name not in self.distributions: |
| return 0.5 |
| return self.distributions[name].percentile(score) |
|
|
| def save(self, path: str) -> None: |
| data = {name: ref.to_dict() for name, ref in self.distributions.items()} |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| json.dump(data, f, indent=2) |
| logger.info("Calibration saved to %s", path) |
|
|
| @classmethod |
| def load(cls, path: str) -> "CalibrationStore": |
| store = cls() |
| with open(path) as f: |
| data = json.load(f) |
| for name, d in data.items(): |
| store.distributions[name] = ReferenceDistribution.from_dict(d) |
| logger.info("Calibration loaded from %s (%d channels)", path, len(store.distributions)) |
| return store |
|
|
|
|
| def has_channel(store: CalibrationStore, name: str) -> bool: |
| """Check if a calibration channel exists in the store.""" |
| return name in store.distributions |
|
|
|
|
| def extend_calibration_with_exmcr( |
| store: CalibrationStore, |
| gram_coh_ia_scores: List[float], |
| gram_coh_tia_scores: Optional[List[float]] = None, |
| ) -> CalibrationStore: |
| """ |
| Extend calibration store with ExMCR-derived channels. |
| |
| Args: |
| store: Existing CalibrationStore to extend. |
| gram_coh_ia_scores: Gram coherence of (image_clip, ExMCR(audio_clap)) pairs. |
| gram_coh_tia_scores: Optional 3-way gram coherence of (text, image, ExMCR(audio)). |
| |
| Returns: |
| Extended CalibrationStore (same object, modified in place). |
| """ |
| if gram_coh_ia_scores: |
| store.add("gram_coh_ia_exmcr", gram_coh_ia_scores) |
| if gram_coh_tia_scores: |
| store.add("gram_coh_tia", gram_coh_tia_scores) |
| return store |
|
|
|
|
| def extend_calibration_with_uncertainty( |
| store: CalibrationStore, |
| uncertainty_ti_scores: List[float], |
| uncertainty_ta_scores: Optional[List[float]] = None, |
| ) -> CalibrationStore: |
| """ |
| Extend calibration store with ProbVLM uncertainty channels. |
| |
| Args: |
| store: Existing CalibrationStore to extend. |
| uncertainty_ti_scores: Per-sample mean uncertainty for text-image (CLIP adapter). |
| uncertainty_ta_scores: Per-sample mean uncertainty for text-audio (CLAP adapter). |
| |
| Returns: |
| Extended CalibrationStore (same object, modified in place). |
| """ |
| if uncertainty_ti_scores: |
| store.add("uncertainty_ti", uncertainty_ti_scores) |
| if uncertainty_ta_scores: |
| store.add("uncertainty_ta", uncertainty_ta_scores) |
| |
| if uncertainty_ti_scores and uncertainty_ta_scores: |
| combined = [ |
| (ti + ta) / 2.0 |
| for ti, ta in zip(uncertainty_ti_scores, uncertainty_ta_scores) |
| ] |
| store.add("uncertainty_mean", combined) |
| return store |
|
|
|
|
| def build_reference_distributions( |
| rq1_results_path: str, |
| ) -> CalibrationStore: |
| """ |
| Build reference distributions from existing RQ1 baseline results. |
| |
| Extracts st_i, st_a, and msci scores from baseline condition only |
| (matched image + audio), fitting a distribution for each channel. |
| |
| Args: |
| rq1_results_path: Path to rq1_results.json |
| |
| Returns: |
| CalibrationStore with fitted distributions for st_i, st_a, msci |
| """ |
| with open(rq1_results_path) as f: |
| data = json.load(f) |
|
|
| st_i_scores = [] |
| st_a_scores = [] |
| msci_scores = [] |
|
|
| for r in data["results"]: |
| if r.get("condition") != "baseline": |
| continue |
| if r.get("st_i") is not None: |
| st_i_scores.append(r["st_i"]) |
| if r.get("st_a") is not None: |
| st_a_scores.append(r["st_a"]) |
| if r.get("msci") is not None: |
| msci_scores.append(r["msci"]) |
|
|
| store = CalibrationStore() |
| if st_i_scores: |
| store.add("st_i", st_i_scores) |
| if st_a_scores: |
| store.add("st_a", st_a_scores) |
| if msci_scores: |
| store.add("msci", msci_scores) |
|
|
| |
| |
| if st_i_scores: |
| gram_coh_ti = [1.0 - np.sqrt(max(0, 1 - s**2)) for s in st_i_scores] |
| store.add("gram_coh_ti", gram_coh_ti) |
| if st_a_scores: |
| gram_coh_ta = [1.0 - np.sqrt(max(0, 1 - s**2)) for s in st_a_scores] |
| store.add("gram_coh_ta", gram_coh_ta) |
|
|
| return store |
|
|