File size: 16,567 Bytes
723e4db
 
 
 
 
 
 
 
cb78463
 
 
 
 
 
 
 
 
 
 
723e4db
 
 
cb78463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00d42e1
 
 
 
 
 
 
 
 
cb78463
 
 
 
 
 
00d42e1
cb78463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb78463
 
 
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
cb78463
723e4db
 
 
 
 
cb78463
 
 
 
723e4db
cb78463
723e4db
 
 
 
 
 
 
cb78463
723e4db
 
 
 
 
 
 
cb78463
 
 
 
 
 
 
 
723e4db
 
 
cb78463
 
 
 
723e4db
 
cb78463
 
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb78463
 
 
723e4db
 
 
 
 
 
cb78463
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb78463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
723e4db
 
cb78463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
723e4db
 
 
 
cb78463
723e4db
cb78463
 
723e4db
cb78463
 
 
 
 
 
 
 
 
 
 
 
 
 
723e4db
 
 
 
 
 
cb78463
 
 
 
 
723e4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#!/usr/bin/env python3
"""Fan out one HF Job per model in the live Convex roster.

Runs as a weekly scheduled HF Job (Sunday 02:00 UTC). Replaces the GitHub
Actions matrix in .github/workflows/benchmark-evals.yml.

Flow:
  1. Resolve roster: `run_eval.py --from-convex --max-cost --list-models`
     writes the cost-filtered roster to /tmp/roster.json. Each entry
     has `name` (RAW Convex `models.name`, alias-stripped at the
     discovery layer) and `model_id` (canonical provider ID).
  2. Spawn one HF Job per roster entry. The dispatcher computes
     `slug(name__model_id)` for each entry and passes it as the child's
     `--models <slug>`. The composite slug is the canonical eval
     identifier β€” same string is the HF Job `model` label and the HF
     dataset upload folder. Two providers exposing the same `model_id`
     produce different slugs because their `name` fields differ, so
     they get distinct children, distinct upload folders, and don't
     collide on storage.
  3. Fire-and-forget. No wait loop. Convex `benchmarkAlerts` cron handles
     pass/fail aggregation by polling HF Jobs API the next morning.

Identifier conventions:
  * `model_id`     canonical provider ID (e.g. "openai/gpt-5-nano"). Used
                   in the actual provider API call inside the child and
                   stored in JSONL row metadata for DB attribution.
  * `name`         RAW `models.name` from Convex (NOT alias). Combined
                   with `model_id` to form the routing identifier.
  * `slug(name__model_id)`
                   The canonical eval-pipeline identifier. One per
                   roster entry.
  * Alias          Convex `models.alias` is a UI-only field for blind
                   testing. The eval pipeline NEVER reads it β€” the
                   `models:listForEvals` query returns raw `name`.
  * Label keys     `model` = slug(name__model_id) (the routing slug).
                   `model_display` = slug(name) (Slack-friendly).

Required environment:
  CONVEX_URL                       Roster discovery
  OPENROUTER_API_KEY               Default API key for OpenRouter models
  FUNCTIONARY_API_KEY              Per-model override (Functionary endpoints)
  MEETKAI_GATEWAY_API_KEY          Per-model override (MeetKai gateway)
  HF_TOKEN                         Result upload to meetkai/modelchorus-eval-results
  EVAL_IMAGE                       HF image URI (default: hf.co/spaces/meetkai/modelchorus-evals)
  EVAL_NAMESPACE                   HF namespace for child jobs (default: meetkai)
  EVAL_FLAVOR                      HF hardware flavor (default: cpu-upgrade)
  EVAL_TIMEOUT                     Per-model timeout (default: 6h)
  EVAL_MAX_COST                    Cost filter in USD (default: 20.00)

Each child job inherits the same image + secrets and is labeled
`purpose=modelchorus-eval` so the alerts cron can find the run later.
"""

from __future__ import annotations

import json
import os
import re
import subprocess
import sys
from datetime import datetime, timedelta, timezone

from huggingface_hub import run_job

ROSTER_PATH = "/tmp/roster.json"

# Mirror of `_redactErrorBody` in convex/benchmarkAlerts.ts. Applied to
# exception text before printing into HF Job logs (visible to anyone with
# namespace read scope) so a misbehaving proxy that echoes the request
# Authorization header in an error body can't leak the runner token.
_REDACT_PATTERNS = [
    (re.compile(r"Bearer\s+[A-Za-z0-9_\-.]+"), "Bearer [REDACTED]"),
    (re.compile(r"hf_[A-Za-z0-9]{20,}"), "hf_[REDACTED]"),
]
_PRINT_BODY_CAP = 500


