pratik-250620's picture
Upload folder using huggingface_hub
358d3bc verified
"""
Distribution Normalization for cMSCI.
Scores from different embedding spaces (CLIP vs CLAP) and different
pairwise channels (st_i, st_a, gram_volume) have different natural
distributions. Z-score normalization makes them comparable.
The ReferenceDistribution class fits mean/std from existing experiment
data and normalizes new scores to z-scores or percentile ranks.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from scipy import stats as sp_stats
logger = logging.getLogger(__name__)
class ReferenceDistribution:
"""
Stores mean/std for a single score channel and provides normalization.
Usage:
ref = ReferenceDistribution()
ref.fit(list_of_scores)
z = ref.normalize(new_score) # z-score
p = ref.percentile(new_score) # percentile rank [0, 1]
"""
def __init__(self, name: str = ""):
self.name = name
self.mean: float = 0.0
self.std: float = 1.0
self.n: int = 0
self._sorted_values: Optional[np.ndarray] = None
def fit(self, scores: List[float]) -> None:
"""Fit the distribution from a list of observed scores."""
arr = np.array(scores, dtype=np.float64)
self.n = len(arr)
self.mean = float(np.mean(arr))
self.std = float(np.std(arr, ddof=1)) if self.n > 1 else 1.0
if self.std < 1e-10:
self.std = 1.0
self._sorted_values = np.sort(arr)
def normalize(self, score: float) -> float:
"""Z-score normalization: (score - mean) / std."""
return float((score - self.mean) / self.std)
def percentile(self, score: float) -> float:
"""
Percentile rank of score within the reference distribution.
Returns a value in [0, 1] where 0.5 = median of reference.
"""
if self._sorted_values is None or len(self._sorted_values) == 0:
return 0.5
rank = np.searchsorted(self._sorted_values, score, side="right")
return float(rank / len(self._sorted_values))
def to_dict(self) -> Dict:
return {
"name": self.name,
"mean": self.mean,
"std": self.std,
"n": self.n,
}
@classmethod
def from_dict(cls, d: Dict) -> "ReferenceDistribution":
obj = cls(name=d.get("name", ""))
obj.mean = d["mean"]
obj.std = d["std"]
obj.n = d.get("n", 0)
return obj
def save(self, path: str) -> None:
with open(path, "w") as f:
json.dump(self.to_dict(), f, indent=2)
@classmethod
def load(cls, path: str) -> "ReferenceDistribution":
with open(path) as f:
return cls.from_dict(json.load(f))
class CalibrationStore:
"""
Collection of ReferenceDistributions for all score channels.
Provides save/load for the full calibration state.
"""
def __init__(self):
self.distributions: Dict[str, ReferenceDistribution] = {}
def add(self, name: str, scores: List[float]) -> ReferenceDistribution:
ref = ReferenceDistribution(name=name)
ref.fit(scores)
self.distributions[name] = ref
logger.info(
"Calibration[%s]: mean=%.4f, std=%.4f, n=%d",
name, ref.mean, ref.std, ref.n,
)
return ref
def normalize(self, name: str, score: float) -> float:
if name not in self.distributions:
return score
return self.distributions[name].normalize(score)
def percentile(self, name: str, score: float) -> float:
if name not in self.distributions:
return 0.5
return self.distributions[name].percentile(score)
def save(self, path: str) -> None:
data = {name: ref.to_dict() for name, ref in self.distributions.items()}
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
logger.info("Calibration saved to %s", path)
@classmethod
def load(cls, path: str) -> "CalibrationStore":
store = cls()
with open(path) as f:
data = json.load(f)
for name, d in data.items():
store.distributions[name] = ReferenceDistribution.from_dict(d)
logger.info("Calibration loaded from %s (%d channels)", path, len(store.distributions))
return store
def has_channel(store: CalibrationStore, name: str) -> bool:
"""Check if a calibration channel exists in the store."""
return name in store.distributions
def extend_calibration_with_exmcr(
store: CalibrationStore,
gram_coh_ia_scores: List[float],
gram_coh_tia_scores: Optional[List[float]] = None,
) -> CalibrationStore:
"""
Extend calibration store with ExMCR-derived channels.
Args:
store: Existing CalibrationStore to extend.
gram_coh_ia_scores: Gram coherence of (image_clip, ExMCR(audio_clap)) pairs.
gram_coh_tia_scores: Optional 3-way gram coherence of (text, image, ExMCR(audio)).
Returns:
Extended CalibrationStore (same object, modified in place).
"""
if gram_coh_ia_scores:
store.add("gram_coh_ia_exmcr", gram_coh_ia_scores)
if gram_coh_tia_scores:
store.add("gram_coh_tia", gram_coh_tia_scores)
return store
def extend_calibration_with_uncertainty(
store: CalibrationStore,
uncertainty_ti_scores: List[float],
uncertainty_ta_scores: Optional[List[float]] = None,
) -> CalibrationStore:
"""
Extend calibration store with ProbVLM uncertainty channels.
Args:
store: Existing CalibrationStore to extend.
uncertainty_ti_scores: Per-sample mean uncertainty for text-image (CLIP adapter).
uncertainty_ta_scores: Per-sample mean uncertainty for text-audio (CLAP adapter).
Returns:
Extended CalibrationStore (same object, modified in place).
"""
if uncertainty_ti_scores:
store.add("uncertainty_ti", uncertainty_ti_scores)
if uncertainty_ta_scores:
store.add("uncertainty_ta", uncertainty_ta_scores)
# Combined uncertainty channel
if uncertainty_ti_scores and uncertainty_ta_scores:
combined = [
(ti + ta) / 2.0
for ti, ta in zip(uncertainty_ti_scores, uncertainty_ta_scores)
]
store.add("uncertainty_mean", combined)
return store
def build_reference_distributions(
rq1_results_path: str,
) -> CalibrationStore:
"""
Build reference distributions from existing RQ1 baseline results.
Extracts st_i, st_a, and msci scores from baseline condition only
(matched image + audio), fitting a distribution for each channel.
Args:
rq1_results_path: Path to rq1_results.json
Returns:
CalibrationStore with fitted distributions for st_i, st_a, msci
"""
with open(rq1_results_path) as f:
data = json.load(f)
st_i_scores = []
st_a_scores = []
msci_scores = []
for r in data["results"]:
if r.get("condition") != "baseline":
continue
if r.get("st_i") is not None:
st_i_scores.append(r["st_i"])
if r.get("st_a") is not None:
st_a_scores.append(r["st_a"])
if r.get("msci") is not None:
msci_scores.append(r["msci"])
store = CalibrationStore()
if st_i_scores:
store.add("st_i", st_i_scores)
if st_a_scores:
store.add("st_a", st_a_scores)
if msci_scores:
store.add("msci", msci_scores)
# GRAM coherence distributions (1 - gram_volume) for gram calibration mode
# gram_volume = sqrt(1 - cos^2), so gram_coherence = 1 - sqrt(1 - cos^2)
if st_i_scores:
gram_coh_ti = [1.0 - np.sqrt(max(0, 1 - s**2)) for s in st_i_scores]
store.add("gram_coh_ti", gram_coh_ti)
if st_a_scores:
gram_coh_ta = [1.0 - np.sqrt(max(0, 1 - s**2)) for s in st_a_scores]
store.add("gram_coh_ta", gram_coh_ta)
return store