ffasr / job_queue.py
Shivam
see email for all
21eb3ea
Raw
History Blame Contribute Delete
74.3 kB
"""
Evaluation queue for the Gradio Space: FIFO jobs, bounded memory.
Evaluations dispatch to **Hugging Face Hub Jobs** when ``FFASR_REMOTE_JOBS=1`` (see ``remote_jobs``).
The Space does not import torch or run ASR locally.
Optional moderation: when FFASR_MODERATION=1 and FFASR_MODERATOR_SECRET is set, new jobs stay in
pending_moderation until the moderator approves (then they enter the work queue).
Job state is persisted to the Hub bucket (results/jobs_state.csv) so moderation and queue survive
Space restarts.
"""
from __future__ import annotations
import csv
import hmac
import io
import json
import os
import time
import queue
import threading
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any
# --- Moderation (Space secrets / env) ---
# In HF Space: Settings → Variables and secrets → Secrets
# FFASR_MODERATION=1
# FFASR_MODERATOR_SECRET=<long random string>
MODERATION_ENABLED = os.environ.get("FFASR_MODERATION", "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
MODERATOR_SECRET = os.environ.get("FFASR_MODERATOR_SECRET", "").strip()
class JobStatus(str, Enum):
pending_moderation = "pending_moderation"
queued = "queued"
running = "running"
dispatching = "dispatching"
remote_running = "remote_running"
collecting = "collecting"
done = "done"
failed = "failed"
@dataclass
class Job:
id: str
model_id: str
family_id: str
status: JobStatus
created_at: str
updated_at: str = ""
error: str | None = None
result: dict[str, Any] | None = None
submission_notes: str = ""
contact_email: str = ""
extra_requirements: str = ""
setup_script: str = ""
custom_script: str = ""
recipe_id: str = ""
is_gated: bool = False
run_custom_script: bool = False
# Progress (not persisted; only meaningful while running)
progress_done: int = 0
progress_total: int = 0
progress_condition: str = ""
hf_remote_job_id: str | None = None
remote_artifact_path: str | None = None
# Set by moderator retry: skip duplicate guard and replace CSV row on success.
replace_leaderboard: bool = False
# Subset of PACKED_FILES keys; None means evaluate all conditions.
eval_conditions: tuple[str, ...] | None = None
_jobs: dict[str, Job] = {}
_jobs_lock = threading.Lock()
_jobs_file_lock = threading.Lock()
_work_queue: queue.Queue[str] = queue.Queue()
_running_job_id: str | None = None
_worker_started = False
_worker_lock = threading.Lock()
_jobs_loaded = False
# Per Space job id: monotonic deadline for remote Hub Job polling (not persisted).
_remote_deadlines: dict[str, float] = {}
_MAX_JOBS_TRACKED = 400
_MAX_QUEUE_BACKLOG = 32
_MAX_PENDING_MODERATION = 64
_DEFAULT_REMOTE_MAX_CONCURRENT = 4
_MAX_CUSTOM_SCRIPT_BYTES = 32 * 1024
_MAX_REQUIREMENT_LINE_LEN = 200
_MAX_REQUIREMENT_LINES = 50
_JOBS_CSV_FIELDS = [
"job_id",
"model_id",
"family_id",
"status",
"created_at",
"updated_at",
"error",
"submission_notes",
"contact_email",
"extra_requirements",
"setup_script_b64",
"custom_script_b64",
"recipe_id",
"is_gated",
"run_custom_script",
"hf_remote_job_id",
"remote_artifact_path",
"eval_conditions",
]
def parse_requirements_lines(text: str) -> list[str]:
"""One package spec per line (requirements.txt style); ignores blanks and # comments."""
out: list[str] = []
for raw in (text or "").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if len(line) > _MAX_REQUIREMENT_LINE_LEN:
line = line[:_MAX_REQUIREMENT_LINE_LEN]
out.append(line)
if len(out) >= _MAX_REQUIREMENT_LINES:
break
return out
def sanitize_custom_script(text: str) -> str:
"""Trim and cap custom script body for storage."""
from backends.custom_eval import normalize_custom_script_compat
s = normalize_custom_script_compat((text or "").strip())
if not s:
return ""
enc = s.encode("utf-8")
if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES:
enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES]
s = enc.decode("utf-8", errors="ignore")
return s
def sanitize_contact_email(text: str) -> str:
"""Trim and validate a required submitter email address."""
s = (text or "").strip()[:254]
if not s:
raise ValueError("Contact email is required.")
if s.count("@") != 1:
raise ValueError("Invalid email address.")
local, domain = s.split("@", 1)
if not local or not domain or "." not in domain:
raise ValueError("Invalid email address.")
return s
def sanitize_setup_script(text: str) -> str:
"""Trim and cap one-time setup script (shell or Python) for storage."""
s = (text or "").strip()
if not s:
return ""
enc = s.encode("utf-8")
if len(enc) > _MAX_CUSTOM_SCRIPT_BYTES:
enc = enc[:_MAX_CUSTOM_SCRIPT_BYTES]
s = enc.decode("utf-8", errors="ignore")
return s
def custom_script_deprecated_api_warning(text: str) -> str | None:
"""Return a short warning if the script uses kwargs removed from current HF Hub."""
s = (text or "").strip()
if not s:
return None
if "use_auth_token" in s or "authentication_token" in s:
return (
"Your script uses <code>use_auth_token</code> (deprecated). "
"It will be rewritten to <code>token=</code> automatically; prefer "
"<code>token=os.environ['HF_TOKEN']</code> for gated models."
)
return None
def custom_script_argparse_warning(text: str) -> str | None:
"""Warn when a script references ``args.`` without defining argparse (common clone mistake)."""
s = (text or "").strip()
if not s or "args." not in s:
return None
try:
import ast
tree = ast.parse(s)
except SyntaxError:
return None
defines_args = False
calls_parse_args = False
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for t in node.targets:
if isinstance(t, ast.Name) and t.id == "args":
defines_args = True
if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name):
if node.target.id == "args":
defines_args = True
if isinstance(node, ast.Call):
func = node.func
if isinstance(func, ast.Attribute) and func.attr == "parse_args":
calls_parse_args = True
if isinstance(func, ast.Name) and func.id == "parse_args":
calls_parse_args = True
if defines_args or calls_parse_args:
return None
return (
"Your script uses <code>args.something</code> but FFASR does not run "
"<code>argparse</code> — replace with literal values (e.g. "
"<code>routing_enabled=True</code>) or call <code>parse_args()</code> "
"before using <code>args</code>. For Mega-ASR, use the recipe evaluator "
"in <code>recipes/mega_asr/evaluate.py</code>."
)
def custom_script_defines_evaluate(text: str) -> bool:
"""Best-effort check that the script defines a top-level ``evaluate`` function."""
s = (text or "").strip()
if not s:
return True
try:
import ast
tree = ast.parse(s)
except SyntaxError:
return False
return any(
isinstance(node, ast.FunctionDef) and node.name == "evaluate"
for node in tree.body
)
def _bool_to_csv(flag: bool) -> str:
return "1" if flag else "0"
def _bool_from_csv(raw: str) -> bool:
return (raw or "").strip().lower() in ("1", "true", "yes", "on")
def _encode_script_b64(script: str) -> str:
if not script:
return ""
import base64
return base64.b64encode(script.encode("utf-8")).decode("ascii")
def _decode_script_b64(raw: str) -> str:
raw = (raw or "").strip()
if not raw:
return ""
import base64
try:
return base64.b64decode(raw.encode("ascii")).decode("utf-8")
except Exception:
return ""
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _touch(job: Job) -> None:
job.updated_at = _now_iso()
def moderation_active() -> bool:
"""True when moderation is enabled and a secret is configured."""
return MODERATION_ENABLED and bool(MODERATOR_SECRET)
def moderation_misconfigured() -> bool:
"""Enabled in UI but secret missing; submissions should be rejected."""
return MODERATION_ENABLED and not MODERATOR_SECRET
def _check_moderator_secret(provided: str) -> bool:
if not MODERATOR_SECRET or provided is None:
return False
a = provided.strip().encode("utf-8")
b = MODERATOR_SECRET.encode("utf-8")
if len(a) != len(b):
return False
return hmac.compare_digest(a, b)
def _prune_jobs() -> None:
if len(_jobs) <= _MAX_JOBS_TRACKED:
return
terminal = [
jid
for jid, j in _jobs.items()
if j.status in (JobStatus.done, JobStatus.failed)
]
terminal.sort(key=lambda jid: _jobs[jid].created_at)
for jid in terminal[: max(0, len(_jobs) - _MAX_JOBS_TRACKED // 2)]:
_jobs.pop(jid, None)
def _persist_jobs() -> None:
"""Write all tracked jobs to Hub bucket CSV (best-effort)."""
from storage import (
HF_BUCKET_ID,
HF_TOKEN,
JOBS_STATE_PATH,
STORAGE_BACKEND,
batch_bucket_files,
upload_to_bucket,
)
if STORAGE_BACKEND != "hf_bucket" or batch_bucket_files is None:
return
with _jobs_lock:
rows: list[dict[str, str]] = []
for jid, j in _jobs.items():
rows.append(
{
"job_id": j.id,
"model_id": j.model_id,
"family_id": j.family_id,
"status": j.status.value,
"created_at": j.created_at,
"updated_at": j.updated_at or j.created_at,
"error": (j.error or "").replace("\n", " ")[:2000],
"submission_notes": (j.submission_notes or "").replace("\n", " ")[:4000],
"contact_email": (j.contact_email or "").replace("\n", " ")[:254],
"extra_requirements": (j.extra_requirements or "").replace("\r\n", "\n")[:8000],
"setup_script_b64": _encode_script_b64(j.setup_script or ""),
"custom_script_b64": _encode_script_b64(j.custom_script or ""),
"recipe_id": (j.recipe_id or "").strip()[:64],
"is_gated": _bool_to_csv(j.is_gated),
"run_custom_script": _bool_to_csv(j.run_custom_script),
"hf_remote_job_id": (j.hf_remote_job_id or "").strip(),
"remote_artifact_path": (j.remote_artifact_path or "").strip(),
"eval_conditions": ",".join(j.eval_conditions)
if j.eval_conditions
else "",
}
)
rows.sort(key=lambda r: r["created_at"])
buf = io.StringIO()
w = csv.DictWriter(buf, fieldnames=_JOBS_CSV_FIELDS, extrasaction="ignore")
w.writeheader()
w.writerows(rows)
content = buf.getvalue().encode("utf-8")
try:
with _jobs_file_lock:
upload_to_bucket(
HF_BUCKET_ID,
add=[(content, JOBS_STATE_PATH)],
token=HF_TOKEN,
)
except Exception:
pass
def _load_persisted_jobs_once() -> None:
"""Restore jobs from bucket and re-queue work that was queued or interrupted."""
global _jobs_loaded
if _jobs_loaded:
return
_jobs_loaded = True
from storage import STORAGE_BACKEND, download_bucket_file, JOBS_STATE_PATH
if STORAGE_BACKEND != "hf_bucket":
return
try:
import os as _os
local_path = download_bucket_file(JOBS_STATE_PATH)
with open(local_path, "r", encoding="utf-8") as f:
disk_rows = list(csv.DictReader(f))
_os.unlink(local_path)
except Exception:
return
if not disk_rows:
return
# Last row wins if duplicate job_id
by_id: dict[str, dict] = {}
for row in disk_rows:
jid = (row.get("job_id") or "").strip()
if jid:
by_id[jid] = row
to_queue: list[tuple[str, str]] = []
with _jobs_lock:
for jid, row in by_id.items():
st_raw = (row.get("status") or "").strip()
try:
st = JobStatus(st_raw)
except ValueError:
continue
err = (row.get("error") or "").strip() or None
notes = (row.get("submission_notes") or "").strip()
email = (row.get("contact_email") or "").strip()[:254]
extra_req = (row.get("extra_requirements") or "").strip()
setup = _decode_script_b64(row.get("setup_script_b64") or "")
script = _decode_script_b64(row.get("custom_script_b64") or "")
recipe_id = (row.get("recipe_id") or "").strip()
is_gated = _bool_from_csv(row.get("is_gated") or "")
run_custom = _bool_from_csv(row.get("run_custom_script") or "")
created = (row.get("created_at") or _now_iso()).strip()
updated = (row.get("updated_at") or created).strip()
mid = (row.get("model_id") or "").strip()
fid = (row.get("family_id") or "").strip()
hf_rid = (row.get("hf_remote_job_id") or "").strip() or None
art_path = (row.get("remote_artifact_path") or "").strip() or None
eval_conds = _parse_eval_conditions_csv(row.get("eval_conditions") or "")
if not mid:
continue
if st == JobStatus.running:
st = JobStatus.queued
extra = "Re-queued after Space restart (was running)."
err = f"{err}; {extra}" if err else extra
if st in (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting):
st = JobStatus.queued
extra = "Re-queued after Space restart (remote job resume)."
err = f"{err}; {extra}" if err else extra
job = Job(
id=jid,
model_id=mid,
family_id=fid,
status=st,
created_at=created,
updated_at=updated,
error=err,
submission_notes=notes,
contact_email=email,
extra_requirements=extra_req,
setup_script=setup,
custom_script=script,
recipe_id=recipe_id,
is_gated=is_gated,
run_custom_script=run_custom,
hf_remote_job_id=hf_rid,
remote_artifact_path=art_path,
eval_conditions=eval_conds,
)
_jobs[jid] = job
if st == JobStatus.queued:
to_queue.append((created, jid))
to_queue.sort(key=lambda x: x[0])
for _, jid in to_queue:
_work_queue.put(jid)
def _progress_update(job_id: str, done: int, total: int, condition: str) -> None:
"""Cheap in-memory progress update; throttling is done by the UI timer."""
with _jobs_lock:
j = _jobs.get(job_id)
if j is None:
return
j.progress_done = int(done)
j.progress_total = int(total)
j.progress_condition = condition or ""
def _leaderboard_sort_rows_inplace(rows: list[dict]) -> None:
"""Sort leaderboard CSV rows by Pareto layer (asc), then FFAS score (desc)."""
from analytics import sort_leaderboard_rows_inplace
sort_leaderboard_rows_inplace(rows)
def _remote_poll_interval_s() -> float:
raw = os.environ.get("FFASR_REMOTE_JOB_POLL_S", "10").strip()
try:
return float(raw) if raw else 10.0
except ValueError:
return 10.0
_DEFAULT_REMOTE_JOB_MAX_WAIT_S = 86400.0 # 24 hours; previously 8h / 4h.
# Extra slack past the Space-side deadline before we declare a timeout.
# Absorbs the brief gap between a worker script exiting cleanly (artifact
# uploaded) and the Hub flipping the job stage to COMPLETED.
_REMOTE_DEADLINE_GRACE_S = 90.0
def _remote_max_wait_s() -> float:
raw = os.environ.get(
"FFASR_REMOTE_JOB_MAX_WAIT_S", str(_DEFAULT_REMOTE_JOB_MAX_WAIT_S)
).strip()
try:
return float(raw) if raw else _DEFAULT_REMOTE_JOB_MAX_WAIT_S
except ValueError:
return _DEFAULT_REMOTE_JOB_MAX_WAIT_S
def remote_max_concurrent_jobs() -> int:
"""Max Hub Jobs in flight at once when ``FFASR_REMOTE_JOBS=1`` (default 4)."""
raw = os.environ.get(
"FFASR_REMOTE_MAX_CONCURRENT_JOBS", str(_DEFAULT_REMOTE_MAX_CONCURRENT)
).strip()
try:
n = int(raw) if raw else _DEFAULT_REMOTE_MAX_CONCURRENT
except ValueError:
n = _DEFAULT_REMOTE_MAX_CONCURRENT
return max(1, min(n, 32))
def _remote_in_flight_statuses() -> tuple[JobStatus, ...]:
return (JobStatus.dispatching, JobStatus.remote_running, JobStatus.collecting)
def _count_remote_in_flight() -> int:
with _jobs_lock:
return sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses())
def _list_remote_in_flight_ids() -> list[str]:
with _jobs_lock:
return [
j.id
for j in _jobs.values()
if j.status in _remote_in_flight_statuses()
]
def _list_active_eval_jobs() -> list[Job]:
"""Jobs currently using a remote or in-process eval slot."""
with _jobs_lock:
return [
j
for j in _jobs.values()
if j.status
in (
JobStatus.running,
*_remote_in_flight_statuses(),
)
]
def _fail_job(job_id: str, error: str, *, finish_queue: bool = True) -> None:
with _jobs_lock:
j = _jobs.get(job_id)
if j:
j.status = JobStatus.failed
j.error = (error or "Unknown error")[:8000]
_touch(j)
_prune_jobs()
_remote_deadlines.pop(job_id, None)
_persist_jobs()
if finish_queue:
try:
_work_queue.task_done()
except ValueError:
pass
def _succeed_job(job_id: str, result: dict) -> None:
with _jobs_lock:
j = _jobs.get(job_id)
if j:
j.status = JobStatus.done
j.result = result
j.error = None
j.progress_condition = ""
_touch(j)
_prune_jobs()
_remote_deadlines.pop(job_id, None)
_persist_jobs()
try:
_work_queue.task_done()
except ValueError:
pass
def _merge_eval_result_to_leaderboard(
result: dict,
submitted_at_iso: str,
submission_notes: str,
contact_email: str = "",
*,
replace_existing: bool = False,
merge_partial: bool = False,
) -> None:
from init import (
invalidate_results_cache,
leaderboard_row_from_eval_result,
load_raw_results,
merge_leaderboard_row_from_eval_result,
normalize_legacy_csv_row,
save_raw_results,
)
if merge_partial:
merge_leaderboard_row_from_eval_result(
result,
submitted_at_iso,
submission_notes=submission_notes,
contact_email=contact_email,
)
return
model_id = str(result.get("model_id", "")).strip()
rows = load_raw_results()
if replace_existing and model_id:
existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id]
for i in sorted(existing, reverse=True):
rows.pop(i)
new_row = leaderboard_row_from_eval_result(
result,
submitted_at_iso,
submission_notes=submission_notes,
contact_email=contact_email,
)
normalize_legacy_csv_row(new_row)
rows.append(new_row)
_leaderboard_sort_rows_inplace(rows)
save_raw_results(rows)
invalidate_results_cache()
def _parse_eval_conditions_csv(raw: str) -> tuple[str, ...] | None:
from benchmark.dataset import resolve_condition_keys
parts = [p.strip() for p in (raw or "").split(",") if p.strip()]
if not parts:
return None
keys = resolve_condition_keys(parts)
from benchmark.dataset import PACKED_FILES
if set(keys) == set(PACKED_FILES.keys()):
return None
return keys
def _encode_eval_conditions_csv(keys: tuple[str, ...] | None) -> str:
if not keys:
return ""
return ",".join(keys)
def normalize_moderator_eval_conditions(
selected: list[str] | None,
) -> tuple[str, ...] | None:
"""``None`` = all packed conditions; otherwise a validated non-empty subset."""
from benchmark.dataset import PACKED_FILES, resolve_condition_keys
if not selected:
return None
keys = resolve_condition_keys(selected)
if not keys:
raise ValueError("Select at least one dataset to evaluate.")
if set(keys) == set(PACKED_FILES.keys()):
return None
return keys
def eval_condition_checkbox_defaults() -> tuple[list[str], list[str]]:
"""(labels, values) for Gradio CheckboxGroup defaults (all selected)."""
from benchmark.dataset import CONDITION_UI_CHOICES
labels = [lbl for lbl, _ in CONDITION_UI_CHOICES]
values = [key for _, key in CONDITION_UI_CHOICES]
return labels, values
def _job_is_partial_eval(job: Job | None) -> bool:
return bool(job and job.eval_conditions)
def _load_artifact_json_from_bucket(artifact_path: str) -> dict:
"""Download and parse a remote eval JSON artifact from the Hub bucket."""
from storage import STORAGE_BACKEND, download_bucket_file
if STORAGE_BACKEND != "hf_bucket":
raise RuntimeError("Artifact import requires STORAGE_BACKEND='hf_bucket'.")
local_path = download_bucket_file(artifact_path)
try:
with open(local_path, "r", encoding="utf-8") as f:
return json.load(f)
finally:
try:
os.unlink(local_path)
except Exception:
pass
def _submission_fields_for_artifact_import(
data: dict, artifact_path: str, notes_override: str, email_override: str
) -> tuple[str, str]:
notes = (notes_override or "").strip()[:4000]
email = (email_override or "").strip()[:254]
if notes and email:
return notes, email
space_job_id = str(data.get("job_id") or "").strip()
with _jobs_lock:
for j in _jobs.values():
if space_job_id and j.id == space_job_id:
if not notes:
notes = (j.submission_notes or "").strip()[:4000]
if not email:
email = (j.contact_email or "").strip()[:254]
return notes, email
if (j.remote_artifact_path or "").strip() == artifact_path:
if not notes:
notes = (j.submission_notes or "").strip()[:4000]
if not email:
email = (j.contact_email or "").strip()[:254]
return notes, email
return notes, email
def _submission_notes_for_artifact_import(
data: dict, artifact_path: str, override: str
) -> str:
notes, _ = _submission_fields_for_artifact_import(data, artifact_path, override, "")
return notes
def _mark_job_done_for_artifact(
data: dict, artifact_path: str, result: dict
) -> str | None:
"""If a tracked queue job matches, mark it done. Returns matched job id or None."""
space_job_id = str(data.get("job_id") or "").strip()
matched: Job | None = None
with _jobs_lock:
for j in _jobs.values():
if space_job_id and j.id == space_job_id:
matched = j
break
if (j.remote_artifact_path or "").strip() == artifact_path:
matched = j
break
if matched is not None:
matched.status = JobStatus.done
matched.result = result
matched.error = None
if not (matched.remote_artifact_path or "").strip():
matched.remote_artifact_path = artifact_path
_touch(matched)
if matched is not None:
_persist_jobs()
return matched.id
return None
def import_artifact_to_leaderboard(
artifact_ref: str,
secret: str,
*,
replace_existing: bool = False,
submission_notes: str = "",
) -> tuple[bool, str]:
"""
Moderator-only: load a bucket JSON artifact and merge its result into leaderboard.csv.
``artifact_ref`` may be a file name (``abc123.json``), job id (``abc123``), or full bucket path.
"""
ok, msg = _moderator_secret_ok(secret)
if not ok:
return False, msg
from evaluation.remote_artifact import extract_result_or_raise, normalize_artifact_bucket_path
from init import (
invalidate_results_cache,
leaderboard_row_from_eval_result,
load_raw_results,
normalize_legacy_csv_row,
save_raw_results,
)
import analytics
try:
artifact_path = normalize_artifact_bucket_path(artifact_ref)
except ValueError as e:
return False, str(e)
try:
data = _load_artifact_json_from_bucket(artifact_path)
except Exception as e:
return False, f"Could not load artifact <code>{_escape_html(artifact_path)}</code>: {e}"
try:
result = extract_result_or_raise(data)
except Exception as e:
return False, f"Artifact is not a successful evaluation: {e}"
model_id = str(result.get("model_id", "")).strip()
if not model_id:
return False, "Artifact result is missing model_id."
notes, email = _submission_fields_for_artifact_import(
data, artifact_path, submission_notes, ""
)
rows = load_raw_results()
existing = [i for i, r in enumerate(rows) if (r.get("model_id") or "").strip() == model_id]
if existing and not replace_existing:
row = dict(rows[existing[0]])
normalize_legacy_csv_row(row)
avg = analytics._avg_wer_for_row(row)
avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A"
return False, (
f"Model <code>{_escape_html(model_id)}</code> is already on the leaderboard "
f"(Avg WER {avg_txt}). Enable "
f"<strong>Replace existing row</strong> to overwrite."
)
if existing and replace_existing:
for i in sorted(existing, reverse=True):
rows.pop(i)
submitted_at = _now_iso()
new_row = leaderboard_row_from_eval_result(
result, submitted_at, submission_notes=notes, contact_email=email
)
normalize_legacy_csv_row(new_row)
rows.append(new_row)
_leaderboard_sort_rows_inplace(rows)
save_raw_results(rows)
invalidate_results_cache()
avg = analytics._avg_wer_for_row(new_row)
avg_txt = f"{avg * 100:.2f}%" if avg != float("inf") else "N/A"
action = "Replaced" if existing else "Added"
matched_job = _mark_job_done_for_artifact(data, artifact_path, result)
job_bit = f" Matched queue job <code>{_escape_html(matched_job)}</code> marked done." if matched_job else ""
return True, (
f"{action} <strong>{_escape_html(model_id)}</strong> from "
f"<code>{_escape_html(artifact_path)}</code>. "
f"Average WER: <strong>{avg_txt}</strong>.{job_bit}"
)
def _remote_collect_result(job_id: str, hf_id: str, jobs_token: str) -> dict:
from evaluation.remote_artifact import extract_result_or_raise
from storage import download_bucket_file
with _jobs_lock:
j = _jobs.get(job_id)
if not j:
raise RuntimeError("internal: job not found")
artifact_path = str(j.remote_artifact_path or "").strip()
if not artifact_path:
raise RuntimeError("internal: missing remote artifact path")
with _jobs_lock:
j2 = _jobs.get(job_id)
if j2:
j2.status = JobStatus.collecting
_touch(j2)
_persist_jobs()
# HF dataset bucket uploads are sometimes briefly not readable right after
# the worker uploaded the JSON (especially with Xet). Retry a few times
# with light backoff so a single 404 doesn't permanently fail an otherwise
# successful run.
last_err: Exception | None = None
local_path: str | None = None
for attempt in range(5):
try:
local_path = download_bucket_file(artifact_path)
break
except Exception as e:
last_err = e
msg = str(e).lower()
transient = (
"404" in msg
or "not found" in msg
or "entrynotfound" in msg
or "no such" in msg
or "timed out" in msg
or "timeout" in msg
)
if not transient or attempt == 4:
raise
time.sleep(2.0 * (2 ** attempt))
if local_path is None:
# Shouldn't reach here, but keep mypy / runtime safe.
raise last_err or RuntimeError("artifact download failed")
try:
with open(local_path, "r", encoding="utf-8") as f:
data = json.load(f)
finally:
try:
os.unlink(local_path)
except Exception:
pass
return extract_result_or_raise(data)
def _remote_dispatch_job(job_id: str, mid: str, fid: str, jobs_token: str) -> str:
"""Submit Hub Job if needed; return ``hf_remote_job_id``."""
import remote_jobs
from evaluation.remote_artifact import default_remote_artifact_path
with _jobs_lock:
j = _jobs.get(job_id)
if not j:
raise RuntimeError("internal: job not found")
if not (j.remote_artifact_path or "").strip():
j.remote_artifact_path = default_remote_artifact_path(job_id)
_touch(j)
artifact_path = str(j.remote_artifact_path).strip()
existing_hf = (j.hf_remote_job_id or "").strip()
if existing_hf:
with _jobs_lock:
j2 = _jobs.get(job_id)
if j2 and j2.status != JobStatus.remote_running:
j2.status = JobStatus.remote_running
_touch(j2)
_persist_jobs()
_remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s()
return existing_hf
with _jobs_lock:
j3 = _jobs.get(job_id)
if j3:
j3.status = JobStatus.dispatching
_touch(j3)
_persist_jobs()
with _jobs_lock:
j_dispatch = _jobs.get(job_id)
extra_req = (j_dispatch.extra_requirements or "") if j_dispatch else ""
setup_script = (j_dispatch.setup_script or "") if j_dispatch else ""
custom_script = (j_dispatch.custom_script or "") if j_dispatch else ""
recipe_id = (j_dispatch.recipe_id or "") if j_dispatch else ""
run_custom = bool(j_dispatch and j_dispatch.run_custom_script)
eval_conds = j_dispatch.eval_conditions if j_dispatch else None
info = remote_jobs.submit_eval_job(
model_id=mid,
family_id=fid,
space_job_id=job_id,
artifact_path=artifact_path,
token=jobs_token,
extra_requirements=extra_req,
setup_script=setup_script,
custom_script=custom_script,
recipe_id=recipe_id,
run_custom_script=run_custom,
eval_conditions=eval_conds,
)
hf_id = info.id
with _jobs_lock:
j4 = _jobs.get(job_id)
if j4:
j4.hf_remote_job_id = hf_id
j4.status = JobStatus.remote_running
_touch(j4)
_persist_jobs()
_remote_deadlines[job_id] = time.monotonic() + _remote_max_wait_s()
return hf_id
def _remote_start_queued_job(job_id: str, jobs_token: str) -> None:
"""Move a queued job onto Hub Jobs (duplicate check + dispatch)."""
from init import load_raw_results
with _jobs_lock:
j = _jobs.get(job_id)
if j is None or j.status != JobStatus.queued:
try:
_work_queue.task_done()
except ValueError:
pass
return
mid = j.model_id
fid = j.family_id
notes = j.submission_notes or ""
email = j.contact_email or ""
j.status = JobStatus.running
j.progress_done = 0
j.progress_total = 0
j.progress_condition = ""
_touch(j)
_persist_jobs()
rows = load_raw_results()
with _jobs_lock:
j_chk = _jobs.get(job_id)
replace_lb = bool(j_chk and j_chk.replace_leaderboard)
partial = _job_is_partial_eval(j_chk)
if not replace_lb and not partial and any(r["model_id"] == mid for r in rows):
_fail_job(job_id, "Model already on leaderboard (skipped duplicate race).")
return
global _running_job_id
_running_job_id = job_id
try:
_remote_dispatch_job(job_id, mid, fid, jobs_token)
except Exception as e:
_fail_job(job_id, str(e))
finally:
with _jobs_lock:
if _running_job_id == job_id:
_running_job_id = None
def _remote_tick_job(job_id: str, jobs_token: str) -> None:
"""Poll one remote job once; on completion merge CSV or mark failed."""
import remote_jobs
from huggingface_hub._jobs_api import JobStage
with _jobs_lock:
j = _jobs.get(job_id)
if j is None or j.status not in _remote_in_flight_statuses():
return
hf_id = (j.hf_remote_job_id or "").strip()
if not hf_id:
return
mid = j.model_id
fid = j.family_id
notes = j.submission_notes or ""
email = j.contact_email or ""
# Grace window absorbs the brief gap between the worker script returning
# rc=0 (and uploading the JSON artifact) and the Hub flipping the job's
# stage to COMPLETED. Without this, a tick that arrives ε seconds past
# the deadline would fail an already-successful run with a misleading
# "exceeded max wait" message.
deadline = _remote_deadlines.get(job_id)
deadline_hit = (
deadline is not None
and time.monotonic() > deadline + _REMOTE_DEADLINE_GRACE_S
)
try:
info = remote_jobs.inspect_job_once(hf_id, token=jobs_token)
except Exception:
# If the Hub API is unavailable AND we're already past the wait
# budget, give up cleanly. Otherwise wait for the next tick.
if deadline_hit:
_fail_job(
job_id,
f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s); "
"Hub inspect_job unavailable.",
)
return
stage = info.status.stage if info.status else None
# 1) Always honor a terminal Hub state, even if the Space-side deadline
# already elapsed -- a completed job's artifact should be collected,
# not thrown away.
if stage == JobStage.COMPLETED:
try:
result = _remote_collect_result(job_id, hf_id, jobs_token)
with _jobs_lock:
j_done = _jobs.get(job_id)
replace_lb = bool(j_done and j_done.replace_leaderboard)
partial = _job_is_partial_eval(j_done)
_merge_eval_result_to_leaderboard(
result,
_now_iso(),
notes,
email,
replace_existing=replace_lb and not partial,
merge_partial=partial,
)
with _jobs_lock:
j_clr = _jobs.get(job_id)
if j_clr:
j_clr.replace_leaderboard = False
_succeed_job(job_id, result)
except Exception as e:
_fail_job(job_id, str(e))
return
if stage in (JobStage.ERROR, JobStage.CANCELED, JobStage.DELETED):
_fail_job(job_id, remote_jobs.describe_job_failure(info, token=jobs_token))
return
# 2) Job is genuinely still running. Only now enforce the Space-side
# deadline (the Hub has its own FFASR_REMOTE_JOB_TIMEOUT as a backstop).
if deadline_hit:
try:
remote_jobs.cancel_remote_job(hf_id, token=jobs_token)
except Exception:
pass
_fail_job(job_id, f"Remote Hub Job exceeded max wait ({_remote_max_wait_s():.0f}s).")
return
def _remote_parallel_worker_loop() -> None:
"""Dispatch up to N Hub Jobs and poll them concurrently (N = remote_max_concurrent_jobs)."""
import remote_jobs
from storage import HF_TOKEN, require_token_for_ffasr_jobs
jobs_token = require_token_for_ffasr_jobs()
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN is required when FFASR_REMOTE_JOBS=1.")
poll_s = _remote_poll_interval_s()
max_slots = remote_max_concurrent_jobs()
while True:
while _count_remote_in_flight() < max_slots:
try:
job_id = _work_queue.get_nowait()
except queue.Empty:
break
_remote_start_queued_job(job_id, jobs_token)
active = _list_remote_in_flight_ids()
if active:
for jid in active:
_remote_tick_job(jid, jobs_token)
time.sleep(poll_s)
continue
if _work_queue.qsize() == 0:
try:
job_id = _work_queue.get(timeout=poll_s)
except queue.Empty:
continue
_remote_start_queued_job(job_id, jobs_token)
else:
time.sleep(min(poll_s, 2.0))
def ensure_worker_started() -> None:
"""Start background Hub Jobs worker after restoring persisted queue (once per process)."""
global _worker_started
_load_persisted_jobs_once()
with _worker_lock:
if _worker_started:
return
import remote_jobs as _rj
if not _rj.remote_jobs_enabled():
raise RuntimeError(
"FFASR_REMOTE_JOBS=1 is required. This Space dispatches evaluations to "
"Hugging Face Hub Jobs and does not run ASR models locally."
)
t = threading.Thread(target=_remote_parallel_worker_loop, name="ffasr-remote-worker", daemon=True)
t.start()
_worker_started = True
def _ensure_worker() -> None:
"""Alias used by queue UI helpers."""
ensure_worker_started()
def _worker_unavailable_html(exc: Exception) -> str:
return (
"<p style='color:orange'><strong>Queue unavailable:</strong> "
f"{_escape_html(str(exc))}</p>"
)
def _model_in_flight(model_id: str) -> bool:
with _jobs_lock:
for j in _jobs.values():
if j.model_id != model_id:
continue
if j.status in (
JobStatus.pending_moderation,
JobStatus.queued,
JobStatus.running,
JobStatus.dispatching,
JobStatus.remote_running,
JobStatus.collecting,
):
return True
return False
def _pending_moderation_count() -> int:
with _jobs_lock:
return sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation)
def enqueue(
model_id: str,
family_id: str,
submission_notes: str = "",
*,
contact_email: str = "",
extra_requirements: str = "",
setup_script: str = "",
custom_script: str = "",
recipe_id: str = "",
is_gated: bool = False,
) -> tuple[str, int, str | None, bool]:
"""
Enqueue an evaluation job.
Returns (job_id, position_or_count, error_message, awaiting_moderation).
When awaiting_moderation is True, the job is not in the execution queue yet.
"""
import remote_jobs
if not remote_jobs.remote_jobs_enabled():
return "", 0, "remote_jobs_required", False
_ensure_worker()
from init import load_raw_results
if moderation_misconfigured():
return "", 0, "moderation_misconfigured", False
if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG and not moderation_active():
return "", 0, "queue_full", False
if _model_in_flight(model_id):
return "", 0, f"Model '{model_id}' is already submitted, queued, or running.", False
existing = load_raw_results()
for row in existing:
if row["model_id"] == model_id:
return "", 0, "already_in_csv", False
if moderation_active() and _pending_moderation_count() >= _MAX_PENDING_MODERATION:
return "", 0, "pending_moderation_full", False
job_id = str(uuid.uuid4())[:8]
created = _now_iso()
notes_clean = (submission_notes or "").strip()[:4000]
try:
email_clean = sanitize_contact_email(contact_email)
from recipes.registry import apply_recipe_to_submission
req_lines = parse_requirements_lines(extra_requirements)
extra_req_clean = "\n".join(req_lines)
setup_clean = sanitize_setup_script(setup_script)
script_clean = sanitize_custom_script(custom_script)
extra_req_clean, setup_clean, script_clean, resolved_recipe = apply_recipe_to_submission(
model_id,
recipe_id or None,
extra_req_clean,
setup_clean,
script_clean,
)
recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64]
except Exception as e:
return "", 0, f"Invalid submission fields: {e}", False
awaiting = moderation_active()
status = JobStatus.pending_moderation if awaiting else JobStatus.queued
uses_custom_stack = bool(
script_clean.strip() or setup_clean.strip() or recipe_clean
)
job = Job(
id=job_id,
model_id=model_id,
family_id=family_id,
status=status,
created_at=created,
updated_at=created,
submission_notes=notes_clean,
contact_email=email_clean,
extra_requirements=extra_req_clean,
setup_script=setup_clean,
custom_script=script_clean,
recipe_id=recipe_clean,
is_gated=bool(is_gated),
run_custom_script=uses_custom_stack,
)
with _jobs_lock:
_jobs[job_id] = job
_persist_jobs()
if awaiting:
pos = _pending_moderation_count()
return job_id, pos, None, True
if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG:
with _jobs_lock:
_jobs.pop(job_id, None)
return "", 0, "queue_full", False
_work_queue.put(job_id)
position = _work_queue.qsize()
return job_id, position, None, False
def approve_job(
job_id: str,
secret: str,
*,
run_custom_script: bool = False,
eval_conditions: list[str] | None = None,
) -> tuple[bool, str]:
"""Move a pending job into the execution queue (moderator only)."""
if not moderation_active():
return False, "Moderation is not active on this Space."
if not _check_moderator_secret(secret):
return False, "Invalid moderator secret."
job_id = job_id.strip()
if not job_id:
return False, "Enter a job ID."
try:
conds = normalize_moderator_eval_conditions(eval_conditions)
except ValueError as e:
return False, str(e)
with _jobs_lock:
job = _jobs.get(job_id)
if not job or job.status != JobStatus.pending_moderation:
return False, "Job not found or not awaiting approval."
if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG:
return False, "Execution queue is full; try again in a moment."
job.status = JobStatus.queued
# Use custom stack when evaluate/setup/recipe is provided.
job.run_custom_script = bool(
(job.custom_script or "").strip()
or (job.setup_script or "").strip()
or (job.recipe_id or "").strip()
)
job.eval_conditions = conds
job.replace_leaderboard = False
_touch(job)
_work_queue.put(job_id)
_persist_jobs()
_ensure_worker()
max_n = remote_max_concurrent_jobs()
custom_note = ""
cond_note = ""
with _jobs_lock:
j = _jobs.get(job_id)
if j and j.run_custom_script and (j.custom_script or "").strip():
custom_note = " Custom script will run on the Hub Job."
if j and j.eval_conditions:
cond_note = f" Evaluating: {', '.join(j.eval_conditions)} (others unchanged on success)."
return True, (
f"Approved job {job_id}. Up to {max_n} Hub Jobs may run in parallel; "
f"this job starts when a slot is free.{custom_note}{cond_note}"
)
def reject_job(job_id: str, secret: str) -> tuple[bool, str]:
"""Reject a pending job (moderator only)."""
if not moderation_active():
return False, "Moderation is not active on this Space."
if not _check_moderator_secret(secret):
return False, "Invalid moderator secret."
job_id = job_id.strip()
if not job_id:
return False, "Enter a job ID."
with _jobs_lock:
job = _jobs.get(job_id)
if not job or job.status != JobStatus.pending_moderation:
return False, "Job not found or not awaiting approval."
job.status = JobStatus.failed
job.error = "Rejected by moderator."
_touch(job)
_prune_jobs()
_persist_jobs()
return True, f"Rejected job {job_id}."
def verify_moderator_secret(secret: str) -> tuple[bool, str]:
"""Check moderator secret for UI unlock and privileged actions."""
if not MODERATOR_SECRET:
return False, "Moderator secret is not configured (set FFASR_MODERATOR_SECRET)."
if not _check_moderator_secret(secret):
return False, "Invalid moderator secret."
return True, ""
def _moderator_secret_ok(secret: str) -> tuple[bool, str]:
"""Shared gate for destructive / queue actions (needs FFASR_MODERATOR_SECRET)."""
return verify_moderator_secret(secret)
def moderation_locked_placeholder_html() -> str:
"""Neutral HTML shown before the moderator secret unlocks the panel."""
return "<p><em>Moderator tools are locked. Enter the secret above and click Unlock.</em></p>"
def _job_can_retry(status: JobStatus) -> bool:
"""Moderator may re-run jobs that are not actively executing on Hub."""
if status in (
JobStatus.running,
JobStatus.dispatching,
JobStatus.remote_running,
JobStatus.collecting,
JobStatus.pending_moderation,
):
return False
return status in (JobStatus.failed, JobStatus.done, JobStatus.queued)
def retry_failed_job(
job_id: str,
secret: str,
*,
eval_conditions: list[str] | None = None,
) -> tuple[bool, str]:
"""Re-queue a job for another evaluation run (moderator only).
Allowed for failed, done, and queued jobs. Successful re-runs replace the
existing leaderboard row for that model when one is present.
"""
ok, msg = _moderator_secret_ok(secret)
if not ok:
return False, msg
job_id = job_id.strip()
if not job_id:
return False, "Select a job."
try:
conds = normalize_moderator_eval_conditions(eval_conditions)
except ValueError as e:
return False, str(e)
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return False, "Job not found."
if not _job_can_retry(job.status):
return (
False,
"Cannot retry while the job is running or awaiting moderation. "
"Wait for it to finish, or approve/reject pending jobs first.",
)
if _work_queue.qsize() >= _MAX_QUEUE_BACKLOG:
return False, "Execution queue is full; try again later."
was_queued = job.status == JobStatus.queued
job.status = JobStatus.queued
job.error = None
job.result = None
job.hf_remote_job_id = None
job.remote_artifact_path = None
job.progress_done = 0
job.progress_total = 0
job.progress_condition = ""
job.eval_conditions = conds
job.replace_leaderboard = conds is None
_touch(job)
if not was_queued:
_work_queue.put(job_id)
_persist_jobs()
_ensure_worker()
cond_note = ""
with _jobs_lock:
j = _jobs.get(job_id)
if j and j.eval_conditions:
cond_note = (
f" Only {', '.join(j.eval_conditions)} will run; "
"existing leaderboard WER columns for other datasets are kept."
)
elif j and j.replace_leaderboard:
cond_note = " Full re-eval; the leaderboard row will be replaced on success."
return (
True,
f"Re-queued job {job_id}; it will run after jobs ahead of it.{cond_note}"
)
def retry_all_eligible_jobs(secret: str) -> tuple[bool, str]:
"""Re-queue every retry-eligible job against the full benchmark (all datasets).
"Retry-eligible" matches :func:`_job_can_retry` — i.e. ``failed``, ``done``, or
``queued``. Jobs that are running, dispatching, awaiting moderation, etc. are left
untouched. ``eval_conditions`` is forced to ``None`` (full benchmark) so that on
success the existing leaderboard row is replaced — matching the semantics
described in the moderator panel for a full re-run.
Stops early (with a partial-success message) if the work queue fills up, so we
never silently drop retries on the floor.
"""
ok, msg = _moderator_secret_ok(secret)
if not ok:
return False, msg
requeued: list[str] = []
skipped_running: list[str] = []
skipped_pending: list[str] = []
capacity_hit = False
job_ids_to_enqueue: list[str] = []
with _jobs_lock:
snapshot = list(_jobs.values())
snapshot.sort(key=lambda j: j.updated_at or j.created_at)
for job in snapshot:
if job.status == JobStatus.pending_moderation:
skipped_pending.append(job.id)
continue
if not _job_can_retry(job.status):
skipped_running.append(job.id)
continue
# Approximate current backlog while we still hold the lock; this keeps
# us within ``_MAX_QUEUE_BACKLOG`` even when several retries land at once.
if _work_queue.qsize() + len(job_ids_to_enqueue) >= _MAX_QUEUE_BACKLOG:
capacity_hit = True
break
was_queued = job.status == JobStatus.queued
job.status = JobStatus.queued
job.error = None
job.result = None
job.hf_remote_job_id = None
job.remote_artifact_path = None
job.progress_done = 0
job.progress_total = 0
job.progress_condition = ""
job.eval_conditions = None
job.replace_leaderboard = True
_touch(job)
requeued.append(job.id)
if not was_queued:
job_ids_to_enqueue.append(job.id)
for jid in job_ids_to_enqueue:
_work_queue.put(jid)
if requeued:
_persist_jobs()
_ensure_worker()
parts: list[str] = []
if requeued:
parts.append(
f"Re-queued {len(requeued)} job(s) against the full benchmark; "
"on success each leaderboard row will be replaced."
)
else:
parts.append("No retry-eligible jobs were found.")
if skipped_pending:
parts.append(
f"Skipped {len(skipped_pending)} job(s) awaiting moderation "
"(approve or reject those manually)."
)
if skipped_running:
parts.append(
f"Skipped {len(skipped_running)} job(s) currently running or dispatching."
)
if capacity_hit:
parts.append(
f"Stopped early: the execution queue hit its backlog cap "
f"({_MAX_QUEUE_BACKLOG}); re-click to enqueue the rest once it drains."
)
return bool(requeued), " ".join(parts)
def remove_job_entry(job_id: str, secret: str) -> tuple[bool, str]:
"""Remove a job from tracking (not allowed while running). Moderator only."""
ok, msg = _moderator_secret_ok(secret)
if not ok:
return False, msg
job_id = job_id.strip()
if not job_id:
return False, "Select a job."
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return False, "Job not found."
if job.status == JobStatus.running:
return False, "Cannot remove a running job; wait for it to finish."
_jobs.pop(job_id, None)
_prune_jobs()
_persist_jobs()
return True, f"Removed job {job_id} from the list."
def list_pending_moderation_jobs() -> list[Job]:
with _jobs_lock:
return [j for j in _jobs.values() if j.status == JobStatus.pending_moderation]
_JOB_NON_EDITABLE_STATUSES = (
JobStatus.running,
JobStatus.dispatching,
JobStatus.remote_running,
JobStatus.collecting,
)
def update_job_script_and_requirements(
job_id: str,
secret: str,
*,
extra_requirements: str,
setup_script: str = "",
custom_script: str,
recipe_id: str = "",
) -> tuple[bool, str]:
"""Update stored script/requirements for a job (moderator only)."""
ok, msg = _moderator_secret_ok(secret)
if not ok:
return False, msg
job_id = job_id.strip()
if not job_id:
return False, "No job selected."
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return False, "Job not found."
try:
from recipes.registry import apply_recipe_to_submission
setup = sanitize_setup_script(setup_script)
script = sanitize_custom_script(custom_script)
reqs = extra_requirements or ""
reqs, setup, script, resolved_recipe = apply_recipe_to_submission(
job.model_id,
recipe_id or None,
reqs,
setup,
script,
)
recipe_clean = (resolved_recipe or recipe_id or "").strip().lower()[:64]
except Exception as e:
return False, f"Invalid submission fields: {e}"
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return False, "Job not found."
if job.status in _JOB_NON_EDITABLE_STATUSES:
return (
False,
"Cannot edit submission while the job is running or dispatching.",
)
job.extra_requirements = reqs
job.setup_script = setup
job.custom_script = script
job.recipe_id = recipe_clean
job.run_custom_script = bool(
script.strip() or setup.strip() or recipe_clean
)
_touch(job)
_persist_jobs()
return True, f"Saved submission details for job {job_id}."
def _ensure_jobs_loaded_for_display() -> None:
"""Load persisted job state without starting the remote worker (moderator read-only UI)."""
_load_persisted_jobs_once()
def pending_job_dropdown_choices() -> list[tuple[str, str]]:
"""Gradio (label, value) pairs for pending job IDs."""
try:
_ensure_jobs_loaded_for_display()
except Exception:
return [("(Queue unavailable)", "")]
jobs = list_pending_moderation_jobs()
jobs.sort(key=lambda j: j.created_at)
if not jobs:
return [("(No pending jobs)", "")]
out: list[tuple[str, str]] = []
for j in jobs:
label = f"{j.id}: {j.model_id} ({j.family_id})"
out.append((label, j.id))
return out
def moderation_action_job_choices() -> list[tuple[str, str]]:
"""Recent jobs for retry/remove dropdown (label shows id, model, status)."""
try:
_ensure_jobs_loaded_for_display()
except Exception:
return [("(Queue unavailable)", "")]
with _jobs_lock:
items = list(_jobs.values())
items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True)
items = items[:60]
if not items:
return [("(No jobs)", "")]
out: list[tuple[str, str]] = []
for j in items:
short_model = j.model_id.split("/")[-1][:32]
label = f"{j.id}: {short_model} ({j.status.value})"
out.append((label, j.id))
return out
def _escape_html(s: str) -> str:
return (
s.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
def _hub_job_link_html(hf_job_id: str | None) -> str:
if not (hf_job_id or "").strip():
return ""
try:
import remote_jobs as rj
url = rj.hub_job_page_url(hf_job_id.strip())
except Exception:
return ""
return (
f"<a href=\"{_escape_html(url)}\" target=\"_blank\" rel=\"noopener noreferrer\">"
f"Open Hub Job logs</a>"
)
def _bucket_artifacts_link_html() -> str:
try:
import remote_jobs as rj
url = rj.bucket_artifact_browser_url()
except Exception:
return ""
return (
f"<a href=\"{_escape_html(url)}\" target=\"_blank\" rel=\"noopener noreferrer\">"
f"remote_artifacts folder</a>"
)
def moderation_list_html() -> str:
"""HTML table of jobs awaiting approval."""
try:
_ensure_jobs_loaded_for_display()
except Exception as e:
return _worker_unavailable_html(e)
jobs = list_pending_moderation_jobs()
jobs.sort(key=lambda j: j.created_at)
if not jobs:
return "<p><em>No jobs awaiting approval.</em></p>"
rows = []
for j in jobs:
note = (j.submission_notes or "").strip()
note_cell = _escape_html(note[:200] + ("…" if len(note) > 200 else "")) if note else "N/A"
gated = " <span style='color:#b45309'>[gated]</span>" if j.is_gated else ""
req_n = len(parse_requirements_lines(j.extra_requirements or ""))
req_bit = f"{req_n} extra req(s)" if req_n else ""
extras = req_bit if req_bit else "—"
script_raw = (j.custom_script or "").strip()
if script_raw:
max_script = 12000
truncated = len(script_raw) > max_script
script_body = script_raw[:max_script]
if truncated:
script_body += "\n… (truncated)"
script_cell = (
"<details><summary>View script</summary>"
f"<pre style='max-height:240px;overflow:auto;font-size:0.75em;"
f"white-space:pre-wrap;margin:0.25rem 0'>{_escape_html(script_body)}</pre>"
"</details>"
)
else:
script_cell = "—"
rows.append(
f"<tr><td><code>{_escape_html(j.id)}</code></td>"
f"<td><code>{_escape_html(j.model_id)}</code>{gated}</td>"
f"<td><code>{_escape_html(j.family_id)}</code></td>"
f"<td style='max-width:140px;font-size:0.85em'>{_escape_html(extras)}</td>"
f"<td style='max-width:320px;font-size:0.8em;vertical-align:top'>{script_cell}</td>"
f"<td style='max-width:280px;font-size:0.9em'>{note_cell}</td>"
f"<td>{_escape_html(j.created_at)}</td></tr>"
)
body = "".join(rows)
return (
"<table style='width:100%;border-collapse:collapse;font-size:0.95em'>"
"<thead><tr><th>Job ID</th><th>Model</th><th>Family</th><th>Extras</th>"
"<th>Script</th><th>Notes</th><th>Submitted (UTC)</th></tr></thead>"
f"<tbody>{body}</tbody></table>"
)
def next_up_html(limit: int = 5) -> str:
"""Ordered list of models next in the approved execution queue (Submit tab)."""
try:
_ensure_worker()
except Exception as e:
return _worker_unavailable_html(e)
with _jobs_lock:
queued = [j for j in _jobs.values() if j.status == JobStatus.queued]
queued.sort(key=lambda j: j.created_at)
queued = queued[: max(1, int(limit))]
if not queued:
return (
"<div class='next-up-panel' style='font-size:0.9em;opacity:0.85'>"
"<p><em>No models are waiting in the evaluation queue.</em></p></div>"
)
items = "".join(
f"<li><code>{_escape_html(j.model_id)}</code> "
f"<span style='opacity:0.75'>(job {_escape_html(j.id)})</span></li>"
for j in queued
)
return (
"<div class='next-up-panel' style='font-size:0.9em'>"
f"<p><strong>Next models to evaluate</strong> ({len(queued)} shown):</p>"
f"<ol style='margin:0.25rem 0 0 1.1rem'>{items}</ol></div>"
)
_JOB_ROW_STATUS_CLASS: dict[str, str] = {
"pending_moderation": "ffasr-job-status-pending",
"queued": "ffasr-job-status-queued",
"running": "ffasr-job-status-active",
"dispatching": "ffasr-job-status-active",
"remote_running": "ffasr-job-status-active",
"collecting": "ffasr-job-status-active",
"done": "ffasr-job-status-done",
"failed": "ffasr-job-status-failed",
}
def job_row_elem_classes(status: str) -> str:
"""CSS classes for a moderator job row (status background + layout)."""
key = (status or "").strip().lower()
specific = _JOB_ROW_STATUS_CLASS.get(key, "ffasr-job-status-unknown")
return f"ffasr-job-row {specific}"
def _job_uses_custom_stack(job: Job) -> bool:
return bool(
(job.custom_script or "").strip()
or (job.setup_script or "").strip()
or (job.recipe_id or "").strip()
)
def pending_jobs_for_render(limit: int = 60) -> list[dict[str, Any]]:
"""Structured pending jobs for Gradio @gr.render rows (Approve / Reject / Check)."""
try:
_ensure_jobs_loaded_for_display()
except Exception:
return []
jobs = list_pending_moderation_jobs()
jobs.sort(key=lambda j: j.created_at)
out: list[dict[str, Any]] = []
for j in jobs[: max(1, int(limit))]:
note = (j.submission_notes or "").strip()
out.append(
{
"id": j.id,
"model_id": j.model_id,
"family_id": j.family_id,
"created_at": (j.created_at or "")[:19],
"contact_email": (j.contact_email or "").strip(),
"is_gated": j.is_gated,
"req_count": len(parse_requirements_lines(j.extra_requirements or "")),
"notes_preview": (
note[:120] + ("…" if len(note) > 120 else "") if note else ""
),
"has_custom_script": _job_uses_custom_stack(j),
"status": JobStatus.pending_moderation.value,
}
)
return out
def recent_jobs_for_render(limit: int = 30) -> list[dict[str, Any]]:
"""Structured recent jobs for Gradio @gr.render rows (Retry / Remove)."""
try:
_ensure_jobs_loaded_for_display()
except Exception:
return []
with _jobs_lock:
items = list(_jobs.values())
items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True)
out: list[dict[str, Any]] = []
for j in items[: max(1, int(limit))]:
err = (j.error or "").strip()
try:
hub = _hub_job_link_html(j.hf_remote_job_id)
except Exception:
hub = ""
out.append(
{
"id": j.id,
"model_id": j.model_id,
"status": j.status.value,
"contact_email": (j.contact_email or "").strip(),
"error": err[:200] + ("…" if len(err) > 200 else ""),
"hub_link_html": hub,
"updated_at": (j.updated_at or j.created_at)[:19],
"can_retry": _job_can_retry(j.status),
"can_remove": j.status != JobStatus.running,
"has_custom_script": _job_uses_custom_stack(j),
}
)
return out
def recent_jobs_html(limit: int = 25, *, with_heading: bool = True) -> str:
"""Recent jobs with status (for spotting stuck or failed runs)."""
try:
_ensure_jobs_loaded_for_display()
except Exception as e:
return _worker_unavailable_html(e)
with _jobs_lock:
items = list(_jobs.values())
items.sort(key=lambda j: j.updated_at or j.created_at, reverse=True)
items = items[:limit]
if not items:
empty = (
"<p><em>No job history loaded yet. Open <strong>Submit</strong> (queue status refreshes automatically) "
"or click <strong>Refresh</strong> here to load persisted jobs from storage.</em></p>"
)
return empty
rows = []
for j in items:
err = (j.error or "").strip()
err_cell = _escape_html(err[:120] + ("…" if len(err) > 120 else "")) if err else "N/A"
hub_link = _hub_job_link_html(j.hf_remote_job_id)
links = hub_link if hub_link else "N/A"
if j.status == JobStatus.done and (j.remote_artifact_path or "").strip():
art = (j.remote_artifact_path or "").strip()
try:
from storage import HF_BUCKET_ID
art_url = f"https://huggingface.co/datasets/{HF_BUCKET_ID}/resolve/main/{art}"
links = (
f"{hub_link} · <a href=\"{_escape_html(art_url)}\" target=\"_blank\" "
f"rel=\"noopener noreferrer\">artifact</a>"
if hub_link
else f"<a href=\"{_escape_html(art_url)}\" target=\"_blank\" "
f"rel=\"noopener noreferrer\">artifact</a>"
)
except Exception:
pass
rows.append(
f"<tr><td><code>{_escape_html(j.id)}</code></td>"
f"<td><code>{_escape_html(j.model_id)}</code></td>"
f"<td>{_escape_html(j.status.value)}</td>"
f"<td style='max-width:240px;font-size:0.85em'>{err_cell}</td>"
f"<td style='font-size:0.85em'>{links}</td>"
f"<td style='font-size:0.85em'>{_escape_html((j.updated_at or j.created_at)[:19])}</td></tr>"
)
body = "".join(rows)
table = (
"<table style='width:100%;border-collapse:collapse;font-size:0.9em'>"
"<thead><tr><th>Job</th><th>Model</th><th>Status</th><th>Error / detail</th>"
"<th>Hub / bucket</th><th>Updated (UTC)</th></tr></thead>"
f"<tbody>{body}</tbody></table>"
)
if with_heading:
return (
"<p style='font-size:0.9em;opacity:0.9'><strong>Recent jobs</strong> "
"(newest first; persisted across restarts)</p>" + table
)
return table
def progress_html() -> str:
"""Live progress bar for the currently running job (empty when idle)."""
try:
_ensure_worker()
except Exception as e:
return _worker_unavailable_html(e)
active = _list_active_eval_jobs()
if len(active) > 1:
max_n = remote_max_concurrent_jobs()
lines = [
"<div style='padding:0.5rem 0;font-size:0.9em'>"
f"<p><strong>{len(active)} evaluations in progress</strong> "
f"(up to {max_n} parallel Hub Jobs):</p><ul style='margin:0.25rem 0 0.5rem 1.1rem'>"
]
for j in sorted(active, key=lambda x: x.updated_at or x.created_at, reverse=True)[:10]:
hub = _hub_job_link_html(j.hf_remote_job_id)
hub_bit = f" · {hub}" if hub else ""
lines.append(
f"<li><code>{_escape_html(j.model_id)}</code> "
f"(job <code>{_escape_html(j.id)}</code>, {j.status.value}){hub_bit}</li>"
)
lines.append("</ul></div>")
return "".join(lines)
with _jobs_lock:
jid = _running_job_id
j = _jobs.get(jid) if jid else None
if j is None and active:
j = active[0]
if j is None:
return (
"<div style='padding:0.5rem 0;font-size:0.9em;opacity:0.85'>"
"<em>No evaluation is currently running.</em></div>"
)
done = int(j.progress_done or 0)
total = int(j.progress_total or 0)
model = j.model_id
fam = j.family_id
cond = j.progress_condition or ""
job_label = j.id
status = j.status
hf_rid = (j.hf_remote_job_id or "").strip() or None
art_path = (j.remote_artifact_path or "").strip() or None
remote_extra = ""
try:
import remote_jobs as rj
remote_mode = rj.remote_jobs_enabled()
except Exception:
remote_mode = False
if remote_mode and status in (
JobStatus.running,
JobStatus.dispatching,
JobStatus.remote_running,
JobStatus.collecting,
):
hub = _hub_job_link_html(hf_rid)
bucket = _bucket_artifacts_link_html()
remote_extra = (
"<p style='margin:0.35rem 0 0 0;font-size:0.9em'>"
f"<strong>Remote eval:</strong> status <code>{_escape_html(status.value)}</code>"
)
if hub:
remote_extra += f" · {hub}"
elif status == JobStatus.dispatching:
remote_extra += " · submitting Hub Job…"
if art_path:
remote_extra += (
f" · expected artifact <code>{_escape_html(art_path)}</code> "
f"(uploaded only on success; browse {_bucket_artifacts_link_html()})"
)
elif bucket:
remote_extra += f" · bucket {_bucket_artifacts_link_html()}"
remote_extra += "</p>"
if total <= 0:
inner = (
"<div style='width:40%;height:100%;background:#38BFA1;"
"background-image:linear-gradient(90deg,#38BFA1,#3DFFA3,#38BFA1);"
"background-size:200% 100%;animation:ffasr-indet 1.6s linear infinite;'></div>"
)
pct_label = "preparing…"
else:
pct = max(0.0, min(100.0, 100.0 * done / total))
inner = (
f"<div style='height:100%;width:{pct:.1f}%;background:#38BFA1;"
"transition:width 0.4s ease-out'></div>"
)
cond_txt = f"; current condition: <code>{_escape_html(cond)}</code>" if cond else ""
pct_label = f"{done}/{total} samples ({pct:.1f}%){cond_txt}"
return (
"<style>@keyframes ffasr-indet{0%{background-position:200% 0}100%{background-position:-200% 0}}</style>"
"<div style='padding:0.25rem 0'>"
f"<p style='margin:0 0 0.4rem 0'><strong>Running:</strong> "
f"<code>{_escape_html(model)}</code> "
f"(family <code>{_escape_html(fam)}</code>, job <code>{_escape_html(job_label)}</code>): {pct_label}</p>"
f"{remote_extra}"
"<div style='width:100%;background:#e6e6e6;border:1px solid #d0d0d0;border-radius:6px;"
"overflow:hidden;height:14px'>"
f"{inner}"
"</div></div>"
)
def status_html(*, start_worker: bool = True) -> str:
"""Short HTML snippet for the Submit tab (queue + current runner + recent activity)."""
if start_worker:
try:
_ensure_worker()
except Exception as e:
return (
"<div class='queue-status'><p style='color:orange'>"
f"<strong>Queue unavailable:</strong> {_escape_html(str(e))}</p></div>"
)
active = _list_active_eval_jobs()
with _jobs_lock:
waiting = _work_queue.qsize()
pending_mod = sum(1 for j in _jobs.values() if j.status == JobStatus.pending_moderation)
# Do not call _count_remote_in_flight() while holding _jobs_lock; that helper
# also acquires the lock and would deadlock Gradio callbacks/timers.
in_flight = sum(1 for j in _jobs.values() if j.status in _remote_in_flight_statuses())
if len(active) == 0:
run_line = "<p><strong>Running:</strong> none</p>"
elif len(active) == 1:
running = active[0]
done = int(running.progress_done or 0)
total = int(running.progress_total or 0)
progress_txt = (
f"; progress <strong>{done}/{total}</strong> ({(100.0 * done / total):.1f}%)"
if total > 0
else "; preparing…"
)
run_line = (
f"<p><strong>Running:</strong> <code>{running.model_id}</code> "
f"(family: <code>{running.family_id}</code>, job <code>{running.id}</code>)"
f"{progress_txt}</p>"
)
else:
max_n = remote_max_concurrent_jobs()
models = ", ".join(f"<code>{_escape_html(j.model_id)}</code>" for j in active[:4])
if len(active) > 4:
models += f" … (+{len(active) - 4} more)"
run_line = (
f"<p><strong>Running:</strong> {len(active)} evaluations "
f"({in_flight} on Hub Jobs, max {max_n} parallel): {models}</p>"
)
wait_line = f"<p><strong>Waiting to run (approved queue):</strong> {waiting}</p>"
mod_line = ""
if moderation_active():
mod_line = (
f"<p><strong>Awaiting moderator approval:</strong> {pending_mod}</p>"
"<p style='font-size:0.9em;opacity:0.85'>New submissions stay pending until approved on the <strong>Moderate</strong> tab.</p>"
)
elif MODERATION_ENABLED and not MODERATOR_SECRET:
mod_line = (
"<p style='color:orange'><strong>Moderation misconfigured:</strong> "
"set secret <code>FFASR_MODERATOR_SECRET</code> in Space settings.</p>"
)
max_n = remote_max_concurrent_jobs()
hint = (
"<p style='font-size:0.9em;opacity:0.85'>Evaluations run as "
"<strong>Hugging Face Hub Jobs</strong> "
"(<code>FFASR_REMOTE_JOBS=1</code>) with results written to the bucket. "
"Set Space secrets <code>token_for_ffasr_jobs</code> (Jobs billing) and "
"<code>HF_TOKEN</code> (bucket). Up to "
f"<strong>{max_n}</strong> models evaluate in parallel "
"(<code>FFASR_REMOTE_MAX_CONCURRENT_JOBS</code>). The Space does not run ASR locally.</p>"
)
recent = recent_jobs_html(20)
return f"<div class='queue-status'>{run_line}{wait_line}{mod_line}{hint}{recent}</div>"
def peek_job(job_id: str) -> Job | None:
try:
_ensure_jobs_loaded_for_display()
except Exception:
pass
with _jobs_lock:
return _jobs.get(job_id)