def _redact_for_print(text: str) -> str:
    for pat, repl in _REDACT_PATTERNS:
        text = pat.sub(repl, text)
    if len(text) > _PRINT_BODY_CAP:
        text = text[:_PRINT_BODY_CAP] + "…[truncated]"
    return text


# HF Jobs has TWO regexes for label-shaped fields and we need to conform
# to the strictest. Documented label values match `^[a-zA-Z0-9._-]*$`
# (dots allowed), but the auto-derived "tags" the backend builds from
# labels match `^[a-zA-Z0-9_\-=]+$` (no dots). A model_id like
# "minimax/minimax-m2.5" passes the label check but trips the tag check
# at POST /api/jobs validation, silently dropping that child. Excluding
# dots from the slug keeps both paths happy. Must stay in sync with
# `_hf_path_segment` in run_eval.py.
_LABEL_INVALID = re.compile(r"[^a-zA-Z0-9_-]+")


def _slugify_label(value: str) -> str:
    """Replace any chars HF labels reject (or any other strict path/label
    boundary) with `_`. Empty input returns `unknown` to keep the label
    populated for downstream filters."""
    s = _LABEL_INVALID.sub("_", (value or "").strip()).strip("_-")
    return s or "unknown"


def _model_slug(name: str, model_id: str, provider_name: str = "") -> str:
    """Canonical eval-pipeline identifier:
        slug(`name__model_id__provider_name`) when provider is known,
        slug(`name__model_id`)                otherwise.

    MUST match `_model_slug` in run_eval.py so the dispatcher's `--models`
    arg is matched by the child's filter. Provider is included so two
    Convex rows that share BOTH name and model_id (one OpenRouter, one
    direct, both displayed as the same UI label) produce distinct slugs
    and distinct children rather than tripping the duplicate-slug guard.
    """
    parts = [name, model_id]
    provider = (provider_name or "").strip()
    if provider:
        parts.append(provider)
    return _slugify_label("__".join(parts))


def _env(name: str, default: str | None = None, *, required: bool = False) -> str:
    val = os.environ.get(name, default)
    if required and not val:
        sys.exit(f"[dispatch] Missing required env var: {name}")
    return val or ""


def _most_recent_sunday_utc(now: datetime) -> str:
    """Return the most-recent Sunday at-or-before `now` (UTC) as YYYY-MM-DD.

    Snapping to Sunday β€” rather than using today's literal date β€” keeps the
    `run_date` label aligned with `lastSundayUtc` in convex/benchmarkAlerts.ts
    even when HF's scheduler fires the dispatcher late (e.g., Sun 02:00 UTC
    job slipping into Mon 00:0x UTC). Without snapping, late dispatch would
    label children with Monday's date, the alerts cron would look up
    Sunday's, and a successful run would be reported as "no jobs found".

    CONTRACT: this function MUST agree with `lastSundayUtc` in
    convex/benchmarkAlerts.ts for all moments where dispatch and the
    alert cron pair up. The two are intentionally different on Sundays
    themselves (this returns today, the JS function returns last Sunday)
    but converge by Mon 02:00 UTC β€” which is when the alert cron looks
    up the dispatcher's labels. If you change either, mentally trace:
      * Sun 02:00 UTC dispatch β†’ run_date = today's Sunday
      * Sun 23:59 UTC dispatch β†’ run_date = same Sunday (still in flight)
      * Mon 00:01 UTC dispatch β†’ run_date = previous Sunday (yesterday)
      * Mon 02:00 UTC alert    β†’ looks up previous Sunday
    All four MUST agree on the same calendar Sunday. The JS test file
    `tests/convex/benchmarkAlerts.test.ts` pins these for the JS side.
    """
    # weekday(): Mon=0..Sun=6. days_back = 0 on Sun, 1 on Mon, ..., 6 on Sat.
    days_back = (now.weekday() + 1) % 7
    sunday = now - timedelta(days=days_back)
    return sunday.strftime("%Y-%m-%d")


