""" Evaluation queue for the Gradio Space: FIFO jobs, bounded memory. Evaluations dispatch to **Hugging Face Hub Jobs** when ``FFASR_REMOTE_JOBS=1`` (see ``remote_jobs``). The Space does not import torch or run ASR locally. Optional moderation: when FFASR_MODERATION=1 and FFASR_MODERATOR_SECRET is set, new jobs stay in pending_moderation until the moderator approves (then they enter the work queue). Job state is persisted to the Hub bucket (results/jobs_state.csv) so moderation and queue survive Space restarts. """ from __future__ import annotations import csv import hmac import io import json import os import time import queue import threading import uuid from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import Any # --- Moderation (Space secrets / env) --- # In HF Space: Settings → Variables and secrets → Secrets # FFASR_MODERATION=1 # FFASR_MODERATOR_SECRET= MODERATION_ENABLED = os.environ.get("FFASR_MODERATION", "").strip().lower() in ( "1", "true", "yes", "on", ) MODERATOR_SECRET = os.environ.get("FFASR_MODERATOR_SECRET", "").strip() class JobStatus(str, Enum): pending_moderation = "pending_moderation" queued = "queued" running = "running" dispatching = "dispatching" remote_running = "remote_running" collecting = "collecting" done = "done" failed = "failed" @dataclass class Job: id: str model_id: str family_id: str status: JobStatus created_at: str updated_at: str = "" error: str | None = None result: dict[str, Any] | None = None submission_notes: str = "" contact_email: str = "" extra_requirements: str = "" setup_script: str = "" custom_script: str = "" recipe_id: str = "" is_gated: bool = False run_custom_script: bool = False # Progress (not persisted; only meaningful while running) progress_done: int = 0 progress_total: int = 0 progress_condition: str = "" hf_remote_job_id: str | None = None remote_artifact_path: str | None = None # Set by moderator retry: skip duplicate guard and replace CSV row on success. replace_leaderboard: bool = False # Subset of PACKED_FILES keys; None means evaluate all conditions. eval_conditions: tuple[str, ...] | None = None _jobs: dict[str, Job] = {} _jobs_lock = threading.Lock() _jobs_file_lock = threading.Lock() _work_queue: queue.Queue[str] = queue.Queue() _running_job_id: str | None = None _worker_started = False _worker_lock = threading.Lock() _jobs_loaded = False # Per Space job id: monotonic deadline for remote Hub Job polling (not persisted). _remote_deadlines: dict[str, float] = {} _MAX_JOBS_TRACKED = 400 _MAX_QUEUE_BACKLOG = 32 _MAX_PENDING_MODERATION = 64 _DEFAULT_REMOTE_MAX_CONCURRENT = 4 _MAX_CUSTOM_SCRIPT_BYTES = 32 * 1024 _MAX_REQUIREMENT_LINE_LEN = 200 _MAX_REQUIREMENT_LINES = 50 _JOBS_CSV_FIELDS = [ "job_id", "model_id", "family_id", "status", "created_at", "updated_at", "error", "submission_notes", "contact_email", "extra_requirements", "setup_script_b64", "custom_script_b64", "recipe_id", "is_gated", "run_custom_script", "hf_remote_job_id", "remote_artifact_path", "eval_conditions", ] def parse_requirements_lines(text: str) -> list[str]: """One package spec per line (requirements.txt style); ignores blanks and # comments.""" out: list[str] = [] for raw in (text or "").splitlines(): line = raw.strip() if not line or line.startswith("#"): continue if len(line) > _MAX_REQUIREMENT_LINE_LEN: line = line[:_MAX_REQUIREMENT_LINE_LEN] out.append(line) if len(out) >= _MAX_REQUIREMENT_LINES: break return out def sanitize_custom_script(text: str) -> str: """Trim and cap custom script body for storage.""" from backends.custom_eval import normalize_custom_script_compat s = normalize_custom_script_compat((text or "").strip()) if not s: return "" enc = s.encode("utf-8") if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES: enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES] s = enc.decode("utf-8", errors="ignore") return s def sanitize_contact_email(text: str) -> str: """Trim and validate a required submitter email address.""" s = (text or "").strip()[:254] if not s: raise ValueError("Contact email is required.") if s.count("@") != 1: raise ValueError("Invalid email address.") local, domain = s.split("@", 1) if not local or not domain or "." not in domain: raise ValueError("Invalid email address.") return s def sanitize_setup_script(text: str) -> str: """Trim and cap one-time setup script (shell or Python) for storage.""" s = (text or "").strip() if not s: return "" enc = s.encode("utf-8") if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES: enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES] s = enc.decode("utf-8", errors="ignore") return s def custom_script_deprecated_api_warning(text: str) -> str | None: """Return a short warning if the script uses kwargs removed from current HF Hub.""" s = (text or "").strip() if not s: return None if "use_auth_token" in s or "authentication_token" in s: return ( "Your script uses use_auth_token (deprecated). " "It will be rewritten to token= automatically; prefer " "token=os.environ['HF_TOKEN'] for gated models." ) return None def custom_script_argparse_warning(text: str) -> str | None: """Warn when a script references ``args.`` without defining argparse (common clone mistake).""" s = (text or "").strip() if not s or "args." not in s: return None try: import ast tree = ast.parse(s) except SyntaxError: return None defines_args = False calls_parse_args = False for node in ast.walk(tree): if isinstance(node, ast.Assign): for t in node.targets: if isinstance(t, ast.Name) and t.id == "args": defines_args = True if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): if node.target.id == "args": defines_args = True if isinstance(node, ast.Call): func = node.func if isinstance(func, ast.Attribute) and func.attr == "parse_args": calls_parse_args = True if isinstance(func, ast.Name) and func.id == "parse_args": calls_parse_args = True if defines_args or calls_parse_args: return None return ( "Your script uses args.something but FFASR does not run " "argparse — replace with literal values (e.g. " "routing_enabled=True) or call parse_args() " "before using args. For Mega-ASR, use the recipe evaluator " "in recipes/mega_asr/evaluate.py." ) def custom_script_defines_evaluate(text: str) -> bool: """Best-effort check that the script defines a top-level ``evaluate`` function.""" s = (text or "").strip() if not s: return True try: import ast tree = ast.parse(s) except SyntaxError: return False return any( isinstance(node, ast.FunctionDef) and node.name == "evaluate" for node in tree.body ) def _bool_to_csv(flag: bool) -> str: return "1" if flag else "0" def _bool_from_csv(raw: str) -> bool: return (raw or "").strip().lower() in ("1", "true", "yes", "on") def _encode_script_b64(script: str) -> str: if not script: return "" import base64 return base64.b64encode(script.encode("utf-8")).decode("ascii") def _decode_script_b64(raw: str) -> str: raw = (raw or "").strip() if not raw: return "" import base64 try: return base64.b64decode(raw.encode("ascii")).decode("utf-8") except Exception: return "" def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _touch(job: Job) -> None: job.updated_at = _now_iso() def moderation_active() -> bool: """True when moderation is enabled and a secret is configured.""" return MODERATION_ENABLED and bool(MODERATOR_SECRET) def moderation_misconfigured() -> bool: """Enabled in UI but secret missing; submissions should be rejected.""" return MODERATION_ENABLED and not MODERATOR_SECRET def _check_moderator_secret(provided: str) -> bool: if not MODERATOR_SECRET or provided is None: return False a = provided.strip().encode("utf-8") b = MODERATOR_SECRET.encode("utf-8") if len(a) != len(b): return False return hmac.compare_digest(a, b) def _prune_jobs() -> None: if len(_jobs) <= _MAX_JOBS_TRACKED: return terminal = [ jid for jid, j in _jobs.items() if j.status in (JobStatus.done, JobStatus.failed) ] terminal.sort(key=lambda jid: _jobs[jid].created_at) for jid in terminal[: max(0, len(_jobs) - _MAX_JOBS_TRACKED // 2)]: _jobs.pop(jid, None) def _persist_jobs() -> None: """Write all tracked jobs to Hub bucket CSV (best-effort).""" from storage import ( HF_BUCKET_ID, HF_TOKEN, JOBS_STATE_PATH, STORAGE_BACKEND, batch_bucket_files, upload_to_bucket, ) if STORAGE_BACKEND != "hf_bucket" or batch_bucket_files is None: return with _jobs_lock: rows: list[dict[str, str]] = [] for jid, j in _jobs.items(): rows.append( { "job_id": j.id, "model_id": j.model_id, "family_id": j.family_id, "status": j.status.value, "created_at": j.created_at, "updated_at": j.updated_at or j.created_at, "error": (j.error or "").replace("\n", " ")[:2000], "submission_notes": (j.submission_notes or "").replace("\n", " ")[:4000], "contact_email": (j.contact_email or "").replace("\n", " ")[:254], "extra_requirements": (j.extra_requirements or "").replace("\r\n", "\n")[:8000], "setup_script_b64": _encode_script_b64(j.setup_script or ""), "custom_script_b64": _encode_script_b64(j.custom_script or ""), "recipe_id": (j.recipe_id or "").strip()[:64], "is_gated": _bool_to_csv(j.is_gated), "run_custom_script": _bool_to_csv(j.run_custom_script), "hf_remote_job_id": (j.hf_remote_job_id or "").strip(), "remote_artifact_path": (j.remote_artifact_path or "").strip(), "eval_conditions": ",".join(j.eval_conditions) if j.eval_conditions else "", } ) rows.sort(key=lambda r: r["created_at"]) buf = io.StringIO() w = csv.DictWriter(buf, fieldnames=_JOBS_CSV_FIELDS, extrasaction="ignore") w.writeheader() w.writerows(rows) content = buf.getvalue().encode("utf-8") try: with _jobs_file_lock: upload_to_bucket( HF_BUCKET_ID, add=[(content, JOBS_STATE_PATH)], token=HF_TOKEN, ) except Exception: pass def _load_persisted_jobs_once() -> None: """Restore jobs from bucket and re-queue work that was queued or interrupted.""" global _jobs_loaded if _jobs_loaded: return _jobs_loaded = True from storage import STORAGE_BACKEND, download_bucket_file, JOBS_STATE_PATH if STORAGE_BACKEND != "hf_bucket": return try: import os as _os local_path = download_bucket_file(JOBS_STATE_PATH) with open(local_path, "r", encoding="utf-8") as f: disk_rows = list(csv.DictReader(f)) _os.unlink(local_path) except Exception: return if not disk_rows: return # Last row wins if duplicate job_id by_id: dict[str, dict] = {} for row in disk_rows: jid = (row.get("job_id") or "").strip() if jid: by_id[jid] = row to_queue: list[tuple[str, str]] = [] with _jobs_lock: for jid, row in by_id.items(): st_raw = (row.get("status") or "").strip() try: st = JobStatus(st_raw) except ValueError: continue err = (row.get("error") or "").strip() or None notes = (row.get("submission_notes") or "").strip() email = (row.get("contact_email") or "").strip()[:254] extra_req = (row.get("extra_requirements") or "").strip() setup = _decode_script_b64(row.get("setup_script_b64") or "") script = _decode_script_b64(row.get("custom_script_b64") or "") recipe_id = (row.get("recipe_id") or "").strip() is_gated = _bool_from_csv(row.get("is_gated") or "") run_custom = _bool_from_csv(row.get("run_custom_script") or "") created = (row.get("created_at") or _now_iso()).strip() updated = (row.get("updated_at") or created).strip() mid = (row.get("model_id") or "").strip() fid = (row.get("family_id") or "").strip() hf_rid = (row.get("hf_remote_job_id") or "").strip() or None art_path = (row.get("remote_artifact_path") or "").strip() or None eval_conds = _parse_eval_conditions_csv(row.get("eval_conditions") or "") if not mid: continue if st == JobStatus.running: st = JobStatus.queued extra = "Re-queued after Space restart (was running)." err = f"{err}; {extra}" if err else extra if st in (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting): st = JobStatus.queued extra = "Re-queued after Space restart (remote job resume)." err = f"{err}; {extra}" if err else extra job = Job( id=jid, model_id=mid, family_id=fid, status=st, created_at=created, updated_at=updated, error=err, submission_notes=notes, contact_email=email, extra_requirements=extra_req, setup_script=setup, custom_script=script, recipe_id=recipe_id, is_gated=is_gated, run_custom_script=run_custom, hf_remote_job_id=hf_rid, remote_artifact_path=art_path, eval_conditions=eval_conds, ) _jobs[jid] = job if st == JobStatus.queued: to_queue.append((created, jid)) to_queue.sort(key=lambda x: x[0]) for _, jid in to_queue: _work_queue.put(jid) def _progress_update(job_id: str, done: int, total: int, condition: str) -> None: """Cheap in-memory progress update; throttling is done by the UI timer.""" with _jobs_lock: j = _jobs.get(job_id) if j is None: return j.progress_done = int(done) j.progress_total = int(total) j.progress_condition = condition or "" def _leaderboard_sort_rows_inplace(rows: list[dict]) -> None: """Sort leaderboard CSV rows by Pareto layer (asc), then FFAS score (desc).""" from analytics import sort_leaderboard_rows_inplace sort_leaderboard_rows_inplace(rows) def _remote_poll_interval_s() -> float: raw = os.environ.get("FFASR_REMOTE_JOB_POLL_S", "10").strip() try: return float(raw) if raw else 10.0 except ValueError: return 10.0 _DEFAULT_REMOTE_JOB_MAX_WAIT_S = 86400.0 # 24 hours; previously 8h / 4h. # Extra slack past the Space-side deadline before we declare a timeout. # Absorbs the brief gap between a worker script exiting cleanly (artifact # uploaded) and the Hub flipping the job stage to COMPLETED. _REMOTE_DEADLINE_GRACE_S = 90.0 def _remote_max_wait_s() -> float: raw = os.environ.get( "FFASR_REMOTE_JOB_MAX_WAIT_S", str(_DEFAULT_REMOTE_JOB_MAX_WAIT_S) ).strip() try: return float(raw) if raw else _DEFAULT_REMOTE_JOB_MAX_WAIT_S except ValueError: return _DEFAULT_REMOTE_JOB_MAX_WAIT_S def remote_max_concurrent_jobs() -> int: """Max Hub Jobs in flight at once when ``FFASR_REMOTE_JOBS=1`` (default 4).""" raw = os.environ.get( "FFASR_REMOTE_MAX_CONCURRENT_JOBS", str(_DEFAULT_REMOTE_MAX_CONCURRENT) ).strip() try: n = int(raw) if raw else _DEFAULT_REMOTE_MAX_CONCURRENT except ValueError: n = _DEFAULT_REMOTE_MAX_CONCURRENT return max(1, min(n, 32)) def _remote_in_flight_statuses() -> tuple[JobStatus, ...]: return (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting) def _count_remote_in_flight() -> int: with _jobs_lock: return sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses()) def _list_remote_in_flight_ids() -> list[str]: with _jobs_lock: return [ j.id for j in _jobs.values() if j.status in _remote_in_flight_statuses() ] def _list_active_eval_jobs() -> list[Job]: """Jobs currently using a remote or in-process eval slot.""" with _jobs_lock: return [ j for j in _jobs.values() if j.status in ( JobStatus.running, *_remote_in_flight_statuses(), ) ] def _fail_job(job_id: str, error: str, *, finish_queue: bool = True) -> None: with _jobs_lock: j = _jobs.get(job_id) if j: j.status = JobStatus.failed j.error = (error or "Unknown error")[:8000] _touch(j) _prune_jobs() _remote_deadlines.pop(job_id, None) _persist_jobs() if finish_queue: try: _work_queue.task_done() except ValueError: pass def _succeed_job(job_id: str, result: dict) -> None: with _jobs_lock: j = _jobs.get(job_id) if j: j.status = JobStatus.done j.result = result j.error = None j.progress_condition = "" _touch(j) _prune_jobs() _remote_deadlines.pop(job_id, None) _persist_jobs() try: _work_queue.task_done() except ValueError: pass def _merge_eval_result_to_leaderboard( result: dict, submitted_at_iso: str, submission_notes: str, contact_email: str = "", *, replace_existing: bool = False, merge_partial: bool = False, ) -> None: from init import ( invalidate_results_cache, leaderboard_row_from_eval_result, load_raw_results, merge_leaderboard_row_from_eval_result, normalize_legacy_csv_row, save_raw_results, ) if merge_partial: merge_leaderboard_row_from_eval_result( result, submitted_at_iso, submission_notes=submission_notes, contact_email=contact_email, ) return model_id = str(result.get("model_id", "")).strip() rows = load_raw_results() if replace_existing and model_id: existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id] for i in sorted(existing, reverse=True): rows.pop(i) new_row = leaderboard_row_from_eval_result( result, submitted_at_iso, submission_notes=submission_notes, contact_email=contact_email, ) normalize_legacy_csv_row(new_row) rows.append(new_row) _leaderboard_sort_rows_inplace(rows) save_raw_results(rows) invalidate_results_cache() def _parse_eval_conditions_csv(raw: str) -> tuple[str, ...] | None: from benchmark.dataset import resolve_condition_keys parts = [p.strip() for p in (raw or "").split(",") if p.strip()] if not parts: return None keys = resolve_condition_keys(parts) from benchmark.dataset import PACKED_FILES if set(keys) == set(PACKED_FILES.keys()): return None return keys def _encode_eval_conditions_csv(keys: tuple[str, ...] | None) -> str: if not keys: return "" return ",".join(keys) def normalize_moderator_eval_conditions( selected: list[str] | None, ) -> tuple[str, ...] | None: """``None`` = all packed conditions; otherwise a validated non-empty subset.""" from benchmark.dataset import PACKED_FILES, resolve_condition_keys if not selected: return None keys = resolve_condition_keys(selected) if not keys: raise ValueError("Select at least one dataset to evaluate.") if set(keys) == set(PACKED_FILES.keys()): return None return keys def eval_condition_checkbox_defaults() -> tuple[list[str], list[str]]: """(labels, values) for Gradio CheckboxGroup defaults (all selected).""" from benchmark.dataset import CONDITION_UI_CHOICES labels = [lbl for lbl, _ in CONDITION_UI_CHOICES] values = [key for _, key in CONDITION_UI_CHOICES] return labels, values def _job_is_partial_eval(job: Job | None) -> bool: return bool(job and job.eval_conditions) def _load_artifact_json_from_bucket(artifact_path: str) -> dict: """Download and parse a remote eval JSON artifact from the Hub bucket.""" from storage import STORAGE_BACKEND, download_bucket_file if STORAGE_BACKEND != "hf_bucket": raise RuntimeError("Artifact import requires STORAGE_BACKEND='hf_bucket'.") local_path = download_bucket_file(artifact_path) try: with open(local_path, "r", encoding="utf-8") as f: return json.load(f) finally: try: os.unlink(local_path) except Exception: pass def _submission_fields_for_artifact_import( data: dict, artifact_path: str, notes_override: str, email_override: str ) -> tuple[str, str]: notes = (notes_override or "").strip()[:4000] email = (email_override or "").strip()[:254] if notes and email: return notes, email space_job_id = str(data.get("job_id") or "").strip() with _jobs_lock: for j in _jobs.values(): if space_job_id and j.id == space_job_id: if not notes: notes = (j.submission_notes or "").strip()[:4000] if not email: email = (j.contact_email or "").strip()[:254] return notes, email if (j.remote_artifact_path or "").strip() == artifact_path: if not notes: notes = (j.submission_notes or "").strip()[:4000] if not email: email = (j.contact_email or "").strip()[:254] return notes, email return notes, email def _submission_notes_for_artifact_import( data: dict, artifact_path: str, override: str ) -> str: notes, _ = _submission_fields_for_artifact_import(data, artifact_path, override, "") return notes def _mark_job_done_for_artifact( data: dict, artifact_path: str, result: dict ) -> str | None: """If a tracked queue job matches, mark it done. Returns matched job id or None.""" space_job_id = str(data.get("job_id") or "").strip() matched: Job | None = None with _jobs_lock: for j in _jobs.values(): if space_job_id and j.id == space_job_id: matched = j break if (j.remote_artifact_path or "").strip() == artifact_path: matched = j break if matched is not None: matched.status = JobStatus.done matched.result = result matched.error = None if not (matched.remote_artifact_path or "").strip(): matched.remote_artifact_path = artifact_path _touch(matched) if matched is not None: _persist_jobs() return matched.id return None def import_artifact_to_leaderboard( artifact_ref: str, secret: str, *, replace_existing: bool = False, submission_notes: str = "", ) -> tuple[bool, str]: """ Moderator-only: load a bucket JSON artifact and merge its result into leaderboard.csv. ``artifact_ref`` may be a file name (``abc123.json``), job id (``abc123``), or full bucket path. """ ok, msg = _moderator_secret_ok(secret) if not ok: return False, msg from evaluation.remote_artifact import extract_result_or_raise, normalize_artifact_bucket_path from init import ( invalidate_results_cache, leaderboard_row_from_eval_result, load_raw_results, normalize_legacy_csv_row, save_raw_results, ) import analytics try: artifact_path = normalize_artifact_bucket_path(artifact_ref) except ValueError as e: return False, str(e) try: data = _load_artifact_json_from_bucket(artifact_path) except Exception as e: return False, f"Could not load artifact {_escape_html(artifact_path)}: {e}" try: result = extract_result_or_raise(data) except Exception as e: return False, f"Artifact is not a successful evaluation: {e}" model_id = str(result.get("model_id", "")).strip() if not model_id: return False, "Artifact result is missing model_id." notes, email = _submission_fields_for_artifact_import( data, artifact_path, submission_notes, "" ) rows = load_raw_results() existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id] if existing and not replace_existing: row = dict(rows[existing[0]]) normalize_legacy_csv_row(row) avg = analytics._avg_wer_for_row(row) avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A" return False, ( f"Model {_escape_html(model_id)} is already on the leaderboard " f"(Avg WER {avg_txt}). Enable " f"Replace existing row to overwrite." ) if existing and replace_existing: for i in sorted(existing, reverse=True): rows.pop(i) submitted_at = _now_iso() new_row = leaderboard_row_from_eval_result( result, submitted_at, submission_notes=notes, contact_email=email ) normalize_legacy_csv_row(new_row) rows.append(new_row) _leaderboard_sort_rows_inplace(rows) save_raw_results(rows) invalidate_results_cache() avg = analytics._avg_wer_for_row(new_row) avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A" action = "Replaced" if existing else "Added" matched_job = _mark_job_done_for_artifact(data, artifact_path, result) job_bit = f" Matched queue job {_escape_html(matched_job)} marked done." if matched_job else "" return True, ( f"{action} {_escape_html(model_id)} from " f"{_escape_html(artifact_path)}. " f"Average WER: {avg_txt}.{job_bit}" ) def _remote_collect_result(job_id: str, hf_id: str, jobs_token: str) -> dict: from evaluation.remote_artifact import extract_result_or_raise from storage import download_bucket_file with _jobs_lock: j = _jobs.get(job_id) if not j: raise RuntimeError("internal: job not found") artifact_path = str(j.remote_artifact_path or "").strip() if not artifact_path: raise RuntimeError("internal: missing remote artifact path") with _jobs_lock: j2 = _jobs.get(job_id) if j2: j2.status = JobStatus.collecting _touch(j2) _persist_jobs() # HF dataset bucket uploads are sometimes briefly not readable right after # the worker uploaded the JSON (especially with Xet). Retry a few times # with light backoff so a single 404 doesn't permanently fail an otherwise # successful run. last_err: Exception | None = None local_path: str | None = None for attempt in range(5): try: local_path = download_bucket_file(artifact_path) break except Exception as e: last_err = e msg = str(e).lower() transient = ( "404" in msg or "not found" in msg or "entrynotfound" in msg or "no such" in msg or "timed out" in msg or "timeout" in msg ) if not transient or attempt == 4: raise time.sleep(2.0 * (2 ** attempt)) if local_path is None: # Shouldn't reach here, but keep mypy / runtime safe. raise last_err or RuntimeError("artifact download failed") try: with open(local_path, "r", encoding="utf-8") as f: data = json.load(f) finally: try: os.unlink(local_path) except Exception: pass return extract_result_or_raise(data) def _remote_dispatch_job(job_id: str, mid: str, fid: str, jobs_token: str) -> str: """Submit Hub Job if needed; return ``hf_remote_job_id``.""" import remote_jobs from evaluation.remote_artifact import default_remote_artifact_path with _jobs_lock: j = _jobs.get(job_id) if not j: raise RuntimeError("internal: job not found") if not (j.remote_artifact_path or "").strip(): j.remote_artifact_path = default_remote_artifact_path(job_id) _touch(j) artifact_path = str(j.remote_artifact_path).strip() existing_hf = (j.hf_remote_job_id or "").strip() if existing_hf: with _jobs_lock: j2 = _jobs.get(job_id) if j2 and j2.status != JobStatus.remote_running: j2.status = JobStatus.remote_running _touch(j2) _persist_jobs() _remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s() return existing_hf with _jobs_lock: j3 = _jobs.get(job_id) if j3: j3.status = JobStatus.dispatching _touch(j3) _persist_jobs() with _jobs_lock: j_dispatch = _jobs.get(job_id) extra_req = (j_dispatch.extra_requirements or "") if j_dispatch else "" setup_script = (j_dispatch.setup_script or "") if j_dispatch else "" custom_script = (j_dispatch.custom_script or "") if j_dispatch else "" recipe_id = (j_dispatch.recipe_id or "") if j_dispatch else "" run_custom = bool(j_dispatch and j_dispatch.run_custom_script) eval_conds = j_dispatch.eval_conditions if j_dispatch else None info = remote_jobs.submit_eval_job( model_id=mid, family_id=fid, space_job_id=job_id, artifact_path=artifact_path, token=jobs_token, extra_requirements=extra_req, setup_script=setup_script, custom_script=custom_script, recipe_id=recipe_id, run_custom_script=run_custom, eval_conditions=eval_conds, ) hf_id = info.id with _jobs_lock: j4 = _jobs.get(job_id) if j4: j4.hf_remote_job_id = hf_id j4.status = JobStatus.remote_running _touch(j4) _persist_jobs() _remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s() return hf_id def _remote_start_queued_job(job_id: str, jobs_token: str) -> None: """Move a queued job onto Hub Jobs (duplicate check + dispatch).""" from init import load_raw_results with _jobs_lock: j = _jobs.get(job_id) if j is None or j.status != JobStatus.queued: try: _work_queue.task_done() except ValueError: pass return mid = j.model_id fid = j.family_id notes = j.submission_notes or "" email = j.contact_email or "" j.status = JobStatus.running j.progress_done = 0 j.progress_total = 0 j.progress_condition = "" _touch(j) _persist_jobs() rows = load_raw_results() with _jobs_lock: j_chk = _jobs.get(job_id) replace_lb = bool(j_chk and j_chk.replace_leaderboard) partial = _job_is_partial_eval(j_chk) if not replace_lb and not partial and any(r["model_id"] == mid for r in rows): _fail_job(job_id, "Model already on leaderboard (skipped duplicate race).") return global _running_job_id _running_job_id = job_id try: _remote_dispatch_job(job_id, mid, fid, jobs_token) except Exception as e: _fail_job(job_id, str(e)) finally: with _jobs_lock: if _running_job_id == job_id: _running_job_id = None def _remote_tick_job(job_id: str, jobs_token: str) -> None: """Poll one remote job once; on completion merge CSV or mark failed.""" import remote_jobs from huggingface_hub._jobs_api import JobStage with _jobs_lock: j = _jobs.get(job_id) if j is None or j.status not in _remote_in_flight_statuses(): return hf_id = (j.hf_remote_job_id or "").strip() if not hf_id: return mid = j.model_id fid = j.family_id notes = j.submission_notes or "" email = j.contact_email or "" # Grace window absorbs the brief gap between the worker script returning # rc=0 (and uploading the JSON artifact) and the Hub flipping the job's # stage to COMPLETED. Without this, a tick that arrives ε seconds past # the deadline would fail an already-successful run with a misleading # "exceeded max wait" message. deadline = _remote_deadlines.get(job_id) deadline_hit = ( deadline is not None and time.monotonic() > deadline + _REMOTE_DEADLINE_GRACE_S ) try: info = remote_jobs.inspect_job_once(hf_id, token=jobs_token) except Exception: # If the Hub API is unavailable AND we're already past the wait # budget, give up cleanly. Otherwise wait for the next tick. if deadline_hit: _fail_job( job_id, f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s); " "Hub inspect_job unavailable.", ) return stage = info.status.stage if info.status else None # 1) Always honor a terminal Hub state, even if the Space-side deadline # already elapsed -- a completed job's artifact should be collected, # not thrown away. if stage == JobStage.COMPLETED: try: result = _remote_collect_result(job_id, hf_id, jobs_token) with _jobs_lock: j_done = _jobs.get(job_id) replace_lb = bool(j_done and j_done.replace_leaderboard) partial = _job_is_partial_eval(j_done) _merge_eval_result_to_leaderboard( result, _now_iso(), notes, email, replace_existing=replace_lb and not partial, merge_partial=partial, ) with _jobs_lock: j_clr = _jobs.get(job_id) if j_clr: j_clr.replace_leaderboard = False _succeed_job(job_id, result) except Exception as e: _fail_job(job_id, str(e)) return if stage in (JobStage.ERROR, JobStage.CANCELED, JobStage.DELETED): _fail_job(job_id, remote_jobs.describe_job_failure(info, token=jobs_token)) return # 2) Job is genuinely still running. Only now enforce the Space-side # deadline (the Hub has its own FFASR_REMOTE_JOB_TIMEOUT as a backstop). if deadline_hit: try: remote_jobs.cancel_remote_job(hf_id, token=jobs_token) except Exception: pass _fail_job(job_id, f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s).") return def _remote_parallel_worker_loop() -> None: """Dispatch up to N Hub Jobs and poll them concurrently (N = remote_max_concurrent_jobs).""" import remote_jobs from storage import HF_TOKEN, require_token_for_ffasr_jobs jobs_token = require_token_for_ffasr_jobs() if not HF_TOKEN: raise RuntimeError("HF_TOKEN is required when FFASR_REMOTE_JOBS=1.") poll_s = _remote_poll_interval_s() max_slots = remote_max_concurrent_jobs() while True: while _count_remote_in_flight() < max_slots: try: job_id = _work_queue.get_nowait() except queue.Empty: break _remote_start_queued_job(job_id, jobs_token) active = _list_remote_in_flight_ids() if active: for jid in active: _remote_tick_job(jid, jobs_token) time.sleep(poll_s) continue if _work_queue.qsize() == 0: try: job_id = _work_queue.get(timeout=poll_s) except queue.Empty: continue _remote_start_queued_job(job_id, jobs_token) else: time.sleep(min(poll_s, 2.0)) def ensure_worker_started() -> None: """Start background Hub Jobs worker after restoring persisted queue (once per process).""" global _worker_started _load_persisted_jobs_once() with _worker_lock: if _worker_started: return import remote_jobs as _rj if not _rj.remote_jobs_enabled(): raise RuntimeError( "FFASR_REMOTE_JOBS=1 is required. This Space dispatches evaluations to " "Hugging Face Hub Jobs and does not run ASR models locally." ) t = threading.Thread(target=_remote_parallel_worker_loop, name="ffasr-remote-worker", daemon=True) t.start() _worker_started = True def _ensure_worker() -> None: """Alias used by queue UI helpers.""" ensure_worker_started() def _worker_unavailable_html(exc: Exception) -> str: return ( "

