michaelr27 HF Staff commited on
Commit
fd61157
·
verified ·
1 Parent(s): ed2a486

step 10 / jobs-migration: submit.py -> dispatch eval to HF Jobs

Browse files
Files changed (1) hide show
  1. submit.py +220 -162
submit.py CHANGED
@@ -1,17 +1,22 @@
1
  """Submit-tab handler for the CADGenBench leaderboard Space.
2
 
3
- Step 6 (E) chunks 2 + 3 + 4 + 6: cheap-sync validation + pending-row
4
- write + zip upload + background-thread eval + boot-time stuck-pending
5
- sweep. The handler validates the upload, uploads the zip to
 
6
  ``submissions/<id>.zip``, appends a ``status: pending`` row to
7
  ``results.jsonl`` (under a process-wide lock), spawns a daemon thread
8
- to run ``cadgenbench evaluate`` + ``cadgenbench report single``, and
9
- returns immediately. The worker uploads ``reports/<id>.{html,json}``
10
- and flips the row ``pending -> completed`` (or ``failed`` with a
11
- ``failure_reason``). At module import a one-shot daemon sweep flips
12
- any ``pending`` row whose ``submitted_at`` is older than 30 min to
13
- ``failed`` with a "Space restart" reason, so rows stranded by a deploy
14
- / OOM / crash don't sit pending forever.
 
 
 
 
15
 
16
  Validation gates, in order:
17
 
@@ -46,18 +51,21 @@ rejection; an orphan-zip sweep is a future-chunk concern.
46
 
47
  Background worker, per submission:
48
 
49
- 1. ``cadgenbench evaluate <run_dir>`` (subprocess; runs
50
- per-fixture eval in parallel via the CLI's ProcessPoolExecutor;
51
- writes ``run_summary.json`` at the run-dir root).
52
- 2. ``cadgenbench report single <run_dir> -o <report.html>``
53
- (subprocess; self-contained HTML with embedded renders).
54
- 3. Upload ``reports/<id>.html`` + ``reports/<id>.json``. The JSON
55
- bundles ``run_summary.json`` + every per-fixture ``result.json``.
56
- 4. Read ``run_summary.json``; under ``_HUB_LOCK`` flip the row's
57
- ``status`` to ``"completed"`` and merge the score fields.
58
- 5. On any worker-side exception, flip the row to ``"failed"`` with
59
- a short ``failure_reason``. Tempdir cleanup runs in ``finally``
60
- either way.
 
 
 
61
  """
62
  from __future__ import annotations
63
 
@@ -67,10 +75,9 @@ import logging
67
  import os
68
  import re
69
  import shutil
70
- import subprocess
71
- import sys
72
  import tempfile
73
  import threading
 
74
  import zipfile
75
  from concurrent.futures import ThreadPoolExecutor
76
  from datetime import datetime, timedelta, timezone
@@ -81,7 +88,13 @@ import cadgenbench
81
  import gradio as gr
82
  from cadgenbench.common.paths import data_inputs_dir
83
  from cadgenbench.common.validity import parse_step
84
- from huggingface_hub import HfApi
 
 
 
 
 
 
85
  from huggingface_hub.errors import EntryNotFoundError
86
 
87
  from leaderboard import HF_DATA_REPO, HF_SUBMISSIONS_REPO
@@ -102,22 +115,33 @@ SUBMISSIONS_DIR = "submissions"
102
  REPORTS_DIR = "reports"
103
  DATA_REV_SHORT_LEN = 12
104
  FAILURE_REASON_MAX_CHARS = 200
105
- EVAL_TIMEOUT_SECONDS = 15 * 60
106
- REPORT_TIMEOUT_SECONDS = 2 * 60
107
- # Per-fixture eval workers. Set to "1" (sequential) because 4 parallel
108
- # workers OOM-kill on the Space's cpu-upgrade tier: cadquery-ocp +
109
- # manifold3d + VTK osmesa state per worker, times 4, exceeds the box's
110
- # RAM and the kernel sends a SIGKILL that surfaces in the parent as
111
- # concurrent.futures.process.BrokenProcessPool after ~5 min of render.
112
- # Revisit once we have a hardware tier with more RAM, or once the
113
- # per-worker footprint shrinks (e.g. lazier OCC imports).
114
- EVAL_WORKER_COUNT = "1"
115
  SHA256_BLOCK_SIZE = 64 * 1024
