Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Data loading, results persistence, and ASR evaluation orchestration. | |
| Heavy ML imports stay inside `evaluation` and `backends` so the Gradio UI starts quickly. | |
| """ | |
| import csv | |
| import io | |
| import os | |
| import threading | |
| import pandas as pd | |
| from storage import ( | |
| HF_BUCKET_ID, | |
| HF_TOKEN, | |
| HISTORY_PATH, | |
| RESULTS_PATH, | |
| STORAGE_BACKEND, | |
| upload_to_bucket, | |
| download_bucket_file, | |
| list_bucket_tree, | |
| ) | |
| from metrics_config import LIVE_SCENARIO_KEYS, SCENARIO_KEYS | |
| from utils_display import ( | |
| AVG_WER_CORE_KEYS, | |
| AutoEvalColumn, | |
| fields, | |
| format_wer_percent, | |
| make_clickable_model, | |
| ) | |
| # Re-export for callers that imported from init | |
| __all__ = [ | |
| "csv_lock", | |
| "is_model_on_hub", | |
| "load_raw_results", | |
| "load_results", | |
| "save_raw_results", | |
| "list_audio_files", | |
| "load_audio_file", | |
| "load_transcript_file", | |
| "normalize_legacy_csv_row", | |
| "list_leaderboard_versions", | |
| "raw_rows_for_version", | |
| "LATEST_VERSION", | |
| ] | |
| csv_lock = threading.Lock() | |
| # Serializes read-modify-write of the append-only history file (separate from the | |
| # leaderboard CSV lock so history bookkeeping can never deadlock a result save). | |
| _history_lock = threading.Lock() | |
| LEADERBOARD_COLUMNS = [c.name for c in fields(AutoEvalColumn)] | |
| CSV_FIELDS = [ | |
| "model_id", | |
| "eval_family", | |
| *SCENARIO_KEYS, | |
| "num_samples", | |
| "eval_audio_seconds", | |
| "eval_wall_time_s", | |
| "eval_rtf", | |
| "num_params", | |
| "submission_notes", | |
| "contact_email", | |
| "submitted_at", | |
| ] | |
| # Sentinel for "show the live leaderboard" (no historical reconstruction). | |
| LATEST_VERSION = "" | |
| # History rows are leaderboard rows plus version bookkeeping. Stored append-only: | |
| # each leaderboard change appends only the model row(s) that changed, tagged with a | |
| # version id (timestamp) so any past state can be reconstructed without full copies. | |
| HISTORY_FIELDS = [*CSV_FIELDS, "version", "recorded_at", "version_label"] | |
| # Legacy CSV / evaluation wire-format keys -> canonical ``SCENARIO_KEYS`` column names. | |
| _LEGACY_WER_TO_CANONICAL: tuple[tuple[str, str], ...] = ( | |
| ("wer_clean", "wer_anechoic_speech"), | |
| ("wer_measured", "wer_lab_measured"), | |
| ("wer_sim", "wer_lab_simulated"), | |
| ("wer_high_snr", "wer_realistic_high_snr"), | |
| ("wer_low_snr", "wer_realistic_low_snr"), | |
| # Misaligned eval builds used packed condition names in result keys. | |
| ("wer_sim_high_snr", "wer_realistic_high_snr"), | |
| ("wer_sim_low_snr", "wer_realistic_low_snr"), | |
| ("wer_moving", "wer_moving_sources"), | |
| ) | |
| def _cell_empty(v) -> bool: | |
| return v is None or (isinstance(v, str) and v.strip() == "") | |
| def normalize_legacy_csv_row(row: dict) -> dict: | |
| """Fill canonical scenario columns from legacy leaderboard CSV keys when needed.""" | |
| for old, new in _LEGACY_WER_TO_CANONICAL: | |
| ov = row.get(old) | |
| nv = row.get(new) | |
| if _cell_empty(nv) and not _cell_empty(ov): | |
| row[new] = str(ov).strip() if not isinstance(ov, str) else ov.strip() | |
| # Older CSVs used ``wer_high_snr`` as a standalone column. | |
| hn = row.get("wer_high_snr") | |
| rh = row.get("wer_realistic_high_snr") | |
| if _cell_empty(rh) and not _cell_empty(hn): | |
| row["wer_realistic_high_snr"] = str(hn).strip() if isinstance(hn, str) else str(hn) | |
| return row | |
| def leaderboard_row_from_eval_result( | |
| result: dict, | |
| submitted_at_iso: str, | |
| submission_notes: str = "", | |
| contact_email: str = "", | |
| ) -> dict: | |
| """Build a full CSV row; scenario keys missing from `result` are left empty (future metrics).""" | |
| row = {k: "" for k in CSV_FIELDS} | |
| row["model_id"] = str(result.get("model_id", "")) | |
| row["eval_family"] = str(result.get("eval_family", "")) | |
| # Evaluation pipeline emits legacy wer_* keys on the wire (see benchmark.dataset.wer_result_key). | |
| _EVAL_WIRE_TO_CANONICAL = { | |
| "wer_anechoic_speech": "wer_clean", | |
| "wer_lab_measured": "wer_measured", | |
| "wer_lab_simulated": "wer_sim", | |
| "wer_realistic_high_snr": "wer_high", | |
| "wer_realistic_mid_snr": "wer_mid", | |
| "wer_realistic_low_snr": "wer_low", | |
| "wer_moving_low": "wer_moving_low", | |
| "wer_moving_mid": "wer_moving_mid", | |
| "wer_moving_high": "wer_moving_high", | |
| } | |
| _WIRE_KEY_ALIASES: dict[str, tuple[str, ...]] = { | |
| "wer_high": ("wer_high_snr", "wer_sim_high_snr"), | |
| "wer_mid": ("wer_mid_snr",), | |
| "wer_low": ("wer_low_snr", "wer_sim_low_snr"), | |
| "wer_moving_low": ("wer_moving_low",), | |
| "wer_moving_mid": ("wer_moving_mid",), | |
| "wer_moving_high": ("wer_moving_high",) | |
| } | |
| def _eval_wer(wire_key: str): | |
| v = result.get(wire_key) | |
| if v is not None and v != "": | |
| return v | |
| for alt in _WIRE_KEY_ALIASES.get(wire_key, ()): | |
| v = result.get(alt) | |
| if v is not None and v != "": | |
| return v | |
| return None | |
| for canon_key, wire_key in _EVAL_WIRE_TO_CANONICAL.items(): | |
| v = _eval_wer(wire_key) | |
| if v is not None and v != "": | |
| row[canon_key] = str(v) | |
| row["num_samples"] = str(result.get("num_samples", "")) | |
| for k in ("eval_audio_seconds", "eval_wall_time_s", "eval_rtf", "num_params"): | |
| if k in result and result[k] is not None and result[k] != "": | |
| row[k] = str(result[k]) | |
| row["submission_notes"] = (submission_notes or "").strip()[:4000] | |
| row["contact_email"] = (contact_email or "").strip()[:254] | |
| row["submitted_at"] = submitted_at_iso | |
| return row | |
| def merge_leaderboard_row_from_eval_result( | |
| result: dict, | |
| submitted_at_iso: str, | |
| submission_notes: str = "", | |
| contact_email: str = "", | |
| ) -> None: | |
| """ | |
| Merge evaluation output into an existing leaderboard row (partial re-eval). | |
| Only scenario WER columns and ``eval_family`` / ``num_params`` present in | |
| ``result`` are overwritten; other metrics and timing fields are kept. | |
| Creates a new row if the model is not on the leaderboard yet. | |
| """ | |
| from metrics_config import SCENARIO_KEYS | |
| model_id = str(result.get("model_id", "")).strip() | |
| if not model_id: | |
| raise ValueError("merge_leaderboard_row_from_eval_result requires model_id") | |
| rows = load_raw_results() | |
| match_idx: int | None = None | |
| for i, r in enumerate(rows): | |
| if (r.get("model_id") or "").strip() == model_id: | |
| match_idx = i | |
| break | |
| patch = leaderboard_row_from_eval_result( | |
| result, submitted_at_iso, submission_notes=submission_notes, contact_email=contact_email | |
| ) | |
| if match_idx is None: | |
| normalize_legacy_csv_row(patch) | |
| rows.append(patch) | |
| else: | |
| row = dict(rows[match_idx]) | |
| for k in SCENARIO_KEYS: | |
| v = patch.get(k) | |
| if v is not None and str(v).strip() != "": | |
| row[k] = v | |
| if patch.get("eval_family"): | |
| row["eval_family"] = patch["eval_family"] | |
| if patch.get("num_params"): | |
| row["num_params"] = patch["num_params"] | |
| if (submission_notes or "").strip(): | |
| row["submission_notes"] = patch["submission_notes"] | |
| if (contact_email or "").strip(): | |
| row["contact_email"] = patch["contact_email"] | |
| normalize_legacy_csv_row(row) | |
| rows[match_idx] = row | |
| from analytics import sort_leaderboard_rows_inplace | |
| sort_leaderboard_rows_inplace(rows) | |
| save_raw_results(rows) | |
| invalidate_results_cache() | |
| # --------------------------------------------------------------------------- | |
| # Storage helpers | |
| # --------------------------------------------------------------------------- | |
| def list_audio_files(prefix: str, suffix: str) -> list[str]: | |
| """List all file paths under a prefix with a given suffix.""" | |
| if STORAGE_BACKEND == "hf_bucket": | |
| files = [] | |
| for entry in list_bucket_tree(HF_BUCKET_ID, prefix=prefix, recursive=True, token=HF_TOKEN): | |
| if entry.type == "file" and entry.path.endswith(suffix): | |
| files.append(entry.path) | |
| return sorted(files) | |
| return [] | |
| def load_audio_file(path: str): | |
| """Download an audio file and return (waveform, sample_rate).""" | |
| import torchaudio | |
| if STORAGE_BACKEND == "hf_bucket": | |
| local_path = download_bucket_file(path) | |
| waveform, sample_rate = torchaudio.load(local_path) | |
| os.unlink(local_path) | |
| else: | |
| raise RuntimeError("load_audio_file requires STORAGE_BACKEND='hf_bucket'.") | |
| return waveform, sample_rate | |
| def load_transcript_file(path: str) -> str: | |
| """Download a transcript txt file and return its content.""" | |
| if STORAGE_BACKEND == "hf_bucket": | |
| local_path = download_bucket_file(path) | |
| with open(local_path, "r") as f: | |
| content = f.read().strip() | |
| os.unlink(local_path) | |
| return content | |
| raise RuntimeError("load_transcript_file requires STORAGE_BACKEND='hf_bucket'.") | |
| # --------------------------------------------------------------------------- | |
| # Results persistence | |
| # --------------------------------------------------------------------------- | |
| _raw_results_cache: list[dict] | None = None | |
| def invalidate_results_cache() -> None: | |
| global _raw_results_cache | |
| _raw_results_cache = None | |
| def load_raw_results(*, refresh: bool = False) -> list[dict]: | |
| """Load the existing leaderboard CSV rows (cached in-process until ``refresh=True``).""" | |
| global _raw_results_cache | |
| if not refresh and _raw_results_cache is not None: | |
| return [dict(r) for r in _raw_results_cache] | |
| with csv_lock: | |
| try: | |
| if STORAGE_BACKEND == "hf_bucket": | |
| local_path = download_bucket_file(RESULTS_PATH) | |
| with open(local_path, "r") as f: | |
| results = list(csv.DictReader(f)) | |
| os.unlink(local_path) | |
| for row in results: | |
| for k in CSV_FIELDS: | |
| row.setdefault(k, "") | |
| if not row.get("eval_family"): | |
| row["eval_family"] = "N/A" | |
| row.setdefault("submission_notes", "") | |
| row.setdefault("contact_email", "") | |
| normalize_legacy_csv_row(row) | |
| _raw_results_cache = results | |
| return [dict(r) for r in results] | |
| _raw_results_cache = [] | |
| return [] | |
| except Exception: | |
| _raw_results_cache = [] | |
| return [] | |
| def save_raw_results(rows: list[dict], *, record_version: bool = True): | |
| """Write the leaderboard CSV back to storage. | |
| When ``record_version`` is set (default), the model row(s) that changed relative to | |
| the previously persisted state are appended to the history file so the new state is | |
| selectable from the leaderboard version dropdown. | |
| """ | |
| if not rows: | |
| return | |
| # Snapshot the previously persisted state BEFORE acquiring csv_lock (load_raw_results | |
| # also takes csv_lock, which is non-reentrant) so we can diff for versioning. | |
| prev_rows = load_raw_results() if record_version else None | |
| with csv_lock: | |
| normalized = [{k: row.get(k, "") for k in CSV_FIELDS} for row in rows] | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=CSV_FIELDS) | |
| writer.writeheader() | |
| writer.writerows(normalized) | |
| content = output.getvalue() | |
| if STORAGE_BACKEND == "hf_bucket": | |
| upload_to_bucket( | |
| HF_BUCKET_ID, | |
| add=[(content.encode("utf-8"), RESULTS_PATH)], | |
| token=HF_TOKEN, | |
| ) | |
| invalidate_results_cache() | |
| if record_version: | |
| # Never let history bookkeeping break a successful result save. | |
| try: | |
| _record_history_from_diff(prev_rows or [], normalized) | |
| except Exception: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Leaderboard version history (append-only, delta-based) | |
| # --------------------------------------------------------------------------- | |
| def _now_iso() -> str: | |
| """UTC timestamp with microseconds so successive versions sort and stay unique.""" | |
| from datetime import datetime, timezone | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") | |
| def load_history_rows() -> list[dict]: | |
| """All rows of the append-only history file (chronological), or ``[]`` if absent.""" | |
| if STORAGE_BACKEND != "hf_bucket": | |
| return [] | |
| try: | |
| local_path = download_bucket_file(HISTORY_PATH) | |
| except Exception: | |
| return [] | |
| try: | |
| with open(local_path, "r") as f: | |
| rows = list(csv.DictReader(f)) | |
| except Exception: | |
| rows = [] | |
| finally: | |
| try: | |
| os.unlink(local_path) | |
| except Exception: | |
| pass | |
| return rows | |
| def _append_history_rows(new_rows: list[dict]) -> None: | |
| """Append rows to the history file (read-modify-write of the whole file).""" | |
| if STORAGE_BACKEND != "hf_bucket" or not new_rows: | |
| return | |
| with _history_lock: | |
| all_rows = load_history_rows() + new_rows | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore") | |
| writer.writeheader() | |
| for row in all_rows: | |
| writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS}) | |
| upload_to_bucket( | |
| HF_BUCKET_ID, | |
| add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)], | |
| token=HF_TOKEN, | |
| ) | |
| def _history_label(changed: list[dict]) -> str: | |
| if len(changed) == 1: | |
| mid = (changed[0].get("model_id") or "").strip() | |
| return mid or "leaderboard updated" | |
| return f"{len(changed)} models updated" | |
| def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None: | |
| """Append the model rows that changed between ``prev_rows`` and ``curr_rows``. | |
| The first ever call seeds a full snapshot ("Initial release") so the earliest | |
| selectable version reconstructs the complete board; later calls store only deltas. | |
| """ | |
| now = _now_iso() | |
| existing = load_history_rows() | |
| if not existing: | |
| seed = [ | |
| {**{k: r.get(k, "") for k in CSV_FIELDS}, | |
| "version": now, "recorded_at": now, "version_label": "Initial release"} | |
| for r in curr_rows | |
| ] | |
| _append_history_rows(seed) | |
| return | |
| prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS} | |
| for r in prev_rows} | |
| changed: list[dict] = [] | |
| for r in curr_rows: | |
| mid = (r.get("model_id") or "").strip() | |
| if not mid: | |
| continue | |
| if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}: | |
| changed.append(r) | |
| if not changed: | |
| return | |
| label = _history_label(changed) | |
| _append_history_rows([ | |
| {**{k: r.get(k, "") for k in CSV_FIELDS}, | |
| "version": now, "recorded_at": now, "version_label": label} | |
| for r in changed | |
| ]) | |
| def list_leaderboard_versions() -> list[dict]: | |
| """Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``.""" | |
| rows = load_history_rows() | |
| by_version: dict[str, dict] = {} | |
| for r in rows: | |
| v = (r.get("version") or "").strip() | |
| if not v: | |
| continue | |
| by_version[v] = { | |
| "version": v, | |
| "label": (r.get("version_label") or v), | |
| "recorded_at": (r.get("recorded_at") or v), | |
| } | |
| items = list(by_version.values()) | |
| items.sort(key=lambda d: d["recorded_at"], reverse=True) | |
| return items | |
| def raw_rows_for_version(version: str) -> list[dict]: | |
| """Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``. | |
| Accumulates every history entry recorded at or before the chosen version and keeps | |
| the most recent row per model. Falls back to the live results if history is missing. | |
| """ | |
| if not version: | |
| return load_raw_results() | |
| rows = load_history_rows() | |
| if not rows: | |
| return load_raw_results() | |
| target = None | |
| for r in rows: | |
| if (r.get("version") or "").strip() == version: | |
| ra = r.get("recorded_at") or "" | |
| if target is None or ra > target: | |
| target = ra | |
| if target is None: | |
| return load_raw_results() | |
| # File order is chronological, so the last entry per model wins. | |
| by_mid: dict[str, dict] = {} | |
| for r in rows: | |
| if (r.get("recorded_at") or "") <= target: | |
| mid = (r.get("model_id") or "").strip() | |
| if mid: | |
| by_mid[mid] = r | |
| out: list[dict] = [] | |
| for r in by_mid.values(): | |
| row = {k: r.get(k, "") for k in CSV_FIELDS} | |
| if not row.get("eval_family"): | |
| row["eval_family"] = "N/A" | |
| row.setdefault("submission_notes", "") | |
| row.setdefault("contact_email", "") | |
| normalize_legacy_csv_row(row) | |
| out.append(row) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # Leaderboard version history (append-only, delta-based) | |
| # --------------------------------------------------------------------------- | |
| def _now_iso() -> str: | |
| """UTC timestamp with microseconds so successive versions sort and stay unique.""" | |
| from datetime import datetime, timezone | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") | |
| def load_history_rows() -> list[dict]: | |
| """All rows of the append-only history file (chronological), or ``[]`` if absent.""" | |
| if STORAGE_BACKEND != "hf_bucket": | |
| return [] | |
| try: | |
| local_path = download_bucket_file(HISTORY_PATH) | |
| except Exception: | |
| return [] | |
| try: | |
| with open(local_path, "r") as f: | |
| rows = list(csv.DictReader(f)) | |
| except Exception: | |
| rows = [] | |
| finally: | |
| try: | |
| os.unlink(local_path) | |
| except Exception: | |
| pass | |
| return rows | |
| def _append_history_rows(new_rows: list[dict]) -> None: | |
| """Append rows to the history file (read-modify-write of the whole file).""" | |
| if STORAGE_BACKEND != "hf_bucket" or not new_rows: | |
| return | |
| with _history_lock: | |
| all_rows = load_history_rows() + new_rows | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore") | |
| writer.writeheader() | |
| for row in all_rows: | |
| writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS}) | |
| upload_to_bucket( | |
| HF_BUCKET_ID, | |
| add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)], | |
| token=HF_TOKEN, | |
| ) | |
| def _history_label(changed: list[dict]) -> str: | |
| if len(changed) == 1: | |
| mid = (changed[0].get("model_id") or "").strip() | |
| return mid or "leaderboard updated" | |
| return f"{len(changed)} models updated" | |
| def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None: | |
| """Append the model rows that changed between ``prev_rows`` and ``curr_rows``. | |
| The first ever call seeds a full snapshot ("Initial release") so the earliest | |
| selectable version reconstructs the complete board; later calls store only deltas. | |
| """ | |
| now = _now_iso() | |
| existing = load_history_rows() | |
| if not existing: | |
| seed = [ | |
| {**{k: r.get(k, "") for k in CSV_FIELDS}, | |
| "version": now, "recorded_at": now, "version_label": "Initial release"} | |
| for r in curr_rows | |
| ] | |
| _append_history_rows(seed) | |
| return | |
| prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS} | |
| for r in prev_rows} | |
| changed: list[dict] = [] | |
| for r in curr_rows: | |
| mid = (r.get("model_id") or "").strip() | |
| if not mid: | |
| continue | |
| if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}: | |
| changed.append(r) | |
| if not changed: | |
| return | |
| label = _history_label(changed) | |
| _append_history_rows([ | |
| {**{k: r.get(k, "") for k in CSV_FIELDS}, | |
| "version": now, "recorded_at": now, "version_label": label} | |
| for r in changed | |
| ]) | |
| def list_leaderboard_versions() -> list[dict]: | |
| """Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``.""" | |
| rows = load_history_rows() | |
| by_version: dict[str, dict] = {} | |
| for r in rows: | |
| v = (r.get("version") or "").strip() | |
| if not v: | |
| continue | |
| by_version[v] = { | |
| "version": v, | |
| "label": (r.get("version_label") or v), | |
| "recorded_at": (r.get("recorded_at") or v), | |
| } | |
| items = list(by_version.values()) | |
| items.sort(key=lambda d: d["recorded_at"], reverse=True) | |
| return items | |
| def raw_rows_for_version(version: str) -> list[dict]: | |
| """Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``. | |
| Accumulates every history entry recorded at or before the chosen version and keeps | |
| the most recent row per model. Falls back to the live results if history is missing. | |
| """ | |
| if not version: | |
| return load_raw_results() | |
| rows = load_history_rows() | |
| if not rows: | |
| return load_raw_results() | |
| target = None | |
| for r in rows: | |
| if (r.get("version") or "").strip() == version: | |
| ra = r.get("recorded_at") or "" | |
| if target is None or ra > target: | |
| target = ra | |
| if target is None: | |
| return load_raw_results() | |
| # File order is chronological, so the last entry per model wins. | |
| by_mid: dict[str, dict] = {} | |
| for r in rows: | |
| if (r.get("recorded_at") or "") <= target: | |
| mid = (r.get("model_id") or "").strip() | |
| if mid: | |
| by_mid[mid] = r | |
| out: list[dict] = [] | |
| for r in by_mid.values(): | |
| row = {k: r.get(k, "") for k in CSV_FIELDS} | |
| if not row.get("eval_family"): | |
| row["eval_family"] = "N/A" | |
| row.setdefault("submission_notes", "") | |
| row.setdefault("contact_email", "") | |
| normalize_legacy_csv_row(row) | |
| out.append(row) | |
| return out | |
| def load_results(version: str | None = None) -> pd.DataFrame: | |
| """Load results from storage and return a display-ready DataFrame. | |
| With ``version`` empty/``None`` the live leaderboard is returned. Otherwise the | |
| leaderboard is reconstructed from the append-only history as of that version. | |
| The leaderboard intentionally does NOT show eval_family, wall time, or submitted_at. | |
| Those fields remain in the CSV (for moderators and analytics) but are omitted from the | |
| public table. | |
| """ | |
| if version: | |
| raw_rows = raw_rows_for_version(version) | |
| else: | |
| raw_rows = load_raw_results() | |
| return _display_df_from_raw_rows(raw_rows) | |
| def _display_df_from_raw_rows(raw_rows: list[dict]) -> pd.DataFrame: | |
| """Transform raw CSV rows into the public, sorted, display-formatted DataFrame.""" | |
| import analytics | |
| if not raw_rows: | |
| return pd.DataFrame(columns=LEADERBOARD_COLUMNS) | |
| raw_df = pd.DataFrame(raw_rows) | |
| # Coerce live-condition columns to numeric so scoring / sorting behave sensibly. | |
| live_cols = [c for c in LIVE_SCENARIO_KEYS if c in raw_df.columns] | |
| for c in live_cols: | |
| raw_df[c] = pd.to_numeric(raw_df[c], errors="coerce") | |
| core_cols = [c for c in AVG_WER_CORE_KEYS if c in raw_df.columns] | |
| raw_df["_avg_wer"] = ( | |
| raw_df[core_cols].mean(axis=1, skipna=True) if core_cols else pd.NA | |
| ) | |
| raw_df = raw_df.sort_values( | |
| by="_avg_wer", ascending=True, na_position="last" | |
| ).reset_index(drop=True) | |
| def _wer_pct_or_na(series, ndigits: int = 2): | |
| return series.apply(lambda v: format_wer_percent(v, ndigits=ndigits)) | |
| display_df = pd.DataFrame() | |
| display_df[AutoEvalColumn.model.name] = raw_df["model_id"].apply(make_clickable_model) | |
| if core_cols: | |
| display_df[AutoEvalColumn.avg_wer_core.name] = _wer_pct_or_na(raw_df["_avg_wer"]) | |
| else: | |
| display_df[AutoEvalColumn.avg_wer_core.name] = "NA" | |
| # Map: scenario CSV key -> display column name (defined in AutoEvalColumn). | |
| scenario_col_map = { | |
| "wer_anechoic_speech": AutoEvalColumn.wer_anechoic.name, | |
| "wer_lab_measured": AutoEvalColumn.wer_lab_measured.name, | |
| "wer_lab_simulated": AutoEvalColumn.wer_lab_simulated.name, | |
| "wer_realistic_high_snr": AutoEvalColumn.wer_realistic_high_snr.name, | |
| "wer_realistic_mid_snr": AutoEvalColumn.wer_realistic_mid_snr.name, | |
| "wer_realistic_low_snr": AutoEvalColumn.wer_realistic_low_snr.name, | |
| "wer_moving_low": AutoEvalColumn.wer_moving_low.name, | |
| "wer_moving_mid": AutoEvalColumn.wer_moving_mid.name, | |
| "wer_moving_high": AutoEvalColumn.wer_moving_high.name, | |
| } | |
| for csv_key, display_name in scenario_col_map.items(): | |
| if csv_key in raw_df.columns: | |
| display_df[display_name] = _wer_pct_or_na( | |
| pd.to_numeric(raw_df[csv_key], errors="coerce") | |
| ) | |
| else: | |
| display_df[display_name] = "NA" | |
| def _num_or_na(series, rnd=4): | |
| return series.apply( | |
| lambda v: round(float(v), rnd) if pd.notna(v) and str(v).strip() != "" else "NA" | |
| ) | |
| display_df[AutoEvalColumn.eval_rtf.name] = ( | |
| _num_or_na(pd.to_numeric(raw_df["eval_rtf"], errors="coerce"), 4) | |
| if "eval_rtf" in raw_df.columns | |
| else "NA" | |
| ) | |
| if "num_params" in raw_df.columns: | |
| params = pd.to_numeric(raw_df["num_params"], errors="coerce") | |
| display_df[AutoEvalColumn.params_m.name] = params.apply( | |
| lambda v: round(float(v) / 1e9, 3) if pd.notna(v) and float(v) > 0 else "NA" | |
| ) | |
| else: | |
| display_df[AutoEvalColumn.params_m.name] = "NA" | |
| return display_df | |
| def _format_timestamp(ts: str) -> str: | |
| from datetime import datetime | |
| try: | |
| dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) | |
| return dt.strftime("%Y-%m-%d %H:%M UTC") | |
| except Exception: | |
| return str(ts) | |
| # --------------------------------------------------------------------------- | |
| # Model validation | |
| # --------------------------------------------------------------------------- | |
| def is_model_on_hub(model_name: str) -> tuple[bool, str | None]: | |
| """Check whether a model exists on the HuggingFace Hub.""" | |
| try: | |
| model_name = model_name.strip().replace(" ", "") | |
| parts = model_name.split("/") | |
| if len(parts) != 2 or not parts[0] or not parts[1]: | |
| return False, "is not a valid model name. Please use the format `author/model_name`." | |
| except Exception: | |
| return False, "is not a valid model name. Please use the format `author/model_name`." | |
| try: | |
| from huggingface_hub import HfApi | |
| hf_api = HfApi() | |
| models = list(hf_api.list_models(author=parts[0], search=parts[1])) | |
| matched = [m for m in models if m.modelId == model_name] | |
| if len(matched) != 1: | |
| return False, "was not found on the hub!" | |
| return True, None | |
| except Exception: | |
| return False, "was not found on the hub!" | |