Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Evaluation queue for the Gradio Space: FIFO jobs, bounded memory. | |
| Evaluations dispatch to **Hugging Face Hub Jobs** when ``FFASR_REMOTE_JOBS=1`` (see ``remote_jobs``). | |
| The Space does not import torch or run ASR locally. | |
| Optional moderation: when FFASR_MODERATION=1 and FFASR_MODERATOR_SECRET is set, new jobs stay in | |
| pending_moderation until the moderator approves (then they enter the work queue). | |
| Job state is persisted to the Hub bucket (results/jobs_state.csv) so moderation and queue survive | |
| Space restarts. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import hmac | |
| import io | |
| import json | |
| import os | |
| import time | |
| import queue | |
| import threading | |
| import uuid | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from enum import Enum | |
| from typing import Any | |
| # --- Moderation (Space secrets / env) --- | |
| # In HF Space: Settings → Variables and secrets → Secrets | |
| # FFASR_MODERATION=1 | |
| # FFASR_MODERATOR_SECRET=<long random string> | |
| MODERATION_ENABLED = os.environ.get("FFASR_MODERATION", "").strip().lower() in ( | |
| "1", | |
| "true", | |
| "yes", | |
| "on", | |
| ) | |
| MODERATOR_SECRET = os.environ.get("FFASR_MODERATOR_SECRET", "").strip() | |
| class JobStatus(str, Enum): | |
| pending_moderation = "pending_moderation" | |
| queued = "queued" | |
| running = "running" | |
| dispatching = "dispatching" | |
| remote_running = "remote_running" | |
| collecting = "collecting" | |
| done = "done" | |
| failed = "failed" | |
| class Job: | |
| id: str | |
| model_id: str | |
| family_id: str | |
| status: JobStatus | |
| created_at: str | |
| updated_at: str = "" | |
| error: str | None = None | |
| result: dict[str, Any] | None = None | |
| submission_notes: str = "" | |
| contact_email: str = "" | |
| extra_requirements: str = "" | |
| setup_script: str = "" | |
| custom_script: str = "" | |
| recipe_id: str = "" | |
| is_gated: bool = False | |
| run_custom_script: bool = False | |
| # Progress (not persisted; only meaningful while running) | |
| progress_done: int = 0 | |
| progress_total: int = 0 | |
| progress_condition: str = "" | |
| hf_remote_job_id: str | None = None | |
| remote_artifact_path: str | None = None | |
| # Set by moderator retry: skip duplicate guard and replace CSV row on success. | |
| replace_leaderboard: bool = False | |
| # Subset of PACKED_FILES keys; None means evaluate all conditions. | |
| eval_conditions: tuple[str, ...] | None = None | |
| _jobs: dict[str, Job] = {} | |
| _jobs_lock = threading.Lock() | |
| _jobs_file_lock = threading.Lock() | |
| _work_queue: queue.Queue[str] = queue.Queue() | |
| _running_job_id: str | None = None | |
| _worker_started = False | |
| _worker_lock = threading.Lock() | |
| _jobs_loaded = False | |
| # Per Space job id: monotonic deadline for remote Hub Job polling (not persisted). | |
| _remote_deadlines: dict[str, float] = {} | |
| _MAX_JOBS_TRACKED = 400 | |
| _MAX_QUEUE_BACKLOG = 32 | |
| _MAX_PENDING_MODERATION = 64 | |
| _DEFAULT_REMOTE_MAX_CONCURRENT = 4 | |
| _MAX_CUSTOM_SCRIPT_BYTES = 32 * 1024 | |
| _MAX_REQUIREMENT_LINE_LEN = 200 | |
| _MAX_REQUIREMENT_LINES = 50 | |
| _JOBS_CSV_FIELDS = [ | |
| "job_id", | |
| "model_id", | |
| "family_id", | |
| "status", | |
| "created_at", | |
| "updated_at", | |
| "error", | |
| "submission_notes", | |
| "contact_email", | |
| "extra_requirements", | |
| "setup_script_b64", | |
| "custom_script_b64", | |
| "recipe_id", | |
| "is_gated", | |
| "run_custom_script", | |
| "hf_remote_job_id", | |
| "remote_artifact_path", | |
| "eval_conditions", | |
| ] | |
| def parse_requirements_lines(text: str) -> list[str]: | |
| """One package spec per line (requirements.txt style); ignores blanks and # comments.""" | |
| out: list[str] = [] | |
| for raw in (text or "").splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| if len(line) > _MAX_REQUIREMENT_LINE_LEN: | |
| line = line[:_MAX_REQUIREMENT_LINE_LEN] | |
| out.append(line) | |
| if len(out) >= _MAX_REQUIREMENT_LINES: | |
| break | |
| return out | |
| def sanitize_custom_script(text: str) -> str: | |
| """Trim and cap custom script body for storage.""" | |
| from backends.custom_eval import normalize_custom_script_compat | |
| s = normalize_custom_script_compat((text or "").strip()) | |
| if not s: | |
| return "" | |
| enc = s.encode("utf-8") | |
| if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES: | |
| enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES] | |
| s = enc.decode("utf-8", errors="ignore") | |
| return s | |
| def sanitize_contact_email(text: str) -> str: | |
| """Trim and validate a required submitter email address.""" | |
| s = (text or "").strip()[:254] | |
| if not s: | |
| raise ValueError("Contact email is required.") | |
| if s.count("@") != 1: | |
| raise ValueError("Invalid email address.") | |
| local, domain = s.split("@", 1) | |
| if not local or not domain or "." not in domain: | |
| raise ValueError("Invalid email address.") | |
| return s | |
| def sanitize_setup_script(text: str) -> str: | |
| """Trim and cap one-time setup script (shell or Python) for storage.""" | |
| s = (text or "").strip() | |
| if not s: | |
| return "" | |
| enc = s.encode("utf-8") | |
| if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES: | |
| enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES] | |
| s = enc.decode("utf-8", errors="ignore") | |
| return s | |
| def custom_script_deprecated_api_warning(text: str) -> str | None: | |
| """Return a short warning if the script uses kwargs removed from current HF Hub.""" | |
| s = (text or "").strip() | |
| if not s: | |
| return None | |
| if "use_auth_token" in s or "authentication_token" in s: | |
| return ( | |
| "Your script uses <code>use_auth_token</code> (deprecated). " | |
| "It will be rewritten to <code>token=</code> automatically; prefer " | |
| "<code>token=os.environ['HF_TOKEN']</code> for gated models." | |
| ) | |
| return None | |
| def custom_script_argparse_warning(text: str) -> str | None: | |
| """Warn when a script references ``args.`` without defining argparse (common clone mistake).""" | |
| s = (text or "").strip() | |
| if not s or "args." not in s: | |
| return None | |
| try: | |
| import ast | |
| tree = ast.parse(s) | |
| except SyntaxError: | |
| return None | |
| defines_args = False | |
| calls_parse_args = False | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Assign): | |
| for t in node.targets: | |
| if isinstance(t, ast.Name) and t.id == "args": | |
| defines_args = True | |
| if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): | |
| if node.target.id == "args": | |
| defines_args = True | |
| if isinstance(node, ast.Call): | |
| func = node.func | |
| if isinstance(func, ast.Attribute) and func.attr == "parse_args": | |
| calls_parse_args = True | |
| if isinstance(func, ast.Name) and func.id == "parse_args": | |
| calls_parse_args = True | |
| if defines_args or calls_parse_args: | |
| return None | |
| return ( | |
| "Your script uses <code>args.something</code> but FFASR does not run " | |
| "<code>argparse</code> — replace with literal values (e.g. " | |
| "<code>routing_enabled=True</code>) or call <code>parse_args()</code> " | |
| "before using <code>args</code>. For Mega-ASR, use the recipe evaluator " | |
| "in <code>recipes/mega_asr/evaluate.py</code>." | |
| ) | |
| def custom_script_defines_evaluate(text: str) -> bool: | |
| """Best-effort check that the script defines a top-level ``evaluate`` function.""" | |
| s = (text or "").strip() | |
| if not s: | |
| return True | |
| try: | |
| import ast | |
| tree = ast.parse(s) | |
| except SyntaxError: | |
| return False | |
| return any( | |
| isinstance(node, ast.FunctionDef) and node.name == "evaluate" | |
| for node in tree.body | |
| ) | |
| def _bool_to_csv(flag: bool) -> str: | |
| return "1" if flag else "0" | |
| def _bool_from_csv(raw: str) -> bool: | |
| return (raw or "").strip().lower() in ("1", "true", "yes", "on") | |
| def _encode_script_b64(script: str) -> str: | |
| if not script: | |
| return "" | |
| import base64 | |
| return base64.b64encode(script.encode("utf-8")).decode("ascii") | |
| def _decode_script_b64(raw: str) -> str: | |
| raw = (raw or "").strip() | |
| if not raw: | |
| return "" | |
| import base64 | |
| try: | |
| return base64.b64decode(raw.encode("ascii")).decode("utf-8") | |
| except Exception: | |
| return "" | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _touch(job: Job) -> None: | |
| job.updated_at = _now_iso() | |
| def moderation_active() -> bool: | |
| """True when moderation is enabled and a secret is configured.""" | |
| return MODERATION_ENABLED and bool(MODERATOR_SECRET) | |
| def moderation_misconfigured() -> bool: | |
| """Enabled in UI but secret missing; submissions should be rejected.""" | |
| return MODERATION_ENABLED and not MODERATOR_SECRET | |
| def _check_moderator_secret(provided: str) -> bool: | |
| if not MODERATOR_SECRET or provided is None: | |
| return False | |
| a = provided.strip().encode("utf-8") | |
| b = MODERATOR_SECRET.encode("utf-8") | |
| if len(a) != len(b): | |
| return False | |
| return hmac.compare_digest(a, b) | |
| def _prune_jobs() -> None: | |
| if len(_jobs) <= _MAX_JOBS_TRACKED: | |
| return | |
| terminal = [ | |
| jid | |
| for jid, j in _jobs.items() | |
| if j.status in (JobStatus.done, JobStatus.failed) | |
| ] | |
| terminal.sort(key=lambda jid: _jobs[jid].created_at) | |
| for jid in terminal[: max(0, len(_jobs) - _MAX_JOBS_TRACKED // 2)]: | |
| _jobs.pop(jid, None) | |
| def _persist_jobs() -> None: | |
| """Write all tracked jobs to Hub bucket CSV (best-effort).""" | |
| from storage import ( | |
| HF_BUCKET_ID, | |
| HF_TOKEN, | |
| JOBS_STATE_PATH, | |
| STORAGE_BACKEND, | |
| batch_bucket_files, | |
| upload_to_bucket, | |
| ) | |
| if STORAGE_BACKEND != "hf_bucket" or batch_bucket_files is None: | |
| return | |
| with _jobs_lock: | |
| rows: list[dict[str, str]] = [] | |
| for jid, j in _jobs.items(): | |
| rows.append( | |
| { | |
| "job_id": j.id, | |
| "model_id": j.model_id, | |
| "family_id": j.family_id, | |
| "status": j.status.value, | |
| "created_at": j.created_at, | |
| "updated_at": j.updated_at or j.created_at, | |
| "error": (j.error or "").replace("\n", " ")[:2000], | |
| "submission_notes": (j.submission_notes or "").replace("\n", " ")[:4000], | |
| "contact_email": (j.contact_email or "").replace("\n", " ")[:254], | |
| "extra_requirements": (j.extra_requirements or "").replace("\r\n", "\n")[:8000], | |
| "setup_script_b64": _encode_script_b64(j.setup_script or ""), | |
| "custom_script_b64": _encode_script_b64(j.custom_script or ""), | |
| "recipe_id": (j.recipe_id or "").strip()[:64], | |
| "is_gated": _bool_to_csv(j.is_gated), | |
| "run_custom_script": _bool_to_csv(j.run_custom_script), | |
| "hf_remote_job_id": (j.hf_remote_job_id or "").strip(), | |
| "remote_artifact_path": (j.remote_artifact_path or "").strip(), | |
| "eval_conditions": ",".join(j.eval_conditions) | |
| if j.eval_conditions | |
| else "", | |
| } | |
| ) | |
| rows.sort(key=lambda r: r["created_at"]) | |
| buf = io.StringIO() | |
| w = csv.DictWriter(buf, fieldnames=_JOBS_CSV_FIELDS, extrasaction="ignore") | |
| w.writeheader() | |
| w.writerows(rows) | |
| content = buf.getvalue().encode("utf-8") | |
| try: | |
| with _jobs_file_lock: | |
| upload_to_bucket( | |
| HF_BUCKET_ID, | |
| add=[(content, JOBS_STATE_PATH)], | |
| token=HF_TOKEN, | |
| ) | |
| except Exception: | |
| pass | |
| def _load_persisted_jobs_once() -> None: | |
| """Restore jobs from bucket and re-queue work that was queued or interrupted.""" | |
| global _jobs_loaded | |
| if _jobs_loaded: | |
| return | |
| _jobs_loaded = True | |
| from storage import STORAGE_BACKEND, download_bucket_file, JOBS_STATE_PATH | |
| if STORAGE_BACKEND != "hf_bucket": | |
| return | |
| try: | |
| import os as _os | |
| local_path = download_bucket_file(JOBS_STATE_PATH) | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| disk_rows = list(csv.DictReader(f)) | |
| _os.unlink(local_path) | |
| except Exception: | |
| return | |
| if not disk_rows: | |
| return | |
| # Last row wins if duplicate job_id | |
| by_id: dict[str, dict] = {} | |
| for row in disk_rows: | |
| jid = (row.get("job_id") or "").strip() | |
| if jid: | |
| by_id[jid] = row | |
| to_queue: list[tuple[str, str]] = [] | |
| with _jobs_lock: | |
| for jid, row in by_id.items(): | |
| st_raw = (row.get("status") or "").strip() | |
| try: | |
| st = JobStatus(st_raw) | |
| except ValueError: | |
| continue | |
| err = (row.get("error") or "").strip() or None | |
| notes = (row.get("submission_notes") or "").strip() | |
| email = (row.get("contact_email") or "").strip()[:254] | |
| extra_req = (row.get("extra_requirements") or "").strip() | |
| setup = _decode_script_b64(row.get("setup_script_b64") or "") | |
| script = _decode_script_b64(row.get("custom_script_b64") or "") | |
| recipe_id = (row.get("recipe_id") or "").strip() | |
| is_gated = _bool_from_csv(row.get("is_gated") or "") | |
| run_custom = _bool_from_csv(row.get("run_custom_script") or "") | |
| created = (row.get("created_at") or _now_iso()).strip() | |
| updated = (row.get("updated_at") or created).strip() | |
| mid = (row.get("model_id") or "").strip() | |
| fid = (row.get("family_id") or "").strip() | |
| hf_rid = (row.get("hf_remote_job_id") or "").strip() or None | |
| art_path = (row.get("remote_artifact_path") or "").strip() or None | |
| eval_conds = _parse_eval_conditions_csv(row.get("eval_conditions") or "") | |
| if not mid: | |
| continue | |
| if st == JobStatus.running: | |
| st = JobStatus.queued | |
| extra = "Re-queued after Space restart (was running)." | |
| err = f"{err}; {extra}" if err else extra | |
| if st in (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting): | |
| st = JobStatus.queued | |
| extra = "Re-queued after Space restart (remote job resume)." | |
| err = f"{err}; {extra}" if err else extra | |
| job = Job( | |
| id=jid, | |
| model_id=mid, | |
| family_id=fid, | |
| status=st, | |
| created_at=created, | |
| updated_at=updated, | |
| error=err, | |
| submission_notes=notes, | |
| contact_email=email, | |
| extra_requirements=extra_req, | |
| setup_script=setup, | |
| custom_script=script, | |
| recipe_id=recipe_id, | |
| is_gated=is_gated, | |
| run_custom_script=run_custom, | |
| hf_remote_job_id=hf_rid, | |
| remote_artifact_path=art_path, | |
| eval_conditions=eval_conds, | |
| ) | |
| _jobs[jid] = job | |
| if st == JobStatus.queued: | |
| to_queue.append((created, jid)) | |
| to_queue.sort(key=lambda x: x[0]) | |
| for _, jid in to_queue: | |
| _work_queue.put(jid) | |
| def _progress_update(job_id: str, done: int, total: int, condition: str) -> None: | |
| """Cheap in-memory progress update; throttling is done by the UI timer.""" | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j is None: | |
| return | |
| j.progress_done = int(done) | |
| j.progress_total = int(total) | |
| j.progress_condition = condition or "" | |
| def _leaderboard_sort_rows_inplace(rows: list[dict]) -> None: | |
| """Sort leaderboard CSV rows by Pareto layer (asc), then FFAS score (desc).""" | |
| from analytics import sort_leaderboard_rows_inplace | |
| sort_leaderboard_rows_inplace(rows) | |
| def _remote_poll_interval_s() -> float: | |
| raw = os.environ.get("FFASR_REMOTE_JOB_POLL_S", "10").strip() | |
| try: | |
| return float(raw) if raw else 10.0 | |
| except ValueError: | |
| return 10.0 | |
| _DEFAULT_REMOTE_JOB_MAX_WAIT_S = 86400.0 # 24 hours; previously 8h / 4h. | |
| # Extra slack past the Space-side deadline before we declare a timeout. | |
| # Absorbs the brief gap between a worker script exiting cleanly (artifact | |
| # uploaded) and the Hub flipping the job stage to COMPLETED. | |
| _REMOTE_DEADLINE_GRACE_S = 90.0 | |
| def _remote_max_wait_s() -> float: | |
| raw = os.environ.get( | |
| "FFASR_REMOTE_JOB_MAX_WAIT_S", str(_DEFAULT_REMOTE_JOB_MAX_WAIT_S) | |
| ).strip() | |
| try: | |
| return float(raw) if raw else _DEFAULT_REMOTE_JOB_MAX_WAIT_S | |
| except ValueError: | |
| return _DEFAULT_REMOTE_JOB_MAX_WAIT_S | |
| def remote_max_concurrent_jobs() -> int: | |
| """Max Hub Jobs in flight at once when ``FFASR_REMOTE_JOBS=1`` (default 4).""" | |
| raw = os.environ.get( | |
| "FFASR_REMOTE_MAX_CONCURRENT_JOBS", str(_DEFAULT_REMOTE_MAX_CONCURRENT) | |
| ).strip() | |
| try: | |
| n = int(raw) if raw else _DEFAULT_REMOTE_MAX_CONCURRENT | |
| except ValueError: | |
| n = _DEFAULT_REMOTE_MAX_CONCURRENT | |
| return max(1, min(n, 32)) | |
| def _remote_in_flight_statuses() -> tuple[JobStatus, ...]: | |
| return (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting) | |
| def _count_remote_in_flight() -> int: | |
| with _jobs_lock: | |
| return sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses()) | |
| def _list_remote_in_flight_ids() -> list[str]: | |
| with _jobs_lock: | |
| return [ | |
| j.id | |
| for j in _jobs.values() | |
| if j.status in _remote_in_flight_statuses() | |
| ] | |
| def _list_active_eval_jobs() -> list[Job]: | |
| """Jobs currently using a remote or in-process eval slot.""" | |
| with _jobs_lock: | |
| return [ | |
| j | |
| for j in _jobs.values() | |
| if j.status | |
| in ( | |
| JobStatus.running, | |
| *_remote_in_flight_statuses(), | |
| ) | |
| ] | |
| def _fail_job(job_id: str, error: str, *, finish_queue: bool = True) -> None: | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j: | |
| j.status = JobStatus.failed | |
| j.error = (error or "Unknown error")[:8000] | |
| _touch(j) | |
| _prune_jobs() | |
| _remote_deadlines.pop(job_id, None) | |
| _persist_jobs() | |
| if finish_queue: | |
| try: | |
| _work_queue.task_done() | |
| except ValueError: | |
| pass | |
| def _succeed_job(job_id: str, result: dict) -> None: | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j: | |
| j.status = JobStatus.done | |
| j.result = result | |
| j.error = None | |
| j.progress_condition = "" | |
| _touch(j) | |
| _prune_jobs() | |
| _remote_deadlines.pop(job_id, None) | |
| _persist_jobs() | |
| try: | |
| _work_queue.task_done() | |
| except ValueError: | |
| pass | |
| def _merge_eval_result_to_leaderboard( | |
| result: dict, | |
| submitted_at_iso: str, | |
| submission_notes: str, | |
| contact_email: str = "", | |
| *, | |
| replace_existing: bool = False, | |
| merge_partial: bool = False, | |
| ) -> None: | |
| from init import ( | |
| invalidate_results_cache, | |
| leaderboard_row_from_eval_result, | |
| load_raw_results, | |
| merge_leaderboard_row_from_eval_result, | |
| normalize_legacy_csv_row, | |
| save_raw_results, | |
| ) | |
| if merge_partial: | |
| merge_leaderboard_row_from_eval_result( | |
| result, | |
| submitted_at_iso, | |
| submission_notes=submission_notes, | |
| contact_email=contact_email, | |
| ) | |
| return | |
| model_id = str(result.get("model_id", "")).strip() | |
| rows = load_raw_results() | |
| if replace_existing and model_id: | |
| existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id] | |
| for i in sorted(existing, reverse=True): | |
| rows.pop(i) | |
| new_row = leaderboard_row_from_eval_result( | |
| result, | |
| submitted_at_iso, | |
| submission_notes=submission_notes, | |
| contact_email=contact_email, | |
| ) | |
| normalize_legacy_csv_row(new_row) | |
| rows.append(new_row) | |
| _leaderboard_sort_rows_inplace(rows) | |
| save_raw_results(rows) | |
| invalidate_results_cache() | |
| def _parse_eval_conditions_csv(raw: str) -> tuple[str, ...] | None: | |
| from benchmark.dataset import resolve_condition_keys | |
| parts = [p.strip() for p in (raw or "").split(",") if p.strip()] | |
| if not parts: | |
| return None | |
| keys = resolve_condition_keys(parts) | |
| from benchmark.dataset import PACKED_FILES | |
| if set(keys) == set(PACKED_FILES.keys()): | |
| return None | |
| return keys | |
| def _encode_eval_conditions_csv(keys: tuple[str, ...] | None) -> str: | |
| if not keys: | |
| return "" | |
| return ",".join(keys) | |
| def normalize_moderator_eval_conditions( | |
| selected: list[str] | None, | |
| ) -> tuple[str, ...] | None: | |
| """``None`` = all packed conditions; otherwise a validated non-empty subset.""" | |
| from benchmark.dataset import PACKED_FILES, resolve_condition_keys | |
| if not selected: | |
| return None | |
| keys = resolve_condition_keys(selected) | |
| if not keys: | |
| raise ValueError("Select at least one dataset to evaluate.") | |
| if set(keys) == set(PACKED_FILES.keys()): | |
| return None | |
| return keys | |
| def eval_condition_checkbox_defaults() -> tuple[list[str], list[str]]: | |
| """(labels, values) for Gradio CheckboxGroup defaults (all selected).""" | |
| from benchmark.dataset import CONDITION_UI_CHOICES | |
| labels = [lbl for lbl, _ in CONDITION_UI_CHOICES] | |
| values = [key for _, key in CONDITION_UI_CHOICES] | |
| return labels, values | |
| def _job_is_partial_eval(job: Job | None) -> bool: | |
| return bool(job and job.eval_conditions) | |
| def _load_artifact_json_from_bucket(artifact_path: str) -> dict: | |
| """Download and parse a remote eval JSON artifact from the Hub bucket.""" | |
| from storage import STORAGE_BACKEND, download_bucket_file | |
| if STORAGE_BACKEND != "hf_bucket": | |
| raise RuntimeError("Artifact import requires STORAGE_BACKEND='hf_bucket'.") | |
| local_path = download_bucket_file(artifact_path) | |
| try: | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| finally: | |
| try: | |
| os.unlink(local_path) | |
| except Exception: | |
| pass | |
| def _submission_fields_for_artifact_import( | |
| data: dict, artifact_path: str, notes_override: str, email_override: str | |
| ) -> tuple[str, str]: | |
| notes = (notes_override or "").strip()[:4000] | |
| email = (email_override or "").strip()[:254] | |
| if notes and email: | |
| return notes, email | |
| space_job_id = str(data.get("job_id") or "").strip() | |
| with _jobs_lock: | |
| for j in _jobs.values(): | |
| if space_job_id and j.id == space_job_id: | |
| if not notes: | |
| notes = (j.submission_notes or "").strip()[:4000] | |
| if not email: | |
| email = (j.contact_email or "").strip()[:254] | |
| return notes, email | |
| if (j.remote_artifact_path or "").strip() == artifact_path: | |
| if not notes: | |
| notes = (j.submission_notes or "").strip()[:4000] | |
| if not email: | |
| email = (j.contact_email or "").strip()[:254] | |
| return notes, email | |
| return notes, email | |
| def _submission_notes_for_artifact_import( | |
| data: dict, artifact_path: str, override: str | |
| ) -> str: | |
| notes, _ = _submission_fields_for_artifact_import(data, artifact_path, override, "") | |
| return notes | |
| def _mark_job_done_for_artifact( | |
| data: dict, artifact_path: str, result: dict | |
| ) -> str | None: | |
| """If a tracked queue job matches, mark it done. Returns matched job id or None.""" | |
| space_job_id = str(data.get("job_id") or "").strip() | |
| matched: Job | None = None | |
| with _jobs_lock: | |
| for j in _jobs.values(): | |
| if space_job_id and j.id == space_job_id: | |
| matched = j | |
| break | |
| if (j.remote_artifact_path or "").strip() == artifact_path: | |
| matched = j | |
| break | |
| if matched is not None: | |
| matched.status = JobStatus.done | |
| matched.result = result | |
| matched.error = None | |
| if not (matched.remote_artifact_path or "").strip(): | |
| matched.remote_artifact_path = artifact_path | |
| _touch(matched) | |
| if matched is not None: | |
| _persist_jobs() | |
| return matched.id | |
| return None | |
| def import_artifact_to_leaderboard( | |
| artifact_ref: str, | |
| secret: str, | |
| *, | |
| replace_existing: bool = False, | |
| submission_notes: str = "", | |
| ) -> tuple[bool, str]: | |
| """ | |
| Moderator-only: load a bucket JSON artifact and merge its result into leaderboard.csv. | |
| ``artifact_ref`` may be a file name (``abc123.json``), job id (``abc123``), or full bucket path. | |
| """ | |
| ok, msg = _moderator_secret_ok(secret) | |
| if not ok: | |
| return False, msg | |
| from evaluation.remote_artifact import extract_result_or_raise, normalize_artifact_bucket_path | |
| from init import ( | |
| invalidate_results_cache, | |
| leaderboard_row_from_eval_result, | |
| load_raw_results, | |
| normalize_legacy_csv_row, | |
| save_raw_results, | |
| ) | |
| import analytics | |
| try: | |
| artifact_path = normalize_artifact_bucket_path(artifact_ref) | |
| except ValueError as e: | |
| return False, str(e) | |
| try: | |
| data = _load_artifact_json_from_bucket(artifact_path) | |
| except Exception as e: | |
| return False, f"Could not load artifact <code>{_escape_html(artifact_path)}</code>: {e}" | |
| try: | |
| result = extract_result_or_raise(data) | |
| except Exception as e: | |
| return False, f"Artifact is not a successful evaluation: {e}" | |
| model_id = str(result.get("model_id", "")).strip() | |
| if not model_id: | |
| return False, "Artifact result is missing model_id." | |
| notes, email = _submission_fields_for_artifact_import( | |
| data, artifact_path, submission_notes, "" | |
| ) | |
| rows = load_raw_results() | |
| existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id] | |
| if existing and not replace_existing: | |
| row = dict(rows[existing[0]]) | |
| normalize_legacy_csv_row(row) | |
| avg = analytics._avg_wer_for_row(row) | |
| avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A" | |
| return False, ( | |
| f"Model <code>{_escape_html(model_id)}</code> is already on the leaderboard " | |
| f"(Avg WER {avg_txt}). Enable " | |
| f"<strong>Replace existing row</strong> to overwrite." | |
| ) | |
| if existing and replace_existing: | |
| for i in sorted(existing, reverse=True): | |
| rows.pop(i) | |
| submitted_at = _now_iso() | |
| new_row = leaderboard_row_from_eval_result( | |
| result, submitted_at, submission_notes=notes, contact_email=email | |
| ) | |
| normalize_legacy_csv_row(new_row) | |
| rows.append(new_row) | |
| _leaderboard_sort_rows_inplace(rows) | |
| save_raw_results(rows) | |
| invalidate_results_cache() | |
| avg = analytics._avg_wer_for_row(new_row) | |
| avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A" | |
| action = "Replaced" if existing else "Added" | |
| matched_job = _mark_job_done_for_artifact(data, artifact_path, result) | |
| job_bit = f" Matched queue job <code>{_escape_html(matched_job)}</code> marked done." if matched_job else "" | |
| return True, ( | |
| f"{action} <strong>{_escape_html(model_id)}</strong> from " | |
| f"<code>{_escape_html(artifact_path)}</code>. " | |
| f"Average WER: <strong>{avg_txt}</strong>.{job_bit}" | |
| ) | |
| def _remote_collect_result(job_id: str, hf_id: str, jobs_token: str) -> dict: | |
| from evaluation.remote_artifact import extract_result_or_raise | |
| from storage import download_bucket_file | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if not j: | |
| raise RuntimeError("internal: job not found") | |
| artifact_path = str(j.remote_artifact_path or "").strip() | |
| if not artifact_path: | |
| raise RuntimeError("internal: missing remote artifact path") | |
| with _jobs_lock: | |
| j2 = _jobs.get(job_id) | |
| if j2: | |
| j2.status = JobStatus.collecting | |
| _touch(j2) | |
| _persist_jobs() | |
| # HF dataset bucket uploads are sometimes briefly not readable right after | |
| # the worker uploaded the JSON (especially with Xet). Retry a few times | |
| # with light backoff so a single 404 doesn't permanently fail an otherwise | |
| # successful run. | |
| last_err: Exception | None = None | |
| local_path: str | None = None | |
| for attempt in range(5): | |
| try: | |
| local_path = download_bucket_file(artifact_path) | |
| break | |
| except Exception as e: | |
| last_err = e | |
| msg = str(e).lower() | |
| transient = ( | |
| "404" in msg | |
| or "not found" in msg | |
| or "entrynotfound" in msg | |
| or "no such" in msg | |
| or "timed out" in msg | |
| or "timeout" in msg | |
| ) | |
| if not transient or attempt == 4: | |
| raise | |
| time.sleep(2.0 * (2 ** attempt)) | |
| if local_path is None: | |
| # Shouldn't reach here, but keep mypy / runtime safe. | |
| raise last_err or RuntimeError("artifact download failed") | |
| try: | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| finally: | |
| try: | |
| os.unlink(local_path) | |
| except Exception: | |
| pass | |
| return extract_result_or_raise(data) | |
| def _remote_dispatch_job(job_id: str, mid: str, fid: str, jobs_token: str) -> str: | |
| """Submit Hub Job if needed; return ``hf_remote_job_id``.""" | |
| import remote_jobs | |
| from evaluation.remote_artifact import default_remote_artifact_path | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if not j: | |
| raise RuntimeError("internal: job not found") | |
| if not (j.remote_artifact_path or "").strip(): | |
| j.remote_artifact_path = default_remote_artifact_path(job_id) | |
| _touch(j) | |
| artifact_path = str(j.remote_artifact_path).strip() | |
| existing_hf = (j.hf_remote_job_id or "").strip() | |
| if existing_hf: | |
| with _jobs_lock: | |
| j2 = _jobs.get(job_id) | |
| if j2 and j2.status != JobStatus.remote_running: | |
| j2.status = JobStatus.remote_running | |
| _touch(j2) | |
| _persist_jobs() | |
| _remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s() | |
| return existing_hf | |
| with _jobs_lock: | |
| j3 = _jobs.get(job_id) | |
| if j3: | |
| j3.status = JobStatus.dispatching | |
| _touch(j3) | |
| _persist_jobs() | |
| with _jobs_lock: | |
| j_dispatch = _jobs.get(job_id) | |
| extra_req = (j_dispatch.extra_requirements or "") if j_dispatch else "" | |
| setup_script = (j_dispatch.setup_script or "") if j_dispatch else "" | |
| custom_script = (j_dispatch.custom_script or "") if j_dispatch else "" | |
| recipe_id = (j_dispatch.recipe_id or "") if j_dispatch else "" | |
| run_custom = bool(j_dispatch and j_dispatch.run_custom_script) | |
| eval_conds = j_dispatch.eval_conditions if j_dispatch else None | |
| info = remote_jobs.submit_eval_job( | |
| model_id=mid, | |
| family_id=fid, | |
| space_job_id=job_id, | |
| artifact_path=artifact_path, | |
| token=jobs_token, | |
| extra_requirements=extra_req, | |
| setup_script=setup_script, | |
| custom_script=custom_script, | |
| recipe_id=recipe_id, | |
| run_custom_script=run_custom, | |
| eval_conditions=eval_conds, | |
| ) | |
| hf_id = info.id | |
| with _jobs_lock: | |
| j4 = _jobs.get(job_id) | |
| if j4: | |
| j4.hf_remote_job_id = hf_id | |
| j4.status = JobStatus.remote_running | |
| _touch(j4) | |
| _persist_jobs() | |
| _remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s() | |
| return hf_id | |
| def _remote_start_queued_job(job_id: str, jobs_token: str) -> None: | |
| """Move a queued job onto Hub Jobs (duplicate check + dispatch).""" | |
| from init import load_raw_results | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j is None or j.status != JobStatus.queued: | |
| try: | |
| _work_queue.task_done() | |
| except ValueError: | |
| pass | |
| return | |
| mid = j.model_id | |
| fid = j.family_id | |
| notes = j.submission_notes or "" | |
| email = j.contact_email or "" | |
| j.status = JobStatus.running | |
| j.progress_done = 0 | |
| j.progress_total = 0 | |
| j.progress_condition = "" | |
| _touch(j) | |
| _persist_jobs() | |
| rows = load_raw_results() | |
| with _jobs_lock: | |
| j_chk = _jobs.get(job_id) | |
| replace_lb = bool(j_chk and j_chk.replace_leaderboard) | |
| partial = _job_is_partial_eval(j_chk) | |
| if not replace_lb and not partial and any(r["model_id"] == mid for r in rows): | |
| _fail_job(job_id, "Model already on leaderboard (skipped duplicate race).") | |
| return | |
| global _running_job_id | |
| _running_job_id = job_id | |
| try: | |
| _remote_dispatch_job(job_id, mid, fid, jobs_token) | |
| except Exception as e: | |
| _fail_job(job_id, str(e)) | |
| finally: | |
| with _jobs_lock: | |
| if _running_job_id == job_id: | |
| _running_job_id = None | |
| def _remote_tick_job(job_id: str, jobs_token: str) -> None: | |
| """Poll one remote job once; on completion merge CSV or mark failed.""" | |
| import remote_jobs | |
| from huggingface_hub._jobs_api import JobStage | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j is None or j.status not in _remote_in_flight_statuses(): | |
| return | |
| hf_id = (j.hf_remote_job_id or "").strip() | |
| if not hf_id: | |
| return | |
| mid = j.model_id | |
| fid = j.family_id | |
| notes = j.submission_notes or "" | |
| email = j.contact_email or "" | |
| # Grace window absorbs the brief gap between the worker script returning | |
| # rc=0 (and uploading the JSON artifact) and the Hub flipping the job's | |
| # stage to COMPLETED. Without this, a tick that arrives ε seconds past | |
| # the deadline would fail an already-successful run with a misleading | |
| # "exceeded max wait" message. | |
| deadline = _remote_deadlines.get(job_id) | |
| deadline_hit = ( | |
| deadline is not None | |
| and time.monotonic() > deadline + _REMOTE_DEADLINE_GRACE_S | |
| ) | |
| try: | |
| info = remote_jobs.inspect_job_once(hf_id, token=jobs_token) | |
| except Exception: | |
| # If the Hub API is unavailable AND we're already past the wait | |
| # budget, give up cleanly. Otherwise wait for the next tick. | |
| if deadline_hit: | |
| _fail_job( | |
| job_id, | |
| f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s); " | |
| "Hub inspect_job unavailable.", | |
| ) | |
| return | |
| stage = info.status.stage if info.status else None | |
| # 1) Always honor a terminal Hub state, even if the Space-side deadline | |
| # already elapsed -- a completed job's artifact should be collected, | |
| # not thrown away. | |
| if stage == JobStage.COMPLETED: | |
| try: | |
| result = _remote_collect_result(job_id, hf_id, jobs_token) | |
| with _jobs_lock: | |
| j_done = _jobs.get(job_id) | |
| replace_lb = bool(j_done and j_done.replace_leaderboard) | |
| partial = _job_is_partial_eval(j_done) | |
| _merge_eval_result_to_leaderboard( | |
| result, | |
| _now_iso(), | |
| notes, | |
| email, | |
| replace_existing=replace_lb and not partial, | |
| merge_partial=partial, | |
| ) | |
| with _jobs_lock: | |
| j_clr = _jobs.get(job_id) | |
| if j_clr: | |
| j_clr.replace_leaderboard = False | |
| _succeed_job(job_id, result) | |
| except Exception as e: | |
| _fail_job(job_id, str(e)) | |
| return | |
| if stage in (JobStage.ERROR, JobStage.CANCELED, JobStage.DELETED): | |
| _fail_job(job_id, remote_jobs.describe_job_failure(info, token=jobs_token)) | |
| return | |
| # 2) Job is genuinely still running. Only now enforce the Space-side | |
| # deadline (the Hub has its own FFASR_REMOTE_JOB_TIMEOUT as a backstop). | |
| if deadline_hit: | |
| try: | |
| remote_jobs.cancel_remote_job(hf_id, token=jobs_token) | |
| except Exception: | |
| pass | |
| _fail_job(job_id, f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s).") | |
| return | |
| def _remote_parallel_worker_loop() -> None: | |
| """Dispatch up to N Hub Jobs and poll them concurrently (N = remote_max_concurrent_jobs).""" | |
| import remote_jobs | |
| from storage import HF_TOKEN, require_token_for_ffasr_jobs | |
| jobs_token = require_token_for_ffasr_jobs() | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN is required when FFASR_REMOTE_JOBS=1.") | |
| poll_s = _remote_poll_interval_s() | |
| max_slots = remote_max_concurrent_jobs() | |
| while True: | |
| while _count_remote_in_flight() < max_slots: | |
| try: | |
| job_id = _work_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| _remote_start_queued_job(job_id, jobs_token) | |
| active = _list_remote_in_flight_ids() | |
| if active: | |
| for jid in active: | |
| _remote_tick_job(jid, jobs_token) | |
| time.sleep(poll_s) | |
| continue | |
| if _work_queue.qsize() == 0: | |
| try: | |
| job_id = _work_queue.get(timeout=poll_s) | |
| except queue.Empty: | |
| continue | |
| _remote_start_queued_job(job_id, jobs_token) | |
| else: | |
| time.sleep(min(poll_s, 2.0)) | |
| def ensure_worker_started() -> None: | |
| """Start background Hub Jobs worker after restoring persisted queue (once per process).""" | |
| global _worker_started | |
| _load_persisted_jobs_once() | |
| with _worker_lock: | |
| if _worker_started: | |
| return | |
| import remote_jobs as _rj | |
| if not _rj.remote_jobs_enabled(): | |
| raise RuntimeError( | |
| "FFASR_REMOTE_JOBS=1 is required. This Space dispatches evaluations to " | |
| "Hugging Face Hub Jobs and does not run ASR models locally." | |
| ) | |
| t = threading.Thread(target=_remote_parallel_worker_loop, name="ffasr-remote-worker", daemon=True) | |
| t.start() | |
| _worker_started = True | |
| def _ensure_worker() -> None: | |
| """Alias used by queue UI helpers.""" | |
| ensure_worker_started() | |
| def _worker_unavailable_html(exc: Exception) -> str: | |
| return ( | |
| "<p style='color:orange'><strong>Queue unavailable:</strong> " | |
| f"{_escape_html(str(exc))}</p>" | |
| ) | |
| def _model_in_flight(model_id: str) -> bool: | |
| with _jobs_lock: | |
| for j in _jobs.values(): | |
| if j.model_id != model_id: | |
| continue | |
| if j.status in ( | |
| JobStatus.pending_moderation, | |
| JobStatus.queued, | |
| JobStatus.running, | |
| JobStatus.dispatching, | |
| JobStatus.remote_running, | |
| JobStatus.collecting, | |
| ): | |
| return True | |
| return False | |
| def _pending_moderation_count() -> int: | |
| with _jobs_lock: | |
| return sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation) | |
| def enqueue( | |
| model_id: str, | |
| family_id: str, | |
| submission_notes: str = "", | |
| *, | |
| contact_email: str = "", | |
| extra_requirements: str = "", | |
| setup_script: str = "", | |
| custom_script: str = "", | |
| recipe_id: str = "", | |
| is_gated: bool = False, | |
| ) -> tuple[str, int, str | None, bool]: | |
| """ | |
| Enqueue an evaluation job. | |
| Returns (job_id, position_or_count, error_message, awaiting_moderation). | |
| When awaiting_moderation is True, the job is not in the execution queue yet. | |
| """ | |
| import remote_jobs | |
| if not remote_jobs.remote_jobs_enabled(): | |
| return "", 0, "remote_jobs_required", False | |
| _ensure_worker() | |
| from init import load_raw_results | |
| if moderation_misconfigured(): | |
| return "", 0, "moderation_misconfigured", False | |
| if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG and not moderation_active(): | |
| return "", 0, "queue_full", False | |
| if _model_in_flight(model_id): | |
| return "", 0, f"Model '{model_id}' is already submitted, queued, or running.", False | |
| existing = load_raw_results() | |
| for row in existing: | |
| if row["model_id"] == model_id: | |
| return "", 0, "already_in_csv", False | |
| if moderation_active() and _pending_moderation_count() >= _MAX_PENDING_MODERATION: | |
| return "", 0, "pending_moderation_full", False | |
| job_id = str(uuid.uuid4())[:8] | |
| created = _now_iso() | |
| notes_clean = (submission_notes or "").strip()[:4000] | |
| try: | |
| email_clean = sanitize_contact_email(contact_email) | |
| from recipes.registry import apply_recipe_to_submission | |
| req_lines = parse_requirements_lines(extra_requirements) | |
| extra_req_clean = "\n".join(req_lines) | |
| setup_clean = sanitize_setup_script(setup_script) | |
| script_clean = sanitize_custom_script(custom_script) | |
| extra_req_clean, setup_clean, script_clean, resolved_recipe = apply_recipe_to_submission( | |
| model_id, | |
| recipe_id or None, | |
| extra_req_clean, | |
| setup_clean, | |
| script_clean, | |
| ) | |
| recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64] | |
| except Exception as e: | |
| return "", 0, f"Invalid submission fields: {e}", False | |
| awaiting = moderation_active() | |
| status = JobStatus.pending_moderation if awaiting else JobStatus.queued | |
| uses_custom_stack = bool( | |
| script_clean.strip() or setup_clean.strip() or recipe_clean | |
| ) | |
| job = Job( | |
| id=job_id, | |
| model_id=model_id, | |
| family_id=family_id, | |
| status=status, | |
| created_at=created, | |
| updated_at=created, | |
| submission_notes=notes_clean, | |
| contact_email=email_clean, | |
| extra_requirements=extra_req_clean, | |
| setup_script=setup_clean, | |
| custom_script=script_clean, | |
| recipe_id=recipe_clean, | |
| is_gated=bool(is_gated), | |
| run_custom_script=uses_custom_stack, | |
| ) | |
| with _jobs_lock: | |
| _jobs[job_id] = job | |
| _persist_jobs() | |
| if awaiting: | |
| pos = _pending_moderation_count() | |
| return job_id, pos, None, True | |
| if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: | |
| with _jobs_lock: | |
| _jobs.pop(job_id, None) | |
| return "", 0, "queue_full", False | |
| _work_queue.put(job_id) | |
| position = _work_queue.qsize() | |
| return job_id, position, None, False | |
| def approve_job( | |
| job_id: str, | |
| secret: str, | |
| *, | |
| run_custom_script: bool = False, | |
| eval_conditions: list[str] | None = None, | |
| ) -> tuple[bool, str]: | |
| """Move a pending job into the execution queue (moderator only).""" | |
| if not moderation_active(): | |
| return False, "Moderation is not active on this Space." | |
| if not _check_moderator_secret(secret): | |
| return False, "Invalid moderator secret." | |
| job_id = job_id.strip() | |
| if not job_id: | |
| return False, "Enter a job ID." | |
| try: | |
| conds = normalize_moderator_eval_conditions(eval_conditions) | |
| except ValueError as e: | |
| return False, str(e) | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job or job.status != JobStatus.pending_moderation: | |
| return False, "Job not found or not awaiting approval." | |
| if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: | |
| return False, "Execution queue is full; try again in a moment." | |
| job.status = JobStatus.queued | |
| # Use custom stack when evaluate/setup/recipe is provided. | |
| job.run_custom_script = bool( | |
| (job.custom_script or "").strip() | |
| or (job.setup_script or "").strip() | |
| or (job.recipe_id or "").strip() | |
| ) | |
| job.eval_conditions = conds | |
| job.replace_leaderboard = False | |
| _touch(job) | |
| _work_queue.put(job_id) | |
| _persist_jobs() | |
| _ensure_worker() | |
| max_n = remote_max_concurrent_jobs() | |
| custom_note = "" | |
| cond_note = "" | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j and j.run_custom_script and (j.custom_script or "").strip(): | |
| custom_note = " Custom script will run on the Hub Job." | |
| if j and j.eval_conditions: | |
| cond_note = f" Evaluating: {', '.join(j.eval_conditions)} (others unchanged on success)." | |
| return True, ( | |
| f"Approved job {job_id}. Up to {max_n} Hub Jobs may run in parallel; " | |
| f"this job starts when a slot is free.{custom_note}{cond_note}" | |
| ) | |
| def reject_job(job_id: str, secret: str) -> tuple[bool, str]: | |
| """Reject a pending job (moderator only).""" | |
| if not moderation_active(): | |
| return False, "Moderation is not active on this Space." | |
| if not _check_moderator_secret(secret): | |
| return False, "Invalid moderator secret." | |
| job_id = job_id.strip() | |
| if not job_id: | |
| return False, "Enter a job ID." | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job or job.status != JobStatus.pending_moderation: | |
| return False, "Job not found or not awaiting approval." | |
| job.status = JobStatus.failed | |
| job.error = "Rejected by moderator." | |
| _touch(job) | |
| _prune_jobs() | |
| _persist_jobs() | |
| return True, f"Rejected job {job_id}." | |
| def verify_moderator_secret(secret: str) -> tuple[bool, str]: | |
| """Check moderator secret for UI unlock and privileged actions.""" | |
| if not MODERATOR_SECRET: | |
| return False, "Moderator secret is not configured (set FFASR_MODERATOR_SECRET)." | |
| if not _check_moderator_secret(secret): | |
| return False, "Invalid moderator secret." | |
| return True, "" | |
| def _moderator_secret_ok(secret: str) -> tuple[bool, str]: | |
| """Shared gate for destructive / queue actions (needs FFASR_MODERATOR_SECRET).""" | |
| return verify_moderator_secret(secret) | |
| def moderation_locked_placeholder_html() -> str: | |
| """Neutral HTML shown before the moderator secret unlocks the panel.""" | |
| return "<p><em>Moderator tools are locked. Enter the secret above and click Unlock.</em></p>" | |
| def _job_can_retry(status: JobStatus) -> bool: | |
| """Moderator may re-run jobs that are not actively executing on Hub.""" | |
| if status in ( | |
| JobStatus.running, | |
| JobStatus.dispatching, | |
| JobStatus.remote_running, | |
| JobStatus.collecting, | |
| JobStatus.pending_moderation, | |
| ): | |
| return False | |
| return status in (JobStatus.failed, JobStatus.done, JobStatus.queued) | |
| def retry_failed_job( | |
| job_id: str, | |
| secret: str, | |
| *, | |
| eval_conditions: list[str] | None = None, | |
| ) -> tuple[bool, str]: | |
| """Re-queue a job for another evaluation run (moderator only). | |
| Allowed for failed, done, and queued jobs. Successful re-runs replace the | |
| existing leaderboard row for that model when one is present. | |
| """ | |
| ok, msg = _moderator_secret_ok(secret) | |
| if not ok: | |
| return False, msg | |
| job_id = job_id.strip() | |
| if not job_id: | |
| return False, "Select a job." | |
| try: | |
| conds = normalize_moderator_eval_conditions(eval_conditions) | |
| except ValueError as e: | |
| return False, str(e) | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| return False, "Job not found." | |
| if not _job_can_retry(job.status): | |
| return ( | |
| False, | |
| "Cannot retry while the job is running or awaiting moderation. " | |
| "Wait for it to finish, or approve/reject pending jobs first.", | |
| ) | |
| if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG: | |
| return False, "Execution queue is full; try again later." | |
| was_queued = job.status == JobStatus.queued | |
| job.status = JobStatus.queued | |
| job.error = None | |
| job.result = None | |
| job.hf_remote_job_id = None | |
| job.remote_artifact_path = None | |
| job.progress_done = 0 | |
| job.progress_total = 0 | |
| job.progress_condition = "" | |
| job.eval_conditions = conds | |
| job.replace_leaderboard = conds is None | |
| _touch(job) | |
| if not was_queued: | |
| _work_queue.put(job_id) | |
| _persist_jobs() | |
| _ensure_worker() | |
| cond_note = "" | |
| with _jobs_lock: | |
| j = _jobs.get(job_id) | |
| if j and j.eval_conditions: | |
| cond_note = ( | |
| f" Only {', '.join(j.eval_conditions)} will run; " | |
| "existing leaderboard WER columns for other datasets are kept." | |
| ) | |
| elif j and j.replace_leaderboard: | |
| cond_note = " Full re-eval; the leaderboard row will be replaced on success." | |
| return ( | |
| True, | |
| f"Re-queued job {job_id}; it will run after jobs ahead of it.{cond_note}" | |
| ) | |
| def retry_all_eligible_jobs(secret: str) -> tuple[bool, str]: | |
| """Re-queue every retry-eligible job against the full benchmark (all datasets). | |
| "Retry-eligible" matches :func:`_job_can_retry` — i.e. ``failed``, ``done``, or | |
| ``queued``. Jobs that are running, dispatching, awaiting moderation, etc. are left | |
| untouched. ``eval_conditions`` is forced to ``None`` (full benchmark) so that on | |
| success the existing leaderboard row is replaced — matching the semantics | |
| described in the moderator panel for a full re-run. | |
| Stops early (with a partial-success message) if the work queue fills up, so we | |
| never silently drop retries on the floor. | |
| """ | |
| ok, msg = _moderator_secret_ok(secret) | |
| if not ok: | |
| return False, msg | |
| requeued: list[str] = [] | |
| skipped_running: list[str] = [] | |
| skipped_pending: list[str] = [] | |
| capacity_hit = False | |
| job_ids_to_enqueue: list[str] = [] | |
| with _jobs_lock: | |
| snapshot = list(_jobs.values()) | |
| snapshot.sort(key=lambda j: j.updated_at or j.created_at) | |
| for job in snapshot: | |
| if job.status == JobStatus.pending_moderation: | |
| skipped_pending.append(job.id) | |
| continue | |
| if not _job_can_retry(job.status): | |
| skipped_running.append(job.id) | |
| continue | |
| # Approximate current backlog while we still hold the lock; this keeps | |
| # us within ``_MAX_QUEUE_BACKLOG`` even when several retries land at once. | |
| if _work_queue.qsize() + len(job_ids_to_enqueue) >= _MAX_QUEUE_BACKLOG: | |
| capacity_hit = True | |
| break | |
| was_queued = job.status == JobStatus.queued | |
| job.status = JobStatus.queued | |
| job.error = None | |
| job.result = None | |
| job.hf_remote_job_id = None | |
| job.remote_artifact_path = None | |
| job.progress_done = 0 | |
| job.progress_total = 0 | |
| job.progress_condition = "" | |
| job.eval_conditions = None | |
| job.replace_leaderboard = True | |
| _touch(job) | |
| requeued.append(job.id) | |
| if not was_queued: | |
| job_ids_to_enqueue.append(job.id) | |
| for jid in job_ids_to_enqueue: | |
| _work_queue.put(jid) | |
| if requeued: | |
| _persist_jobs() | |
| _ensure_worker() | |
| parts: list[str] = [] | |
| if requeued: | |
| parts.append( | |
| f"Re-queued {len(requeued)} job(s) against the full benchmark; " | |
| "on success each leaderboard row will be replaced." | |
| ) | |
| else: | |
| parts.append("No retry-eligible jobs were found.") | |
| if skipped_pending: | |
| parts.append( | |
| f"Skipped {len(skipped_pending)} job(s) awaiting moderation " | |
| "(approve or reject those manually)." | |
| ) | |
| if skipped_running: | |
| parts.append( | |
| f"Skipped {len(skipped_running)} job(s) currently running or dispatching." | |
| ) | |
| if capacity_hit: | |
| parts.append( | |
| f"Stopped early: the execution queue hit its backlog cap " | |
| f"({_MAX_QUEUE_BACKLOG}); re-click to enqueue the rest once it drains." | |
| ) | |
| return bool(requeued), " ".join(parts) | |
| def remove_job_entry(job_id: str, secret: str) -> tuple[bool, str]: | |
| """Remove a job from tracking (not allowed while running). Moderator only.""" | |
| ok, msg = _moderator_secret_ok(secret) | |
| if not ok: | |
| return False, msg | |
| job_id = job_id.strip() | |
| if not job_id: | |
| return False, "Select a job." | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| return False, "Job not found." | |
| if job.status == JobStatus.running: | |
| return False, "Cannot remove a running job; wait for it to finish." | |
| _jobs.pop(job_id, None) | |
| _prune_jobs() | |
| _persist_jobs() | |
| return True, f"Removed job {job_id} from the list." | |
| def list_pending_moderation_jobs() -> list[Job]: | |
| with _jobs_lock: | |
| return [j for j in _jobs.values() if j.status == JobStatus.pending_moderation] | |
| _JOB_NON_EDITABLE_STATUSES = ( | |
| JobStatus.running, | |
| JobStatus.dispatching, | |
| JobStatus.remote_running, | |
| JobStatus.collecting, | |
| ) | |
| def update_job_script_and_requirements( | |
| job_id: str, | |
| secret: str, | |
| *, | |
| extra_requirements: str, | |
| setup_script: str = "", | |
| custom_script: str, | |
| recipe_id: str = "", | |
| ) -> tuple[bool, str]: | |
| """Update stored script/requirements for a job (moderator only).""" | |
| ok, msg = _moderator_secret_ok(secret) | |
| if not ok: | |
| return False, msg | |
| job_id = job_id.strip() | |
| if not job_id: | |
| return False, "No job selected." | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| return False, "Job not found." | |
| try: | |
| from recipes.registry import apply_recipe_to_submission | |
| setup = sanitize_setup_script(setup_script) | |
| script = sanitize_custom_script(custom_script) | |
| reqs = extra_requirements or "" | |
| reqs, setup, script, resolved_recipe = apply_recipe_to_submission( | |
| job.model_id, | |
| recipe_id or None, | |
| reqs, | |
| setup, | |
| script, | |
| ) | |
| recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64] | |
| except Exception as e: | |
| return False, f"Invalid submission fields: {e}" | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| return False, "Job not found." | |
| if job.status in _JOB_NON_EDITABLE_STATUSES: | |
| return ( | |
| False, | |
| "Cannot edit submission while the job is running or dispatching.", | |
| ) | |
| job.extra_requirements = reqs | |
| job.setup_script = setup | |
| job.custom_script = script | |
| job.recipe_id = recipe_clean | |
| job.run_custom_script = bool( | |
| script.strip() or setup.strip() or recipe_clean | |
| ) | |
| _touch(job) | |
| _persist_jobs() | |
| return True, f"Saved submission details for job {job_id}." | |
| def _ensure_jobs_loaded_for_display() -> None: | |
| """Load persisted job state without starting the remote worker (moderator read-only UI).""" | |
| _load_persisted_jobs_once() | |
| def pending_job_dropdown_choices() -> list[tuple[str, str]]: | |
| """Gradio (label, value) pairs for pending job IDs.""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception: | |
| return [("(Queue unavailable)", "")] | |
| jobs = list_pending_moderation_jobs() | |
| jobs.sort(key=lambda j: j.created_at) | |
| if not jobs: | |
| return [("(No pending jobs)", "")] | |
| out: list[tuple[str, str]] = [] | |
| for j in jobs: | |
| label = f"{j.id}: {j.model_id} ({j.family_id})" | |
| out.append((label, j.id)) | |
| return out | |
| def moderation_action_job_choices() -> list[tuple[str, str]]: | |
| """Recent jobs for retry/remove dropdown (label shows id, model, status).""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception: | |
| return [("(Queue unavailable)", "")] | |
| with _jobs_lock: | |
| items = list(_jobs.values()) | |
| items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) | |
| items = items[:60] | |
| if not items: | |
| return [("(No jobs)", "")] | |
| out: list[tuple[str, str]] = [] | |
| for j in items: | |
| short_model = j.model_id.split("/")[-1][:32] | |
| label = f"{j.id}: {short_model} ({j.status.value})" | |
| out.append((label, j.id)) | |
| return out | |
| def _escape_html(s: str) -> str: | |
| return ( | |
| s.replace("&", "&") | |
| .replace("<", "<") | |
| .replace(">", ">") | |
| .replace('"', """) | |
| ) | |
| def _hub_job_link_html(hf_job_id: str | None) -> str: | |
| if not (hf_job_id or "").strip(): | |
| return "" | |
| try: | |
| import remote_jobs as rj | |
| url = rj.hub_job_page_url(hf_job_id.strip()) | |
| except Exception: | |
| return "" | |
| return ( | |
| f"<a href=\"{_escape_html(url)}\" target=\"_blank\" rel=\"noopener noreferrer\">" | |
| f"Open Hub Job logs</a>" | |
| ) | |
| def _bucket_artifacts_link_html() -> str: | |
| try: | |
| import remote_jobs as rj | |
| url = rj.bucket_artifact_browser_url() | |
| except Exception: | |
| return "" | |
| return ( | |
| f"<a href=\"{_escape_html(url)}\" target=\"_blank\" rel=\"noopener noreferrer\">" | |
| f"remote_artifacts folder</a>" | |
| ) | |
| def moderation_list_html() -> str: | |
| """HTML table of jobs awaiting approval.""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception as e: | |
| return _worker_unavailable_html(e) | |
| jobs = list_pending_moderation_jobs() | |
| jobs.sort(key=lambda j: j.created_at) | |
| if not jobs: | |
| return "<p><em>No jobs awaiting approval.</em></p>" | |
| rows = [] | |
| for j in jobs: | |
| note = (j.submission_notes or "").strip() | |
| note_cell = _escape_html(note[:200] + ("…" if len(note) > 200 else "")) if note else "N/A" | |
| gated = " <span style='color:#b45309'>[gated]</span>" if j.is_gated else "" | |
| req_n = len(parse_requirements_lines(j.extra_requirements or "")) | |
| req_bit = f"{req_n} extra req(s)" if req_n else "" | |
| extras = req_bit if req_bit else "—" | |
| script_raw = (j.custom_script or "").strip() | |
| if script_raw: | |
| max_script = 12000 | |
| truncated = len(script_raw) > max_script | |
| script_body = script_raw[:max_script] | |
| if truncated: | |
| script_body += "\n… (truncated)" | |
| script_cell = ( | |
| "<details><summary>View script</summary>" | |
| f"<pre style='max-height:240px;overflow:auto;font-size:0.75em;" | |
| f"white-space:pre-wrap;margin:0.25rem 0'>{_escape_html(script_body)}</pre>" | |
| "</details>" | |
| ) | |
| else: | |
| script_cell = "—" | |
| rows.append( | |
| f"<tr><td><code>{_escape_html(j.id)}</code></td>" | |
| f"<td><code>{_escape_html(j.model_id)}</code>{gated}</td>" | |
| f"<td><code>{_escape_html(j.family_id)}</code></td>" | |
| f"<td style='max-width:140px;font-size:0.85em'>{_escape_html(extras)}</td>" | |
| f"<td style='max-width:320px;font-size:0.8em;vertical-align:top'>{script_cell}</td>" | |
| f"<td style='max-width:280px;font-size:0.9em'>{note_cell}</td>" | |
| f"<td>{_escape_html(j.created_at)}</td></tr>" | |
| ) | |
| body = "".join(rows) | |
| return ( | |
| "<table style='width:100%;border-collapse:collapse;font-size:0.95em'>" | |
| "<thead><tr><th>Job ID</th><th>Model</th><th>Family</th><th>Extras</th>" | |
| "<th>Script</th><th>Notes</th><th>Submitted (UTC)</th></tr></thead>" | |
| f"<tbody>{body}</tbody></table>" | |
| ) | |
| def next_up_html(limit: int = 5) -> str: | |
| """Ordered list of models next in the approved execution queue (Submit tab).""" | |
| try: | |
| _ensure_worker() | |
| except Exception as e: | |
| return _worker_unavailable_html(e) | |
| with _jobs_lock: | |
| queued = [j for j in _jobs.values() if j.status == JobStatus.queued] | |
| queued.sort(key=lambda j: j.created_at) | |
| queued = queued[: max(1, int(limit))] | |
| if not queued: | |
| return ( | |
| "<div class='next-up-panel' style='font-size:0.9em;opacity:0.85'>" | |
| "<p><em>No models are waiting in the evaluation queue.</em></p></div>" | |
| ) | |
| items = "".join( | |
| f"<li><code>{_escape_html(j.model_id)}</code> " | |
| f"<span style='opacity:0.75'>(job {_escape_html(j.id)})</span></li>" | |
| for j in queued | |
| ) | |
| return ( | |
| "<div class='next-up-panel' style='font-size:0.9em'>" | |
| f"<p><strong>Next models to evaluate</strong> ({len(queued)} shown):</p>" | |
| f"<ol style='margin:0.25rem 0 0 1.1rem'>{items}</ol></div>" | |
| ) | |
| _JOB_ROW_STATUS_CLASS: dict[str, str] = { | |
| "pending_moderation": "ffasr-job-status-pending", | |
| "queued": "ffasr-job-status-queued", | |
| "running": "ffasr-job-status-active", | |
| "dispatching": "ffasr-job-status-active", | |
| "remote_running": "ffasr-job-status-active", | |
| "collecting": "ffasr-job-status-active", | |
| "done": "ffasr-job-status-done", | |
| "failed": "ffasr-job-status-failed", | |
| } | |
| def job_row_elem_classes(status: str) -> str: | |
| """CSS classes for a moderator job row (status background + layout).""" | |
| key = (status or "").strip().lower() | |
| specific = _JOB_ROW_STATUS_CLASS.get(key, "ffasr-job-status-unknown") | |
| return f"ffasr-job-row {specific}" | |
| def _job_uses_custom_stack(job: Job) -> bool: | |
| return bool( | |
| (job.custom_script or "").strip() | |
| or (job.setup_script or "").strip() | |
| or (job.recipe_id or "").strip() | |
| ) | |
| def pending_jobs_for_render(limit: int = 60) -> list[dict[str, Any]]: | |
| """Structured pending jobs for Gradio @gr.render rows (Approve / Reject / Check).""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception: | |
| return [] | |
| jobs = list_pending_moderation_jobs() | |
| jobs.sort(key=lambda j: j.created_at) | |
| out: list[dict[str, Any]] = [] | |
| for j in jobs[: max(1, int(limit))]: | |
| note = (j.submission_notes or "").strip() | |
| out.append( | |
| { | |
| "id": j.id, | |
| "model_id": j.model_id, | |
| "family_id": j.family_id, | |
| "created_at": (j.created_at or "")[:19], | |
| "contact_email": (j.contact_email or "").strip(), | |
| "is_gated": j.is_gated, | |
| "req_count": len(parse_requirements_lines(j.extra_requirements or "")), | |
| "notes_preview": ( | |
| note[:120] + ("…" if len(note) > 120 else "") if note else "" | |
| ), | |
| "has_custom_script": _job_uses_custom_stack(j), | |
| "status": JobStatus.pending_moderation.value, | |
| } | |
| ) | |
| return out | |
| def recent_jobs_for_render(limit: int = 30) -> list[dict[str, Any]]: | |
| """Structured recent jobs for Gradio @gr.render rows (Retry / Remove).""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception: | |
| return [] | |
| with _jobs_lock: | |
| items = list(_jobs.values()) | |
| items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) | |
| out: list[dict[str, Any]] = [] | |
| for j in items[: max(1, int(limit))]: | |
| err = (j.error or "").strip() | |
| try: | |
| hub = _hub_job_link_html(j.hf_remote_job_id) | |
| except Exception: | |
| hub = "" | |
| out.append( | |
| { | |
| "id": j.id, | |
| "model_id": j.model_id, | |
| "status": j.status.value, | |
| "contact_email": (j.contact_email or "").strip(), | |
| "error": err[:200] + ("…" if len(err) > 200 else ""), | |
| "hub_link_html": hub, | |
| "updated_at": (j.updated_at or j.created_at)[:19], | |
| "can_retry": _job_can_retry(j.status), | |
| "can_remove": j.status != JobStatus.running, | |
| "has_custom_script": _job_uses_custom_stack(j), | |
| } | |
| ) | |
| return out | |
| def recent_jobs_html(limit: int = 25, *, with_heading: bool = True) -> str: | |
| """Recent jobs with status (for spotting stuck or failed runs).""" | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception as e: | |
| return _worker_unavailable_html(e) | |
| with _jobs_lock: | |
| items = list(_jobs.values()) | |
| items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True) | |
| items = items[:limit] | |
| if not items: | |
| empty = ( | |
| "<p><em>No job history loaded yet. Open <strong>Submit</strong> (queue status refreshes automatically) " | |
| "or click <strong>Refresh</strong> here to load persisted jobs from storage.</em></p>" | |
| ) | |
| return empty | |
| rows = [] | |
| for j in items: | |
| err = (j.error or "").strip() | |
| err_cell = _escape_html(err[:120] + ("…" if len(err) > 120 else "")) if err else "N/A" | |
| hub_link = _hub_job_link_html(j.hf_remote_job_id) | |
| links = hub_link if hub_link else "N/A" | |
| if j.status == JobStatus.done and (j.remote_artifact_path or "").strip(): | |
| art = (j.remote_artifact_path or "").strip() | |
| try: | |
| from storage import HF_BUCKET_ID | |
| art_url = f"https://huggingface.co/datasets/{HF_BUCKET_ID}/resolve/main/{art}" | |
| links = ( | |
| f"{hub_link} · <a href=\"{_escape_html(art_url)}\" target=\"_blank\" " | |
| f"rel=\"noopener noreferrer\">artifact</a>" | |
| if hub_link | |
| else f"<a href=\"{_escape_html(art_url)}\" target=\"_blank\" " | |
| f"rel=\"noopener noreferrer\">artifact</a>" | |
| ) | |
| except Exception: | |
| pass | |
| rows.append( | |
| f"<tr><td><code>{_escape_html(j.id)}</code></td>" | |
| f"<td><code>{_escape_html(j.model_id)}</code></td>" | |
| f"<td>{_escape_html(j.status.value)}</td>" | |
| f"<td style='max-width:240px;font-size:0.85em'>{err_cell}</td>" | |
| f"<td style='font-size:0.85em'>{links}</td>" | |
| f"<td style='font-size:0.85em'>{_escape_html((j.updated_at or j.created_at)[:19])}</td></tr>" | |
| ) | |
| body = "".join(rows) | |
| table = ( | |
| "<table style='width:100%;border-collapse:collapse;font-size:0.9em'>" | |
| "<thead><tr><th>Job</th><th>Model</th><th>Status</th><th>Error / detail</th>" | |
| "<th>Hub / bucket</th><th>Updated (UTC)</th></tr></thead>" | |
| f"<tbody>{body}</tbody></table>" | |
| ) | |
| if with_heading: | |
| return ( | |
| "<p style='font-size:0.9em;opacity:0.9'><strong>Recent jobs</strong> " | |
| "(newest first; persisted across restarts)</p>" + table | |
| ) | |
| return table | |
| def progress_html() -> str: | |
| """Live progress bar for the currently running job (empty when idle).""" | |
| try: | |
| _ensure_worker() | |
| except Exception as e: | |
| return _worker_unavailable_html(e) | |
| active = _list_active_eval_jobs() | |
| if len(active) > 1: | |
| max_n = remote_max_concurrent_jobs() | |
| lines = [ | |
| "<div style='padding:0.5rem 0;font-size:0.9em'>" | |
| f"<p><strong>{len(active)} evaluations in progress</strong> " | |
| f"(up to {max_n} parallel Hub Jobs):</p><ul style='margin:0.25rem 0 0.5rem 1.1rem'>" | |
| ] | |
| for j in sorted(active, key=lambda x: x.updated_at or x.created_at, reverse=True)[:10]: | |
| hub = _hub_job_link_html(j.hf_remote_job_id) | |
| hub_bit = f" · {hub}" if hub else "" | |
| lines.append( | |
| f"<li><code>{_escape_html(j.model_id)}</code> " | |
| f"(job <code>{_escape_html(j.id)}</code>, {j.status.value}){hub_bit}</li>" | |
| ) | |
| lines.append("</ul></div>") | |
| return "".join(lines) | |
| with _jobs_lock: | |
| jid = _running_job_id | |
| j = _jobs.get(jid) if jid else None | |
| if j is None and active: | |
| j = active[0] | |
| if j is None: | |
| return ( | |
| "<div style='padding:0.5rem 0;font-size:0.9em;opacity:0.85'>" | |
| "<em>No evaluation is currently running.</em></div>" | |
| ) | |
| done = int(j.progress_done or 0) | |
| total = int(j.progress_total or 0) | |
| model = j.model_id | |
| fam = j.family_id | |
| cond = j.progress_condition or "" | |
| job_label = j.id | |
| status = j.status | |
| hf_rid = (j.hf_remote_job_id or "").strip() or None | |
| art_path = (j.remote_artifact_path or "").strip() or None | |
| remote_extra = "" | |
| try: | |
| import remote_jobs as rj | |
| remote_mode = rj.remote_jobs_enabled() | |
| except Exception: | |
| remote_mode = False | |
| if remote_mode and status in ( | |
| JobStatus.running, | |
| JobStatus.dispatching, | |
| JobStatus.remote_running, | |
| JobStatus.collecting, | |
| ): | |
| hub = _hub_job_link_html(hf_rid) | |
| bucket = _bucket_artifacts_link_html() | |
| remote_extra = ( | |
| "<p style='margin:0.35rem 0 0 0;font-size:0.9em'>" | |
| f"<strong>Remote eval:</strong> status <code>{_escape_html(status.value)}</code>" | |
| ) | |
| if hub: | |
| remote_extra += f" · {hub}" | |
| elif status == JobStatus.dispatching: | |
| remote_extra += " · submitting Hub Job…" | |
| if art_path: | |
| remote_extra += ( | |
| f" · expected artifact <code>{_escape_html(art_path)}</code> " | |
| f"(uploaded only on success; browse {_bucket_artifacts_link_html()})" | |
| ) | |
| elif bucket: | |
| remote_extra += f" · bucket {_bucket_artifacts_link_html()}" | |
| remote_extra += "</p>" | |
| if total <= 0: | |
| inner = ( | |
| "<div style='width:40%;height:100%;background:#38BFA1;" | |
| "background-image:linear-gradient(90deg,#38BFA1,#3DFFA3,#38BFA1);" | |
| "background-size:200% 100%;animation:ffasr-indet 1.6s linear infinite;'></div>" | |
| ) | |
| pct_label = "preparing…" | |
| else: | |
| pct = max(0.0, min(100.0, 100.0 * done / total)) | |
| inner = ( | |
| f"<div style='height:100%;width:{pct:.1f}%;background:#38BFA1;" | |
| "transition:width 0.4s ease-out'></div>" | |
| ) | |
| cond_txt = f"; current condition: <code>{_escape_html(cond)}</code>" if cond else "" | |
| pct_label = f"{done}/{total} samples ({pct:.1f}%){cond_txt}" | |
| return ( | |
| "<style>@keyframes ffasr-indet{0%{background-position:200% 0}100%{background-position:-200% 0}}</style>" | |
| "<div style='padding:0.25rem 0'>" | |
| f"<p style='margin:0 0 0.4rem 0'><strong>Running:</strong> " | |
| f"<code>{_escape_html(model)}</code> " | |
| f"(family <code>{_escape_html(fam)}</code>, job <code>{_escape_html(job_label)}</code>): {pct_label}</p>" | |
| f"{remote_extra}" | |
| "<div style='width:100%;background:#e6e6e6;border:1px solid #d0d0d0;border-radius:6px;" | |
| "overflow:hidden;height:14px'>" | |
| f"{inner}" | |
| "</div></div>" | |
| ) | |
| def status_html(*, start_worker: bool = True) -> str: | |
| """Short HTML snippet for the Submit tab (queue + current runner + recent activity).""" | |
| if start_worker: | |
| try: | |
| _ensure_worker() | |
| except Exception as e: | |
| return ( | |
| "<div class='queue-status'><p style='color:orange'>" | |
| f"<strong>Queue unavailable:</strong> {_escape_html(str(e))}</p></div>" | |
| ) | |
| active = _list_active_eval_jobs() | |
| with _jobs_lock: | |
| waiting = _work_queue.qsize() | |
| pending_mod = sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation) | |
| # Do not call _count_remote_in_flight() while holding _jobs_lock; that helper | |
| # also acquires the lock and would deadlock Gradio callbacks/timers. | |
| in_flight = sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses()) | |
| if len(active) == 0: | |
| run_line = "<p><strong>Running:</strong> none</p>" | |
| elif len(active) == 1: | |
| running = active[0] | |
| done = int(running.progress_done or 0) | |
| total = int(running.progress_total or 0) | |
| progress_txt = ( | |
| f"; progress <strong>{done}/{total}</strong> ({(100.0 * done / total):.1f}%)" | |
| if total > 0 | |
| else "; preparing…" | |
| ) | |
| run_line = ( | |
| f"<p><strong>Running:</strong> <code>{running.model_id}</code> " | |
| f"(family: <code>{running.family_id}</code>, job <code>{running.id}</code>)" | |
| f"{progress_txt}</p>" | |
| ) | |
| else: | |
| max_n = remote_max_concurrent_jobs() | |
| models = ", ".join(f"<code>{_escape_html(j.model_id)}</code>" for j in active[:4]) | |
| if len(active) > 4: | |
| models += f" … (+{len(active) - 4} more)" | |
| run_line = ( | |
| f"<p><strong>Running:</strong> {len(active)} evaluations " | |
| f"({in_flight} on Hub Jobs, max {max_n} parallel): {models}</p>" | |
| ) | |
| wait_line = f"<p><strong>Waiting to run (approved queue):</strong> {waiting}</p>" | |
| mod_line = "" | |
| if moderation_active(): | |
| mod_line = ( | |
| f"<p><strong>Awaiting moderator approval:</strong> {pending_mod}</p>" | |
| "<p style='font-size:0.9em;opacity:0.85'>New submissions stay pending until approved on the <strong>Moderate</strong> tab.</p>" | |
| ) | |
| elif MODERATION_ENABLED and not MODERATOR_SECRET: | |
| mod_line = ( | |
| "<p style='color:orange'><strong>Moderation misconfigured:</strong> " | |
| "set secret <code>FFASR_MODERATOR_SECRET</code> in Space settings.</p>" | |
| ) | |
| max_n = remote_max_concurrent_jobs() | |
| hint = ( | |
| "<p style='font-size:0.9em;opacity:0.85'>Evaluations run as " | |
| "<strong>Hugging Face Hub Jobs</strong> " | |
| "(<code>FFASR_REMOTE_JOBS=1</code>) with results written to the bucket. " | |
| "Set Space secrets <code>token_for_ffasr_jobs</code> (Jobs billing) and " | |
| "<code>HF_TOKEN</code> (bucket). Up to " | |
| f"<strong>{max_n}</strong> models evaluate in parallel " | |
| "(<code>FFASR_REMOTE_MAX_CONCURRENT_JOBS</code>). The Space does not run ASR locally.</p>" | |
| ) | |
| recent = recent_jobs_html(20) | |
| return f"<div class='queue-status'>{run_line}{wait_line}{mod_line}{hint}{recent}</div>" | |
| def peek_job(job_id: str) -> Job | None: | |
| try: | |
| _ensure_jobs_loaded_for_display() | |
| except Exception: | |
| pass | |
| with _jobs_lock: | |
| return _jobs.get(job_id) | |