Spaces:
Sleeping
Sleeping
Commit ·
2409b81
1
Parent(s): a83d5d2
HF API dedup @7912416
Browse files- tools/hf_space/runner.py +52 -35
tools/hf_space/runner.py
CHANGED
|
@@ -400,7 +400,10 @@ def _validate_zip_streaming(*, api: HfApi, dataset: str, token: str | None,
|
|
| 400 |
progress_file: Path | None, out,
|
| 401 |
force: bool = False,
|
| 402 |
submission_id: str = "",
|
| 403 |
-
flat_target: Path | None = None
|
|
|
|
|
|
|
|
|
|
| 404 |
"""Validate a zip-bundled dataset by streaming one archive at a time.
|
| 405 |
|
| 406 |
Flow per zip: hf_hub_download → extract → validate.py → capture
|
|
@@ -419,22 +422,29 @@ def _validate_zip_streaming(*, api: HfApi, dataset: str, token: str | None,
|
|
| 419 |
from huggingface_hub import hf_hub_download
|
| 420 |
import zipfile
|
| 421 |
|
| 422 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 423 |
# Unified path: if the dataset has no zip files, synthesize a SINGLE
|
| 424 |
# "unit" representing the whole dataset. snapshot_download has
|
| 425 |
# already (or will) materialize the contents into flat_target;
|
| 426 |
# downstream daemon-pool validation treats it the same as one zip.
|
| 427 |
-
# This removes the previously-separate flat-download branch in
|
| 428 |
-
# run() and gives flat datasets the same: daemon-pool speed,
|
| 429 |
-
# cancel signaling, live progress, per-unit caching.
|
| 430 |
is_flat = not zip_entries
|
| 431 |
if is_flat:
|
| 432 |
if flat_target is None or not flat_target.is_dir():
|
| 433 |
return None # caller must provide the materialized dir
|
| 434 |
-
|
| 435 |
-
|
| 436 |
-
|
| 437 |
-
|
|
|
|
|
|
|
| 438 |
zip_entries = [(dataset, head)]
|
| 439 |
out(f" flat dataset: snapshot at {flat_target}; validator will discover assets")
|
| 440 |
if force:
|
|
@@ -1206,7 +1216,8 @@ def progress_path_for(submission_id: str) -> Path:
|
|
| 1206 |
def _finalize_run(*, dataset: str, profile: str, version: str,
|
| 1207 |
results_json: dict, status: str, summary: str,
|
| 1208 |
out_dir: Path, api: HfApi, token: str | None,
|
| 1209 |
-
open_pr: bool, results_path: Path, out
|
|
|
|
| 1210 |
"""Shared tail-end of run(): file issues, optionally open PR on
|
| 1211 |
dataset, persist report, write cache, return RunResult."""
|
| 1212 |
try:
|
|
@@ -1254,7 +1265,10 @@ def _finalize_run(*, dataset: str, profile: str, version: str,
|
|
| 1254 |
f"({'cancelled' if is_cancelled else 'partial: ' + str(results_json.get('streaming_processed')) + '/' + str(results_json.get('streaming_zips'))})")
|
| 1255 |
else:
|
| 1256 |
try:
|
| 1257 |
-
|
|
|
|
|
|
|
|
|
|
| 1258 |
key = _cache_key(head, profile, _validator_version(), _foundation_sha())
|
| 1259 |
_write_cache(dataset, key, {
|
| 1260 |
"schema_version": 1,
|
|
@@ -1306,30 +1320,30 @@ def run(
|
|
| 1306 |
token = hf_token or os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
|
| 1307 |
api = HfApi(token=token)
|
| 1308 |
|
| 1309 |
-
#
|
| 1310 |
-
#
|
| 1311 |
-
#
|
| 1312 |
-
#
|
| 1313 |
-
|
| 1314 |
-
|
| 1315 |
-
|
| 1316 |
-
|
| 1317 |
-
|
| 1318 |
-
|
| 1319 |
-
|
| 1320 |
-
|
| 1321 |
-
|
| 1322 |
-
|
| 1323 |
-
|
| 1324 |
-
|
| 1325 |
-
|
| 1326 |
-
|
| 1327 |
-
|
| 1328 |
-
|
| 1329 |
-
|
| 1330 |
-
|
| 1331 |
-
|
| 1332 |
-
|
| 1333 |
|
| 1334 |
with tempfile.TemporaryDirectory(prefix=f"hfsp-{dataset.replace('/', '_')}-") as td:
|
| 1335 |
work = Path(td)
|
|
@@ -1387,6 +1401,8 @@ def run(
|
|
| 1387 |
progress_file=prog_path, out=out, force=force,
|
| 1388 |
submission_id=submission_id,
|
| 1389 |
flat_target=flat_target,
|
|
|
|
|
|
|
| 1390 |
)
|
| 1391 |
|
| 1392 |
out_dir = work / "out"
|
|
@@ -1427,4 +1443,5 @@ def run(
|
|
| 1427 |
results_json=results_json, status=status, summary=summary,
|
| 1428 |
out_dir=out_dir, api=api, token=token, open_pr=open_pr,
|
| 1429 |
results_path=results_path, out=out,
|
|
|
|
| 1430 |
)
|
|
|
|
| 400 |
progress_file: Path | None, out,
|
| 401 |
force: bool = False,
|
| 402 |
submission_id: str = "",
|
| 403 |
+
flat_target: Path | None = None,
|
| 404 |
+
prefetched_zip_entries: list | None = None,
|
| 405 |
+
prefetched_dataset_head: str | None = None,
|
| 406 |
+
) -> dict | None:
|
| 407 |
"""Validate a zip-bundled dataset by streaming one archive at a time.
|
| 408 |
|
| 409 |
Flow per zip: hf_hub_download → extract → validate.py → capture
|
|
|
|
| 422 |
from huggingface_hub import hf_hub_download
|
| 423 |
import zipfile
|
| 424 |
|
| 425 |
+
# Use the caller's pre-fetched zip listing + dataset HEAD when
|
| 426 |
+
# available. run() already calls _list_dataset_zips() and
|
| 427 |
+
# repo_info() to decide flat vs zip + populate the dataset-level
|
| 428 |
+
# cache; calling them again here doubled the HF API request count
|
| 429 |
+
# per validation for no value.
|
| 430 |
+
if prefetched_zip_entries is not None:
|
| 431 |
+
zip_entries = prefetched_zip_entries
|
| 432 |
+
else:
|
| 433 |
+
zip_entries = _list_dataset_zips(api, dataset, token)
|
| 434 |
# Unified path: if the dataset has no zip files, synthesize a SINGLE
|
| 435 |
# "unit" representing the whole dataset. snapshot_download has
|
| 436 |
# already (or will) materialize the contents into flat_target;
|
| 437 |
# downstream daemon-pool validation treats it the same as one zip.
|
|
|
|
|
|
|
|
|
|
| 438 |
is_flat = not zip_entries
|
| 439 |
if is_flat:
|
| 440 |
if flat_target is None or not flat_target.is_dir():
|
| 441 |
return None # caller must provide the materialized dir
|
| 442 |
+
head = prefetched_dataset_head
|
| 443 |
+
if head is None:
|
| 444 |
+
try:
|
| 445 |
+
head = api.repo_info(dataset, repo_type="dataset").sha
|
| 446 |
+
except Exception:
|
| 447 |
+
head = ""
|
| 448 |
zip_entries = [(dataset, head)]
|
| 449 |
out(f" flat dataset: snapshot at {flat_target}; validator will discover assets")
|
| 450 |
if force:
|
|
|
|
| 1216 |
def _finalize_run(*, dataset: str, profile: str, version: str,
|
| 1217 |
results_json: dict, status: str, summary: str,
|
| 1218 |
out_dir: Path, api: HfApi, token: str | None,
|
| 1219 |
+
open_pr: bool, results_path: Path, out,
|
| 1220 |
+
dataset_head: str | None = None) -> RunResult:
|
| 1221 |
"""Shared tail-end of run(): file issues, optionally open PR on
|
| 1222 |
dataset, persist report, write cache, return RunResult."""
|
| 1223 |
try:
|
|
|
|
| 1265 |
f"({'cancelled' if is_cancelled else 'partial: ' + str(results_json.get('streaming_processed')) + '/' + str(results_json.get('streaming_zips'))})")
|
| 1266 |
else:
|
| 1267 |
try:
|
| 1268 |
+
# Reuse the pre-resolved HEAD when run() already fetched it.
|
| 1269 |
+
# Falls back to a fresh API call only if the caller didn't
|
| 1270 |
+
# pass one (e.g. legacy call sites).
|
| 1271 |
+
head = dataset_head if dataset_head is not None else api.repo_info(dataset, repo_type="dataset").sha
|
| 1272 |
key = _cache_key(head, profile, _validator_version(), _foundation_sha())
|
| 1273 |
_write_cache(dataset, key, {
|
| 1274 |
"schema_version": 1,
|
|
|
|
| 1320 |
token = hf_token or os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
|
| 1321 |
api = HfApi(token=token)
|
| 1322 |
|
| 1323 |
+
# Resolve the dataset HEAD ONCE up front. Used for: (a) the
|
| 1324 |
+
# dataset-level cache key, (b) the per-unit cache key in the flat
|
| 1325 |
+
# path, (c) the streaming function's "synthetic zip sha" for the
|
| 1326 |
+
# flat unit. Without this, the same metadata was re-fetched from
|
| 1327 |
+
# HF up to 4 times per validation.
|
| 1328 |
+
dataset_head: str | None = None
|
| 1329 |
+
try:
|
| 1330 |
+
dataset_head = api.repo_info(dataset, repo_type="dataset").sha
|
| 1331 |
+
except Exception as e:
|
| 1332 |
+
out(f" ! could not resolve dataset HEAD ({type(e).__name__}: {e}); cache + drift checks skipped")
|
| 1333 |
+
if not force and dataset_head:
|
| 1334 |
+
key = _cache_key(dataset_head, profile, _validator_version(), _foundation_sha())
|
| 1335 |
+
cached = _read_cache(dataset, key)
|
| 1336 |
+
if cached:
|
| 1337 |
+
out(f" cache hit (key={key}, head={dataset_head[:8]}, "
|
| 1338 |
+
f"cached_at={cached.get('cached_at')}); returning without re-running")
|
| 1339 |
+
return RunResult(
|
| 1340 |
+
dataset=dataset, profile=profile, version=version,
|
| 1341 |
+
status=cached["status"], summary=cached["summary"],
|
| 1342 |
+
results_json=cached["results_json"],
|
| 1343 |
+
report_path=Path(cached.get("report_path") or "/tmp"),
|
| 1344 |
+
pr_url=None,
|
| 1345 |
+
)
|
| 1346 |
+
out(f" cache miss (key={key}, head={dataset_head[:8]}); running validator")
|
| 1347 |
|
| 1348 |
with tempfile.TemporaryDirectory(prefix=f"hfsp-{dataset.replace('/', '_')}-") as td:
|
| 1349 |
work = Path(td)
|
|
|
|
| 1401 |
progress_file=prog_path, out=out, force=force,
|
| 1402 |
submission_id=submission_id,
|
| 1403 |
flat_target=flat_target,
|
| 1404 |
+
prefetched_zip_entries=probe_zip_entries,
|
| 1405 |
+
prefetched_dataset_head=dataset_head,
|
| 1406 |
)
|
| 1407 |
|
| 1408 |
out_dir = work / "out"
|
|
|
|
| 1443 |
results_json=results_json, status=status, summary=summary,
|
| 1444 |
out_dir=out_dir, api=api, token=token, open_pr=open_pr,
|
| 1445 |
results_path=results_path, out=out,
|
| 1446 |
+
dataset_head=dataset_head,
|
| 1447 |
)
|