Michael Rabinovich Cursor commited on
Commit
293bc3b
·
1 Parent(s): 8eb8954

submit: shard large submissions across HF Jobs + merge (UC3)

Browse files

Submissions over SHARD_THRESHOLD (12) fixtures now fan out into
12-fixture shards dispatched at once, polled to terminal with ERROR-only
retry, then merged into one run via write_run_summary (regenerating the
report + gallery from the merged whole) before the row flips. At/under
the threshold a submission stays a single job, so the original path is
unchanged. Eval is CPU-bound, so more shards is the throughput lever.

Co-authored-by: Cursor <cursoragent@cursor.com>

Files changed (1) hide show
  1. submit.py +452 -10
submit.py CHANGED
@@ -108,6 +108,7 @@ from huggingface_hub import (
108
  hf_hub_download,
109
  inspect_job,
110
  run_job,
 
111
  )
112
  from huggingface_hub.errors import EntryNotFoundError
113
 
@@ -127,6 +128,12 @@ SUBMISSION_ID_SLUG_MAX = 40
127
  RESULTS_FILENAME = "results.jsonl"
128
  SUBMISSIONS_DIR = "submissions"
129
  REPORTS_DIR = "reports"
 
 
 
 
 
 
130
  DATA_REV_SHORT_LEN = 12
131
  FAILURE_REASON_MAX_CHARS = 200
132
  SHA256_BLOCK_SIZE = 64 * 1024
@@ -160,6 +167,31 @@ JOB_POLL_DEADLINE_SECONDS = 35 * 60
160
  JOB_LOG_TAIL_LINES = 30
161
  JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5
162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
163
  # One HfApi client per process. HF_TOKEN is picked up from the env at
164
  # construction time and reused for every call.
165
  _HF_API = HfApi()