def resolve_roster(max_cost: str) -> list[dict]:
    """Run run_eval.py --list-models to get the cost-filtered active roster.

    Inherits the dispatcher's env so --from-convex sees CONVEX_URL.
    """
    cmd = [
        "python",
        "run_eval.py",
        "--from-convex",
        "--max-cost",
        max_cost,
        "--list-models",
        ROSTER_PATH,
    ]
    print(f"[dispatch] Resolving roster: {' '.join(cmd)}", flush=True)
    subprocess.run(cmd, check=True, cwd=os.path.dirname(os.path.abspath(__file__)))
    with open(ROSTER_PATH) as f:
        roster = json.load(f)
    if not isinstance(roster, list):
        sys.exit(f"[dispatch] Roster JSON must be a list, got {type(roster).__name__}")
    return roster


def spawn_one(
    name: str,
    model_id: str,
    provider_name: str,
    image: str,
    namespace: str,
    flavor: str,
    timeout: str,
    run_date: str,
    roster_size: int,
    secrets: dict[str, str],
) -> str:
    """Fire one child Job. Returns the job id.

    Children deliberately run WITHOUT --max-cost. The dispatcher already
    applied the cost filter at roster resolution time; reapplying it inside
    the child re-evaluates against (potentially newer) pricing β€” and if the
    pricing for the model has crossed the threshold between dispatch and
    child execution, run_eval.py's `--max-cost empty-roster early-exit`
    branch would silently exit 0 with no results uploaded, which the alerts
    cron would then report as a healthy job. Worst case without the filter:
    we evaluate a model whose cost has spiked, paying a few cents more than
    the dispatcher expected. That's strictly preferable to silent data loss.

    `name` is the RAW Convex `models.name` (alias-stripped at discovery).
    Combined with `model_id` to form the routing slug β€” same string is
    the `--models` arg, the HF `model` label, and the upload folder.
    """
    routing_slug = _model_slug(name, model_id, provider_name)
    job = run_job(
        image=image,
        command=[
            "python",
            "run_eval.py",
            "--from-convex",
            "--models",
            routing_slug,
        ],
        namespace=namespace,
        flavor=flavor,
        timeout=timeout,
        secrets=secrets,
        env={
            # Non-secret config β€” visible in the HF Jobs UI for debugging.
            # MODEL_ID, MODEL_NAME, PROVIDER_NAME are the raw values (not
            # slugged) for human inspection in the HF dashboard.
            # MODEL_SLUG is the routing identifier the child filter
            # matches against.
            "MODEL_ID": model_id,
            "MODEL_NAME": name,
            "PROVIDER_NAME": provider_name,
            "MODEL_SLUG": routing_slug,
            "DISPATCH_RUN_DATE": run_date,
        },
        # Labels are how `benchmarkAlerts` Convex cron locates this run later.
        # * `model`         slug(name__model_id) β€” canonical routing identifier
        # * `model_display` slug(name)            β€” human-readable, Slack-friendly
        # * `roster_size`   detects partial spawn failures
        #                   (jobs.length < roster_size β‡’ some never spawned)
        labels={
            "purpose": "modelchorus-eval",
            "model": routing_slug,
            "model_display": _slugify_label(name),
            "run_date": run_date,
            "roster_size": str(roster_size),
        },
    )
    return job.id