116
  STUCK_PENDING_THRESHOLD_SECONDS = 30 * 60
117
  SUBMITTED_AT_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
118
  STUCK_PENDING_REASON = "evaluation interrupted by Space restart"
119
  BOOT_SWEEP_ENV = "CADGENBENCH_DISABLE_BOOT_SWEEP"
120
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  # One HfApi client per process. HF_TOKEN is picked up from the env at
122
  # construction time and reused for every call.
123
  _HF_API = HfApi()
@@ -178,10 +202,11 @@ def handle_submit(
178
 
179
  zip_path = Path(zip_file.name)
180
 
181
- # Manual tempdir lifecycle: cleaned up here on any rejection, but
182
- # ownership passes to the worker on a successful spawn (the worker
183
- # cleans up in its own finally). TemporaryDirectory's context
184
- # manager doesn't fit because the dir has to outlive this function.
 
185
  tmp = Path(tempfile.mkdtemp(prefix="cadgenbench-submit-"))
186
  run_dir = tmp / "run"
187
  run_dir.mkdir()
@@ -229,18 +254,16 @@ def handle_submit(
229
  except _HubWriteError as e:
230
  raise gr.Error(f"Submission rejected: {e}")
231
 
232
- _spawn_worker(submission_id, tmp, run_dir)
233
- tmp = None # ownership transferred; skip cleanup below
234
  progress(1.0, desc="Queued")
235
  gr.Info(
236
  f"Submission {submission_id} queued for evaluation "
237
- f"({len(fixture_names)} fixtures). Evaluation typically "
238
- f"takes 2-5 minutes; the row flips to completed when the "
239
- f"worker finishes."
240
  )
241
  finally:
242
- if tmp is not None:
243
- shutil.rmtree(tmp, ignore_errors=True)
244
 
245
 
246
  def _validate_form(zip_file) -> str | None:
@@ -646,153 +669,188 @@ def _resolve_data_revision() -> str:
646
 
647
 
648
  # ---------------------------------------------------------------------------
649
- # Background worker (eval + report + row flip)
650
  # ---------------------------------------------------------------------------
651
 
652
 
653
- def _spawn_worker(submission_id: str, tmp: Path, run_dir: Path) -> None:
654
- """Start the eval worker thread. Fire-and-forget; daemon=True so a
655
- Space restart doesn't block on in-flight workers (chunk 6's
656
- boot-time sweep flips any rows their workers didn't finish to
657
- failed).
 
 
658
  """
659
  t = threading.Thread(
660
  target=_run_worker,
661
- args=(submission_id, tmp, run_dir),
662
  name=f"cgb-worker-{submission_id}",
663
  daemon=True,
664
  )
665
  t.start()
666
 
667
 
668
- def _run_worker(submission_id: str, tmp: Path, run_dir: Path) -> None:
669
- """Top-level worker entry: run eval, build + upload reports, flip row.
670
 
671
- Any exception in the pipeline flips the row to ``failed`` with a
672
- short ``failure_reason`` (full traceback goes to the Space's
673
- runtime logs). The tempdir is always cleaned up.
674
  """
675
  try:
676
- try:
677
- _run_eval(run_dir)
678
- report_html = tmp / f"{submission_id}.html"
679
- _run_report(run_dir, report_html)
680
- report_json = _build_report_json(run_dir)
681
- _upload_reports(submission_id, report_html, report_json)
682
- summary = json.loads(
683
- (run_dir / "run_summary.json").read_text(encoding="utf-8")
684
- )
685
  _flip_row_to_completed(submission_id, summary)
686
  logger.info("Worker completed for %s", submission_id)
687
- except Exception as e: # noqa: BLE001 - broad on purpose; we map to row state
688
- logger.exception("Worker failed for %s", submission_id)
689
- reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS]
690
- try:
691
- _flip_row_to_failed(submission_id, reason)
692
- except Exception:
693
- # If even the row-flip fails, the row stays pending.
694
- # Chunk 6's stuck-pending sweep will catch it on the
695
- # next Space boot.
696
- logger.exception(
697
- "Failed to flip row to failed for %s; row stays pending",
698
- submission_id,
699
- )
700
- finally:
701
- shutil.rmtree(tmp, ignore_errors=True)
 
 
 
 
702
 
703
 
704
- def _run_eval(run_dir: Path) -> None:
705
- """Invoke ``cadgenbench evaluate`` over the run_dir; raise on non-zero."""
706
- cmd = [
707
- sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir),
708
- "--workers", EVAL_WORKER_COUNT,
709
- ]
710
- logger.info("Running eval: %s", " ".join(cmd))
711
- proc = subprocess.run(
712
- cmd,
713
- capture_output=True,
714
- text=True,
715
- timeout=EVAL_TIMEOUT_SECONDS,
716
- env=os.environ.copy(),
717
- check=False,
718
- )
719
- if proc.returncode != 0:
720
- # Dump the full subprocess output to the container log so the
721
- # actual child crash (segfault, OSMesa init error, etc.) is
722
- # recoverable via the Space's run logs. The user-facing
723
- # failure_reason field stays short (200 chars cap downstream).
724
- logger.error(
725
- "cadgenbench evaluate exited %s\n--- STDERR ---\n%s\n--- STDOUT ---\n%s",
726
- proc.returncode, proc.stderr or "", proc.stdout or "",
727
- )
728
- tail = (proc.stderr or proc.stdout or "")[-500:].strip()
729
  raise RuntimeError(
730
- f"cadgenbench evaluate exited {proc.returncode}: {tail}"
731
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
732
 
733
 
734
- def _run_report(run_dir: Path, html_out: Path) -> None:
735
- """Invoke ``cadgenbench report single`` for the run_dir; raise on non-zero."""
736
- cmd = [
737
- sys.executable, "-m", "cadgenbench.cli", "report", "single",
738
- str(run_dir), "-o", str(html_out),
739
- ]
740
- logger.info("Running report: %s", " ".join(cmd))
741
- proc = subprocess.run(
742
- cmd,
743
- capture_output=True,
744
- text=True,
745
- timeout=REPORT_TIMEOUT_SECONDS,
746
- env=os.environ.copy(),
747
- check=False,
748
- )
749
- if proc.returncode != 0 or not html_out.is_file():
750
- logger.error(
751
- "cadgenbench report single exited %s\n--- STDERR ---\n%s\n--- STDOUT ---\n%s",
752
- proc.returncode, proc.stderr or "", proc.stdout or "",
753
- )
754
- tail = (proc.stderr or proc.stdout or "")[-500:].strip()
755
- raise RuntimeError(
756
- f"cadgenbench report single exited {proc.returncode}: {tail}"
757
- )
758
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
759
 
760
- def _build_report_json(run_dir: Path) -> dict[str, Any]:
761
- """Bundle ``run_summary.json`` + every per-fixture ``result.json``."""
762
- summary_path = run_dir / "run_summary.json"
763
- if not summary_path.is_file():
764
- raise RuntimeError(
765
- f"run_summary.json not produced under {run_dir} (eval issue?)"
766
- )
767
- summary = json.loads(summary_path.read_text(encoding="utf-8"))
768
- per_fixture: dict[str, dict[str, Any]] = {}
769
- for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
770
- rp = fixture_dir / "result.json"
771
- if rp.is_file():
772
- per_fixture[fixture_dir.name] = json.loads(
773
- rp.read_text(encoding="utf-8")
774
  )
775
- return {"run_summary": summary, "per_fixture_results": per_fixture}
776
 
777
 
778
- def _upload_reports(
779
- submission_id: str, html_path: Path, report_json: dict[str, Any],
780
- ) -> None:
781
- """Upload ``reports/<id>.html`` and ``reports/<id>.json`` to the Hub."""
782
- _HF_API.upload_file(
783
- path_or_fileobj=str(html_path),
784
- path_in_repo=f"{REPORTS_DIR}/{submission_id}.html",
785
- repo_id=HF_SUBMISSIONS_REPO,
786
- repo_type="dataset",
787
- commit_message=f"add HTML report for {submission_id}",
788
- )
789
- _HF_API.upload_file(
790
- path_or_fileobj=json.dumps(report_json, ensure_ascii=False, indent=2).encode("utf-8"),
791
- path_in_repo=f"{REPORTS_DIR}/{submission_id}.json",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
792
  repo_id=HF_SUBMISSIONS_REPO,
 
793
  repo_type="dataset",
794
- commit_message=f"add JSON report for {submission_id}",
795
  )
 
 
 
 
 
 
 
 
796
 
797
 
798
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None:
 
1
  """Submit-tab handler for the CADGenBench leaderboard Space.
2
 
3
+ Step 6 (E) chunks 2 + 3 + 4 + 6 + Step 10 (jobs migration): cheap-sync
4
+ validation + pending-row write + zip upload + background dispatch +
5
+ poll of an HF Jobs GPU eval + boot-time stuck-pending sweep. The
6
+ handler validates the upload, uploads the zip to
7
  ``submissions/<id>.zip``, appends a ``status: pending`` row to
8
  ``results.jsonl`` (under a process-wide lock), spawns a daemon thread
9
+ that dispatches a per-submission HF Job against the
10
+ ``HuggingAI4Engineering/cadgenbench-eval-gpu`` image and polls
11
+ ``inspect_job`` until the job's stage is terminal. On COMPLETED the
12
+ worker downloads ``reports/<id>.json`` (the Job already uploaded
13
+ ``reports/<id>.{html,json}`` to the submissions dataset), reads
14
+ ``run_summary`` out of it, and flips the row ``pending -> completed``.
15
+ On ERROR (or any dispatch / poll exception) the row flips to ``failed``
16
+ with a short ``failure_reason``. At module import a one-shot daemon
17
+ sweep flips any ``pending`` row whose ``submitted_at`` is older than
18
+ 30 min to ``failed`` with a "Space restart" reason, so rows stranded by
19
+ a deploy / OOM / crash / orphaned Job don't sit pending forever.
20
 
21
  Validation gates, in order:
22
 
 
51
 
52
  Background worker, per submission:
53
 
54
+ 1. ``huggingface_hub.run_job(...)`` dispatches an HF Job against
55
+ the ``cadgenbench-eval-gpu`` Space image on ``a10g-large``,
56
+ passing the submission_id + zip blob URL as command args and
57
+ ``HF_TOKEN`` as a secret.
58
+ 2. Poll ``inspect_job(job_id)`` every few seconds until the job's
59
+ stage is terminal (``COMPLETED`` or ``ERROR``). Outer deadline
60
+ guards against an unresponsive poll surface.
61
+ 3. On ``COMPLETED``: download ``reports/<id>.json`` from the
62
+ submissions dataset (the Job uploaded both
63
+ ``reports/<id>.{html,json}`` before exiting), read
64
+ ``run_summary`` out of the bundled payload, under ``_HUB_LOCK``
65
+ flip the row to ``"completed"`` and merge the score fields.
66
+ 4. On ``ERROR`` (or any dispatch / poll exception), flip the row to
67
+ ``"failed"`` with a short ``failure_reason`` (the job's
68
+ ``status.message`` plus the last N lines of ``fetch_job_logs``).
69
  """
70
  from __future__ import annotations
71
 
 
75
  import os
76
  import re
77
  import shutil
 
 
78
  import tempfile
79
  import threading
80
+ import time
81
  import zipfile
82
  from concurrent.futures import ThreadPoolExecutor
83
  from datetime import datetime, timedelta, timezone
 
88
  import gradio as gr
89
  from cadgenbench.common.paths import data_inputs_dir
90
  from cadgenbench.common.validity import parse_step
91
+ from huggingface_hub import (
92
+ HfApi,
93
+ fetch_job_logs,
94
+ hf_hub_download,
95
+ inspect_job,
96
+ run_job,
97
+ )
98
  from huggingface_hub.errors import EntryNotFoundError
99
 
100
  from leaderboard import HF_DATA_REPO, HF_SUBMISSIONS_REPO
 
115
  REPORTS_DIR = "reports"
116
  DATA_REV_SHORT_LEN = 12
117
  FAILURE_REASON_MAX_CHARS = 200
 
 
 
 
 
 
 
 
 
 
118
  SHA256_BLOCK_SIZE = 64 * 1024
119
  STUCK_PENDING_THRESHOLD_SECONDS = 30 * 60
120
  SUBMITTED_AT_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
121
  STUCK_PENDING_REASON = "evaluation interrupted by Space restart"
122
  BOOT_SWEEP_ENV = "CADGENBENCH_DISABLE_BOOT_SWEEP"
123
 
124
+ # HF Jobs target. The eval-gpu image is hosted as a Docker Space
125
+ # (paused; image-only) at HuggingAI4Engineering/cadgenbench-eval-gpu.
126
+ # Jobs run under the personal `michaelr27` namespace (no-bill for
127
+ # HF employees per Round 6 of space-setup/leandro.md). a10g-large
128
+ # fits cadgenbench evaluate --workers 8 comfortably in 46 GB RAM.
129
+ EVAL_GPU_SPACE = "HuggingAI4Engineering/cadgenbench-eval-gpu"
130
+ EVAL_JOB_FLAVOR = "a10g-large"
131
+ EVAL_JOB_NAMESPACE = "michaelr27"
132
+ EVAL_JOB_TIMEOUT = "30m"
133
+ EVAL_JOB_WORKER_COUNT = "8"
134
+
135
+ # Poll cadence + outer deadline guarding inspect_job. 5 s is fast
136
+ # enough that a 60 s eval lands in <10 s of completion, slow enough
137
+ # that we don't hammer the API. Deadline matches the Job's own
138
+ # --timeout; the Job is the source of truth, this is just a belt
139
+ # for an unresponsive inspect_job surface.
140
+ JOB_POLL_INTERVAL_SECONDS = 5
141
+ JOB_POLL_DEADLINE_SECONDS = 35 * 60
142
+ JOB_LOG_TAIL_LINES = 30
143
+ JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5
144
+
145
  # One HfApi client per process. HF_TOKEN is picked up from the env at
146
  # construction time and reused for every call.
147
  _HF_API = HfApi()
 
202
 
203
  zip_path = Path(zip_file.name)
204
 
205
+ # The tempdir lives only for the cheap-sync validation pass
206
+ # (unpack zip, validate meta + fixture set + STEP parseability).
207
+ # The Job downloads the zip itself from the Hub, so the
208
+ # Space-local unpack is throwaway and the tempdir gets cleaned
209
+ # up unconditionally in the outer finally.
210
  tmp = Path(tempfile.mkdtemp(prefix="cadgenbench-submit-"))
211
  run_dir = tmp / "run"
212
  run_dir.mkdir()
 
254
  except _HubWriteError as e:
255
  raise gr.Error(f"Submission rejected: {e}")
256
 
257
+ _spawn_worker(submission_id, blob_url)
 
258
  progress(1.0, desc="Queued")
259
  gr.Info(
260
  f"Submission {submission_id} queued for evaluation "
261
+ f"({len(fixture_names)} fixtures). The eval runs on an "
262
+ f"HF Jobs GPU; the row flips to completed when the job "
263
+ f"finishes (typically 1-3 minutes)."
264
  )
265
  finally:
266
+ shutil.rmtree(tmp, ignore_errors=True)
 
267
 
268
 
269
  def _validate_form(zip_file) -> str | None:
 
669
 
670
 
671
  # ---------------------------------------------------------------------------
672
+ # Background worker (dispatch eval to HF Jobs, poll, flip row)
673
  # ---------------------------------------------------------------------------
674
 
675
 
676
+ def _spawn_worker(submission_id: str, submission_blob_url: str) -> None:
677
+ """Start the dispatch+poll worker thread.
678
+
679
+ Fire-and-forget; daemon=True so a Space restart doesn't block on
680
+ in-flight workers (the boot-time sweep below flips any rows their
681
+ workers didn't finish to failed). The worker no longer owns any
682
+ Space-local files; the Job downloads the zip itself from the Hub.
683
  """
684
  t = threading.Thread(
685
  target=_run_worker,
686
+ args=(submission_id, submission_blob_url),
687
  name=f"cgb-worker-{submission_id}",
688
  daemon=True,
689
  )
690
  t.start()
691
 
692
 
693
+ def _run_worker(submission_id: str, submission_blob_url: str) -> None:
694
+ """Dispatch the eval Job, poll to completion, flip the row.
695
 
696
+ Any exception (dispatch, poll, fetch_run_summary, flip) maps to a
697
+ ``failed`` row with a short ``failure_reason`` (full traceback goes
698
+ to the Space's runtime logs).
699
  """
700
  try:
701
+ job_id = _dispatch_eval_job(submission_id, submission_blob_url)
702
+ logger.info("Dispatched eval job %s for %s", job_id, submission_id)
703
+ stage, status_message = _poll_until_done(job_id, submission_id)
704
+ if stage == "COMPLETED":
705
+ summary = _fetch_run_summary_from_report(submission_id)
 
 
 
 
706
  _flip_row_to_completed(submission_id, summary)
707
  logger.info("Worker completed for %s", submission_id)
708
+ return
709
+ reason = _job_failure_reason(job_id, stage, status_message)
710
+ _flip_row_to_failed(submission_id, reason)
711
+ logger.warning(
712
+ "Eval job %s for %s ended %s: %s",
713
+ job_id, submission_id, stage, reason,
714
+ )
715
+ except Exception as e: # noqa: BLE001 - broad on purpose; we map to row state
716
+ logger.exception("Worker failed for %s", submission_id)
717
+ reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS]
718
+ try:
719
+ _flip_row_to_failed(submission_id, reason)
720
+ except Exception:
721
+ # If even the row-flip fails, the row stays pending. The
722
+ # stuck-pending sweep on the next Space boot will catch it.
723
+ logger.exception(
724
+ "Failed to flip row to failed for %s; row stays pending",
725
+ submission_id,
726
+ )
727
 
