#!/usr/bin/env python3 """Fan out one HF Job per model in the live Convex roster. Runs as a weekly scheduled HF Job (Sunday 02:00 UTC). Replaces the GitHub Actions matrix in .github/workflows/benchmark-evals.yml. Flow: 1. Resolve roster: `run_eval.py --from-convex --max-cost --list-models` writes the cost-filtered roster to /tmp/roster.json. Each entry has `name` (RAW Convex `models.name`, alias-stripped at the discovery layer) and `model_id` (canonical provider ID). 2. Spawn one HF Job per roster entry. The dispatcher computes `slug(name__model_id)` for each entry and passes it as the child's `--models `. The composite slug is the canonical eval identifier — same string is the HF Job `model` label and the HF dataset upload folder. Two providers exposing the same `model_id` produce different slugs because their `name` fields differ, so they get distinct children, distinct upload folders, and don't collide on storage. 3. Fire-and-forget. No wait loop. Convex `benchmarkAlerts` cron handles pass/fail aggregation by polling HF Jobs API the next morning. Identifier conventions: * `model_id` canonical provider ID (e.g. "openai/gpt-5-nano"). Used in the actual provider API call inside the child and stored in JSONL row metadata for DB attribution. * `name` RAW `models.name` from Convex (NOT alias). Combined with `model_id` to form the routing identifier. * `slug(name__model_id)` The canonical eval-pipeline identifier. One per roster entry. * Alias Convex `models.alias` is a UI-only field for blind testing. The eval pipeline NEVER reads it — the `models:listForEvals` query returns raw `name`. * Label keys `model` = slug(name__model_id) (the routing slug). `model_display` = slug(name) (Slack-friendly). Required environment: CONVEX_URL Roster discovery OPENROUTER_API_KEY Default API key for OpenRouter models FUNCTIONARY_API_KEY Per-model override (Functionary endpoints) MEETKAI_GATEWAY_API_KEY Per-model override (MeetKai gateway) HF_TOKEN Result upload to meetkai/modelchorus-eval-results EVAL_IMAGE HF image URI (default: hf.co/spaces/meetkai/modelchorus-evals) EVAL_NAMESPACE HF namespace for child jobs (default: meetkai) EVAL_FLAVOR HF hardware flavor (default: cpu-upgrade) EVAL_TIMEOUT Per-model timeout (default: 6h) EVAL_MAX_COST Cost filter in USD (default: 20.00) Each child job inherits the same image + secrets and is labeled `purpose=modelchorus-eval` so the alerts cron can find the run later. """ from __future__ import annotations import json import os import re import subprocess import sys from datetime import datetime, timedelta, timezone from huggingface_hub import run_job ROSTER_PATH = "/tmp/roster.json" # Mirror of `_redactErrorBody` in convex/benchmarkAlerts.ts. Applied to # exception text before printing into HF Job logs (visible to anyone with # namespace read scope) so a misbehaving proxy that echoes the request # Authorization header in an error body can't leak the runner token. _REDACT_PATTERNS = [ (re.compile(r"Bearer\s+[A-Za-z0-9_\-.]+"), "Bearer [REDACTED]"), (re.compile(r"hf_[A-Za-z0-9]{20,}"), "hf_[REDACTED]"), ] _PRINT_BODY_CAP = 500 def _redact_for_print(text: str) -> str: for pat, repl in _REDACT_PATTERNS: text = pat.sub(repl, text) if len(text) > _PRINT_BODY_CAP: text = text[:_PRINT_BODY_CAP] + "…[truncated]" return text # HF Jobs has TWO regexes for label-shaped fields and we need to conform # to the strictest. Documented label values match `^[a-zA-Z0-9._-]*$` # (dots allowed), but the auto-derived "tags" the backend builds from # labels match `^[a-zA-Z0-9_\-=]+$` (no dots). A model_id like # "minimax/minimax-m2.5" passes the label check but trips the tag check # at POST /api/jobs validation, silently dropping that child. Excluding # dots from the slug keeps both paths happy. Must stay in sync with # `_hf_path_segment` in run_eval.py. _LABEL_INVALID = re.compile(r"[^a-zA-Z0-9_-]+") def _slugify_label(value: str) -> str: """Replace any chars HF labels reject (or any other strict path/label boundary) with `_`. Empty input returns `unknown` to keep the label populated for downstream filters.""" s = _LABEL_INVALID.sub("_", (value or "").strip()).strip("_-") return s or "unknown" def _model_slug(name: str, model_id: str, provider_name: str = "") -> str: """Canonical eval-pipeline identifier: slug(`name__model_id__provider_name`) when provider is known, slug(`name__model_id`) otherwise. MUST match `_model_slug` in run_eval.py so the dispatcher's `--models` arg is matched by the child's filter. Provider is included so two Convex rows that share BOTH name and model_id (one OpenRouter, one direct, both displayed as the same UI label) produce distinct slugs and distinct children rather than tripping the duplicate-slug guard. """ parts = [name, model_id] provider = (provider_name or "").strip() if provider: parts.append(provider) return _slugify_label("__".join(parts)) def _env(name: str, default: str | None = None, *, required: bool = False) -> str: val = os.environ.get(name, default) if required and not val: sys.exit(f"[dispatch] Missing required env var: {name}") return val or "" def _most_recent_sunday_utc(now: datetime) -> str: """Return the most-recent Sunday at-or-before `now` (UTC) as YYYY-MM-DD. Snapping to Sunday — rather than using today's literal date — keeps the `run_date` label aligned with `lastSundayUtc` in convex/benchmarkAlerts.ts even when HF's scheduler fires the dispatcher late (e.g., Sun 02:00 UTC job slipping into Mon 00:0x UTC). Without snapping, late dispatch would label children with Monday's date, the alerts cron would look up Sunday's, and a successful run would be reported as "no jobs found". CONTRACT: this function MUST agree with `lastSundayUtc` in convex/benchmarkAlerts.ts for all moments where dispatch and the alert cron pair up. The two are intentionally different on Sundays themselves (this returns today, the JS function returns last Sunday) but converge by Mon 02:00 UTC — which is when the alert cron looks up the dispatcher's labels. If you change either, mentally trace: * Sun 02:00 UTC dispatch → run_date = today's Sunday * Sun 23:59 UTC dispatch → run_date = same Sunday (still in flight) * Mon 00:01 UTC dispatch → run_date = previous Sunday (yesterday) * Mon 02:00 UTC alert → looks up previous Sunday All four MUST agree on the same calendar Sunday. The JS test file `tests/convex/benchmarkAlerts.test.ts` pins these for the JS side. """ # weekday(): Mon=0..Sun=6. days_back = 0 on Sun, 1 on Mon, ..., 6 on Sat. days_back = (now.weekday() + 1) % 7 sunday = now - timedelta(days=days_back) return sunday.strftime("%Y-%m-%d") def resolve_roster(max_cost: str) -> list[dict]: """Run run_eval.py --list-models to get the cost-filtered active roster. Inherits the dispatcher's env so --from-convex sees CONVEX_URL. """ cmd = [ "python", "run_eval.py", "--from-convex", "--max-cost", max_cost, "--list-models", ROSTER_PATH, ] print(f"[dispatch] Resolving roster: {' '.join(cmd)}", flush=True) subprocess.run(cmd, check=True, cwd=os.path.dirname(os.path.abspath(__file__))) with open(ROSTER_PATH) as f: roster = json.load(f) if not isinstance(roster, list): sys.exit(f"[dispatch] Roster JSON must be a list, got {type(roster).__name__}") return roster def spawn_one( name: str, model_id: str, provider_name: str, image: str, namespace: str, flavor: str, timeout: str, run_date: str, roster_size: int, secrets: dict[str, str], ) -> str: """Fire one child Job. Returns the job id. Children deliberately run WITHOUT --max-cost. The dispatcher already applied the cost filter at roster resolution time; reapplying it inside the child re-evaluates against (potentially newer) pricing — and if the pricing for the model has crossed the threshold between dispatch and child execution, run_eval.py's `--max-cost empty-roster early-exit` branch would silently exit 0 with no results uploaded, which the alerts cron would then report as a healthy job. Worst case without the filter: we evaluate a model whose cost has spiked, paying a few cents more than the dispatcher expected. That's strictly preferable to silent data loss. `name` is the RAW Convex `models.name` (alias-stripped at discovery). Combined with `model_id` to form the routing slug — same string is the `--models` arg, the HF `model` label, and the upload folder. """ routing_slug = _model_slug(name, model_id, provider_name) job = run_job( image=image, command=[ "python", "run_eval.py", "--from-convex", "--models", routing_slug, ], namespace=namespace, flavor=flavor, timeout=timeout, secrets=secrets, env={ # Non-secret config — visible in the HF Jobs UI for debugging. # MODEL_ID, MODEL_NAME, PROVIDER_NAME are the raw values (not # slugged) for human inspection in the HF dashboard. # MODEL_SLUG is the routing identifier the child filter # matches against. "MODEL_ID": model_id, "MODEL_NAME": name, "PROVIDER_NAME": provider_name, "MODEL_SLUG": routing_slug, "DISPATCH_RUN_DATE": run_date, }, # Labels are how `benchmarkAlerts` Convex cron locates this run later. # * `model` slug(name__model_id) — canonical routing identifier # * `model_display` slug(name) — human-readable, Slack-friendly # * `roster_size` detects partial spawn failures # (jobs.length < roster_size ⇒ some never spawned) labels={ "purpose": "modelchorus-eval", "model": routing_slug, "model_display": _slugify_label(name), "run_date": run_date, "roster_size": str(roster_size), }, ) return job.id def main() -> int: image = _env("EVAL_IMAGE", "hf.co/spaces/meetkai/modelchorus-evals") namespace = _env("EVAL_NAMESPACE", "meetkai") flavor = _env("EVAL_FLAVOR", "cpu-upgrade") timeout = _env("EVAL_TIMEOUT", "6h") max_cost = _env("EVAL_MAX_COST", "20.00") # Pass through to children. Required for run_eval.py to function. # EVAL_DISCOVERY_TOKEN is the shared secret for `models:listForEvals` # (the alias-stripped roster query); each child calls --from-convex # which re-reads this env var on startup. secrets = { k: _env(k, required=True) for k in ( "CONVEX_URL", "OPENROUTER_API_KEY", "HF_TOKEN", "EVAL_DISCOVERY_TOKEN", ) } # Optional per-model overrides — only forward if set so children don't # see empty-string keys for models that legitimately don't need them. for k in ("FUNCTIONARY_API_KEY", "MEETKAI_GATEWAY_API_KEY", "JUDGE_API_KEY", "JUDGE_BASE_URL", "JUDGE_MODEL"): v = os.environ.get(k, "").strip() if v: secrets[k] = v # Snap run_date to the most-recent Sunday in UTC so the label aligns # with the alerts cron's `lastSundayUtc` even when HF fires the # dispatcher late (e.g., Sun 02:00 slipping into Mon 00:0x). See # _most_recent_sunday_utc for full reasoning. run_date = _most_recent_sunday_utc(datetime.now(timezone.utc)) roster = resolve_roster(max_cost) if not roster: # benchmarkAlerts cron will see total=0 and post the empty-roster alert. print("[dispatch] Roster is empty; nothing to spawn.", flush=True) return 0 # Pre-filter so roster_size reflects actual fan-out attempts. Children # carry this number as a label; if alerts sees fewer jobs than this, # it's a real partial-spawn failure (not a skipped entry). Both # `name` and `model_id` must be present and non-whitespace — they're # combined into the routing slug, and either being empty would # produce a non-unique or malformed identifier. spawnable: list[dict] = [] skipped: list[str] = [] for e in roster: name = (e.get("name") or "").strip() mid = (e.get("model_id") or "").strip() if name and mid: spawnable.append(e) else: # Log a hint of which entry was bad — name OR model_id might be # set even if both aren't, so include whichever is non-empty. hint = name or mid or "" skipped.append(hint) if skipped: print( f"[dispatch] Skipping {len(skipped)} roster entries missing name/model_id: {skipped}", flush=True, ) if not spawnable: print("[dispatch] No spawnable roster entries.", flush=True) return 0 # Defense-in-depth: refuse to spawn if two entries would produce the # same routing slug. The slug includes name + model_id + provider, so # this only trips when ALL THREE are identical — i.e. genuine # duplicate roster rows. Loud failure beats silently spawning two # children that overwrite each other. seen_slugs: dict[str, dict] = {} for entry in spawnable: slug = _model_slug(entry["name"], entry["model_id"], entry.get("provider_name", "")) if slug in seen_slugs: other = seen_slugs[slug] print( f"[dispatch] FATAL: roster has two entries collapsing to slug={slug!r}: " f"{entry!r} and {other!r}. Fix Convex data and re-run.", flush=True, ) return 2 seen_slugs[slug] = entry roster_size = len(spawnable) print(f"[dispatch] Spawning {roster_size} child jobs for run_date={run_date}", flush=True) spawned: list[tuple[str, str]] = [] failures: list[tuple[str, str]] = [] for entry in spawnable: name: str = entry["name"] model_id: str = entry["model_id"] provider_name: str = entry.get("provider_name", "") or "" try: job_id = spawn_one( name=name, model_id=model_id, provider_name=provider_name, image=image, namespace=namespace, flavor=flavor, timeout=timeout, run_date=run_date, roster_size=roster_size, secrets=secrets, ) spawned.append((model_id, job_id)) print(f"[dispatch] spawned {name} ({model_id} via {provider_name or '?'}) → {job_id}", flush=True) # Spawn failure shouldn't poison the rest. Each child carries the # roster_size label, so the alerts cron can detect partial spawn # failure (children_seen < roster_size) even when this dispatcher # process exits cleanly. except Exception as exc: # noqa: BLE001 redacted = _redact_for_print(str(exc)) failures.append((model_id, redacted)) print( f"[dispatch] FAILED to spawn {name} ({model_id} via {provider_name or '?'}): {redacted}", flush=True, ) print(f"[dispatch] Done. spawned={len(spawned)} failed={len(failures)}", flush=True) if failures: # Non-zero exit so HF marks this dispatcher run as ERROR in its # dashboard — purely a UI signal, not the source of truth for # alerts. The Convex `benchmarkAlerts` cron derives partial-spawn # status from `roster_size` labels on the children that DID spawn: # if jobs.length < roster_size, that's a real partial-fanout # failure regardless of this exit code. return 1 return 0 if __name__ == "__main__": sys.exit(main())