ffasr / init.py
whojavumusic's picture
added email section
a5fbf6b
Raw
History Blame Contribute Delete
27.1 kB
"""
Data loading, results persistence, and ASR evaluation orchestration.
Heavy ML imports stay inside `evaluation` and `backends` so the Gradio UI starts quickly.
"""
import csv
import io
import os
import threading
import pandas as pd
from storage import (
HF_BUCKET_ID,
HF_TOKEN,
HISTORY_PATH,
RESULTS_PATH,
STORAGE_BACKEND,
upload_to_bucket,
download_bucket_file,
list_bucket_tree,
)
from metrics_config import LIVE_SCENARIO_KEYS, SCENARIO_KEYS
from utils_display import (
AVG_WER_CORE_KEYS,
AutoEvalColumn,
fields,
format_wer_percent,
make_clickable_model,
)
# Re-export for callers that imported from init
__all__ = [
"csv_lock",
"is_model_on_hub",
"load_raw_results",
"load_results",
"save_raw_results",
"list_audio_files",
"load_audio_file",
"load_transcript_file",
"normalize_legacy_csv_row",
"list_leaderboard_versions",
"raw_rows_for_version",
"LATEST_VERSION",
]
csv_lock = threading.Lock()
# Serializes read-modify-write of the append-only history file (separate from the
# leaderboard CSV lock so history bookkeeping can never deadlock a result save).
_history_lock = threading.Lock()
LEADERBOARD_COLUMNS = [c.name for c in fields(AutoEvalColumn)]
CSV_FIELDS = [
"model_id",
"eval_family",
*SCENARIO_KEYS,
"num_samples",
"eval_audio_seconds",
"eval_wall_time_s",
"eval_rtf",
"num_params",
"submission_notes",
"contact_email",
"submitted_at",
]
# Sentinel for "show the live leaderboard" (no historical reconstruction).
LATEST_VERSION = ""
# History rows are leaderboard rows plus version bookkeeping. Stored append-only:
# each leaderboard change appends only the model row(s) that changed, tagged with a
# version id (timestamp) so any past state can be reconstructed without full copies.
HISTORY_FIELDS = [*CSV_FIELDS, "version", "recorded_at", "version_label"]
# Legacy CSV / evaluation wire-format keys -> canonical ``SCENARIO_KEYS`` column names.
_LEGACY_WER_TO_CANONICAL: tuple[tuple[str, str], ...] = (
("wer_clean", "wer_anechoic_speech"),
("wer_measured", "wer_lab_measured"),
("wer_sim", "wer_lab_simulated"),
("wer_high_snr", "wer_realistic_high_snr"),
("wer_low_snr", "wer_realistic_low_snr"),
# Misaligned eval builds used packed condition names in result keys.
("wer_sim_high_snr", "wer_realistic_high_snr"),
("wer_sim_low_snr", "wer_realistic_low_snr"),
("wer_moving", "wer_moving_sources"),
)
def _cell_empty(v) -> bool:
return v is None or (isinstance(v, str) and v.strip() == "")
def normalize_legacy_csv_row(row: dict) -> dict:
"""Fill canonical scenario columns from legacy leaderboard CSV keys when needed."""
for old, new in _LEGACY_WER_TO_CANONICAL:
ov = row.get(old)
nv = row.get(new)
if _cell_empty(nv) and not _cell_empty(ov):
row[new] = str(ov).strip() if not isinstance(ov, str) else ov.strip()
# Older CSVs used ``wer_high_snr`` as a standalone column.
hn = row.get("wer_high_snr")
rh = row.get("wer_realistic_high_snr")
if _cell_empty(rh) and not _cell_empty(hn):
row["wer_realistic_high_snr"] = str(hn).strip() if isinstance(hn, str) else str(hn)
return row
def leaderboard_row_from_eval_result(
result: dict,
submitted_at_iso: str,
submission_notes: str = "",
contact_email: str = "",
) -> dict:
"""Build a full CSV row; scenario keys missing from `result` are left empty (future metrics)."""
row = {k: "" for k in CSV_FIELDS}
row["model_id"] = str(result.get("model_id", ""))
row["eval_family"] = str(result.get("eval_family", ""))
# Evaluation pipeline emits legacy wer_* keys on the wire (see benchmark.dataset.wer_result_key).
_EVAL_WIRE_TO_CANONICAL = {
"wer_anechoic_speech": "wer_clean",
"wer_lab_measured": "wer_measured",
"wer_lab_simulated": "wer_sim",
"wer_realistic_high_snr": "wer_high",
"wer_realistic_mid_snr": "wer_mid",
"wer_realistic_low_snr": "wer_low",
"wer_moving_low": "wer_moving_low",
"wer_moving_mid": "wer_moving_mid",
"wer_moving_high": "wer_moving_high",
}
_WIRE_KEY_ALIASES: dict[str, tuple[str, ...]] = {
"wer_high": ("wer_high_snr", "wer_sim_high_snr"),
"wer_mid": ("wer_mid_snr",),
"wer_low": ("wer_low_snr", "wer_sim_low_snr"),
"wer_moving_low": ("wer_moving_low",),
"wer_moving_mid": ("wer_moving_mid",),
"wer_moving_high": ("wer_moving_high",)
}
def _eval_wer(wire_key: str):
v = result.get(wire_key)
if v is not None and v != "":
return v
for alt in _WIRE_KEY_ALIASES.get(wire_key, ()):
v = result.get(alt)
if v is not None and v != "":
return v
return None
for canon_key, wire_key in _EVAL_WIRE_TO_CANONICAL.items():
v = _eval_wer(wire_key)
if v is not None and v != "":
row[canon_key] = str(v)
row["num_samples"] = str(result.get("num_samples", ""))
for k in ("eval_audio_seconds", "eval_wall_time_s", "eval_rtf", "num_params"):
if k in result and result[k] is not None and result[k] != "":
row[k] = str(result[k])
row["submission_notes"] = (submission_notes or "").strip()[:4000]
row["contact_email"] = (contact_email or "").strip()[:254]
row["submitted_at"] = submitted_at_iso
return row
def merge_leaderboard_row_from_eval_result(
result: dict,
submitted_at_iso: str,
submission_notes: str = "",
contact_email: str = "",
) -> None:
"""
Merge evaluation output into an existing leaderboard row (partial re-eval).
Only scenario WER columns and ``eval_family`` / ``num_params`` present in
``result`` are overwritten; other metrics and timing fields are kept.
Creates a new row if the model is not on the leaderboard yet.
"""
from metrics_config import SCENARIO_KEYS
model_id = str(result.get("model_id", "")).strip()
if not model_id:
raise ValueError("merge_leaderboard_row_from_eval_result requires model_id")
rows = load_raw_results()
match_idx: int | None = None
for i, r in enumerate(rows):
if (r.get("model_id") or "").strip() == model_id:
match_idx = i
break
patch = leaderboard_row_from_eval_result(
result, submitted_at_iso, submission_notes=submission_notes, contact_email=contact_email
)
if match_idx is None:
normalize_legacy_csv_row(patch)
rows.append(patch)
else:
row = dict(rows[match_idx])
for k in SCENARIO_KEYS:
v = patch.get(k)
if v is not None and str(v).strip() != "":
row[k] = v
if patch.get("eval_family"):
row["eval_family"] = patch["eval_family"]
if patch.get("num_params"):
row["num_params"] = patch["num_params"]
if (submission_notes or "").strip():
row["submission_notes"] = patch["submission_notes"]
if (contact_email or "").strip():
row["contact_email"] = patch["contact_email"]
normalize_legacy_csv_row(row)
rows[match_idx] = row
from analytics import sort_leaderboard_rows_inplace
sort_leaderboard_rows_inplace(rows)
save_raw_results(rows)
invalidate_results_cache()
# ---------------------------------------------------------------------------
# Storage helpers
# ---------------------------------------------------------------------------
def list_audio_files(prefix: str, suffix: str) -> list[str]:
"""List all file paths under a prefix with a given suffix."""
if STORAGE_BACKEND == "hf_bucket":
files = []
for entry in list_bucket_tree(HF_BUCKET_ID, prefix=prefix, recursive=True, token=HF_TOKEN):
if entry.type == "file" and entry.path.endswith(suffix):
files.append(entry.path)
return sorted(files)
return []
def load_audio_file(path: str):
"""Download an audio file and return (waveform, sample_rate)."""
import torchaudio
if STORAGE_BACKEND == "hf_bucket":
local_path = download_bucket_file(path)
waveform, sample_rate = torchaudio.load(local_path)
os.unlink(local_path)
else:
raise RuntimeError("load_audio_file requires STORAGE_BACKEND='hf_bucket'.")
return waveform, sample_rate
def load_transcript_file(path: str) -> str:
"""Download a transcript txt file and return its content."""
if STORAGE_BACKEND == "hf_bucket":
local_path = download_bucket_file(path)
with open(local_path, "r") as f:
content = f.read().strip()
os.unlink(local_path)
return content
raise RuntimeError("load_transcript_file requires STORAGE_BACKEND='hf_bucket'.")
# ---------------------------------------------------------------------------
# Results persistence
# ---------------------------------------------------------------------------
_raw_results_cache: list[dict] | None = None
def invalidate_results_cache() -> None:
global _raw_results_cache
_raw_results_cache = None
def load_raw_results(*, refresh: bool = False) -> list[dict]:
"""Load the existing leaderboard CSV rows (cached in-process until ``refresh=True``)."""
global _raw_results_cache
if not refresh and _raw_results_cache is not None:
return [dict(r) for r in _raw_results_cache]
with csv_lock:
try:
if STORAGE_BACKEND == "hf_bucket":
local_path = download_bucket_file(RESULTS_PATH)
with open(local_path, "r") as f:
results = list(csv.DictReader(f))
os.unlink(local_path)
for row in results:
for k in CSV_FIELDS:
row.setdefault(k, "")
if not row.get("eval_family"):
row["eval_family"] = "N/A"
row.setdefault("submission_notes", "")
row.setdefault("contact_email", "")
normalize_legacy_csv_row(row)
_raw_results_cache = results
return [dict(r) for r in results]
_raw_results_cache = []
return []
except Exception:
_raw_results_cache = []
return []
def save_raw_results(rows: list[dict], *, record_version: bool = True):
"""Write the leaderboard CSV back to storage.
When ``record_version`` is set (default), the model row(s) that changed relative to
the previously persisted state are appended to the history file so the new state is
selectable from the leaderboard version dropdown.
"""
if not rows:
return
# Snapshot the previously persisted state BEFORE acquiring csv_lock (load_raw_results
# also takes csv_lock, which is non-reentrant) so we can diff for versioning.
prev_rows = load_raw_results() if record_version else None
with csv_lock:
normalized = [{k: row.get(k, "") for k in CSV_FIELDS} for row in rows]
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=CSV_FIELDS)
writer.writeheader()
writer.writerows(normalized)
content = output.getvalue()
if STORAGE_BACKEND == "hf_bucket":
upload_to_bucket(
HF_BUCKET_ID,
add=[(content.encode("utf-8"), RESULTS_PATH)],
token=HF_TOKEN,
)
invalidate_results_cache()
if record_version:
# Never let history bookkeeping break a successful result save.
try:
_record_history_from_diff(prev_rows or [], normalized)
except Exception:
pass
# ---------------------------------------------------------------------------
# Leaderboard version history (append-only, delta-based)
# ---------------------------------------------------------------------------
def _now_iso() -> str:
"""UTC timestamp with microseconds so successive versions sort and stay unique."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def load_history_rows() -> list[dict]:
"""All rows of the append-only history file (chronological), or ``[]`` if absent."""
if STORAGE_BACKEND != "hf_bucket":
return []
try:
local_path = download_bucket_file(HISTORY_PATH)
except Exception:
return []
try:
with open(local_path, "r") as f:
rows = list(csv.DictReader(f))
except Exception:
rows = []
finally:
try:
os.unlink(local_path)
except Exception:
pass
return rows
def _append_history_rows(new_rows: list[dict]) -> None:
"""Append rows to the history file (read-modify-write of the whole file)."""
if STORAGE_BACKEND != "hf_bucket" or not new_rows:
return
with _history_lock:
all_rows = load_history_rows() + new_rows
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore")
writer.writeheader()
for row in all_rows:
writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS})
upload_to_bucket(
HF_BUCKET_ID,
add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)],
token=HF_TOKEN,
)
def _history_label(changed: list[dict]) -> str:
if len(changed) == 1:
mid = (changed[0].get("model_id") or "").strip()
return mid or "leaderboard updated"
return f"{len(changed)} models updated"
def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None:
"""Append the model rows that changed between ``prev_rows`` and ``curr_rows``.
The first ever call seeds a full snapshot ("Initial release") so the earliest
selectable version reconstructs the complete board; later calls store only deltas.
"""
now = _now_iso()
existing = load_history_rows()
if not existing:
seed = [
{**{k: r.get(k, "") for k in CSV_FIELDS},
"version": now, "recorded_at": now, "version_label": "Initial release"}
for r in curr_rows
]
_append_history_rows(seed)
return
prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS}
for r in prev_rows}
changed: list[dict] = []
for r in curr_rows:
mid = (r.get("model_id") or "").strip()
if not mid:
continue
if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}:
changed.append(r)
if not changed:
return
label = _history_label(changed)
_append_history_rows([
{**{k: r.get(k, "") for k in CSV_FIELDS},
"version": now, "recorded_at": now, "version_label": label}
for r in changed
])
def list_leaderboard_versions() -> list[dict]:
"""Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``."""
rows = load_history_rows()
by_version: dict[str, dict] = {}
for r in rows:
v = (r.get("version") or "").strip()
if not v:
continue
by_version[v] = {
"version": v,
"label": (r.get("version_label") or v),
"recorded_at": (r.get("recorded_at") or v),
}
items = list(by_version.values())
items.sort(key=lambda d: d["recorded_at"], reverse=True)
return items
def raw_rows_for_version(version: str) -> list[dict]:
"""Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``.
Accumulates every history entry recorded at or before the chosen version and keeps
the most recent row per model. Falls back to the live results if history is missing.
"""
if not version:
return load_raw_results()
rows = load_history_rows()
if not rows:
return load_raw_results()
target = None
for r in rows:
if (r.get("version") or "").strip() == version:
ra = r.get("recorded_at") or ""
if target is None or ra > target:
target = ra
if target is None:
return load_raw_results()
# File order is chronological, so the last entry per model wins.
by_mid: dict[str, dict] = {}
for r in rows:
if (r.get("recorded_at") or "") <= target:
mid = (r.get("model_id") or "").strip()
if mid:
by_mid[mid] = r
out: list[dict] = []
for r in by_mid.values():
row = {k: r.get(k, "") for k in CSV_FIELDS}
if not row.get("eval_family"):
row["eval_family"] = "N/A"
row.setdefault("submission_notes", "")
row.setdefault("contact_email", "")
normalize_legacy_csv_row(row)
out.append(row)
return out
# ---------------------------------------------------------------------------
# Leaderboard version history (append-only, delta-based)
# ---------------------------------------------------------------------------
def _now_iso() -> str:
"""UTC timestamp with microseconds so successive versions sort and stay unique."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def load_history_rows() -> list[dict]:
"""All rows of the append-only history file (chronological), or ``[]`` if absent."""
if STORAGE_BACKEND != "hf_bucket":
return []
try:
local_path = download_bucket_file(HISTORY_PATH)
except Exception:
return []
try:
with open(local_path, "r") as f:
rows = list(csv.DictReader(f))
except Exception:
rows = []
finally:
try:
os.unlink(local_path)
except Exception:
pass
return rows
def _append_history_rows(new_rows: list[dict]) -> None:
"""Append rows to the history file (read-modify-write of the whole file)."""
if STORAGE_BACKEND != "hf_bucket" or not new_rows:
return
with _history_lock:
all_rows = load_history_rows() + new_rows
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=HISTORY_FIELDS, extrasaction="ignore")
writer.writeheader()
for row in all_rows:
writer.writerow({k: row.get(k, "") for k in HISTORY_FIELDS})
upload_to_bucket(
HF_BUCKET_ID,
add=[(output.getvalue().encode("utf-8"), HISTORY_PATH)],
token=HF_TOKEN,
)
def _history_label(changed: list[dict]) -> str:
if len(changed) == 1:
mid = (changed[0].get("model_id") or "").strip()
return mid or "leaderboard updated"
return f"{len(changed)} models updated"
def _record_history_from_diff(prev_rows: list[dict], curr_rows: list[dict]) -> None:
"""Append the model rows that changed between ``prev_rows`` and ``curr_rows``.
The first ever call seeds a full snapshot ("Initial release") so the earliest
selectable version reconstructs the complete board; later calls store only deltas.
"""
now = _now_iso()
existing = load_history_rows()
if not existing:
seed = [
{**{k: r.get(k, "") for k in CSV_FIELDS},
"version": now, "recorded_at": now, "version_label": "Initial release"}
for r in curr_rows
]
_append_history_rows(seed)
return
prev_by_mid = {(r.get("model_id") or "").strip(): {k: r.get(k, "") for k in CSV_FIELDS}
for r in prev_rows}
changed: list[dict] = []
for r in curr_rows:
mid = (r.get("model_id") or "").strip()
if not mid:
continue
if prev_by_mid.get(mid) != {k: r.get(k, "") for k in CSV_FIELDS}:
changed.append(r)
if not changed:
return
label = _history_label(changed)
_append_history_rows([
{**{k: r.get(k, "") for k in CSV_FIELDS},
"version": now, "recorded_at": now, "version_label": label}
for r in changed
])
def list_leaderboard_versions() -> list[dict]:
"""Distinct versions, newest first: ``[{version, label, recorded_at}, ...]``."""
rows = load_history_rows()
by_version: dict[str, dict] = {}
for r in rows:
v = (r.get("version") or "").strip()
if not v:
continue
by_version[v] = {
"version": v,
"label": (r.get("version_label") or v),
"recorded_at": (r.get("recorded_at") or v),
}
items = list(by_version.values())
items.sort(key=lambda d: d["recorded_at"], reverse=True)
return items
def raw_rows_for_version(version: str) -> list[dict]:
"""Reconstruct the leaderboard (CSV_FIELDS rows) as of ``version``.
Accumulates every history entry recorded at or before the chosen version and keeps
the most recent row per model. Falls back to the live results if history is missing.
"""
if not version:
return load_raw_results()
rows = load_history_rows()
if not rows:
return load_raw_results()
target = None
for r in rows:
if (r.get("version") or "").strip() == version:
ra = r.get("recorded_at") or ""
if target is None or ra > target:
target = ra
if target is None:
return load_raw_results()
# File order is chronological, so the last entry per model wins.
by_mid: dict[str, dict] = {}
for r in rows:
if (r.get("recorded_at") or "") <= target:
mid = (r.get("model_id") or "").strip()
if mid:
by_mid[mid] = r
out: list[dict] = []
for r in by_mid.values():
row = {k: r.get(k, "") for k in CSV_FIELDS}
if not row.get("eval_family"):
row["eval_family"] = "N/A"
row.setdefault("submission_notes", "")
row.setdefault("contact_email", "")
normalize_legacy_csv_row(row)
out.append(row)
return out
def load_results(version: str | None = None) -> pd.DataFrame:
"""Load results from storage and return a display-ready DataFrame.
With ``version`` empty/``None`` the live leaderboard is returned. Otherwise the
leaderboard is reconstructed from the append-only history as of that version.
The leaderboard intentionally does NOT show eval_family, wall time, or submitted_at.
Those fields remain in the CSV (for moderators and analytics) but are omitted from the
public table.
"""
if version:
raw_rows = raw_rows_for_version(version)
else:
raw_rows = load_raw_results()
return _display_df_from_raw_rows(raw_rows)
def _display_df_from_raw_rows(raw_rows: list[dict]) -> pd.DataFrame:
"""Transform raw CSV rows into the public, sorted, display-formatted DataFrame."""
import analytics
if not raw_rows:
return pd.DataFrame(columns=LEADERBOARD_COLUMNS)
raw_df = pd.DataFrame(raw_rows)
# Coerce live-condition columns to numeric so scoring / sorting behave sensibly.
live_cols = [c for c in LIVE_SCENARIO_KEYS if c in raw_df.columns]
for c in live_cols:
raw_df[c] = pd.to_numeric(raw_df[c], errors="coerce")
core_cols = [c for c in AVG_WER_CORE_KEYS if c in raw_df.columns]
raw_df["_avg_wer"] = (
raw_df[core_cols].mean(axis=1, skipna=True) if core_cols else pd.NA
)
raw_df = raw_df.sort_values(
by="_avg_wer", ascending=True, na_position="last"
).reset_index(drop=True)
def _wer_pct_or_na(series, ndigits: int = 2):
return series.apply(lambda v: format_wer_percent(v, ndigits=ndigits))
display_df = pd.DataFrame()
display_df[AutoEvalColumn.model.name] = raw_df["model_id"].apply(make_clickable_model)
if core_cols:
display_df[AutoEvalColumn.avg_wer_core.name] = _wer_pct_or_na(raw_df["_avg_wer"])
else:
display_df[AutoEvalColumn.avg_wer_core.name] = "NA"
# Map: scenario CSV key -> display column name (defined in AutoEvalColumn).
scenario_col_map = {
"wer_anechoic_speech": AutoEvalColumn.wer_anechoic.name,
"wer_lab_measured": AutoEvalColumn.wer_lab_measured.name,
"wer_lab_simulated": AutoEvalColumn.wer_lab_simulated.name,
"wer_realistic_high_snr": AutoEvalColumn.wer_realistic_high_snr.name,
"wer_realistic_mid_snr": AutoEvalColumn.wer_realistic_mid_snr.name,
"wer_realistic_low_snr": AutoEvalColumn.wer_realistic_low_snr.name,
"wer_moving_low": AutoEvalColumn.wer_moving_low.name,
"wer_moving_mid": AutoEvalColumn.wer_moving_mid.name,
"wer_moving_high": AutoEvalColumn.wer_moving_high.name,
}
for csv_key, display_name in scenario_col_map.items():
if csv_key in raw_df.columns:
display_df[display_name] = _wer_pct_or_na(
pd.to_numeric(raw_df[csv_key], errors="coerce")
)
else:
display_df[display_name] = "NA"
def _num_or_na(series, rnd=4):
return series.apply(
lambda v: round(float(v), rnd) if pd.notna(v) and str(v).strip() != "" else "NA"
)
display_df[AutoEvalColumn.eval_rtf.name] = (
_num_or_na(pd.to_numeric(raw_df["eval_rtf"], errors="coerce"), 4)
if "eval_rtf" in raw_df.columns
else "NA"
)
if "num_params" in raw_df.columns:
params = pd.to_numeric(raw_df["num_params"], errors="coerce")
display_df[AutoEvalColumn.params_m.name] = params.apply(
lambda v: round(float(v) / 1e9, 3) if pd.notna(v) and float(v) > 0 else "NA"
)
else:
display_df[AutoEvalColumn.params_m.name] = "NA"
return display_df
def _format_timestamp(ts: str) -> str:
from datetime import datetime
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return str(ts)
# ---------------------------------------------------------------------------
# Model validation
# ---------------------------------------------------------------------------
def is_model_on_hub(model_name: str) -> tuple[bool, str | None]:
"""Check whether a model exists on the HuggingFace Hub."""
try:
model_name = model_name.strip().replace(" ", "")
parts = model_name.split("/")
if len(parts) != 2 or not parts[0] or not parts[1]:
return False, "is not a valid model name. Please use the format `author/model_name`."
except Exception:
return False, "is not a valid model name. Please use the format `author/model_name`."
try:
from huggingface_hub import HfApi
hf_api = HfApi()
models = list(hf_api.list_models(author=parts[0], search=parts[1]))
matched = [m for m in models if m.modelId == model_name]
if len(matched) != 1:
return False, "was not found on the hub!"
return True, None
except Exception:
return False, "was not found on the hub!"