728
 
729
+ def _dispatch_eval_job(
730
+ submission_id: str, submission_blob_url: str,
731
+ ) -> str:
732
+ """Dispatch the per-submission eval Job and return its id.
733
+
734
+ Passes through every env var ``eval_job.py`` needs to resolve the
735
+ Hub data + GT repos and the target submissions repo; the Job's
736
+ HF_TOKEN secret comes from the Space's own HF_TOKEN env (which
737
+ needs Jobs + repo R/W scopes, see space-setup/jobs-migration.md).
738
+ """
739
+ token = os.environ.get("HF_TOKEN")
740
+ if not token:
 
 
 
 
 
 
 
 
 
 
 
 
 
741
  raise RuntimeError(
742
+ "HF_TOKEN is unset on the Space; cannot dispatch eval job."
743
  )
744
+ env: dict[str, str] = {
745
+ "HF_SUBMISSIONS_REPO": HF_SUBMISSIONS_REPO,
746
+ "EVAL_WORKER_COUNT": EVAL_JOB_WORKER_COUNT,
747
+ }
748
+ for key in ("CADGENBENCH_DATA_REPO", "CADGENBENCH_DATA_GT_REPO"):
749
+ value = os.environ.get(key)
750
+ if value:
751
+ env[key] = value
752
+ job = run_job(
753
+ image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
754
+ command=[
755
+ "python", "/opt/eval_job.py", submission_id, submission_blob_url,
756
+ ],
757
+ flavor=EVAL_JOB_FLAVOR,
758
+ namespace=EVAL_JOB_NAMESPACE,
759
+ env=env,
760
+ secrets={"HF_TOKEN": token},
761
+ timeout=EVAL_JOB_TIMEOUT,
762
+ token=token,
763
+ )
764
+ return job.id
765
 
