Michael Rabinovich commited on
Commit
6ffd043
·
1 Parent(s): 1004adf

stage shard artifacts through HF bucket

Browse files
Files changed (3) hide show
  1. requirements.txt +4 -4
  2. submit.py +100 -30
  3. tests/test_submit.py +90 -0
requirements.txt CHANGED
@@ -12,9 +12,9 @@
12
  gradio[oauth]==5.50.0
13
  gradio-leaderboard==0.0.14
14
  pandas>=2.0
15
- # huggingface_hub >=1.1 for the Jobs Python API (run_job, inspect_job,
16
- # fetch_job_logs). Used by submit.py to dispatch + poll per-submission
17
- # GPU evals on HF Jobs (Step 10, space-setup/jobs-migration.md).
18
- huggingface_hub>=1.1.0
19
  datasets>=3.0
20
  requests>=2.31
 
12
  gradio[oauth]==5.50.0
13
  gradio-leaderboard==0.0.14
14
  pandas>=2.0
15
+ # huggingface_hub >=1.8 for the Jobs Python API plus bucket volume
16
+ # mounts. Used by submit.py to dispatch + poll per-submission GPU evals
17
+ # and stage sharded artifacts through HF Buckets.
18
+ huggingface_hub>=1.8.0
19
  datasets>=3.0
20
  requests>=2.31