@@ -270,7 +302,7 @@ def handle_submit(
270
  except _HubWriteError as e:
271
  raise gr.Error(f"Submission rejected: {e}")
272
 
273
- _spawn_worker(submission_id, blob_url)
274
  gr.Info(
275
  f"Submission {submission_id} queued for evaluation "
276
  f"({len(fixture_names)} fixtures). The eval runs on an "
@@ -688,31 +720,48 @@ def _resolve_data_revision() -> str:
688
  # ---------------------------------------------------------------------------
689
 
690
 
691
- def _spawn_worker(submission_id: str, submission_blob_url: str) -> None:
 
 
 
 
692
  """Start the dispatch+poll worker thread.
693
 
694
  Fire-and-forget; daemon=True so a Space restart doesn't block on
695
  in-flight workers (the boot-time sweep below flips any rows their
696
  workers didn't finish to failed). The worker no longer owns any
697
- Space-local files; the Job downloads the zip itself from the Hub.
 
 
698
  """
699
  t = threading.Thread(
700
  target=_run_worker,
701
- args=(submission_id, submission_blob_url),
702
  name=f"cgb-worker-{submission_id}",
703
  daemon=True,
704
  )
705
  t.start()
706
 
707
 
708
- def _run_worker(submission_id: str, submission_blob_url: str) -> None:
709
- """Dispatch the eval Job, poll to completion, flip the row.
 
 
 
 
710
 
711
- Any exception (dispatch, poll, fetch_run_summary, flip) maps to a
712
- ``failed`` row with a short ``failure_reason`` (full traceback goes
713
- to the Space's runtime logs).
 
 
714
  """
715
  try:
 
 
 
 
 
716
  job_id = _dispatch_eval_job(submission_id, submission_blob_url)
717
  logger.info("Dispatched eval job %s for %s", job_id, submission_id)
718
  stage, status_message = _poll_until_done(job_id, submission_id)
@@ -741,15 +790,88 @@ def _run_worker(submission_id: str, submission_blob_url: str) -> None:
741
  )
742
 
743
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
744
  def _dispatch_eval_job(
745
  submission_id: str, submission_blob_url: str,
746
  ) -> str:
747
- """Dispatch the per-submission eval Job and return its id.
 
 
 
 
 
 
 
 
 
748
 
749
  Passes through every env var ``eval_job.py`` needs to resolve the
750
  Hub data + GT repos and the target submissions repo; the Job's
751
  HF_TOKEN secret comes from the Space's own HF_TOKEN env (which
752
  needs Jobs + repo R/W scopes, see space-setup/jobs-migration.md).
 
 
753
  """
754
  token = os.environ.get("HF_TOKEN")
755
  if not token:
@@ -768,6 +890,7 @@ def _dispatch_eval_job(
768
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
769
  command=[
770
  "python", "/opt/eval_job.py", submission_id, submission_blob_url,
 
771
  ],
772
  flavor=EVAL_JOB_FLAVOR,
773
  namespace=EVAL_JOB_NAMESPACE,
@@ -779,6 +902,117 @@ def _dispatch_eval_job(
779
  return job.id
780
 
781
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
782
  def _poll_until_done(
783
  job_id: str, submission_id: str,
784
  ) -> tuple[str, str | None]:
@@ -868,6 +1102,214 @@ def _fetch_run_summary_from_report(submission_id: str) -> dict[str, Any]:
868
  return summary
869
 
870
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
871
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None:
872
  """Merge ``run_summary.json`` fields into the pending row."""
873
  updates: dict[str, Any] = {
 
108
  hf_hub_download,
109
  inspect_job,
110
  run_job,
111
+ snapshot_download,
112
  )
113
  from huggingface_hub.errors import EntryNotFoundError
114
 
 
128
  RESULTS_FILENAME = "results.jsonl"
129
  SUBMISSIONS_DIR = "submissions"
130
  REPORTS_DIR = "reports"
131
+ RENDERS_DIR = "renders"
132
+ # Single canonical view staged per fixture for the leaderboard gallery
133
+ # thumbnail (matches eval_job.py's GALLERY_THUMB_VIEW). The merged-shard
134
+ # path stages these from the merged run dir, exactly as the single-job
135
+ # eval_job does from its in-job run dir.
136
+ GALLERY_THUMB_VIEW = "iso"
137
  DATA_REV_SHORT_LEN = 12
138
  FAILURE_REASON_MAX_CHARS = 200
139
  SHA256_BLOCK_SIZE = 64 * 1024
 
167
  JOB_LOG_TAIL_LINES = 30
168
  JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5
169
 
170
+ # Sharded eval (UC3). A submission with more than SHARD_THRESHOLD
171
+ # fixtures fans out across several jobs of SHARD_CHUNK_SIZE fixtures
172
+ # each, dispatched all at once (HF queues any overflow past the
173
+ # account's ~8 concurrent slots; queueing is a speed variable, never a
174
+ # failure). Each shard uploads its per-fixture dirs under
175
+ # ``reports/<id>/shards/<shard_id>/``; the Space merges them into one
176
+ # run dir, recomputes the aggregate run_summary + report + gallery, and
177
+ # deletes the shards tree. Eval is CPU-bound (tessellation + Manifold
178
+ # booleans), so more machines is the throughput lever. At/under the
179
+ # threshold a submission stays a single job (the original path), so the
180
+ # extra dispatch/merge machinery only kicks in when it pays off.
181
+ SHARD_THRESHOLD = 12
182
+ SHARD_CHUNK_SIZE = 12
183
+ SHARDS_SUBDIR = "shards"
184
+ # ERROR-only retries per shard before the whole submission fails. A
185
+ # shard re-run is idempotent (it re-evals its own fixture slice and
186
+ # overwrites its upload prefix), so one cheap retry absorbs a transient
187
+ # job/runtime blip without re-running the shards that already passed.
188
+ SHARD_MAX_RETRIES = 1
189
+ # Whole-fan-out poll deadline. Each shard job carries its own
190
+ # ``EVAL_JOB_TIMEOUT``; this guards the Space-side poll loop. Generous
191
+ # vs. the per-shard ceiling because queued shards (past the ~8
192
+ # concurrent slots) wait their turn before their own timeout starts.
193
+ SHARD_POLL_DEADLINE_SECONDS = 45 * 60
194
+
195
  # One HfApi client per process. HF_TOKEN is picked up from the env at
196
  # construction time and reused for every call.
197
  _HF_API = HfApi()
 
302
  except _HubWriteError as e:
303
  raise gr.Error(f"Submission rejected: {e}")
304
 
305
+ _spawn_worker(submission_id, blob_url, sorted(fixture_names))
306
  gr.Info(
307
  f"Submission {submission_id} queued for evaluation "
308
  f"({len(fixture_names)} fixtures). The eval runs on an "
 
720
  # ---------------------------------------------------------------------------
721
 
722
 
723
+ def _spawn_worker(
724
+ submission_id: str,
725
+ submission_blob_url: str,
726
+ fixture_names: list[str],
727
+ ) -> None:
728
  """Start the dispatch+poll worker thread.
729
 
730
  Fire-and-forget; daemon=True so a Space restart doesn't block on
731
  in-flight workers (the boot-time sweep below flips any rows their
732
  workers didn't finish to failed). The worker no longer owns any
733
+ Space-local files; the Job(s) download the zip themselves from the
734
+ Hub. *fixture_names* (the validated, dataset-matched set) decides
735
+ single-job vs. sharded dispatch and drives the shard split.
736
  """
737
  t = threading.Thread(
738
  target=_run_worker,
739
+ args=(submission_id, submission_blob_url, fixture_names),
740
  name=f"cgb-worker-{submission_id}",
741
  daemon=True,
742
  )
743
  t.start()
744
 
745
 
746
+ def _run_worker(
747
+ submission_id: str,
748
+ submission_blob_url: str,
749
+ fixture_names: list[str],
750
+ ) -> None:
751
+ """Dispatch the eval Job(s), poll to completion, flip the row.
752
 
753
+ Submissions at/under :data:`SHARD_THRESHOLD` fixtures run as a
754
+ single job (the original path); larger ones fan out across shards
755
+ and merge. Any exception (dispatch, poll, fetch/merge, flip) maps to
756
+ a ``failed`` row with a short ``failure_reason`` (full traceback
757
+ goes to the Space's runtime logs).
758
  """
759
  try:
760
+ if len(fixture_names) > SHARD_THRESHOLD:
761
+ _run_worker_sharded(
762
+ submission_id, submission_blob_url, fixture_names,
763
+ )
764
+ return
765
  job_id = _dispatch_eval_job(submission_id, submission_blob_url)
766
  logger.info("Dispatched eval job %s for %s", job_id, submission_id)
767
  stage, status_message = _poll_until_done(job_id, submission_id)
 
790
  )
791
 
792
 
793
+ def _run_worker_sharded(
794
+ submission_id: str,
795
+ submission_blob_url: str,
796
+ fixture_names: list[str],
797
+ ) -> None:
798
+ """Fan a large submission across shard jobs, then merge + flip.
799
+
800
+ Dispatches every shard at once (HF queues overflow past the
801
+ account's concurrent-job cap), polls all to terminal retrying only
802
+ ERROR shards, then merges each shard's per-fixture dirs into one run
803
+ dir, recomputes the aggregate ``run_summary`` + report + gallery,
804
+ flips the row to ``completed``, and deletes the shards tree. If any
805
+ shard is still ERROR after its retries the row flips to ``failed``
806
+ and the partial shard artifacts are left for a maintainer to
807
+ inspect. Raised exceptions propagate to :func:`_run_worker`'s
808
+ handler, which maps them to a failed row.
809
+ """
810
+ chunks = _chunk_fixtures(fixture_names, SHARD_CHUNK_SIZE)
811
+ shards: dict[str, dict[str, Any]] = {
812
+ f"shard_{i:03d}": {
813
+ "fixtures": chunk,
814
+ "job_id": None,
815
+ "attempts": 0,
816
+ "stage": None,
817
+ "message": None,
818
+ }
819
+ for i, chunk in enumerate(chunks)
820
+ }
821
+ logger.info(
822
+ "Sharded eval for %s: %d fixtures -> %d shard(s)",
823
+ submission_id, len(fixture_names), len(shards),
824
+ )
825
+ for shard_id, st in shards.items():
826
+ _dispatch_shard(submission_id, submission_blob_url, shard_id, st)
827
+
828
+ failures = _poll_shards_until_done(
829
+ submission_id, submission_blob_url, shards,
830
+ )
831
+ if failures:
832
+ reason = ("sharded eval failed: " + "; ".join(failures))[
833
+ :FAILURE_REASON_MAX_CHARS
834
+ ]
835
+ _flip_row_to_failed(submission_id, reason)
836
+ logger.warning("Sharded eval for %s failed: %s", submission_id, reason)
837
+ return
838
+
839
+ summary = _merge_shards_and_publish(
840
+ submission_id, list(shards.keys()), fixture_names,
841
+ )
842
+ _flip_row_to_completed(submission_id, summary)
843
+ logger.info("Sharded worker completed for %s", submission_id)
844
+ _cleanup_shard_artifacts(submission_id)
845
+
846
+
847
+ def _chunk_fixtures(fixtures: list[str], chunk_size: int) -> list[list[str]]:
848
+ """Split *fixtures* into contiguous chunks of at most *chunk_size*."""
849
+ return [
850
+ fixtures[i:i + chunk_size]
851
+ for i in range(0, len(fixtures), chunk_size)
852
+ ]
853
+
854
+
855
  def _dispatch_eval_job(
856
  submission_id: str, submission_blob_url: str,
857
  ) -> str:
858
+ """Dispatch the whole-submission eval Job and return its id."""
859
+ return _dispatch_eval_command(submission_id, submission_blob_url, [])
860
+
861
+
862
+ def _dispatch_eval_command(
863
+ submission_id: str,
864
+ submission_blob_url: str,
865
+ extra_args: list[str],
866
+ ) -> str:
867
+ """Dispatch an eval Job (whole-submission or one shard) and return its id.
868
 
869
  Passes through every env var ``eval_job.py`` needs to resolve the
870
  Hub data + GT repos and the target submissions repo; the Job's
871
  HF_TOKEN secret comes from the Space's own HF_TOKEN env (which
872
  needs Jobs + repo R/W scopes, see space-setup/jobs-migration.md).
873
+ *extra_args* are appended to the entrypoint argv; empty for the
874
+ whole-submission path, ``--shard-id ... --fixtures ...`` for a shard.
875
  """
876
  token = os.environ.get("HF_TOKEN")
877
  if not token:
 
890
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
891
  command=[
892
  "python", "/opt/eval_job.py", submission_id, submission_blob_url,
893
+ *extra_args,
894
  ],
895
  flavor=EVAL_JOB_FLAVOR,
896
  namespace=EVAL_JOB_NAMESPACE,
 
902
  return job.id
903
 
904
 
905
+ def _dispatch_shard(
906
+ submission_id: str,
907
+ submission_blob_url: str,
908
+ shard_id: str,
909
+ state: dict[str, Any],
910
+ ) -> None:
911
+ """Dispatch (or re-dispatch) one shard job and record it in *state*.
912
+
913
+ Mutates *state* in place: sets ``job_id``, bumps ``attempts``, and
914
+ clears the prior ``stage``/``message`` so a retried shard is polled
915
+ fresh. The shard re-evals its own fixture slice and overwrites its
916
+ ``reports/<id>/shards/<shard_id>/`` prefix, so a retry is idempotent.
917
+ """
918
+ job_id = _dispatch_eval_command(
919
+ submission_id,
920
+ submission_blob_url,
921
+ ["--shard-id", shard_id, "--fixtures", ",".join(state["fixtures"])],
922
+ )
923
+ state["job_id"] = job_id
924
+ state["attempts"] += 1
925
+ state["stage"] = None
926
+ state["message"] = None
927
+ logger.info(
928
+ "Dispatched shard %s for %s (attempt %d, job %s, %d fixtures)",
929
+ shard_id, submission_id, state["attempts"], job_id,
930
+ len(state["fixtures"]),
931
+ )
932
+
933
+
934
+ def _poll_shards_until_done(
935
+ submission_id: str,
936
+ submission_blob_url: str,
937
+ shards: dict[str, dict[str, Any]],
938
+ ) -> list[str]:
939
+ """Poll every shard to terminal, retrying only ERROR shards.
940
+
941
+ Mirrors the orchestrator's eval poll loop: a single thread sweeps
942
+ all running shards each tick (``inspect_job`` calls are cheap), an
943
+ ERROR shard re-dispatches up to :data:`SHARD_MAX_RETRIES` times,
944
+ and a non-terminal stage just waits. Returns a list of
945
+ ``"<shard_id>: <reason>"`` strings for shards that stayed ERROR
946
+ after their retries (empty list means every shard COMPLETED).
947
+ Transient ``inspect_job`` failures retry up to
948
+ :data:`JOB_POLL_MAX_CONSECUTIVE_ERRORS` before raising.
949
+ """
950
+ deadline = time.monotonic() + SHARD_POLL_DEADLINE_SECONDS
951
+ consecutive_errors = 0
952
+ while True:
953
+ running = [
954
+ sid for sid, st in shards.items()
955
+ if st["stage"] not in ("COMPLETED", "FAILED")
956
+ ]
957
+ if not running:
958
+ break
959
+ for shard_id in running:
960
+ st = shards[shard_id]
961
+ try:
962
+ info = inspect_job(job_id=st["job_id"])
963
+ consecutive_errors = 0
964
+ except Exception as e: # noqa: BLE001 - retry transient API errors
965
+ consecutive_errors += 1
966
+ logger.warning(
967
+ "inspect_job(%s) for shard %s failed (%d/%d): %s",
968
+ st["job_id"], shard_id, consecutive_errors,
969
+ JOB_POLL_MAX_CONSECUTIVE_ERRORS, e,
970
+ )
971
+ if consecutive_errors >= JOB_POLL_MAX_CONSECUTIVE_ERRORS:
972
+ raise
973
+ break # stop this sweep; sleep then retry
974
+
975
+ stage = info.status.stage
976
+ if stage == "COMPLETED":
977
+ st["stage"] = "COMPLETED"
978
+ logger.info("Shard %s COMPLETED for %s", shard_id, submission_id)
979
+ elif stage == "ERROR":
980
+ if st["attempts"] <= SHARD_MAX_RETRIES:
981
+ logger.warning(
982
+ "Shard %s ERROR; retry %d/%d",
983
+ shard_id, st["attempts"], SHARD_MAX_RETRIES,
984
+ )
985
+ _dispatch_shard(
986
+ submission_id, submission_blob_url, shard_id, st,
987
+ )
988
+ else:
989
+ st["stage"] = "FAILED"
990
+ st["message"] = _job_failure_reason(
991
+ st["job_id"], stage, info.status.message,
992
+ )
993
+ logger.warning(
994
+ "Shard %s FAILED after %d attempt(s): %s",
995
+ shard_id, st["attempts"], st["message"],
996
+ )
997
+
998
+ if time.monotonic() >= deadline:
999
+ for shard_id, st in shards.items():
1000
+ if st["stage"] not in ("COMPLETED", "FAILED"):
1001
+ st["stage"] = "FAILED"
1002
+ st["message"] = (
1003
+ f"Space-side poll deadline exceeded "
1004
+ f"({SHARD_POLL_DEADLINE_SECONDS}s)"
1005
+ )
1006
+ break
1007
+ time.sleep(JOB_POLL_INTERVAL_SECONDS)
1008
+
1009
+ return [
1010
+ f"{sid}: {st['message']}"
1011
+ for sid, st in shards.items()
1012
+ if st["stage"] == "FAILED"
1013
+ ]
1014
+
1015
+
1016
  def _poll_until_done(
1017
  job_id: str, submission_id: str,
1018
  ) -> tuple[str, str | None]:
 
1102
  return summary
1103
 
1104
 
1105
+ def _merge_shards_and_publish(
1106
+ submission_id: str,
1107
+ shard_ids: list[str],
1108
+ fixture_names: list[str],
1109
+ ) -> dict[str, Any]:
1110
+ """Merge every shard's per-fixture dirs into one run + publish results.
1111
+
1112
+ Downloads ``reports/<id>/shards/**`` from the submissions dataset,
1113
+ copies each shard's ``<fixture>/`` dir (``result.json`` + renders)
1114
+ into a single merged run dir, then recomputes the aggregate exactly
1115
+ as a single-job run would: ``write_run_summary`` over the union
1116
+ (the proven merge primitive, importable from the Space's own
1117
+ ``cadgenbench`` install -- no private-repo dependency), a
1118
+ ``report.json`` bundle, an HTML report via the same ``report
1119
+ single`` renderer the job uses, and one ``iso`` gallery thumbnail
1120
+ per fixture. Uploads ``reports/<id>.{html,json}`` + the gallery
1121
+ renders, and returns the merged ``run_summary`` for the row flip.
1122
+
1123
+ Raises if a shard's tree is missing, a fixture appears in two shards,
1124
+ or the merged set doesn't cover every expected fixture -- any of
1125
+ which means the fan-out lost or duplicated work and the row should
1126
+ fail loudly rather than publish a partial aggregate.
1127
+ """
1128
+ # Imported from the Space's own cadgenbench install (the same
1129
+ # package submit.py imports at module load); these are public eval
1130
+ # APIs, not the private orchestrator repo.
1131
+ from cadgenbench.eval.report.single_run import discover_run, generate_html
1132
+ from cadgenbench.eval.run_summary import write_run_summary
1133
+
1134
+ tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-"))
1135
+ try:
1136
+ download_root = Path(
1137
+ snapshot_download(
1138
+ repo_id=HF_SUBMISSIONS_REPO,
1139
+ repo_type="dataset",
1140
+ allow_patterns=[
1141
+ f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}/**"
1142
+ ],
1143
+ local_dir=str(tmp / "dl"),
1144
+ )
1145
+ )
1146
+ shards_root = (
1147
+ download_root / REPORTS_DIR / submission_id / SHARDS_SUBDIR
1148
+ )
1149
+ if not shards_root.is_dir():
1150
+ raise RuntimeError(
1151
+ f"No shard artifacts found under {shards_root} after download."
1152
+ )
1153
+
1154
+ merged_run = tmp / "run"
1155
+ merged_run.mkdir()
1156
+ seen: set[str] = set()
1157
+ for shard_dir in sorted(p for p in shards_root.iterdir() if p.is_dir()):
1158
+ for fixture_dir in sorted(
1159
+ p for p in shard_dir.iterdir() if p.is_dir()
1160
+ ):
1161
+ # Only real fixture dirs carry result.json; skip anything
1162
+ # else the shard upload swept in (e.g. a stray run_summary
1163
+ # subdir would not, but be defensive).
1164
+ if not (fixture_dir / "result.json").is_file():
1165
+ continue
1166
+ name = fixture_dir.name
1167
+ if name in seen:
1168
+ raise RuntimeError(
1169
+ f"Fixture {name!r} present in more than one shard."
1170
+ )
1171
+ seen.add(name)
1172
+ shutil.copytree(fixture_dir, merged_run / name)
1173
+
1174
+ missing = set(fixture_names) - seen
1175
+ if missing:
1176
+ raise RuntimeError(
1177
+ f"Merged run missing {len(missing)} fixture(s) after shard "
1178
+ f"merge: {', '.join(sorted(missing)[:5])}"
1179
+ + ("..." if len(missing) > 5 else "")
1180
+ )
1181
+
1182
+ write_run_summary(merged_run)
1183
+ report_json = _build_report_json(merged_run)
1184
+
1185
+ run_data = discover_run(merged_run)
1186
+ html = generate_html(run_data)
1187
+ html_path = tmp / f"{submission_id}.html"
1188
+ html_path.write_text(html, encoding="utf-8")
1189
+
1190
+ _upload_reports(submission_id, html_path, report_json)
1191
+ _upload_gallery_renders_from_dir(submission_id, merged_run)
1192
+ return report_json["run_summary"]
1193
+ finally:
1194
+ shutil.rmtree(tmp, ignore_errors=True)
1195
+
1196
+
1197
+ def _build_report_json(run_dir: Path) -> dict[str, Any]:
1198
+ """Bundle ``run_summary.json`` + every per-fixture ``result.json``.
1199
+
1200
+ Identical shape to ``eval_job.py``'s ``_build_report_json`` so the
1201
+ merged report matches a single-job report: the row flip reads
1202
+ ``run_summary`` out of this and the bundle is what gets uploaded as
1203
+ ``reports/<id>.json``.
1204
+ """
1205
+ summary_path = run_dir / "run_summary.json"
1206
+ if not summary_path.is_file():
1207
+ raise RuntimeError(
1208
+ f"run_summary.json not produced under {run_dir} (merge issue?)"
1209
+ )
1210
+ summary = json.loads(summary_path.read_text(encoding="utf-8"))
1211
+ per_fixture: dict[str, dict[str, Any]] = {}
1212
+ for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
1213
+ rp = fixture_dir / "result.json"
1214
+ if rp.is_file():
1215
+ per_fixture[fixture_dir.name] = json.loads(
1216
+ rp.read_text(encoding="utf-8")
1217
+ )
1218
+ return {"run_summary": summary, "per_fixture_results": per_fixture}
1219
+
1220
+
1221
+ def _upload_reports(
1222
+ submission_id: str,
1223
+ html_path: Path,
1224
+ report_json: dict[str, Any],
1225
+ ) -> None:
1226
+ """Upload ``reports/<id>.html`` + ``reports/<id>.json`` to the Hub.
1227
+
1228
+ Mirrors ``eval_job.py``'s ``_upload_reports`` so the merged-shard
1229
+ artifacts land at the exact paths the leaderboard + the row-flip
1230
+ expect. Uses the process ``HfApi`` (Space HF_TOKEN env).
1231
+ """
1232
+ _HF_API.upload_file(
1233
+ path_or_fileobj=str(html_path),
1234
+ path_in_repo=f"{REPORTS_DIR}/{submission_id}.html",
1235
+ repo_id=HF_SUBMISSIONS_REPO,
1236
+ repo_type="dataset",
1237
+ commit_message=f"add merged HTML report for {submission_id}",
1238
+ )
1239
+ _HF_API.upload_file(
1240
+ path_or_fileobj=json.dumps(
1241
+ report_json, ensure_ascii=False, indent=2,
1242
+ ).encode("utf-8"),
1243
+ path_in_repo=f"{REPORTS_DIR}/{submission_id}.json",
1244
+ repo_id=HF_SUBMISSIONS_REPO,
1245
+ repo_type="dataset",
1246
+ commit_message=f"add merged JSON report for {submission_id}",
1247
+ )
1248
+ logger.info("Uploaded merged reports/%s.{html,json}", submission_id)
1249
+
1250
+
1251
+ def _upload_gallery_renders_from_dir(
1252
+ submission_id: str,
1253
+ run_dir: Path,
1254
+ ) -> None:
1255
+ """Stage one ``iso`` thumbnail per fixture for the leaderboard gallery.
1256
+
1257
+ Mirrors ``eval_job.py``'s ``_upload_gallery_renders`` but reads from
1258
+ the merged run dir: every ``<run_dir>/<fixture>/renders/iso.png``
1259
+ becomes ``renders/<id>/<fixture>.png``. A fixture with no ``iso.png``
1260
+ (missing output / render that never ran) is skipped, matching the
1261
+ single-job behaviour; the gallery draws the dashed "invalid" cell
1262
+ from the row, so an absent thumbnail is not an error.
1263
+ """
1264
+ staged: list[tuple[Path, str]] = []
1265
+ for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
1266
+ iso_png = fixture_dir / "renders" / f"{GALLERY_THUMB_VIEW}.png"
1267
+ if iso_png.is_file():
1268
+ staged.append((iso_png, fixture_dir.name))
1269
+ if not staged:
1270
+ logger.info("No gallery renders to upload for %s", submission_id)
1271
+ return
1272
+ for iso_png, fixture_name in staged:
1273
+ _HF_API.upload_file(
1274
+ path_or_fileobj=str(iso_png),
1275
+ path_in_repo=(
1276
+ f"{RENDERS_DIR}/{submission_id}/{fixture_name}.png"
1277
+ ),
1278
+ repo_id=HF_SUBMISSIONS_REPO,
1279
+ repo_type="dataset",
1280
+ commit_message=(
1281
+ f"add gallery render {fixture_name} for {submission_id}"
1282
+ ),
1283
+ )
1284
+ logger.info(
1285
+ "Uploaded %d gallery render(s) under %s/%s/",
1286
+ len(staged), RENDERS_DIR, submission_id,
1287
+ )
1288
+
1289
+
1290
+ def _cleanup_shard_artifacts(submission_id: str) -> None:
1291
+ """Delete ``reports/<id>/shards/`` after a successful merge.
1292
+
1293
+ Best-effort: the merged ``reports/<id>.{html,json}`` + gallery are
1294
+ the durable artifacts, so a failed cleanup only leaves recoverable
1295
+ scratch behind and must never fail an otherwise-completed
1296
+ submission.
1297
+ """
1298
+ try:
1299
+ _HF_API.delete_folder(
1300
+ path_in_repo=f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}",
1301
+ repo_id=HF_SUBMISSIONS_REPO,
1302
+ repo_type="dataset",
1303
+ commit_message=f"clean up eval shards for {submission_id}",
1304
+ )
1305
+ logger.info("Cleaned up shard artifacts for %s", submission_id)
1306
+ except Exception as e: # noqa: BLE001 - cleanup is best-effort
1307
+ logger.warning(
1308
+ "Shard-artifact cleanup failed for %s (%s: %s); leaving scratch",
1309
+ submission_id, type(e).__name__, e,
1310
+ )
1311
+
1312
+
1313
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None:
1314
  """Merge ``run_summary.json`` fields into the pending row."""
1315
  updates: dict[str, Any] = {