""" Data loading, results persistence, and ASR evaluation orchestration. Heavy ML imports stay inside `evaluation` and `backends` so the Gradio UI starts quickly. """ import csv import io import os import threading import pandas as pd from storage import ( HF_BUCKET_ID, HF_TOKEN, HISTORY_PATH, RESULTS_PATH, STORAGE_BACKEND, upload_to_bucket, download_bucket_file, list_bucket_tree, ) from metrics_config import LIVE_SCENARIO_KEYS, SCENARIO_KEYS from utils_display import ( AVG_WER_CORE_KEYS, AutoEvalColumn, fields, format_wer_percent, make_clickable_model, ) # Re-export for callers that imported from init __all__ = [ "csv_lock", "is_model_on_hub", "load_raw_results", "load_results", "save_raw_results", "list_audio_files", "load_audio_file", "load_transcript_file", "normalize_legacy_csv_row", "list_leaderboard_versions", "raw_rows_for_version", "LATEST_VERSION", ] csv_lock = threading.Lock() # Serializes read-modify-write of the append-only history file (separate from the # leaderboard CSV lock so history bookkeeping can never deadlock a result save). _history_lock = threading.Lock() LEADERBOARD_COLUMNS = [c.name for c in fields(AutoEvalColumn)] CSV_FIELDS = [ "model_id", "eval_family", *SCENARIO_KEYS, "num_samples", "eval_audio_seconds", "eval_wall_time_s", "eval_rtf", "num_params", "submission_notes", "contact_email", "submitted_at", ] # Sentinel for "show the live leaderboard" (no historical reconstruction). LATEST_VERSION = "" # History rows are leaderboard rows plus version bookkeeping. Stored append-only: # each leaderboard change appends only the model row(s) that changed, tagged with a # version id (timestamp) so any past state can be reconstructed without full copies. HISTORY_FIELDS = [*CSV_FIELDS, "version", "recorded_at", "version_label"] # Legacy CSV / evaluation wire-format keys -> canonical ``SCENARIO_KEYS`` column names. _LEGACY_WER_TO_CANONICAL: tuple[tuple[str, str], ...] = ( ("wer_clean", "wer_anechoic_speech"), ("wer_measured", "wer_lab_measured"), ("wer_sim", "wer_lab_simulated"), ("wer_high_snr", "wer_realistic_high_snr"), ("wer_low_snr", "wer_realistic_low_snr"), # Misaligned eval builds used packed condition names in result keys. ("wer_sim_high_snr", "wer_realistic_high_snr"), ("wer_sim_low_snr", "wer_realistic_low_snr"), ("wer_moving", "wer_moving_sources"), ) def _cell_empty(v) -> bool: return v is None or (isinstance(v, str) and v.strip() == "") def normalize_legacy_csv_row(row: dict) -> dict: """Fill canonical scenario columns from legacy leaderboard CSV keys when needed.""" for old, new in _LEGACY_WER_TO_CANONICAL: ov = row.get(old) nv = row.get(new) if _cell_empty(nv) and not _cell_empty(ov): row[new] = str(ov).strip() if not isinstance(ov, str) else ov.strip() # Older CSVs used ``wer_high_snr`` as a standalone column. hn = row.get("wer_high_snr") rh = row.get("wer_realistic_high_snr") if _cell_empty(rh) and not _cell_empty(hn): row["wer_realistic_high_snr"] = str(hn).strip() if isinstance(hn, str) else str(hn) return row def leaderboard_row_from_eval_result( result: dict, submitted_at_iso: str, submission_notes: str = "", contact_email: str = "", ) -> dict: """Build a full CSV row; scenario keys missing from `result` are left empty (future metrics).""" row = {k: "" for k in CSV_FIELDS} row["model_id"] = str(result.get("model_id", "")) row["eval_family"] = str(result.get("eval_family", "")) # Evaluation pipeline emits legacy wer_* keys on the wire (see benchmark.dataset.wer_result_key). _EVAL_WIRE_TO_CANONICAL = { "wer_anechoic_speech": "wer_clean", "wer_lab_measured": "wer_measured", "wer_lab_simulated": "wer_sim", "wer_realistic_high_snr": "wer_high", "wer_realistic_mid_snr": "wer_mid", "wer_realistic_low_snr": "wer_low", "wer_moving_low": "wer_moving_low", "wer_moving_mid": "wer_moving_mid", "wer_moving_high": "wer_moving_high", } _WIRE_KEY_ALIASES: dict[str, tuple[str, ...]] = { "wer_high": ("wer_high_snr", "wer_sim_high_snr"), "wer_mid": ("wer_mid_snr",), "wer_low": ("wer_low_snr", "wer_sim_low_snr"), "wer_moving_low": ("wer_moving_low",), "wer_moving_mid": ("wer_moving_mid",), "wer_moving_high": ("wer_moving_high",) } def _eval_wer(wire_key: str): v = result.get(wire_key) if v is not None and v != "": return v for alt in _WIRE_KEY_ALIASES.get(wire_key, ()): v = result.get(alt) if v is not None and v != "": return v return None for canon_key, wire_key in _EVAL_WIRE_TO_CANONICAL.items(): v = _eval_wer(wire_key) if v is not None and v != "": row[canon_key] = str(v) row["num_samples"] = str(result.get("num_samples", "")) for k in ("eval_audio_seconds", "eval_wall_time_s", "eval_rtf", "num_params"): if k in result and result[k] is not None and result[k] != "": row[k] = str(result[k]) row["submission_notes"] = (submission_notes or "").strip()[:4000] row["contact_email"] = (contact_email or "").strip()[:254] row["submitted_at"] = submitted_at_iso return row def merge_leaderboard_row_from_eval_result( result: dict, submitted_at_iso: str, submission_notes: str = "", contact_email: str = "", ) -> None: """ Merge evaluation output into an existing leaderboard row (partial re-eval). Only scenario WER columns and ``eval_family`` / ``num_params`` present in ``result`` are overwritten; other metrics and timing fields are kept. Creates a new row if the model is not on the leaderboard yet. """ from metrics_config import SCENARIO_KEYS model_id = str(result.get("model_id", "")).strip() if not model_id: raise ValueError("merge_leaderboard_row_from_eval_result requires model_id") rows = load_raw_results() match_idx: int | None = None for i, r in enumerate(rows): if (r.get("model_id") or "").strip() == model_id: match_idx = i break patch = leaderboard_row_from_eval_result( result, submitted_at_iso, submission_notes=submission_notes, contact_email=contact_email ) if match_idx is None: normalize_legacy_csv_row(patch) rows.append(patch) else: row = dict(rows[match_idx]) for k in SCENARIO_KEYS: v = patch.get(k) if v is not None and str(v).strip() != "": row[k] = v if patch.get("eval_family"): row["eval_family"] = patch["eval_family"] if patch.get("num_params"): row["num_params"] = patch["num_params"] if (submission_notes or "").strip(): row["submission_notes"] = patch["submission_notes"] if (contact_email or "").strip(): row["contact_email"] = patch["contact_email"] normalize_legacy_csv_row(row) rows[match_idx] = row from analytics import sort_leaderboard_rows_inplace sort_leaderboard_rows_inplace(rows) save_raw_results(rows) invalidate_results_cache() # --------------------------------------------------------------------------- # Storage helpers # --------------------------------------------------------------------------- def list_audio_files(prefix: str, suffix: str) -> list[str]: """List all file paths under a prefix with a given suffix.""" if STORAGE_BACKEND == "hf_bucket": files = [] for entry in list_bucket_tree(HF_BUCKET_ID, prefix=prefix, recursive=True, token=HF_TOKEN): if entry.type == "file" and entry.path.endswith(suffix): files.append(entry.path) return sorted(files) return [] def load_audio_file(path: str): """Download an audio file and return (waveform, sample_rate).""" import torchaudio if STORAGE_BACKEND == "hf_bucket": local_path = download_bucket_file(path) waveform, sample_rate = torchaudio.load(local_path) os.unlink(local_path) else: raise RuntimeError("load_audio_file requires STORAGE_BACKEND='hf_bucket'.") return waveform, sample_rate def load_transcript_file(path: str) -> str: """Download a transcript txt file and return its content.""" if STORAGE_BACKEND == "hf_bucket": local_path = download_bucket_file(path) with open(local_path, "r") as f: content = f.read().strip() os.unlink(local_path) return content raise RuntimeError("load_transcript_file requires STORAGE_BACKEND='hf_bucket'.") # --------------------------------------------------------------------------- # Results persistence # --------------------------------------------------------------------------- _raw_results_cache: list[dict] | None = None def invalidate_results_cache() -> None: global _raw_results_cache _raw_results_cache = None def load_raw_results(*, refresh: bool = False) -> list[dict]: """Load the existing leaderboard CSV rows (cached in-process until ``refresh=True``).""" global _raw_results_cache if not refresh and _raw_results_cache is not None: return [dict(r) for r in _raw_results_cache] with csv_lock: try: if STORAGE_BACKEND == "hf_bucket": local_path = download_bucket_file(RESULTS_PATH) with open(local_path, "r") as f: results = list(csv.DictReader(f)) os.unlink(local_path) for row in results: for k in CSV_FIELDS: row.setdefault(k, "") if not row.get("eval_family"): row["eval_family"] = "N/A" row.setdefault("submission_notes", "") row.setdefault("contact_email", "") normalize_legacy_csv_row(row) _raw_results_cache = results return [dict(r) for r in results] _raw_results_cache = [] return [] except Exception: _raw_results_cache = [] return [] def save_raw_results(rows: list[dict], *, record_version: bool = True): """Write the leaderboard CSV back to storage. When ``record_version`` is set (default), the model row(s) that changed relative to the previously persisted state are appended to the history file so the new state is selectable from the leaderboard version dropdown. """ if not rows: return # Snapshot the previously persisted state BEFORE acquiring csv_lock (load_raw_results # also takes csv_lock, which is non-reentrant) so we can diff for versioning. prev_rows = load_raw_results() if record_version else None with csv_lock: normalized = [{k: row.get(k, "") for k in CSV_FIELDS} for row in rows] output = io.StringIO() writer = csv.DictWriter(output, fieldnames=CSV_FIELDS) writer.writeheader() writer.writerows(normalized) content = output.getvalue() if STORAGE_BACKEND == "hf_bucket": upload_to_bucket( HF_BUCKET_ID, add=[(content.encode("utf-8"), RESULTS_PATH)], token=HF_TOKEN, ) invalidate_results_cache() if record_version: # Never let history bookkeeping break a successful result save. try: _record_history_from_diff(prev_rows or [], normalized) except Exception: pass # --------------------------------------------------------------------------- # Leaderboard version history (append-only, delta-based) # --------------------------------------------------------------------------- def _now_iso() -> str: """UTC timestamp with microseconds so successive versions sort and stay unique.""" from datetime import datetime, timezone return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") def load_history_rows() -> list[dict]: """All rows of the append-only history file (chronological), or ``[]`` if absent.""" if STORAGE_BACKEND != "hf_bucket": return [] try: local_path = download_bucket_file(HISTORY_PATH) except Exception: return [] try: with open(local_path, "r") as f: rows = list(csv.DictReader(f)) except Exception: rows = [] finally: try: os.unlink(local_path) except Exception: pass return rows def _append_history_rows(new_rows: list[dict]) -> None: """Append rows to the history file (read-modify-write of the whole file).""" if STORAGE_BACKEND != "hf_bucket" or not new_rows: return with _history_lock: all_rows = load_history_rows() + new_rows output = io.StringIO() writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore") writer.writeheader() for row in all_rows: writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS}) upload_to_bucket( HF_BUCKET_ID, add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)], token=HF_TOKEN, ) def _history_label(changed: list[dict]) -> str: if len(changed) == 1: mid = (changed[0].get("model_id") or "").strip() return mid or "leaderboard updated" return f"{len(changed)} models updated" def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None: """Append the model rows that changed between ``prev_rows`` and ``curr_rows``. The first ever call seeds a full snapshot ("Initial release") so the earliest selectable version reconstructs the complete board; later calls store only deltas. """ now = _now_iso() existing = load_history_rows() if not existing: seed = [ {**{k: r.get(k, "") for k in CSV_FIELDS}, "version": now, "recorded_at": now, "version_label": "Initial release"} for r in curr_rows ] _append_history_rows(seed) return prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS} for r in prev_rows} changed: list[dict] = [] for r in curr_rows: mid = (r.get("model_id") or "").strip() if not mid: continue if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}: changed.append(r) if not changed: return label = _history_label(changed) _append_history_rows([ {**{k: r.get(k, "") for k in CSV_FIELDS}, "version": now, "recorded_at": now, "version_label": label} for r in changed ]) def list_leaderboard_versions() -> list[dict]: """Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``.""" rows = load_history_rows() by_version: dict[str, dict] = {} for r in rows: v = (r.get("version") or "").strip() if not v: continue by_version[v] = { "version": v, "label": (r.get("version_label") or v), "recorded_at": (r.get("recorded_at") or v), } items = list(by_version.values()) items.sort(key=lambda d: d["recorded_at"], reverse=True) return items def raw_rows_for_version(version: str) -> list[dict]: """Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``. Accumulates every history entry recorded at or before the chosen version and keeps the most recent row per model. Falls back to the live results if history is missing. """ if not version: return load_raw_results() rows = load_history_rows() if not rows: return load_raw_results() target = None for r in rows: if (r.get("version") or "").strip() == version: ra = r.get("recorded_at") or "" if target is None or ra > target: target = ra if target is None: return load_raw_results() # File order is chronological, so the last entry per model wins. by_mid: dict[str, dict] = {} for r in rows: if (r.get("recorded_at") or "") <= target: mid = (r.get("model_id") or "").strip() if mid: by_mid[mid] = r out: list[dict] = [] for r in by_mid.values(): row = {k: r.get(k, "") for k in CSV_FIELDS} if not row.get("eval_family"): row["eval_family"] = "N/A" row.setdefault("submission_notes", "") row.setdefault("contact_email", "") normalize_legacy_csv_row(row) out.append(row) return out # --------------------------------------------------------------------------- # Leaderboard version history (append-only, delta-based) # --------------------------------------------------------------------------- def _now_iso() -> str: """UTC timestamp with microseconds so successive versions sort and stay unique.""" from datetime import datetime, timezone return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") def load_history_rows() -> list[dict]: """All rows of the append-only history file (chronological), or ``[]`` if absent.""" if STORAGE_BACKEND != "hf_bucket": return [] try: local_path = download_bucket_file(HISTORY_PATH) except Exception: return [] try: with open(local_path, "r") as f: rows = list(csv.DictReader(f)) except Exception: rows = [] finally: try: os.unlink(local_path) except Exception: pass return rows def _append_history_rows(new_rows: list[dict]) -> None: """Append rows to the history file (read-modify-write of the whole file).""" if STORAGE_BACKEND != "hf_bucket" or not new_rows: return with _history_lock: all_rows = load_history_rows() + new_rows output = io.StringIO() writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore") writer.writeheader() for row in all_rows: writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS}) upload_to_bucket( HF_BUCKET_ID, add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)], token=HF_TOKEN, ) def _history_label(changed: list[dict]) -> str: if len(changed) == 1: mid = (changed[0].get("model_id") or "").strip() return mid or "leaderboard updated" return f"{len(changed)} models updated" def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None: """Append the model rows that changed between ``prev_rows`` and ``curr_rows``. The first ever call seeds a full snapshot ("Initial release") so the earliest selectable version reconstructs the complete board; later calls store only deltas. """ now = _now_iso() existing = load_history_rows() if not existing: seed = [ {**{k: r.get(k, "") for k in CSV_FIELDS}, "version": now, "recorded_at": now, "version_label": "Initial release"} for r in curr_rows ] _append_history_rows(seed) return prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS} for r in prev_rows} changed: list[dict] = [] for r in curr_rows: mid = (r.get("model_id") or "").strip() if not mid: continue if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}: changed.append(r) if not changed: return label = _history_label(changed) _append_history_rows([ {**{k: r.get(k, "") for k in CSV_FIELDS}, "version": now, "recorded_at": now, "version_label": label} for r in changed ]) def list_leaderboard_versions() -> list[dict]: """Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``.""" rows = load_history_rows() by_version: dict[str, dict] = {} for r in rows: v = (r.get("version") or "").strip() if not v: continue by_version[v] = { "version": v, "label": (r.get("version_label") or v), "recorded_at": (r.get("recorded_at") or v), } items = list(by_version.values()) items.sort(key=lambda d: d["recorded_at"], reverse=True) return items def raw_rows_for_version(version: str) -> list[dict]: """Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``. Accumulates every history entry recorded at or before the chosen version and keeps the most recent row per model. Falls back to the live results if history is missing. """ if not version: return load_raw_results() rows = load_history_rows() if not rows: return load_raw_results() target = None for r in rows: if (r.get("version") or "").strip() == version: ra = r.get("recorded_at") or "" if target is None or ra > target: target = ra if target is None: return load_raw_results() # File order is chronological, so the last entry per model wins. by_mid: dict[str, dict] = {} for r in rows: if (r.get("recorded_at") or "") <= target: mid = (r.get("model_id") or "").strip() if mid: by_mid[mid] = r out: list[dict] = [] for r in by_mid.values(): row = {k: r.get(k, "") for k in CSV_FIELDS} if not row.get("eval_family"): row["eval_family"] = "N/A" row.setdefault("submission_notes", "") row.setdefault("contact_email", "") normalize_legacy_csv_row(row) out.append(row) return out def load_results(version: str | None = None) -> pd.DataFrame: """Load results from storage and return a display-ready DataFrame. With ``version`` empty/``None`` the live leaderboard is returned. Otherwise the leaderboard is reconstructed from the append-only history as of that version. The leaderboard intentionally does NOT show eval_family, wall time, or submitted_at. Those fields remain in the CSV (for moderators and analytics) but are omitted from the public table. """ if version: raw_rows = raw_rows_for_version(version) else: raw_rows = load_raw_results() return _display_df_from_raw_rows(raw_rows) def _display_df_from_raw_rows(raw_rows: list[dict]) -> pd.DataFrame: """Transform raw CSV rows into the public, sorted, display-formatted DataFrame.""" import analytics if not raw_rows: return pd.DataFrame(columns=LEADERBOARD_COLUMNS) raw_df = pd.DataFrame(raw_rows) # Coerce live-condition columns to numeric so scoring / sorting behave sensibly. live_cols = [c for c in LIVE_SCENARIO_KEYS if c in raw_df.columns] for c in live_cols: raw_df[c] = pd.to_numeric(raw_df[c], errors="coerce") core_cols = [c for c in AVG_WER_CORE_KEYS if c in raw_df.columns] raw_df["_avg_wer"] = ( raw_df[core_cols].mean(axis=1, skipna=True) if core_cols else pd.NA ) raw_df = raw_df.sort_values( by="_avg_wer", ascending=True, na_position="last" ).reset_index(drop=True) def _wer_pct_or_na(series, ndigits: int = 2): return series.apply(lambda v: format_wer_percent(v, ndigits=ndigits)) display_df = pd.DataFrame() display_df[AutoEvalColumn.model.name] = raw_df["model_id"].apply(make_clickable_model) if core_cols: display_df[AutoEvalColumn.avg_wer_core.name] = _wer_pct_or_na(raw_df["_avg_wer"]) else: display_df[AutoEvalColumn.avg_wer_core.name] = "NA" # Map: scenario CSV key -> display column name (defined in AutoEvalColumn). scenario_col_map = { "wer_anechoic_speech": AutoEvalColumn.wer_anechoic.name, "wer_lab_measured": AutoEvalColumn.wer_lab_measured.name, "wer_lab_simulated": AutoEvalColumn.wer_lab_simulated.name, "wer_realistic_high_snr": AutoEvalColumn.wer_realistic_high_snr.name, "wer_realistic_mid_snr": AutoEvalColumn.wer_realistic_mid_snr.name, "wer_realistic_low_snr": AutoEvalColumn.wer_realistic_low_snr.name, "wer_moving_low": AutoEvalColumn.wer_moving_low.name, "wer_moving_mid": AutoEvalColumn.wer_moving_mid.name, "wer_moving_high": AutoEvalColumn.wer_moving_high.name, } for csv_key, display_name in scenario_col_map.items(): if csv_key in raw_df.columns: display_df[display_name] = _wer_pct_or_na( pd.to_numeric(raw_df[csv_key], errors="coerce") ) else: display_df[display_name] = "NA" def _num_or_na(series, rnd=4): return series.apply( lambda v: round(float(v), rnd) if pd.notna(v) and str(v).strip() != "" else "NA" ) display_df[AutoEvalColumn.eval_rtf.name] = ( _num_or_na(pd.to_numeric(raw_df["eval_rtf"], errors="coerce"), 4) if "eval_rtf" in raw_df.columns else "NA" ) if "num_params" in raw_df.columns: params = pd.to_numeric(raw_df["num_params"], errors="coerce") display_df[AutoEvalColumn.params_m.name] = params.apply( lambda v: round(float(v) / 1e9, 3) if pd.notna(v) and float(v) > 0 else "NA" ) else: display_df[AutoEvalColumn.params_m.name] = "NA" return display_df def _format_timestamp(ts: str) -> str: from datetime import datetime try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.strftime("%Y-%m-%d %H:%M UTC") except Exception: return str(ts) # --------------------------------------------------------------------------- # Model validation # --------------------------------------------------------------------------- def is_model_on_hub(model_name: str) -> tuple[bool, str | None]: """Check whether a model exists on the HuggingFace Hub.""" try: model_name = model_name.strip().replace(" ", "") parts = model_name.split("/") if len(parts) != 2 or not parts[0] or not parts[1]: return False, "is not a valid model name. Please use the format `author/model_name`." except Exception: return False, "is not a valid model name. Please use the format `author/model_name`." try: from huggingface_hub import HfApi hf_api = HfApi() models = list(hf_api.list_models(author=parts[0], search=parts[1])) matched = [m for m in models if m.modelId == model_name] if len(matched) != 1: return False, "was not found on the hub!" return True, None except Exception: return False, "was not found on the hub!"