766
 
767
+ def _poll_until_done(
768
+ job_id: str, submission_id: str,
769
+ ) -> tuple[str, str | None]:
770
+ """Poll ``inspect_job`` until terminal; return (stage, message).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
771
 
772
+ Terminal stages: ``COMPLETED``, ``ERROR``. Anything else after the
773
+ outer deadline counts as a synthetic ``ERROR`` with a "deadline
774
+ exceeded" message; we do not try to cancel the Job from here (the
775
+ Job carries its own ``timeout`` and HF will reap it). Transient
776
+ ``inspect_job`` errors retry up to
777
+ ``JOB_POLL_MAX_CONSECUTIVE_ERRORS`` consecutive failures before
778
+ raising.
779
+ """
780
+ deadline = time.monotonic() + JOB_POLL_DEADLINE_SECONDS
781
+ consecutive_errors = 0
782
+ while True:
783
+ try:
784
+ info = inspect_job(job_id=job_id)
785
+ consecutive_errors = 0
786
+ except Exception as e: # noqa: BLE001 - retry transient API errors
787
+ consecutive_errors += 1
788
+ logger.warning(
789
+ "inspect_job(%s) failed (%d/%d): %s",
790
+ job_id, consecutive_errors,
791
+ JOB_POLL_MAX_CONSECUTIVE_ERRORS, e,
792
+ )
793
+ if consecutive_errors >= JOB_POLL_MAX_CONSECUTIVE_ERRORS:
794
+ raise
795
+ time.sleep(JOB_POLL_INTERVAL_SECONDS)
796
+ continue
797
 