def main() -> int:
    image = _env("EVAL_IMAGE", "hf.co/spaces/meetkai/modelchorus-evals")
    namespace = _env("EVAL_NAMESPACE", "meetkai")
    flavor = _env("EVAL_FLAVOR", "cpu-upgrade")
    timeout = _env("EVAL_TIMEOUT", "6h")
    max_cost = _env("EVAL_MAX_COST", "20.00")

    # Pass through to children. Required for run_eval.py to function.
    # EVAL_DISCOVERY_TOKEN is the shared secret for `models:listForEvals`
    # (the alias-stripped roster query); each child calls --from-convex
    # which re-reads this env var on startup.
    secrets = {
        k: _env(k, required=True)
        for k in (
            "CONVEX_URL",
            "OPENROUTER_API_KEY",
            "HF_TOKEN",
            "EVAL_DISCOVERY_TOKEN",
        )
    }
    # Optional per-model overrides β€” only forward if set so children don't
    # see empty-string keys for models that legitimately don't need them.
    for k in ("FUNCTIONARY_API_KEY", "MEETKAI_GATEWAY_API_KEY", "JUDGE_API_KEY", "JUDGE_BASE_URL", "JUDGE_MODEL"):
        v = os.environ.get(k, "").strip()
        if v:
            secrets[k] = v

    # Snap run_date to the most-recent Sunday in UTC so the label aligns
    # with the alerts cron's `lastSundayUtc` even when HF fires the
    # dispatcher late (e.g., Sun 02:00 slipping into Mon 00:0x). See
    # _most_recent_sunday_utc for full reasoning.
    run_date = _most_recent_sunday_utc(datetime.now(timezone.utc))

    roster = resolve_roster(max_cost)
    if not roster:
        # benchmarkAlerts cron will see total=0 and post the empty-roster alert.
        print("[dispatch] Roster is empty; nothing to spawn.", flush=True)
        return 0

    # Pre-filter so roster_size reflects actual fan-out attempts. Children
    # carry this number as a label; if alerts sees fewer jobs than this,
    # it's a real partial-spawn failure (not a skipped entry). Both
    # `name` and `model_id` must be present and non-whitespace β€” they're
    # combined into the routing slug, and either being empty would
    # produce a non-unique or malformed identifier.
    spawnable: list[dict] = []
    skipped: list[str] = []
    for e in roster:
        name = (e.get("name") or "").strip()
        mid = (e.get("model_id") or "").strip()
        if name and mid:
            spawnable.append(e)
        else:
            # Log a hint of which entry was bad β€” name OR model_id might be
            # set even if both aren't, so include whichever is non-empty.
            hint = name or mid or "<both empty>"
            skipped.append(hint)
    if skipped:
        print(
            f"[dispatch] Skipping {len(skipped)} roster entries missing name/model_id: {skipped}",
            flush=True,
        )
    if not spawnable:
        print("[dispatch] No spawnable roster entries.", flush=True)
        return 0

    # Defense-in-depth: refuse to spawn if two entries would produce the
    # same routing slug. The slug includes name + model_id + provider, so
    # this only trips when ALL THREE are identical β€” i.e. genuine
    # duplicate roster rows. Loud failure beats silently spawning two
    # children that overwrite each other.
    seen_slugs: dict[str, dict] = {}
    for entry in spawnable:
        slug = _model_slug(entry["name"], entry["model_id"], entry.get("provider_name", ""))
        if slug in seen_slugs:
            other = seen_slugs[slug]
            print(
                f"[dispatch] FATAL: roster has two entries collapsing to slug={slug!r}: "
                f"{entry!r} and {other!r}. Fix Convex data and re-run.",
                flush=True,
            )
            return 2
        seen_slugs[slug] = entry

    roster_size = len(spawnable)
    print(f"[dispatch] Spawning {roster_size} child jobs for run_date={run_date}", flush=True)

    spawned: list[tuple[str, str]] = []
    failures: list[tuple[str, str]] = []
    for entry in spawnable:
        name: str = entry["name"]
        model_id: str = entry["model_id"]
        provider_name: str = entry.get("provider_name", "") or ""
        try:
            job_id = spawn_one(
                name=name,
                model_id=model_id,
                provider_name=provider_name,
                image=image,
                namespace=namespace,
                flavor=flavor,
                timeout=timeout,
                run_date=run_date,
                roster_size=roster_size,
                secrets=secrets,
            )
            spawned.append((model_id, job_id))
            print(f"[dispatch]   spawned {name} ({model_id} via {provider_name or '?'}) β†’ {job_id}", flush=True)
        # Spawn failure shouldn't poison the rest. Each child carries the
        # roster_size label, so the alerts cron can detect partial spawn
        # failure (children_seen < roster_size) even when this dispatcher
        # process exits cleanly.
        except Exception as exc:  # noqa: BLE001
            redacted = _redact_for_print(str(exc))
            failures.append((model_id, redacted))
            print(
                f"[dispatch]   FAILED to spawn {name} ({model_id} via {provider_name or '?'}): {redacted}",
                flush=True,
            )

    print(f"[dispatch] Done. spawned={len(spawned)} failed={len(failures)}", flush=True)
    if failures:
        # Non-zero exit so HF marks this dispatcher run as ERROR in its
        # dashboard β€” purely a UI signal, not the source of truth for
        # alerts. The Convex `benchmarkAlerts` cron derives partial-spawn
        # status from `roster_size` labels on the children that DID spawn:
        # if jobs.length < roster_size, that's a real partial-fanout
        # failure regardless of this exit code.
        return 1
    return 0


if __name__ == "__main__":
    sys.exit(main())