Queue unavailable: " f"{_escape_html(str(exc))}

" ) def _model_in_flight(model_id: str) -> bool: with _jobs_lock: for j in _jobs.values(): if j.model_id != model_id: continue if j.status in ( JobStatus.pending_moderation, JobStatus.queued, JobStatus.running, JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting, ): return True return False def _pending_moderation_count() -> int: with _jobs_lock: return sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation) def enqueue( model_id: str, family_id: str, submission_notes: str = "", *, contact_email: str = "", extra_requirements: str = "", setup_script: str = "", custom_script: str = "", recipe_id: str = "", is_gated: bool = False, ) -> tuple[str, int, str | None, bool]: """ Enqueue an evaluation job. Returns (job_id, position_or_count, error_message, awaiting_moderation). When awaiting_moderation is True, the job is not in the execution queue yet. """ import remote_jobs if not remote_jobs.remote_jobs_enabled(): return "", 0, "remote_jobs_required", False _ensure_worker() from init import load_raw_results if moderation_misconfigured(): return "", 0, "moderation_misconfigured", False if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG and not moderation_active(): return "", 0, "queue_full", False if _model_in_flight(model_id): return "", 0, f"Model '{model_id}' is already submitted, queued, or running.", False existing = load_raw_results() for row in existing: if row["model_id"] == model_id: return "", 0, "already_in_csv", False if moderation_active() and _pending_moderation_count() >= _MAX_PENDING_MODERATION: return "", 0, "pending_moderation_full", False job_id = str(uuid.uuid4())[:8] created = _now_iso() notes_clean = (submission_notes or "").strip()[:4000] try: email_clean = sanitize_contact_email(contact_email) from recipes.registry import apply_recipe_to_submission req_lines = parse_requirements_lines(extra_requirements) extra_req_clean = "\n".join(req_lines) setup_clean = sanitize_setup_script(setup_script) script_clean = sanitize_custom_script(custom_script) extra_req_clean, setup_clean, script_clean, resolved_recipe = apply_recipe_to_submission( model_id, recipe_id or None, extra_req_clean, setup_clean, script_clean, ) recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64] except Exception as e: return "", 0, f"Invalid submission fields: {e}", False awaiting = moderation_active() status = JobStatus.pending_moderation if awaiting else JobStatus.queued uses_custom_stack = bool( script_clean.strip() or setup_clean.strip() or recipe_clean ) job = Job( id=job_id, model_id=model_id, family_id=family_id, status=status, created_at=created, updated_at=created, submission_notes=notes_clean, contact_email=email_clean, extra_requirements=extra_req_clean, setup_script=setup_clean, custom_script=script_clean, recipe_id=recipe_clean, is_gated=bool(is_gated), run_custom_script=uses_custom_stack, ) with _jobs_lock: _jobs[job_id] = job _persist_jobs() if awaiting: pos = _pending_moderation_count() return job_id, pos, None, True if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: with _jobs_lock: _jobs.pop(job_id, None) return "", 0, "queue_full", False _work_queue.put(job_id) position = _work_queue.qsize() return job_id, position, None, False def approve_job( job_id: str, secret: str, *, run_custom_script: bool = False, eval_conditions: list[str] | None = None, ) -> tuple[bool, str]: """Move a pending job into the execution queue (moderator only).""" if not moderation_active(): return False, "Moderation is not active on this Space." if not _check_moderator_secret(secret): return False, "Invalid moderator secret." job_id = job_id.strip() if not job_id: return False, "Enter a job ID." try: conds = normalize_moderator_eval_conditions(eval_conditions) except ValueError as e: return False, str(e) with _jobs_lock: job = _jobs.get(job_id) if not job or job.status != JobStatus.pending_moderation: return False, "Job not found or not awaiting approval." if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: return False, "Execution queue is full; try again in a moment." job.status = JobStatus.queued # Use custom stack when evaluate/setup/recipe is provided. job.run_custom_script = bool( (job.custom_script or "").strip() or (job.setup_script or "").strip() or (job.recipe_id or "").strip() ) job.eval_conditions = conds job.replace_leaderboard = False _touch(job) _work_queue.put(job_id) _persist_jobs() _ensure_worker() max_n = remote_max_concurrent_jobs() custom_note = "" cond_note = "" with _jobs_lock: j = _jobs.get(job_id) if j and j.run_custom_script and (j.custom_script or "").strip(): custom_note = " Custom script will run on the Hub Job." if j and j.eval_conditions: cond_note = f" Evaluating: {', '.join(j.eval_conditions)} (others unchanged on success)." return True, ( f"Approved job {job_id}. Up to {max_n} Hub Jobs may run in parallel; " f"this job starts when a slot is free.{custom_note}{cond_note}" ) def reject_job(job_id: str, secret: str) -> tuple[bool, str]: """Reject a pending job (moderator only).""" if not moderation_active(): return False, "Moderation is not active on this Space." if not _check_moderator_secret(secret): return False, "Invalid moderator secret." job_id = job_id.strip() if not job_id: return False, "Enter a job ID." with _jobs_lock: job = _jobs.get(job_id) if not job or job.status != JobStatus.pending_moderation: return False, "Job not found or not awaiting approval." job.status = JobStatus.failed job.error = "Rejected by moderator." _touch(job) _prune_jobs() _persist_jobs() return True, f"Rejected job {job_id}." def verify_moderator_secret(secret: str) -> tuple[bool, str]: """Check moderator secret for UI unlock and privileged actions.""" if not MODERATOR_SECRET: return False, "Moderator secret is not configured (set FFASR_MODERATOR_SECRET)." if not _check_moderator_secret(secret): return False, "Invalid moderator secret." return True, "" def _moderator_secret_ok(secret: str) -> tuple[bool, str]: """Shared gate for destructive / queue actions (needs FFASR_MODERATOR_SECRET).""" return verify_moderator_secret(secret) def moderation_locked_placeholder_html() -> str: """Neutral HTML shown before the moderator secret unlocks the panel.""" return "

Moderator tools are locked. Enter the secret above and click Unlock.

" def _job_can_retry(status: JobStatus) -> bool: """Moderator may re-run jobs that are not actively executing on Hub.""" if status in ( JobStatus.running, JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting, JobStatus.pending_moderation, ): return False return status in (JobStatus.failed, JobStatus.done, JobStatus.queued) def retry_failed_job( job_id: str, secret: str, *, eval_conditions: list[str] | None = None, ) -> tuple[bool, str]: """Re-queue a job for another evaluation run (moderator only). Allowed for failed, done, and queued jobs. Successful re-runs replace the existing leaderboard row for that model when one is present. """ ok, msg = _moderator_secret_ok(secret) if not ok: return False, msg job_id = job_id.strip() if not job_id: return False, "Select a job." try: conds = normalize_moderator_eval_conditions(eval_conditions) except ValueError as e: return False, str(e) with _jobs_lock: job = _jobs.get(job_id) if not job: return False, "Job not found." if not _job_can_retry(job.status): return ( False, "Cannot retry while the job is running or awaiting moderation. " "Wait for it to finish, or approve/reject pending jobs first.", ) if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: return False, "Execution queue is full; try again later." was_queued = job.status == JobStatus.queued job.status = JobStatus.queued job.error = None job.result = None job.hf_remote_job_id = None job.remote_artifact_path = None job.progress_done = 0 job.progress_total = 0 job.progress_condition = "" job.eval_conditions = conds job.replace_leaderboard = conds is None _touch(job) if not was_queued: _work_queue.put(job_id) _persist_jobs() _ensure_worker() cond_note = "" with _jobs_lock: j = _jobs.get(job_id) if j and j.eval_conditions: cond_note = ( f" Only {', '.join(j.eval_conditions)} will run; " "existing leaderboard WER columns for other datasets are kept." ) elif j and j.replace_leaderboard: cond_note = " Full re-eval; the leaderboard row will be replaced on success." return ( True, f"Re-queued job {job_id}; it will run after jobs ahead of it.{cond_note}" ) def retry_all_eligible_jobs(secret: str) -> tuple[bool, str]: """Re-queue every retry-eligible job against the full benchmark (all datasets). "Retry-eligible" matches :func:`_job_can_retry` — i.e. ``failed``, ``done``, or ``queued``. Jobs that are running, dispatching, awaiting moderation, etc. are left untouched. ``eval_conditions`` is forced to ``None`` (full benchmark) so that on success the existing leaderboard row is replaced — matching the semantics described in the moderator panel for a full re-run. Stops early (with a partial-success message) if the work queue fills up, so we never silently drop retries on the floor. """ ok, msg = _moderator_secret_ok(secret) if not ok: return False, msg requeued: list[str] = [] skipped_running: list[str] = [] skipped_pending: list[str] = [] capacity_hit = False job_ids_to_enqueue: list[str] = [] with _jobs_lock: snapshot = list(_jobs.values()) snapshot.sort(key=lambda j: j.updated_at or j.created_at) for job in snapshot: if job.status == JobStatus.pending_moderation: skipped_pending.append(job.id) continue if not _job_can_retry(job.status): skipped_running.append(job.id) continue # Approximate current backlog while we still hold the lock; this keeps # us within ``_MAX_QUEUE_BACKLOG`` even when several retries land at once. if _work_queue.qsize() + len(job_ids_to_enqueue) >= _MAX_QUEUE_BACKLOG: capacity_hit = True break was_queued = job.status == JobStatus.queued job.status = JobStatus.queued job.error = None job.result = None job.hf_remote_job_id = None job.remote_artifact_path = None job.progress_done = 0 job.progress_total = 0 job.progress_condition = "" job.eval_conditions = None job.replace_leaderboard = True _touch(job) requeued.append(job.id) if not was_queued: job_ids_to_enqueue.append(job.id) for jid in job_ids_to_enqueue: _work_queue.put(jid) if requeued: _persist_jobs() _ensure_worker() parts: list[str] = [] if requeued: parts.append( f"Re-queued {len(requeued)} job(s) against the full benchmark; " "on success each leaderboard row will be replaced." ) else: parts.append("No retry-eligible jobs were found.") if skipped_pending: parts.append( f"Skipped {len(skipped_pending)} job(s) awaiting moderation " "(approve or reject those manually)." ) if skipped_running: parts.append( f"Skipped {len(skipped_running)} job(s) currently running or dispatching." ) if capacity_hit: parts.append( f"Stopped early: the execution queue hit its backlog cap " f"({_MAX_QUEUE_BACKLOG}); re-click to enqueue the rest once it drains." ) return bool(requeued), " ".join(parts) def remove_job_entry(job_id: str, secret: str) -> tuple[bool, str]: """Remove a job from tracking (not allowed while running). Moderator only.""" ok, msg = _moderator_secret_ok(secret) if not ok: return False, msg job_id = job_id.strip() if not job_id: return False, "Select a job." with _jobs_lock: job = _jobs.get(job_id) if not job: return False, "Job not found." if job.status == JobStatus.running: return False, "Cannot remove a running job; wait for it to finish." _jobs.pop(job_id, None) _prune_jobs() _persist_jobs() return True, f"Removed job {job_id} from the list." def list_pending_moderation_jobs() -> list[Job]: with _jobs_lock: return [j for j in _jobs.values() if j.status == JobStatus.pending_moderation] _JOB_NON_EDITABLE_STATUSES = ( JobStatus.running, JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting, ) def update_job_script_and_requirements( job_id: str, secret: str, *, extra_requirements: str, setup_script: str = "", custom_script: str, recipe_id: str = "", ) -> tuple[bool, str]: """Update stored script/requirements for a job (moderator only).""" ok, msg = _moderator_secret_ok(secret) if not ok: return False, msg job_id = job_id.strip() if not job_id: return False, "No job selected." with _jobs_lock: job = _jobs.get(job_id) if not job: return False, "Job not found." try: from recipes.registry import apply_recipe_to_submission setup = sanitize_setup_script(setup_script) script = sanitize_custom_script(custom_script) reqs = extra_requirements or "" reqs, setup, script, resolved_recipe = apply_recipe_to_submission( job.model_id, recipe_id or None, reqs, setup, script, ) recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64] except Exception as e: return False, f"Invalid submission fields: {e}" with _jobs_lock: job = _jobs.get(job_id) if not job: return False, "Job not found." if job.status in _JOB_NON_EDITABLE_STATUSES: return ( False, "Cannot edit submission while the job is running or dispatching.", ) job.extra_requirements = reqs job.setup_script = setup job.custom_script = script job.recipe_id = recipe_clean job.run_custom_script = bool( script.strip() or setup.strip() or recipe_clean ) _touch(job) _persist_jobs() return True, f"Saved submission details for job {job_id}." def _ensure_jobs_loaded_for_display() -> None: """Load persisted job state without starting the remote worker (moderator read-only UI).""" _load_persisted_jobs_once() def pending_job_dropdown_choices() -> list[tuple[str, str]]: """Gradio (label, value) pairs for pending job IDs.""" try: _ensure_jobs_loaded_for_display() except Exception: return [("(Queue unavailable)", "")] jobs = list_pending_moderation_jobs() jobs.sort(key=lambda j: j.created_at) if not jobs: return [("(No pending jobs)", "")] out: list[tuple[str, str]] = [] for j in jobs: label = f"{j.id}: {j.model_id} ({j.family_id})" out.append((label, j.id)) return out def moderation_action_job_choices() -> list[tuple[str, str]]: """Recent jobs for retry/remove dropdown (label shows id, model, status).""" try: _ensure_jobs_loaded_for_display() except Exception: return [("(Queue unavailable)", "")] with _jobs_lock: items = list(_jobs.values()) items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) items = items[:60] if not items: return [("(No jobs)", "")] out: list[tuple[str, str]] = [] for j in items: short_model = j.model_id.split("/")[-1][:32] label = f"{j.id}: {short_model} ({j.status.value})" out.append((label, j.id)) return out def _escape_html(s: str) -> str: return ( s.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def _hub_job_link_html(hf_job_id: str | None) -> str: if not (hf_job_id or "").strip(): return "" try: import remote_jobs as rj url = rj.hub_job_page_url(hf_job_id.strip()) except Exception: return "" return ( f"" f"Open Hub Job logs" ) def _bucket_artifacts_link_html() -> str: try: import remote_jobs as rj url = rj.bucket_artifact_browser_url() except Exception: return "" return ( f"" f"remote_artifacts folder" ) def moderation_list_html() -> str: """HTML table of jobs awaiting approval.""" try: _ensure_jobs_loaded_for_display() except Exception as e: return _worker_unavailable_html(e) jobs = list_pending_moderation_jobs() jobs.sort(key=lambda j: j.created_at) if not jobs: return "

No jobs awaiting approval.

" rows = [] for j in jobs: note = (j.submission_notes or "").strip() note_cell = _escape_html(note[:200] + ("…" if len(note) > 200 else "")) if note else "N/A" gated = " [gated]" if j.is_gated else "" req_n = len(parse_requirements_lines(j.extra_requirements or "")) req_bit = f"{req_n} extra req(s)" if req_n else "" extras = req_bit if req_bit else "—" script_raw = (j.custom_script or "").strip() if script_raw: max_script = 12000 truncated = len(script_raw) > max_script script_body = script_raw[:max_script] if truncated: script_body += "\n… (truncated)" script_cell = ( "
View script" f"
{_escape_html(script_body)}
" "
" ) else: script_cell = "—" rows.append( f"{_escape_html(j.id)}" f"{_escape_html(j.model_id)}{gated}" f"{_escape_html(j.family_id)}" f"{_escape_html(extras)}" f"{script_cell}" f"{note_cell}" f"{_escape_html(j.created_at)}" ) body = "".join(rows) return ( "" "" "" f"{body}
Job IDModelFamilyExtrasScriptNotesSubmitted (UTC)
" ) def next_up_html(limit: int = 5) -> str: """Ordered list of models next in the approved execution queue (Submit tab).""" try: _ensure_worker() except Exception as e: return _worker_unavailable_html(e) with _jobs_lock: queued = [j for j in _jobs.values() if j.status == JobStatus.queued] queued.sort(key=lambda j: j.created_at) queued = queued[: max(1, int(limit))] if not queued: return ( "
" "

No models are waiting in the evaluation queue.

" ) items = "".join( f"
  • {_escape_html(j.model_id)} " f"(job {_escape_html(j.id)})
  • " for j in queued ) return ( "
    " f"

    Next models to evaluate ({len(queued)} shown):

    " f"
      {items}
    " ) _JOB_ROW_STATUS_CLASS: dict[str, str] = { "pending_moderation": "ffasr-job-status-pending", "queued": "ffasr-job-status-queued", "running": "ffasr-job-status-active", "dispatching": "ffasr-job-status-active", "remote_running": "ffasr-job-status-active", "collecting": "ffasr-job-status-active", "done": "ffasr-job-status-done", "failed": "ffasr-job-status-failed", } def job_row_elem_classes(status: str) -> str: """CSS classes for a moderator job row (status background + layout).""" key = (status or "").strip().lower() specific = _JOB_ROW_STATUS_CLASS.get(key, "ffasr-job-status-unknown") return f"ffasr-job-row {specific}" def _job_uses_custom_stack(job: Job) -> bool: return bool( (job.custom_script or "").strip() or (job.setup_script or "").strip() or (job.recipe_id or "").strip() ) def pending_jobs_for_render(limit: int = 60) -> list[dict[str, Any]]: """Structured pending jobs for Gradio @gr.render rows (Approve / Reject / Check).""" try: _ensure_jobs_loaded_for_display() except Exception: return [] jobs = list_pending_moderation_jobs() jobs.sort(key=lambda j: j.created_at) out: list[dict[str, Any]] = [] for j in jobs[: max(1, int(limit))]: note = (j.submission_notes or "").strip() out.append( { "id": j.id, "model_id": j.model_id, "family_id": j.family_id, "created_at": (j.created_at or "")[:19], "contact_email": (j.contact_email or "").strip(), "is_gated": j.is_gated, "req_count": len(parse_requirements_lines(j.extra_requirements or "")), "notes_preview": ( note[:120] + ("…" if len(note) > 120 else "") if note else "" ), "has_custom_script": _job_uses_custom_stack(j), "status": JobStatus.pending_moderation.value, } ) return out def recent_jobs_for_render(limit: int = 30) -> list[dict[str, Any]]: """Structured recent jobs for Gradio @gr.render rows (Retry / Remove).""" try: _ensure_jobs_loaded_for_display() except Exception: return [] with _jobs_lock: items = list(_jobs.values()) items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) out: list[dict[str, Any]] = [] for j in items[: max(1, int(limit))]: err = (j.error or "").strip() try: hub = _hub_job_link_html(j.hf_remote_job_id) except Exception: hub = "" out.append( { "id": j.id, "model_id": j.model_id, "status": j.status.value, "contact_email": (j.contact_email or "").strip(), "error": err[:200] + ("…" if len(err) > 200 else ""), "hub_link_html": hub, "updated_at": (j.updated_at or j.created_at)[:19], "can_retry": _job_can_retry(j.status), "can_remove": j.status != JobStatus.running, "has_custom_script": _job_uses_custom_stack(j), } ) return out def recent_jobs_html(limit: int = 25, *, with_heading: bool = True) -> str: """Recent jobs with status (for spotting stuck or failed runs).""" try: _ensure_jobs_loaded_for_display() except Exception as e: return _worker_unavailable_html(e) with _jobs_lock: items = list(_jobs.values()) items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) items = items[:limit] if not items: empty = ( "

    No job history loaded yet. Open Submit (queue status refreshes automatically) " "or click Refresh here to load persisted jobs from storage.

    " ) return empty rows = [] for j in items: err = (j.error or "").strip() err_cell = _escape_html(err[:120] + ("…" if len(err) > 120 else "")) if err else "N/A" hub_link = _hub_job_link_html(j.hf_remote_job_id) links = hub_link if hub_link else "N/A" if j.status == JobStatus.done and (j.remote_artifact_path or "").strip(): art = (j.remote_artifact_path or "").strip() try: from storage import HF_BUCKET_ID art_url = f"https://huggingface.co/datasets/{HF_BUCKET_ID}/resolve/main/{art}" links = ( f"{hub_link} · artifact" if hub_link else f"artifact" ) except Exception: pass rows.append( f"{_escape_html(j.id)}" f"{_escape_html(j.model_id)}" f"{_escape_html(j.status.value)}" f"{err_cell}" f"{links}" f"{_escape_html((j.updated_at or j.created_at)[:19])}" ) body = "".join(rows) table = ( "" "" "" f"{body}
    JobModelStatusError / detailHub / bucketUpdated (UTC)
    " ) if with_heading: return ( "

    Recent jobs " "(newest first; persisted across restarts)

    " + table ) return table def progress_html() -> str: """Live progress bar for the currently running job (empty when idle).""" try: _ensure_worker() except Exception as e: return _worker_unavailable_html(e) active = _list_active_eval_jobs() if len(active) > 1: max_n = remote_max_concurrent_jobs() lines = [ "
    " f"

    {len(active)} evaluations in progress " f"(up to {max_n} parallel Hub Jobs):

    ") return "".join(lines) with _jobs_lock: jid = _running_job_id j = _jobs.get(jid) if jid else None if j is None and active: j = active[0] if j is None: return ( "
    " "No evaluation is currently running.
    " ) done = int(j.progress_done or 0) total = int(j.progress_total or 0) model = j.model_id fam = j.family_id cond = j.progress_condition or "" job_label = j.id status = j.status hf_rid = (j.hf_remote_job_id or "").strip() or None art_path = (j.remote_artifact_path or "").strip() or None remote_extra = "" try: import remote_jobs as rj remote_mode = rj.remote_jobs_enabled() except Exception: remote_mode = False if remote_mode and status in ( JobStatus.running, JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting, ): hub = _hub_job_link_html(hf_rid) bucket = _bucket_artifacts_link_html() remote_extra = ( "

    " f"Remote eval: status {_escape_html(status.value)}" ) if hub: remote_extra += f" · {hub}" elif status == JobStatus.dispatching: remote_extra += " · submitting Hub Job…" if art_path: remote_extra += ( f" · expected artifact {_escape_html(art_path)} " f"(uploaded only on success; browse {_bucket_artifacts_link_html()})" ) elif bucket: remote_extra += f" · bucket {_bucket_artifacts_link_html()}" remote_extra += "

    " if total <= 0: inner = ( "
    " ) pct_label = "preparing…" else: pct = max(0.0, min(100.0, 100.0 * done / total)) inner = ( f"
    " ) cond_txt = f"; current condition: {_escape_html(cond)}" if cond else "" pct_label = f"{done}/{total} samples ({pct:.1f}%){cond_txt}" return ( "" "
    " f"

    Running: " f"{_escape_html(model)} " f"(family {_escape_html(fam)}, job {_escape_html(job_label)}): {pct_label}

    " f"{remote_extra}" "
    " f"{inner}" "
    " ) def status_html(*, start_worker: bool = True) -> str: """Short HTML snippet for the Submit tab (queue + current runner + recent activity).""" if start_worker: try: _ensure_worker() except Exception as e: return ( "

    " f"Queue unavailable: {_escape_html(str(e))}

    " ) active = _list_active_eval_jobs() with _jobs_lock: waiting = _work_queue.qsize() pending_mod = sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation) # Do not call _count_remote_in_flight() while holding _jobs_lock; that helper # also acquires the lock and would deadlock Gradio callbacks/timers. in_flight = sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses()) if len(active) == 0: run_line = "

    Running: none

    " elif len(active) == 1: running = active[0] done = int(running.progress_done or 0) total = int(running.progress_total or 0) progress_txt = ( f"; progress {done}/{total} ({(100.0 * done / total):.1f}%)" if total > 0 else "; preparing…" ) run_line = ( f"

    Running: {running.model_id} " f"(family: {running.family_id}, job {running.id})" f"{progress_txt}

    " ) else: max_n = remote_max_concurrent_jobs() models = ", ".join(f"{_escape_html(j.model_id)}" for j in active[:4]) if len(active) > 4: models += f" … (+{len(active) - 4} more)" run_line = ( f"

    Running: {len(active)} evaluations " f"({in_flight} on Hub Jobs, max {max_n} parallel): {models}

    " ) wait_line = f"

    Waiting to run (approved queue): {waiting}

    " mod_line = "" if moderation_active(): mod_line = ( f"

    Awaiting moderator approval: {pending_mod}

    " "

    New submissions stay pending until approved on the Moderate tab.

    " ) elif MODERATION_ENABLED and not MODERATOR_SECRET: mod_line = ( "

    Moderation misconfigured: " "set secret FFASR_MODERATOR_SECRET in Space settings.

    " ) max_n = remote_max_concurrent_jobs() hint = ( "

    Evaluations run as " "Hugging Face Hub Jobs " "(FFASR_REMOTE_JOBS=1) with results written to the bucket. " "Set Space secrets token_for_ffasr_jobs (Jobs billing) and " "HF_TOKEN (bucket). Up to " f"{max_n} models evaluate in parallel " "(FFASR_REMOTE_MAX_CONCURRENT_JOBS). The Space does not run ASR locally.

    " ) recent = recent_jobs_html(20) return f"
    {run_line}{wait_line}{mod_line}{hint}{recent}
    " def peek_job(job_id: str) -> Job | None: try: _ensure_jobs_loaded_for_display() except Exception: pass with _jobs_lock: return _jobs.get(job_id)