submit.py CHANGED
@@ -115,6 +115,11 @@ from huggingface_hub import (
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
 
 
 
 
 
118
  import progress
119
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
120
 
@@ -199,16 +204,25 @@ JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5
199
  # fixtures fans out across several jobs of SHARD_CHUNK_SIZE fixtures
200
  # each, dispatched all at once (HF queues any overflow past the
201
  # account's ~8 concurrent slots; queueing is a speed variable, never a
202
- # failure). Each shard uploads its per-fixture dirs under
203
- # ``reports/<id>/shards/<shard_id>/``; the Space merges them into one
204
- # run dir, recomputes the aggregate run_summary + report + gallery, and
205
- # deletes the shards tree. Eval is CPU-bound (tessellation + Manifold
206
- # booleans), so more machines is the throughput lever. At/under the
207
- # threshold a submission stays a single job (the original path), so the
208
- # extra dispatch/merge machinery only kicks in when it pays off.
 
 
209
  SHARD_THRESHOLD = 12
210
  SHARD_CHUNK_SIZE = 12
211
  SHARDS_SUBDIR = "shards"
 
 
 
 
 
 
 
212
  # ERROR-only retries per shard before the whole submission fails. A
213
  # shard re-run is idempotent (it re-evals its own fixture slice and
214
  # overwrites its upload prefix), so one cheap retry absorbs a transient
@@ -255,6 +269,35 @@ def _retry_after_seconds(error: HfHubHTTPError) -> float | None:
255
  return None
256
 
257
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
258
  def _with_hub_retries(fn, *, what: str):
259
  """Run *fn* (a Hub commit) retrying transient HTTP errors with backoff.
260
 
@@ -1138,6 +1181,26 @@ def _dispatch_eval_command(
1138
  value = os.environ.get(key)
1139
  if value:
1140
  env[key] = value
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1141
  job = run_job(
1142
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
1143
  command=[
@@ -1150,6 +1213,7 @@ def _dispatch_eval_command(
1150
  secrets={"HF_TOKEN": token},
1151
  timeout=EVAL_JOB_TIMEOUT,
1152
  token=token,
 
1153
  )
1154
  return job.id
1155
 
@@ -1165,7 +1229,7 @@ def _dispatch_shard(
1165
  Mutates *state* in place: sets ``job_id``, bumps ``attempts``, and
1166
  clears the prior ``stage``/``message`` so a retried shard is polled
1167
  fresh. The shard re-evals its own fixture slice and overwrites its
1168
- ``reports/<id>/shards/<shard_id>/`` prefix, so a retry is idempotent.
1169
  """
1170
  job_id = _dispatch_eval_command(
1171
  submission_id,
@@ -1408,22 +1472,25 @@ def _merge_shards_and_publish(
1408
 
1409
  tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-"))
1410
  try:
1411
- download_root = Path(
1412
- snapshot_download(
1413
- repo_id=HF_SUBMISSIONS_REPO,
1414
- repo_type="dataset",
1415
- allow_patterns=[
1416
- f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}/**"
1417
- ],
1418
- local_dir=str(tmp / "dl"),
 
 
 
 
 
 
 
1419
  )
1420
- )
1421
- shards_root = (
1422
- download_root / REPORTS_DIR / submission_id / SHARDS_SUBDIR
1423
- )
1424
  if not shards_root.is_dir():
1425
  raise RuntimeError(
1426
- f"No shard artifacts found under {shards_root} after download."
1427
  )
1428
 
1429
  merged_run = tmp / "run"
@@ -1570,15 +1637,18 @@ def _cleanup_shard_artifacts(submission_id: str) -> None:
1570
  submission.
1571
  """
1572
  try:
1573
- _with_hub_retries(
1574
- lambda: _HF_API.delete_folder(
1575
- path_in_repo=f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}",
1576
- repo_id=HF_SUBMISSIONS_REPO,
1577
- repo_type="dataset",
1578
- commit_message=f"clean up eval shards for {submission_id}",
1579
- ),
1580
- what="shard cleanup",
1581
- )
 
 
 
1582
  logger.info("Cleaned up shard artifacts for %s", submission_id)
1583
  except Exception as e: # noqa: BLE001 - cleanup is best-effort
1584
  logger.warning(
 
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
118
+ try:
119
+ from huggingface_hub import Volume
120
+ except ImportError: # pragma: no cover - exercised only on old deploy images
121
+ Volume = None # type: ignore[assignment]
122
+
123
  import progress
124
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
125
 
 
204
  # fixtures fans out across several jobs of SHARD_CHUNK_SIZE fixtures
205
  # each, dispatched all at once (HF queues any overflow past the
206
  # account's ~8 concurrent slots; queueing is a speed variable, never a
207
+ # failure). Each shard stages its per-fixture dirs into a mounted bucket
208
+ # when CADGENBENCH_SHARD_BUCKET is set, or under
209
+ # ``reports/<id>/shards/<shard_id>/`` in the submissions dataset
210
+ # otherwise; the Space merges them into one run dir, recomputes the
211
+ # aggregate run_summary + report + gallery, and deletes the shards tree.
212
+ # Eval is CPU-bound (tessellation + Manifold booleans), so more machines
213
+ # is the throughput lever. At/under the threshold a submission stays a
214
+ # single job (the original path), so the extra dispatch/merge machinery
215
+ # only kicks in when it pays off.
216
  SHARD_THRESHOLD = 12
217
  SHARD_CHUNK_SIZE = 12
218
  SHARDS_SUBDIR = "shards"
219
+ SHARD_BUCKET = os.getenv("CADGENBENCH_SHARD_BUCKET", "").strip()
220
+ SHARD_BUCKET_MOUNT = os.getenv(
221
+ "CADGENBENCH_SHARD_BUCKET_MOUNT", "/mnt/cadgenbench-shards",
222
+ ).strip()
223
+ SHARD_BUCKET_PREFIX = os.getenv(
224
+ "CADGENBENCH_SHARD_BUCKET_PREFIX", SUBMISSIONS_DIR,
225
+ ).strip("/")
226
  # ERROR-only retries per shard before the whole submission fails. A
227
  # shard re-run is idempotent (it re-evals its own fixture slice and
228
  # overwrites its upload prefix), so one cheap retry absorbs a transient
 
269
  return None
270
 
271
 
272
+ def _shard_bucket_enabled() -> bool:
273
+ """Whether shard scratch should be staged through a mounted bucket."""
274
+ return bool(SHARD_BUCKET)
275
+
276
+
277
+ def _shard_bucket_source() -> str:
278
+ """Return the bucket id accepted by ``huggingface_hub.Volume``."""
279
+ source = SHARD_BUCKET
280
+ if source.startswith("hf://buckets/"):
281
+ source = source[len("hf://buckets/"):]
282
+ return source.rstrip("/")
283
+
284
+
285
+ def _shard_bucket_relative_root(submission_id: str) -> Path:
286
+ """Relative bucket path containing one directory per shard."""
287
+ parts = [p for p in SHARD_BUCKET_PREFIX.split("/") if p]
288
+ return Path(*parts, submission_id, SHARDS_SUBDIR)
289
+
290
+
291
+ def _shard_bucket_root(submission_id: str) -> Path:
292
+ """Mounted bucket path containing staged shard artifacts."""
293
+ if not SHARD_BUCKET_MOUNT:
294
+ raise RuntimeError(
295
+ "CADGENBENCH_SHARD_BUCKET is set but "
296
+ "CADGENBENCH_SHARD_BUCKET_MOUNT is empty."
297
+ )
298
+ return Path(SHARD_BUCKET_MOUNT) / _shard_bucket_relative_root(submission_id)
299
+
300
+
301
  def _with_hub_retries(fn, *, what: str):
302
  """Run *fn* (a Hub commit) retrying transient HTTP errors with backoff.
303
 
 
1181
  value = os.environ.get(key)
1182
  if value:
1183
  env[key] = value
1184
+ run_kwargs: dict[str, Any] = {}
1185
+ if _shard_bucket_enabled() and "--shard-id" in extra_args:
1186
+ if Volume is None:
1187
+ raise RuntimeError(
1188
+ "CADGENBENCH_SHARD_BUCKET requires huggingface_hub>=1.8.0 "
1189
+ "for HF Jobs volume mounts."
1190
+ )
1191
+ env.update(
1192
+ {
1193
+ "CADGENBENCH_SHARD_BUCKET_MOUNT": SHARD_BUCKET_MOUNT,
1194
+ "CADGENBENCH_SHARD_BUCKET_PREFIX": SHARD_BUCKET_PREFIX,
1195
+ }
1196
+ )
1197
+ run_kwargs["volumes"] = [
1198
+ Volume(
1199
+ type="bucket",
1200
+ source=_shard_bucket_source(),
1201
+ mount_path=SHARD_BUCKET_MOUNT,
1202
+ )
1203
+ ]
1204
  job = run_job(
1205
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
1206
  command=[
 
1213
  secrets={"HF_TOKEN": token},
1214
  timeout=EVAL_JOB_TIMEOUT,
1215
  token=token,
1216
+ **run_kwargs,
1217
  )
1218
  return job.id
1219
 
 
1229
  Mutates *state* in place: sets ``job_id``, bumps ``attempts``, and
1230
  clears the prior ``stage``/``message`` so a retried shard is polled
1231
  fresh. The shard re-evals its own fixture slice and overwrites its
1232
+ configured shard-staging prefix, so a retry is idempotent.
1233
  """
1234
  job_id = _dispatch_eval_command(
1235
  submission_id,
 
1472
 
1473
  tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-"))
1474
  try:
1475
+ if _shard_bucket_enabled():
1476
+ shards_root = _shard_bucket_root(submission_id)
1477
+ else:
1478
+ download_root = Path(
1479
+ snapshot_download(
1480
+ repo_id=HF_SUBMISSIONS_REPO,
1481
+ repo_type="dataset",
1482
+ allow_patterns=[
1483
+ f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}/**"
1484
+ ],
1485
+ local_dir=str(tmp / "dl"),
1486
+ )
1487
+ )
1488
+ shards_root = (
1489
+ download_root / REPORTS_DIR / submission_id / SHARDS_SUBDIR
1490
  )
 
 
 
 
1491
  if not shards_root.is_dir():
1492
  raise RuntimeError(
1493
+ f"No shard artifacts found under {shards_root}."
1494
  )
1495
 
1496
  merged_run = tmp / "run"
 
1637
  submission.
1638
  """
1639
  try:
1640
+ if _shard_bucket_enabled():
1641
+ shutil.rmtree(_shard_bucket_root(submission_id), ignore_errors=True)
1642
+ else:
1643
+ _with_hub_retries(
1644
+ lambda: _HF_API.delete_folder(
1645
+ path_in_repo=f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}",
1646
+ repo_id=HF_SUBMISSIONS_REPO,
1647
+ repo_type="dataset",
1648
+ commit_message=f"clean up eval shards for {submission_id}",
1649
+ ),
1650
+ what="shard cleanup",
1651
+ )
1652
  logger.info("Cleaned up shard artifacts for %s", submission_id)
1653
  except Exception as e: # noqa: BLE001 - cleanup is best-effort
1654
  logger.warning(
tests/test_submit.py CHANGED
@@ -11,6 +11,7 @@ network traffic.
11
  """
12
  from __future__ import annotations
13
 
 
14
  from pathlib import Path
15
  from types import SimpleNamespace
16
 
@@ -104,6 +105,95 @@ def test_retry_after_header_is_honored(monkeypatch):
104
  assert slept and slept[0] >= 7.0
105
 
106
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  def _stub_meta() -> dict:
108
  """Minimum meta.json shape that survives ``_load_and_validate_meta``."""
109
  return {
 
11
  """
12
  from __future__ import annotations
13
 
14
+ import importlib.util
15
  from pathlib import Path
16
  from types import SimpleNamespace
17
 
 
105
  assert slept and slept[0] >= 7.0
106
 
107
 
108
+ def test_dispatch_shard_mounts_configured_bucket(monkeypatch):
109
+ """Bucket-configured shard jobs get a read/write bucket volume."""
110
+ captured: dict = {}
111
+
112
+ class FakeVolume:
113
+ def __init__(self, **kwargs):
114
+ self.kwargs = kwargs
115
+
116
+ def fake_run_job(**kwargs):
117
+ captured.update(kwargs)
118
+ return SimpleNamespace(id="job-123")
119
+
120
+ monkeypatch.setenv("HF_TOKEN", "hf_test")
121
+ monkeypatch.setattr(
122
+ submit, "SHARD_BUCKET",
123
+ "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging",
124
+ )
125
+ monkeypatch.setattr(submit, "SHARD_BUCKET_MOUNT", "/mnt/cgb-shards")
126
+ monkeypatch.setattr(submit, "SHARD_BUCKET_PREFIX", "submissions")
127
+ monkeypatch.setattr(submit, "Volume", FakeVolume)
128
+ monkeypatch.setattr(submit, "run_job", fake_run_job)
129
+
130
+ job_id = submit._dispatch_eval_command(
131
+ "sub-1", "https://example.test/sub-1.zip",
132
+ ["--shard-id", "shard_000", "--fixtures", "101,102"],
133
+ )
134
+
135
+ assert job_id == "job-123"
136
+ assert captured["env"]["CADGENBENCH_SHARD_BUCKET_MOUNT"] == "/mnt/cgb-shards"
137
+ assert captured["env"]["CADGENBENCH_SHARD_BUCKET_PREFIX"] == "submissions"
138
+ volume = captured["volumes"][0]
139
+ assert volume.kwargs == {
140
+ "type": "bucket",
141
+ "source": "HuggingAI4Engineering/cadgenbench-eval-staging",
142
+ "mount_path": "/mnt/cgb-shards",
143
+ }
144
+
145
+
146
+ def test_dispatch_whole_submission_does_not_mount_bucket(monkeypatch):
147
+ """Configured bucket staging is only for sharded eval jobs."""
148
+ captured: dict = {}
149
+
150
+ def fake_run_job(**kwargs):
151
+ captured.update(kwargs)
152
+ return SimpleNamespace(id="job-456")
153
+
154
+ monkeypatch.setenv("HF_TOKEN", "hf_test")
155
+ monkeypatch.setattr(submit, "SHARD_BUCKET", "org/bucket")
156
+ monkeypatch.setattr(submit, "run_job", fake_run_job)
157
+
158
+ job_id = submit._dispatch_eval_command(
159
+ "sub-1", "https://example.test/sub-1.zip", [],
160
+ )
161
+
162
+ assert job_id == "job-456"
163
+ assert "volumes" not in captured
164
+ assert "CADGENBENCH_SHARD_BUCKET_MOUNT" not in captured["env"]
165
+
166
+
167
+ def test_eval_job_stages_shard_to_mounted_bucket(tmp_path: Path, monkeypatch):
168
+ """In bucket mode the eval job copies shard outputs to the mount."""
169
+ eval_job_path = (
170
+ Path(__file__).resolve().parents[2]
171
+ / "cadgenbench-eval-gpu"
172
+ / "eval_job.py"
173
+ )
174
+ spec = importlib.util.spec_from_file_location("eval_job_for_test", eval_job_path)
175
+ assert spec and spec.loader
176
+ eval_job = importlib.util.module_from_spec(spec)
177
+ spec.loader.exec_module(eval_job)
178
+
179
+ run_dir = tmp_path / "run"
180
+ fixture_dir = run_dir / "101"
181
+ fixture_dir.mkdir(parents=True)
182
+ (fixture_dir / "result.json").write_text("{}", encoding="utf-8")
183
+ bucket_mount = tmp_path / "bucket"
184
+ bucket_mount.mkdir()
185
+
186
+ monkeypatch.setenv(eval_job.SHARD_BUCKET_MOUNT_ENV, str(bucket_mount))
187
+ monkeypatch.setenv(eval_job.SHARD_BUCKET_PREFIX_ENV, "submissions")
188
+
189
+ eval_job._upload_shard_artifacts(
190
+ "sub-1", "shard_000", run_dir, "ignored/submissions", "ignored-token",
191
+ )
192
+
193
+ staged = bucket_mount / "submissions" / "sub-1" / "shards" / "shard_000"
194
+ assert (staged / "101" / "result.json").read_text(encoding="utf-8") == "{}"
195
+
196
+
197
  def _stub_meta() -> dict:
198
  """Minimum meta.json shape that survives ``_load_and_validate_meta``."""
199
  return {