798
+ stage = info.status.stage
799
+ message = info.status.message
800
+ if stage in ("COMPLETED", "ERROR"):
801
+ return stage, message
802
+ if time.monotonic() >= deadline:
803
+ return "ERROR", (
804
+ f"Space-side poll deadline exceeded "
805
+ f"({JOB_POLL_DEADLINE_SECONDS}s); last stage={stage}"
 
 
 
 
 
 
806
  )
807
+ time.sleep(JOB_POLL_INTERVAL_SECONDS)
808
 
809
 
810
+ def _job_failure_reason(
811
+ job_id: str, stage: str, status_message: str | None,
812
+ ) -> str:
813
+ """Build a short ``failure_reason`` for a non-completed Job.
814
+
815
+ Combines the job's own ``status.message`` (if any) with the last
816
+ ``JOB_LOG_TAIL_LINES`` of ``fetch_job_logs`` so the user sees
817
+ something actionable in the row. Log fetch is best-effort.
818
+ """
819
+ parts: list[str] = [f"eval job {stage.lower()}"]
820
+ if status_message:
821
+ parts.append(status_message)
822
+ try:
823
+ tail = list(fetch_job_logs(job_id=job_id))[-JOB_LOG_TAIL_LINES:]
824
+ if tail:
825
+ parts.append("logs: " + " | ".join(tail))
826
+ except Exception as e: # noqa: BLE001 - logs are best-effort
827
+ logger.warning("fetch_job_logs(%s) failed: %s", job_id, e)
828
+ return ": ".join(parts)[:FAILURE_REASON_MAX_CHARS]
829
+
830
+
831
+ def _fetch_run_summary_from_report(submission_id: str) -> dict[str, Any]:
832
+ """Download ``reports/<id>.json`` and return its ``run_summary`` dict.
833
+
834
+ The Job uploaded the report bundle before exiting; by the time
835
+ ``inspect_job`` returns COMPLETED the file is on the Hub. Raises
836
+ if the report or the ``run_summary`` key is missing (which would
837
+ indicate an eval that ran-but-broke contract; we want loud
838
+ failure rather than a silently-empty row).
839
+ """
840
+ path = hf_hub_download(
841
  repo_id=HF_SUBMISSIONS_REPO,
842
+ filename=f"{REPORTS_DIR}/{submission_id}.json",
843
  repo_type="dataset",
844
+ force_download=True,
845
  )
846
+ payload = json.loads(Path(path).read_text(encoding="utf-8"))
847
+ summary = payload.get("run_summary")
848
+ if not isinstance(summary, dict):
849
+ raise RuntimeError(
850
+ f"reports/{submission_id}.json missing or malformed "
851
+ f"`run_summary` block (got {type(summary).__name__})"
852
+ )
853
+ return summary
854
 
855
 
856
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None: