Michael Rabinovich commited on
Commit
920b1b4
·
1 Parent(s): dd284f3

stage shards via bucket API instead of volume mount

Browse files

Avoids Space volume-mount issues on an existing Space: shard jobs sync
artifacts to the HF bucket via the bucket API, and the Space syncs them
back down to merge. Gated on CADGENBENCH_SHARD_BUCKET.

Files changed (3) hide show
  1. requirements.txt +5 -4
  2. submit.py +52 -48
  3. tests/test_submit.py +42 -24
requirements.txt CHANGED
@@ -12,9 +12,10 @@
12
  gradio[oauth]==5.50.0
13
  gradio-leaderboard==0.0.14
14
  pandas>=2.0
15
- # huggingface_hub >=1.8 for the Jobs Python API plus bucket volume
16
- # mounts. Used by submit.py to dispatch + poll per-submission GPU evals
17
- # and stage sharded artifacts through HF Buckets.
18
- huggingface_hub>=1.8.0
 
19
  datasets>=3.0
20
  requests>=2.31
 
12
  gradio[oauth]==5.50.0
13
  gradio-leaderboard==0.0.14
14
  pandas>=2.0
15
+ # huggingface_hub >=1.16 for the Jobs Python API plus the bucket API
16
+ # (sync_bucket / list_bucket_tree / batch_bucket_files). Used by submit.py
17
+ # to dispatch + poll per-submission GPU evals and to stage/merge sharded
18
+ # artifacts through an HF Storage Bucket (no volume mounts).
19
+ huggingface_hub>=1.16.0
20
  datasets>=3.0
21
  requests>=2.31
submit.py CHANGED
@@ -115,11 +115,6 @@ from huggingface_hub import (
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
118
- try:
119
- from huggingface_hub import Volume
120
- except ImportError: # pragma: no cover - exercised only on old deploy images
121
- Volume = None # type: ignore[assignment]
122
-
123
  import progress
124
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
125
 
@@ -204,22 +199,23 @@ JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5
204
  # fixtures fans out across several jobs of SHARD_CHUNK_SIZE fixtures
205
  # each, dispatched all at once (HF queues any overflow past the
206
  # account's ~8 concurrent slots; queueing is a speed variable, never a
207
- # failure). Each shard stages its per-fixture dirs into a mounted bucket
208
- # when CADGENBENCH_SHARD_BUCKET is set, or under
209
- # ``reports/<id>/shards/<shard_id>/`` in the submissions dataset
210
- # otherwise; the Space merges them into one run dir, recomputes the
211
- # aggregate run_summary + report + gallery, and deletes the shards tree.
212
- # Eval is CPU-bound (tessellation + Manifold booleans), so more machines
213
- # is the throughput lever. At/under the threshold a submission stays a
214
- # single job (the original path), so the extra dispatch/merge machinery
215
- # only kicks in when it pays off.
 
 
216
  SHARD_THRESHOLD = 12
217
  SHARD_CHUNK_SIZE = 12
218
  SHARDS_SUBDIR = "shards"
 
 
219
  SHARD_BUCKET = os.getenv("CADGENBENCH_SHARD_BUCKET", "").strip()
220
- SHARD_BUCKET_MOUNT = os.getenv(
221
- "CADGENBENCH_SHARD_BUCKET_MOUNT", "/mnt/cadgenbench-shards",
222
- ).strip()
223
  SHARD_BUCKET_PREFIX = os.getenv(
224
  "CADGENBENCH_SHARD_BUCKET_PREFIX", SUBMISSIONS_DIR,
225
  ).strip("/")
@@ -270,32 +266,30 @@ def _retry_after_seconds(error: HfHubHTTPError) -> float | None:
270
 
271
 
272
  def _shard_bucket_enabled() -> bool:
273
- """Whether shard scratch should be staged through a mounted bucket."""
274
  return bool(SHARD_BUCKET)
275
 
276
 
277
- def _shard_bucket_source() -> str:
278
- """Return the bucket id accepted by ``huggingface_hub.Volume``."""
279
  source = SHARD_BUCKET
280
  if source.startswith("hf://buckets/"):
281
  source = source[len("hf://buckets/"):]
282
  return source.rstrip("/")
283
 
284
 
285
- def _shard_bucket_relative_root(submission_id: str) -> Path:
286
- """Relative bucket path containing one directory per shard."""
287
  parts = [p for p in SHARD_BUCKET_PREFIX.split("/") if p]
288
- return Path(*parts, submission_id, SHARDS_SUBDIR)
289
 
290
 
291
- def _shard_bucket_root(submission_id: str) -> Path:
292
- """Mounted bucket path containing staged shard artifacts."""
293
- if not SHARD_BUCKET_MOUNT:
294
- raise RuntimeError(
295
- "CADGENBENCH_SHARD_BUCKET is set but "
296
- "CADGENBENCH_SHARD_BUCKET_MOUNT is empty."
297
- )
298
- return Path(SHARD_BUCKET_MOUNT) / _shard_bucket_relative_root(submission_id)
299
 
300
 
301
  def _jobs_token() -> str | None:
@@ -1186,26 +1180,15 @@ def _dispatch_eval_command(
1186
  value = os.environ.get(key)
1187
  if value:
1188
  env[key] = value
1189
- run_kwargs: dict[str, Any] = {}
1190
  if _shard_bucket_enabled() and "--shard-id" in extra_args:
1191
- if Volume is None:
1192
- raise RuntimeError(
1193
- "CADGENBENCH_SHARD_BUCKET requires huggingface_hub>=1.8.0 "
1194
- "for HF Jobs volume mounts."
1195
- )
1196
  env.update(
1197
  {
1198
- "CADGENBENCH_SHARD_BUCKET_MOUNT": SHARD_BUCKET_MOUNT,
1199
  "CADGENBENCH_SHARD_BUCKET_PREFIX": SHARD_BUCKET_PREFIX,
1200
  }
1201
  )
1202
- run_kwargs["volumes"] = [
1203
- Volume(
1204
- type="bucket",
1205
- source=_shard_bucket_source(),
1206
- mount_path=SHARD_BUCKET_MOUNT,
1207
- )
1208
- ]
1209
  job = run_job(
1210
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
1211
  command=[
@@ -1218,7 +1201,6 @@ def _dispatch_eval_command(
1218
  secrets={"HF_TOKEN": token},
1219
  timeout=EVAL_JOB_TIMEOUT,
1220
  token=token,
1221
- **run_kwargs,
1222
  )
1223
  return job.id
1224
 
@@ -1492,7 +1474,13 @@ def _merge_shards_and_publish(
1492
  tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-"))
1493
  try:
1494
  if _shard_bucket_enabled():
1495
- shards_root = _shard_bucket_root(submission_id)
 
 
 
 
 
 
1496
  else:
1497
  download_root = Path(
1498
  snapshot_download(
@@ -1657,7 +1645,7 @@ def _cleanup_shard_artifacts(submission_id: str) -> None:
1657
  """
1658
  try:
1659
  if _shard_bucket_enabled():
1660
- shutil.rmtree(_shard_bucket_root(submission_id), ignore_errors=True)
1661
  else:
1662
  _with_hub_retries(
1663
  lambda: _HF_API.delete_folder(
@@ -1676,6 +1664,22 @@ def _cleanup_shard_artifacts(submission_id: str) -> None:
1676
  )
1677
 
1678
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1679
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None:
1680
  """Merge ``run_summary.json`` fields into the pending row."""
1681
  updates: dict[str, Any] = {
 
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
 
 
 
 
 
118
  import progress
119
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
120
 
 
199
  # fixtures fans out across several jobs of SHARD_CHUNK_SIZE fixtures
200
  # each, dispatched all at once (HF queues any overflow past the
201
  # account's ~8 concurrent slots; queueing is a speed variable, never a
202
+ # failure). When CADGENBENCH_SHARD_BUCKET is set, each shard job syncs its
203
+ # per-fixture dirs into that HF Storage Bucket (via the bucket API, no
204
+ # volume mount) and the Space syncs them back down to merge; otherwise the
205
+ # shard uploads under ``reports/<id>/shards/<shard_id>/`` in the
206
+ # submissions dataset. The bucket path avoids the dataset commit-queue
207
+ # 429s that strand concurrent shard commits. The Space merges into one run
208
+ # dir, recomputes the aggregate run_summary + report + gallery, then
209
+ # deletes the staged shards. Eval is CPU-bound (tessellation + Manifold
210
+ # booleans), so more machines is the throughput lever. At/under the
211
+ # threshold a submission stays a single job (the original path), so the
212
+ # extra dispatch/merge machinery only kicks in when it pays off.
213
  SHARD_THRESHOLD = 12
214
  SHARD_CHUNK_SIZE = 12
215
  SHARDS_SUBDIR = "shards"
216
+ # Bucket id (``namespace/bucket-name``, with or without an ``hf://buckets/``
217
+ # prefix). Empty disables bucket staging and keeps the dataset-repo path.
218
  SHARD_BUCKET = os.getenv("CADGENBENCH_SHARD_BUCKET", "").strip()
 
 
 
219
  SHARD_BUCKET_PREFIX = os.getenv(
220
  "CADGENBENCH_SHARD_BUCKET_PREFIX", SUBMISSIONS_DIR,
221
  ).strip("/")
 
266
 
267
 
268
  def _shard_bucket_enabled() -> bool:
269
+ """Whether shard scratch should be staged through an HF bucket."""
270
  return bool(SHARD_BUCKET)
271
 
272
 
273
+ def _shard_bucket_id() -> str:
274
+ """Return the bucket id (``namespace/bucket-name``), prefix stripped."""
275
  source = SHARD_BUCKET
276
  if source.startswith("hf://buckets/"):
277
  source = source[len("hf://buckets/"):]
278
  return source.rstrip("/")
279
 
280
 
281
+ def _shard_bucket_prefix_path(submission_id: str) -> str:
282
+ """Bucket-relative path holding one directory per shard for *submission_id*."""
283
  parts = [p for p in SHARD_BUCKET_PREFIX.split("/") if p]
284
+ return "/".join([*parts, submission_id, SHARDS_SUBDIR])
285
 
286
 
287
+ def _shard_bucket_uri(submission_id: str) -> str:
288
+ """``hf://buckets/...`` URI of the shards tree for *submission_id*."""
289
+ return (
290
+ f"hf://buckets/{_shard_bucket_id()}/"
291
+ f"{_shard_bucket_prefix_path(submission_id)}"
292
+ )
 
 
293
 
294
 
295
  def _jobs_token() -> str | None:
 
1180
  value = os.environ.get(key)
1181
  if value:
1182
  env[key] = value
 
1183
  if _shard_bucket_enabled() and "--shard-id" in extra_args:
1184
+ # The shard job syncs its artifacts straight to the bucket via the
1185
+ # bucket API (it already has HF_TOKEN); no volume mount is involved.
 
 
 
1186
  env.update(
1187
  {
1188
+ "CADGENBENCH_SHARD_BUCKET": _shard_bucket_id(),
1189
  "CADGENBENCH_SHARD_BUCKET_PREFIX": SHARD_BUCKET_PREFIX,
1190
  }
1191
  )
 
 
 
 
 
 
 
1192
  job = run_job(
1193
  image=f"hf.co/spaces/{EVAL_GPU_SPACE}",
1194
  command=[
 
1201
  secrets={"HF_TOKEN": token},
1202
  timeout=EVAL_JOB_TIMEOUT,
1203
  token=token,
 
1204
  )
1205
  return job.id
1206
 
 
1474
  tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-"))
1475
  try:
1476
  if _shard_bucket_enabled():
1477
+ shards_root = tmp / "dl"
1478
+ shards_root.mkdir(parents=True, exist_ok=True)
1479
+ _HF_API.sync_bucket(
1480
+ source=_shard_bucket_uri(submission_id),
1481
+ dest=str(shards_root),
1482
+ token=_jobs_token(),
1483
+ )
1484
  else:
1485
  download_root = Path(
1486
  snapshot_download(
 
1645
  """
1646
  try:
1647
  if _shard_bucket_enabled():
1648
+ _delete_shard_bucket_prefix(submission_id)
1649
  else:
1650
  _with_hub_retries(
1651
  lambda: _HF_API.delete_folder(
 
1664
  )
1665
 
1666
 
1667
+ def _delete_shard_bucket_prefix(submission_id: str) -> None:
1668
+ """Remove every staged file under the submission's bucket shards prefix."""
1669
+ bucket_id = _shard_bucket_id()
1670
+ prefix = _shard_bucket_prefix_path(submission_id)
1671
+ token = _jobs_token()
1672
+ files = [
1673
+ item.path
1674
+ for item in _HF_API.list_bucket_tree(
1675
+ bucket_id, prefix=prefix, recursive=True, token=token,
1676
+ )
1677
+ if not getattr(item, "is_folder", False) and getattr(item, "path", None)
1678
+ ]
1679
+ if files:
1680
+ _HF_API.batch_bucket_files(bucket_id, delete=files, token=token)
1681
+
1682
+
1683
  def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None:
1684
  """Merge ``run_summary.json`` fields into the pending row."""
1685
  updates: dict[str, Any] = {
tests/test_submit.py CHANGED
@@ -105,14 +105,10 @@ def test_retry_after_header_is_honored(monkeypatch):
105
  assert slept and slept[0] >= 7.0
106
 
107
 
108
- def test_dispatch_shard_mounts_configured_bucket(monkeypatch):
109
- """Bucket-configured shard jobs get a read/write bucket volume."""
110
  captured: dict = {}
111
 
112
- class FakeVolume:
113
- def __init__(self, **kwargs):
114
- self.kwargs = kwargs
115
-
116
  def fake_run_job(**kwargs):
117
  captured.update(kwargs)
118
  return SimpleNamespace(id="job-123")
@@ -122,9 +118,7 @@ def test_dispatch_shard_mounts_configured_bucket(monkeypatch):
122
  submit, "SHARD_BUCKET",
123
  "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging",
124
  )
125
- monkeypatch.setattr(submit, "SHARD_BUCKET_MOUNT", "/mnt/cgb-shards")
126
  monkeypatch.setattr(submit, "SHARD_BUCKET_PREFIX", "submissions")
127
- monkeypatch.setattr(submit, "Volume", FakeVolume)
128
  monkeypatch.setattr(submit, "run_job", fake_run_job)
129
 
130
  job_id = submit._dispatch_eval_command(
@@ -133,17 +127,15 @@ def test_dispatch_shard_mounts_configured_bucket(monkeypatch):
133
  )
134
 
135
  assert job_id == "job-123"
136
- assert captured["env"]["CADGENBENCH_SHARD_BUCKET_MOUNT"] == "/mnt/cgb-shards"
 
 
137
  assert captured["env"]["CADGENBENCH_SHARD_BUCKET_PREFIX"] == "submissions"
138
- volume = captured["volumes"][0]
139
- assert volume.kwargs == {
140
- "type": "bucket",
141
- "source": "HuggingAI4Engineering/cadgenbench-eval-staging",
142
- "mount_path": "/mnt/cgb-shards",
143
- }
144
 
145
 
146
- def test_dispatch_whole_submission_does_not_mount_bucket(monkeypatch):
147
  """Configured bucket staging is only for sharded eval jobs."""
148
  captured: dict = {}
149
 
@@ -161,11 +153,27 @@ def test_dispatch_whole_submission_does_not_mount_bucket(monkeypatch):
161
 
162
  assert job_id == "job-456"
163
  assert "volumes" not in captured
164
- assert "CADGENBENCH_SHARD_BUCKET_MOUNT" not in captured["env"]
165
 
166
 
167
- def test_eval_job_stages_shard_to_mounted_bucket(tmp_path: Path, monkeypatch):
168
- """In bucket mode the eval job copies shard outputs to the mount."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
  eval_job_path = (
170
  Path(__file__).resolve().parents[2]
171
  / "cadgenbench-eval-gpu"
@@ -180,18 +188,28 @@ def test_eval_job_stages_shard_to_mounted_bucket(tmp_path: Path, monkeypatch):
180
  fixture_dir = run_dir / "101"
181
  fixture_dir.mkdir(parents=True)
182
  (fixture_dir / "result.json").write_text("{}", encoding="utf-8")
183
- bucket_mount = tmp_path / "bucket"
184
- bucket_mount.mkdir()
185
 
186
- monkeypatch.setenv(eval_job.SHARD_BUCKET_MOUNT_ENV, str(bucket_mount))
 
 
 
 
 
 
 
 
 
187
  monkeypatch.setenv(eval_job.SHARD_BUCKET_PREFIX_ENV, "submissions")
188
 
189
  eval_job._upload_shard_artifacts(
190
  "sub-1", "shard_000", run_dir, "ignored/submissions", "ignored-token",
191
  )
192
 
193
- staged = bucket_mount / "submissions" / "sub-1" / "shards" / "shard_000"
194
- assert (staged / "101" / "result.json").read_text(encoding="utf-8") == "{}"
 
 
 
195
 
196
 
197
  def test_poll_until_done_uses_jobs_namespace_and_token(monkeypatch):
 
105
  assert slept and slept[0] >= 7.0
106
 
107
 
108
+ def test_dispatch_shard_passes_bucket_env(monkeypatch):
109
+ """Bucket-configured shard jobs get the bucket env, no volume mount."""
110
  captured: dict = {}
111
 
 
 
 
 
112
  def fake_run_job(**kwargs):
113
  captured.update(kwargs)
114
  return SimpleNamespace(id="job-123")
 
118
  submit, "SHARD_BUCKET",
119
  "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging",
120
  )
 
121
  monkeypatch.setattr(submit, "SHARD_BUCKET_PREFIX", "submissions")
 
122
  monkeypatch.setattr(submit, "run_job", fake_run_job)
123
 
124
  job_id = submit._dispatch_eval_command(
 
127
  )
128
 
129
  assert job_id == "job-123"
130
+ assert captured["env"]["CADGENBENCH_SHARD_BUCKET"] == (
131
+ "HuggingAI4Engineering/cadgenbench-eval-staging"
132
+ )
133
  assert captured["env"]["CADGENBENCH_SHARD_BUCKET_PREFIX"] == "submissions"
134
+ # Mount-free: no volume is attached to the job.
135
+ assert "volumes" not in captured
 
 
 
 
136
 
137
 
138
+ def test_dispatch_whole_submission_no_bucket_env(monkeypatch):
139
  """Configured bucket staging is only for sharded eval jobs."""
140
  captured: dict = {}
141
 
 
153
 
154
  assert job_id == "job-456"
155
  assert "volumes" not in captured
156
+ assert "CADGENBENCH_SHARD_BUCKET" not in captured["env"]
157
 
158
 
159
+ def test_shard_bucket_uri_built_from_id_and_prefix(monkeypatch):
160
+ """The bucket URI strips any hf:// prefix and nests submission/shards."""
161
+ monkeypatch.setattr(
162
+ submit, "SHARD_BUCKET",
163
+ "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging",
164
+ )
165
+ monkeypatch.setattr(submit, "SHARD_BUCKET_PREFIX", "submissions")
166
+ assert submit._shard_bucket_id() == (
167
+ "HuggingAI4Engineering/cadgenbench-eval-staging"
168
+ )
169
+ assert submit._shard_bucket_uri("sub-1") == (
170
+ "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging/"
171
+ "submissions/sub-1/shards"
172
+ )
173
+
174
+
175
+ def test_eval_job_syncs_shard_to_bucket(tmp_path: Path, monkeypatch):
176
+ """In bucket mode the eval job syncs shard outputs to the bucket URI."""
177
  eval_job_path = (
178
  Path(__file__).resolve().parents[2]
179
  / "cadgenbench-eval-gpu"
 
188
  fixture_dir = run_dir / "101"
189
  fixture_dir.mkdir(parents=True)
190
  (fixture_dir / "result.json").write_text("{}", encoding="utf-8")
 
 
191
 
192
+ captured: dict = {}
193
+
194
+ def fake_sync_bucket(self, *, source, dest, token=None):
195
+ captured.update(source=source, dest=dest)
196
+
197
+ monkeypatch.setattr(eval_job.HfApi, "sync_bucket", fake_sync_bucket)
198
+ monkeypatch.setenv(
199
+ eval_job.SHARD_BUCKET_ENV,
200
+ "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging",
201
+ )
202
  monkeypatch.setenv(eval_job.SHARD_BUCKET_PREFIX_ENV, "submissions")
203
 
204
  eval_job._upload_shard_artifacts(
205
  "sub-1", "shard_000", run_dir, "ignored/submissions", "ignored-token",
206
  )
207
 
208
+ assert captured["source"] == str(run_dir)
209
+ assert captured["dest"] == (
210
+ "hf://buckets/HuggingAI4Engineering/cadgenbench-eval-staging/"
211
+ "submissions/sub-1/shards/shard_000"
212
+ )
213
 
214
 
215
  def test_poll_until_done_uses_jobs_namespace_and_token(monkeypatch):