Spaces:
Sleeping
Sleeping
| """Orchestrator: dataset name → validator → verdict PR on the dataset. | |
| Phase-3 of the PRD. Same flow the DGXC `tools/hf_watch/validate.py` | |
| performs, minus: | |
| - the Claude Code subprocess wrapper (we call the validator directly, | |
| keeping the agentic-decision layer for our internal coordinator path) | |
| - the status.json patching (the HF Space is per-dataset; the | |
| coordinator dashboard polls verdicts back via its existing watcher) | |
| - the GitHub commit step (we open an HF Dataset PR instead) | |
| The validator engine itself is unchanged — it's the same simready-report | |
| skill that runs on Windows, on DGXC, and now here. | |
| """ | |
| from __future__ import annotations | |
| import dataclasses | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Iterator | |
| from huggingface_hub import HfApi, snapshot_download | |
| VALIDATOR = ( | |
| Path(__file__).resolve().parent | |
| / "tools" / "validation" / "plugins" / "simready-report" | |
| / "skills" / "simready-report" / "validate.py" | |
| ) | |
| # Zip-bomb guard for untrusted uploaded archives. USD datasets are large | |
| # (textures, meshes), so the cap is generous — 20 GiB uncompressed total | |
| # and 100k members is well past any legitimate single dataset zip. | |
| MAX_EXTRACT_BYTES = 20 * 1024**3 | |
| MAX_EXTRACT_MEMBERS = 100_000 | |
| def _safe_extract_zip(zf, extract_dir) -> None: | |
| """Extract an untrusted zip with path-traversal + symlink + zip-bomb | |
| guards. Rejects the archive (ValueError) on bomb limits; skips any | |
| member that escapes extract_dir or is a symlink. Caller's existing | |
| try/except handles the raise like other validation failures.""" | |
| import stat | |
| infos = zf.infolist() | |
| if len(infos) > MAX_EXTRACT_MEMBERS: | |
| raise ValueError(f"zip rejected: {len(infos)} members exceeds cap {MAX_EXTRACT_MEMBERS}") | |
| total = sum(m.file_size for m in infos) | |
| if total > MAX_EXTRACT_BYTES: | |
| raise ValueError(f"zip rejected: {total} uncompressed bytes exceeds cap {MAX_EXTRACT_BYTES}") | |
| root = os.path.realpath(extract_dir) | |
| for m in infos: | |
| if stat.S_ISLNK(m.external_attr >> 16): | |
| continue # skip symlink members | |
| dest = os.path.realpath(os.path.join(extract_dir, m.filename)) | |
| if not (dest == root or dest.startswith(root + os.sep)): | |
| continue # skip path-traversal members | |
| zf.extract(m, extract_dir) | |
| def _zip_single_root(zf) -> str | None: | |
| """Name of the single top-level directory every entry lives under, or | |
| None if the archive has loose files / multiple top-level entries at | |
| its root. Used by the asset-per-zip path to avoid double-wrapping a | |
| zip that already nests its contents in one <asset>/ folder.""" | |
| roots = set() | |
| for n in zf.namelist(): | |
| n = n.strip("/") | |
| if not n: | |
| continue | |
| head, sep, _ = n.partition("/") | |
| if not sep: | |
| return None # a loose file at the root → not single-rooted | |
| roots.add(head) | |
| if len(roots) > 1: | |
| return None | |
| return next(iter(roots)) if len(roots) == 1 else None | |
| def _kit_available() -> bool: | |
| """Detect whether Isaac Sim's kit-python is reachable on this host. | |
| The validator's --use-kit path re-execs the spec rules inside Kit | |
| so PhysX / MDL rules that need the Kit runtime can actually fire. | |
| Without Kit installed, the validator must run with --no-use-kit | |
| (those rules are silently dropped). This Space detects Kit at | |
| runtime so an Isaac-Sim-enabled image automatically gets full | |
| rule coverage without a code change. | |
| Checks (any one passes → True): | |
| * `kit` binary on PATH (a non-Isaac-Sim Kit also satisfies) | |
| * ISAAC_SIM_PATH or KIT_PATH env var pointing at an existing dir | |
| * Common Isaac Sim Docker install paths | |
| """ | |
| if shutil.which("kit"): | |
| return True | |
| for env_var in ("ISAAC_SIM_PATH", "KIT_PATH"): | |
| p = os.environ.get(env_var) | |
| if p and Path(p).exists(): | |
| return True | |
| for candidate in ( | |
| "/isaac-sim/python.sh", | |
| "/opt/isaac-sim/python.sh", | |
| "/isaac-sim/kit/kit", | |
| ): | |
| if Path(candidate).is_file(): | |
| return True | |
| return False | |
| _KIT_FLAG = "--use-kit" if _kit_available() else "--no-use-kit" | |
| # Same exclude set the DGXC path uses. HF datasets ship a lot of bulk the | |
| # validator doesn't open; skipping shrinks downloads from tens-of-GB to | |
| # hundreds-of-MB on assets like nvidia/PhysicalAI-SimReady-Warehouse-01. | |
| HF_DOWNLOAD_EXCLUDES = ( | |
| ".thumbs/*", | |
| "images/*", | |
| "*_renders/*", "renders/*", | |
| "*.mp4", "*.mov", "*.webm", | |
| "*.jpg", "*.jpeg", "*.png", "*.gif", "*.tiff", "*.tif", | |
| "*.zip", "*.tar", "*.tgz", | |
| ) | |
| USD_EXTS = (".usd", ".usda", ".usdc", ".usdz") | |
| class RunResult: | |
| dataset: str | |
| profile: str | |
| version: str | |
| status: str # "pass" | "warn" | "fail" | "error" | |
| summary: str # one-line human-readable digest | |
| results_json: dict # the validator's results.json contents | |
| report_path: Path # local path to the HTML report tree | |
| pr_url: str | None # discussion URL when --open-pr was used | |
| def _now() -> str: | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| def _wrap_layout_for_validator(downloaded: Path, work: Path) -> Path: | |
| """Pass-through. The validator's discover_assets recurses, so we no | |
| longer need to wrap the download to fit a one-level-deep expectation. | |
| Kept as a hook so runner.run() doesn't churn if we re-add adaptation | |
| later (e.g. for zip-bundled datasets that need extraction first). | |
| """ | |
| return downloaded | |
| def _list_dataset_zips(api: HfApi, dataset: str, token: str | None) -> list[tuple[str, str | None]]: | |
| """Enumerate `(rel_path, content_sha)` for `*.zip` files in the dataset | |
| without downloading anything. content_sha is the per-zip cache key — | |
| prefer the LFS sha256 (large-file pointer) when present, fall back | |
| to the git blob_id. Returns empty list if the listing fails.""" | |
| try: | |
| info = api.repo_info(repo_id=dataset, repo_type="dataset", | |
| files_metadata=True, token=token) | |
| except Exception: | |
| return [] | |
| out_list: list[tuple[str, str | None]] = [] | |
| for sib in (info.siblings or []): | |
| name = getattr(sib, "rfilename", "") or "" | |
| if not name.lower().endswith(".zip"): | |
| continue | |
| sha = None | |
| lfs = getattr(sib, "lfs", None) | |
| if lfs: | |
| sha = (lfs.get("sha256") if isinstance(lfs, dict) | |
| else getattr(lfs, "sha256", None)) | |
| if not sha: | |
| sha = getattr(sib, "blob_id", None) | |
| out_list.append((name, sha)) | |
| return out_list | |
| def _zip_cache_key(zip_sha: str, profile: str, validator_version: str, | |
| foundation_sha: str) -> str: | |
| """Per-zip cache key — sha256 of every input that affects the | |
| validator's verdict for this archive. zip_sha covers content | |
| changes; the surrounding tuple covers rule-source changes. Runner | |
| wrapper-code changes are NOT in the key — re-validating a zip | |
| just because runner.py formatting changed wastes compute for an | |
| identical verdict. Operator forces a fresh run with Shift+Click, | |
| which clears this dataset's per-zip cache before re-streaming.""" | |
| import hashlib | |
| blob = f"{zip_sha}|{profile}|{validator_version}|{foundation_sha}" | |
| return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] | |
| def _safe_name(name: str) -> str: | |
| """Sanitize an untrusted name (dataset / submission_id) for use as a | |
| filesystem path component. Allowlist keeps `.` for legit names, so | |
| we must also collapse any `..` run — otherwise `..` survives and | |
| enables path traversal out of the cache/report/tmp roots.""" | |
| safe = "".join(c if c.isalnum() or c in "-_." else "_" for c in name) | |
| while ".." in safe: | |
| safe = safe.replace("..", "_") | |
| return safe | |
| def _zip_cache_path(dataset: str, key: str) -> Path: | |
| return CACHE_DIR / _safe_name(dataset) / "zips" / f"{key}.json" | |
| def _read_zip_cache(dataset: str, key: str) -> dict | None: | |
| p = _zip_cache_path(dataset, key) | |
| if not p.is_file(): | |
| return None | |
| try: | |
| return json.loads(p.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| return None | |
| def _write_zip_cache(dataset: str, key: str, payload: dict) -> None: | |
| p = _zip_cache_path(dataset, key) | |
| try: | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = p.with_suffix(p.suffix + ".tmp") | |
| tmp.write_text(json.dumps(payload), encoding="utf-8") | |
| os.replace(tmp, p) | |
| except OSError: | |
| pass | |
| def _clear_zip_cache(dataset: str, out) -> None: | |
| """Wipe the per-zip cache for a dataset. Called from the streaming | |
| path when force=True so a forced run actually re-validates every | |
| zip instead of consulting cached entries.""" | |
| zips_dir = CACHE_DIR / _safe_name(dataset) / "zips" | |
| if not zips_dir.is_dir(): | |
| return | |
| try: | |
| n = sum(1 for _ in zips_dir.glob("*.json")) | |
| shutil.rmtree(zips_dir, ignore_errors=True) | |
| out(f" cleared {n} zip cache entries (force)") | |
| except OSError as e: | |
| out(f" ! zip cache clear failed: {type(e).__name__}: {e}") | |
| def _accumulated_progress_rows(merged_results: list) -> list: | |
| """Snapshot merged_results into the live-progress row shape the | |
| dashboard's applyLiveAssetStatus expects: {rel_path, passed, | |
| status, issues_count}. list() is atomic in CPython so we can | |
| snapshot without a lock — a slightly stale snapshot is fine.""" | |
| rows = [] | |
| for asset in list(merged_results): | |
| if not isinstance(asset, dict): | |
| continue | |
| sevs = {(iss.get("severity") or "").lower() for iss in (asset.get("issues") or [])} | |
| if asset.get("passed") and not (sevs & {"error", "failure"}): | |
| status = "warn" if "warning" in sevs else "pass" | |
| else: | |
| status = "fail" | |
| rows.append({ | |
| "rel_path": asset.get("rel_path") or asset.get("name") or "", | |
| "passed": bool(asset.get("passed")), | |
| "status": status, | |
| "issues_count": len(asset.get("issues") or []), | |
| }) | |
| return rows | |
| def _write_streaming_progress(path: Path | None, *, processed: int, total: int, | |
| current: str | None, started_at: str, | |
| state: str = "", | |
| stage: str | None = None, | |
| results: list | None = None) -> None: | |
| """Streaming-mode progress emitter. Same JSON shape as the validator's | |
| per-asset progress so the dashboard's poller reads both transparently; | |
| the counter is at the zip level here instead of the asset level. | |
| `results` is the cumulative per-asset list aggregated across all | |
| zips finished so far (dashboard uses it for the Files expander | |
| pass/fail overlay).""" | |
| if not path: | |
| return | |
| import tempfile as _tf | |
| payload = { | |
| "processed": processed, "total": total, "current": current, | |
| "started_at": started_at, "state": state, | |
| "updated_at": _now(), | |
| "results": (results or [])[-1000:], | |
| } | |
| if stage is not None: | |
| payload["stage"] = stage | |
| try: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| fd, tmp = _tf.mkstemp(prefix=".progress-", dir=str(path.parent)) | |
| with os.fdopen(fd, "w", encoding="utf-8") as f: | |
| json.dump(payload, f) | |
| os.replace(tmp, str(path)) | |
| except OSError: | |
| pass | |
| def _update_progress_stage(path: Path | None, stage: str) -> None: | |
| """Stage-only update — read the existing payload, overwrite the | |
| `stage` and `updated_at` fields, write back atomically. Used to | |
| surface what the validator is currently doing (last stdout line, | |
| current phase, etc.) without disturbing processed/total counters. | |
| Best-effort: silently skips if the file doesn't exist yet, is | |
| mid-write, or can't be parsed.""" | |
| if not path: | |
| return | |
| try: | |
| existing = json.loads(path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| return | |
| existing["stage"] = stage | |
| existing["updated_at"] = _now() | |
| import tempfile as _tf | |
| try: | |
| fd, tmp = _tf.mkstemp(prefix=".progress-", dir=str(path.parent)) | |
| with os.fdopen(fd, "w", encoding="utf-8") as f: | |
| json.dump(existing, f) | |
| os.replace(tmp, str(path)) | |
| except OSError: | |
| pass | |
| def _find_representative_usd(extract_dir: Path) -> Path | None: | |
| """Pick a single USD file likely to be the asset's entry point. | |
| Heuristic priority (lower sort-key = better): | |
| 1. stem matches parent dir name (SimReady bundle convention) | |
| 2. shallower depth | |
| 3. NOT a generic catch-all name like "model.usda" / "main.usd" | |
| (those are often runtime-generated wrappers, not bundle roots) | |
| """ | |
| candidates: list[Path] = [] | |
| for ext in (".usd", ".usda", ".usdc", ".usdz"): | |
| candidates.extend(extract_dir.rglob(f"*{ext}")) | |
| if not candidates: | |
| return None | |
| GENERIC = {"model", "main", "scene", "root", "stage"} | |
| def key(p: Path): | |
| depth = len(p.relative_to(extract_dir).parts) | |
| bundle_match = 0 if p.stem.lower() == p.parent.name.lower() else 1 | |
| is_generic = 1 if p.stem.lower() in GENERIC else 0 | |
| # Order: bundle-name matches first (best signal we're looking | |
| # at an authored asset root), then non-generic names, then | |
| # shallowness. Pure depth as last tiebreaker. | |
| return (bundle_match, is_generic, depth, str(p)) | |
| candidates.sort(key=key) | |
| return candidates[0] | |
| def _detect_profile_from_usd(usd_path: Path, out) -> str | None: | |
| """Open a USD file, look at applied schemas + structure, classify | |
| into the closest existing profile. Returns None on parse failure | |
| (caller falls back to caller-supplied profile). | |
| Detection logic, in priority order: | |
| - Has BOM/Package-* schemas → Package-Candidate | |
| - Has PhysicsArticulationRootAPI → Robot-Body-{Isaac|Runnable|Neutral} | |
| - Single-asset content → Prop-Robotics-{Isaac|Physx|Neutral} | |
| - Many top-level Xforms / sublayers → Package-Candidate (multi-asset bundle) | |
| The Isaac/Physx/Neutral suffix is picked from applied schemas | |
| (Isaac > Physx > Neutral). Note: foundation currently has no | |
| "Scene" profile, so multi-asset scenes route to Package-Candidate | |
| as the closest existing match — operator should request a Scene | |
| profile from the foundation team for proper validation.""" | |
| try: | |
| from pxr import Usd | |
| except ImportError: | |
| out(" (usd-core not available; skipping content detection)") | |
| return None | |
| try: | |
| stage = Usd.Stage.Open(str(usd_path)) | |
| except Exception as e: | |
| out(f" (couldn't open {usd_path.name} for detection: {type(e).__name__}: {e})") | |
| return None | |
| if not stage: | |
| return None | |
| has_articulation = False | |
| has_rigidbody = False | |
| has_isaac = False | |
| has_physx = False | |
| has_bom = False | |
| top_level_xforms = 0 | |
| sublayer_count = len(stage.GetRootLayer().subLayerPaths) | |
| for prim in stage.Traverse(): | |
| if prim.GetPath().pathElementCount == 1 and prim.IsA(Usd.Typed): | |
| type_name = str(prim.GetTypeName()) | |
| if type_name == "Xform": | |
| top_level_xforms += 1 | |
| schemas = list(prim.GetAppliedSchemas()) | |
| for s in schemas: | |
| sl = s.lower() | |
| if "articulationroot" in sl: | |
| has_articulation = True | |
| elif "rigidbody" in sl: | |
| has_rigidbody = True | |
| elif "physx" in sl: | |
| has_physx = True | |
| elif "isaac" in sl: | |
| has_isaac = True | |
| elif "bom" in sl or "packageinfo" in sl: | |
| has_bom = True | |
| # Classify | |
| if has_bom or top_level_xforms >= 5 or sublayer_count >= 3: | |
| # Multi-asset bundle or scene — closest existing profile. | |
| return "Package-Candidate" | |
| if has_articulation: | |
| if has_isaac: return "Robot-Body-Isaac" | |
| if has_physx: return "Robot-Body-Runnable" | |
| return "Robot-Body-Neutral" | |
| # Single-asset prop content | |
| if has_isaac: return "Prop-Robotics-Isaac" | |
| if has_physx or has_rigidbody: return "Prop-Robotics-Physx" | |
| return "Prop-Robotics-Neutral" | |
| def _is_profile_registration_failure(log_file: Path) -> bool: | |
| """Detect the validator's "profile not registered" signature in its | |
| log. The CLI-loader failure mode (omniverse-usd-profiles can't | |
| register because foundation specs reference unknown requirement | |
| codes) IS recoverable by retrying with --use-plugin. The plugin- | |
| discovery failure mode (SimReadyPlugin entry point missing) is | |
| NOT — both code paths go through the same plugin discovery, so | |
| retrying gains nothing and just doubles compute. | |
| Returns True only for the recoverable case.""" | |
| if not log_file.is_file(): | |
| return False | |
| try: | |
| text = log_file.read_text(encoding="utf-8", errors="replace") | |
| except OSError: | |
| return False | |
| # Plugin discovery is the broken layer? Don't retry — both default | |
| # and plugin paths share the same plugin loader. | |
| if ("Discovered plugins: {'omni.asset_validator:DefaultPlugin'}" in text | |
| and "SimReadyPlugin" in text): | |
| return False | |
| if "[CLI-loader] loaded: profiles=0" in text: | |
| return True | |
| if "FATAL: profile " in text and "not registered" in text: | |
| return True | |
| return False | |
| def _is_unrecoverable_plugin_miss(log_file: Path) -> bool: | |
| """The 'SimReadyPlugin entry point not installed' shape. Once we see | |
| this signature, every subsequent zip will fail identically — abort | |
| the streaming loop early instead of running through 800 doomed | |
| validates.""" | |
| if not log_file.is_file(): | |
| return False | |
| try: | |
| text = log_file.read_text(encoding="utf-8", errors="replace") | |
| except OSError: | |
| return False | |
| return ("Plugin allow-list:" in text | |
| and "SimReadyPlugin" in text | |
| and "Discovered plugins: {'omni.asset_validator:DefaultPlugin'}" in text) | |
| def _file_registration_issue(dataset: str, profile: str, val_ver: str, | |
| found_sha: str, log_file: Path, out) -> None: | |
| """File a single GitHub issue documenting the foundation/validator | |
| registration mismatch that triggered the --use-plugin retry. | |
| Best-effort, deduplicated by title via github_issues helpers.""" | |
| try: | |
| from github_issues import _gh_token, _find_issue, _create_issue, _add_comment, scrub_secrets | |
| except Exception as e: | |
| out(f" (issue-filing import failed: {type(e).__name__}: {e})") | |
| return | |
| if not _gh_token(): | |
| out(f" (no GH token; skipping issue filing)") | |
| return | |
| title = f"[validator-internal] CLI loader emits 0 profiles for foundation {found_sha[:8]} + simready-validate {val_ver}" | |
| try: | |
| tail = "" | |
| try: | |
| tail = scrub_secrets("\n".join(log_file.read_text(encoding="utf-8", errors="replace") | |
| .splitlines()[-25:])[:3000]) | |
| except OSError: | |
| pass | |
| body = ( | |
| "**Validator-internal bug** — surfaced by the HF Space streaming-zip path.\n\n" | |
| "The default CLI loader loads 0 profiles when run against the pinned\n" | |
| "foundation specs + simready-validate combination, because features\n" | |
| "reference requirement codes the validator package doesn't have\n" | |
| "registered. Recoverable at runtime via `--use-plugin`, but the\n" | |
| "underlying mismatch should be fixed in either the foundation pin\n" | |
| "or the validator package version.\n\n" | |
| f"| Field | Value |\n|---|---|\n" | |
| f"| Dataset (first hit) | `{dataset}` |\n" | |
| f"| Profile | `{profile}` |\n" | |
| f"| simready-validate | `{val_ver}` |\n" | |
| f"| foundation sha | `{found_sha}` |\n" | |
| f"| Workaround in effect | `--use-plugin` for this run |\n\n" | |
| f"**Loader log tail:**\n\n```\n{tail}\n```\n" | |
| ) | |
| existing = _find_issue(title) | |
| if existing: | |
| _add_comment(existing["number"], | |
| f"Re-hit during validation of `{dataset}`. --use-plugin recovery engaged.") | |
| out(f" internal-issue #{existing['number']}: comment added") | |
| else: | |
| num = _create_issue(title, body, ["validator-internal", "process"]) | |
| out(f" internal-issue #{num}: opened") | |
| except Exception as e: | |
| out(f" (issue filing failed: {type(e).__name__}: {e})") | |
| def _validate_zip_streaming(*, api: HfApi, dataset: str, token: str | None, | |
| work: Path, profile: str, version: str, | |
| progress_file: Path | None, out, | |
| force: bool = False, | |
| submission_id: str = "", | |
| run_token: str = "", | |
| kit_flag: str = _KIT_FLAG, | |
| asset_per_zip: bool = False, | |
| flat_target: Path | None = None, | |
| prefetched_zip_entries: list | None = None, | |
| prefetched_dataset_head: str | None = None, | |
| continue_on_preliminary: bool = False, | |
| ) -> dict | None: | |
| """Validate a zip-bundled dataset by streaming one archive at a time. | |
| DEPRECATED — zip-bundled datasets are not allowed per the foundation | |
| AA.002 spec (allowlist is USD/image/audio only — no .zip) or the | |
| SDK packaging spec (describes unpacked layout only). run() now | |
| fails such datasets at the preliminary check stage before any | |
| download happens. | |
| This function is intentionally retained for the case where the | |
| spec is amended to accept zips as a transport mechanism. The flat- | |
| path code in run() also reuses this function (passing flat_target) | |
| for the unified daemon-pool + cache + cancel + progress machinery | |
| — that path is NOT deprecated. | |
| Flow per zip (deprecated path): hf_hub_download → extract → | |
| validate.py → capture results.json → delete archive + extracted | |
| tree → next. Never holds more than one zip's worth of data on | |
| disk, so works on datasets whose total size doesn't fit on the | |
| Space's ephemeral /tmp. | |
| Returns a results.json-shaped dict aggregating every per-zip run. | |
| The `results` list has each asset's `rel_path` prefixed with its | |
| source zip so dashboard rows can show e.g. | |
| `kitchen_03.zip/kitchen_03/scene.usd`. | |
| Returns None when the dataset has no zips at all AND no flat_target | |
| is provided — caller misconfigured (the strict pre-check in run() | |
| should never let this happen). | |
| """ | |
| from huggingface_hub import hf_hub_download | |
| import zipfile | |
| # Use the caller's pre-fetched zip listing + dataset HEAD when | |
| # available. run() already calls _list_dataset_zips() and | |
| # repo_info() to decide flat vs zip + populate the dataset-level | |
| # cache; calling them again here doubled the HF API request count | |
| # per validation for no value. | |
| if prefetched_zip_entries is not None: | |
| zip_entries = prefetched_zip_entries | |
| else: | |
| zip_entries = _list_dataset_zips(api, dataset, token) | |
| # Unified path: if the dataset has no zip files, synthesize a SINGLE | |
| # "unit" representing the whole dataset. snapshot_download has | |
| # already (or will) materialize the contents into flat_target; | |
| # downstream daemon-pool validation treats it the same as one zip. | |
| is_flat = not zip_entries | |
| if is_flat: | |
| if flat_target is None or not flat_target.is_dir(): | |
| return None # caller must provide the materialized dir | |
| head = prefetched_dataset_head | |
| if head is None: | |
| try: | |
| head = api.repo_info(dataset, repo_type="dataset").sha | |
| except Exception: | |
| head = "" | |
| zip_entries = [(dataset, head)] | |
| out(f" flat dataset: snapshot at {flat_target}; validator will discover assets") | |
| if force: | |
| _clear_zip_cache(dataset, out) | |
| if not is_flat: | |
| out(f" zip-bundled dataset: {len(zip_entries)} zip(s); streaming one at a time" | |
| + (" (force)" if force else "")) | |
| started_at = _now() | |
| # Flat mode: the daemon owns the progress file (writes per-asset | |
| # progress). Streaming-loop's unit-level writes would clobber the | |
| # validator's "k of N assets" with a useless "0/1" / "1/1" counter. | |
| def _emit_unit_progress(**kw): | |
| if is_flat: | |
| return | |
| _write_streaming_progress(progress_file, **kw) | |
| _emit_unit_progress(processed=0, total=len(zip_entries), | |
| current=None, started_at=started_at, state="starting") | |
| merged_results: list[dict] = [] | |
| merged_layout: list[dict] = [] | |
| merged_preliminary: list[dict] = [] | |
| # Set when ANY processed unit's results.json carries | |
| # preliminary_check_failed=true (the validator's strict pre-check fired). | |
| # Propagated into the final dict so the dashboard sees the flag | |
| # and renders the layout-failed banner instead of generic counts. | |
| any_preliminary_check_failed = False | |
| workers = os.environ.get("SR_WORKERS", "4").strip() or "4" | |
| cache_hits = 0 | |
| val_ver = _validator_version() | |
| found_sha = _foundation_sha() | |
| # Always use --use-plugin in the streaming path. With the patched | |
| # foundation wheel installed, SimReadyPlugin's on_startup() loads | |
| # profiles via simready.validate.impl.loader at omni.asset_validator | |
| # import time — fully populated ProfileRegistry before validate.py | |
| # main() runs. The default CLI-loader path still attempts a | |
| # second `load_validation_implementation` call which races with | |
| # the plugin's registration and ends up failing in subtle ways | |
| # (we've observed FATAL: profile X not registered even though | |
| # the plugin succeeded — different code path, different bug). | |
| # Skip the doomed-first-attempt entirely. | |
| use_plugin_default = True | |
| issue_filed_for_registration_bug = False | |
| # Once an issue-filing attempt 404s (token can't reach the repo, | |
| # repo wrong, permissions missing), don't try again this run — | |
| # avoids spamming 30+ 404s in the log. | |
| issue_filing_disabled = False | |
| # Profile auto-detect state — runs once on the first zip's extracted | |
| # tree. If content-detection disagrees with the caller-supplied | |
| # profile, override for all remaining zips in this run. | |
| profile_autodetect_done = False | |
| # Abort after this many consecutive unrecoverable failures so we | |
| # don't burn 800 zips' worth of compute on a known-broken Space. | |
| consecutive_unrecoverable = 0 | |
| UNRECOVERABLE_ABORT_AT = 3 | |
| was_cancelled = False | |
| zips_processed = 0 | |
| # Parallelism strategy — total concurrency = SR_WORKERS in both cases: | |
| # - Zip path (N units, 1 asset/unit after scene-root reduction): | |
| # N daemons × 1 internal worker each. Each daemon processes one | |
| # zip end-to-end; the streaming loop fans out via ThreadPoolExecutor. | |
| # - Flat path (1 unit with many assets): | |
| # 1 daemon × N internal workers. Single daemon's fork pool handles | |
| # the M assets discovered in the snapshot. | |
| # The N²-over-subscription case (N daemons × N workers) is avoided. | |
| if is_flat: | |
| n_daemons = 1 | |
| daemon_workers = workers # SR_WORKERS | |
| else: | |
| # Cap daemon count at the actual number of zips — spawning 8 | |
| # daemons for 1 zip (preliminary mode's sliced workload) wastes | |
| # ~10s of per-daemon spec-load before the first zip even starts. | |
| max_daemons = max(1, int(workers) if str(workers).isdigit() else 1) | |
| n_daemons = min(max_daemons, len(zip_entries)) | |
| daemon_workers = "1" | |
| daemon_pool: list[subprocess.Popen] = [] | |
| daemon_locks: list = [] # threading.Lock per daemon for IO safety | |
| import threading as _threading | |
| import queue as _queue | |
| daemon_cmd = [ | |
| sys.executable, str(VALIDATOR), "--daemon", | |
| "--use-plugin", kit_flag, "--workers", daemon_workers, | |
| "--profile", profile, "--version", version, | |
| ] | |
| # When using Kit, the validator needs the explicit path to Kit's | |
| # Python. The Dockerfile sets SIMREADY_KIT_PYTHON=/isaac-sim/python.sh | |
| # globally; pass it on the command line too so the daemon process | |
| # picks it up reliably regardless of env propagation. | |
| if kit_flag == "--use-kit": | |
| kit_py = os.environ.get("SIMREADY_KIT_PYTHON", "/isaac-sim/python.sh") | |
| if Path(kit_py).exists(): | |
| daemon_cmd += ["--kit-python", kit_py] | |
| if continue_on_preliminary: | |
| daemon_cmd.append("--continue-on-preliminary") | |
| out(f" spawning {n_daemons} validator daemon(s) (spec load happens once each)…") | |
| _update_progress_stage(progress_file, "Loading validator specs") | |
| import time as _t_spawn | |
| for di in range(n_daemons): | |
| try: | |
| proc = subprocess.Popen( | |
| daemon_cmd, | |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, text=True, bufsize=1, | |
| ) | |
| # Wait for __DAEMON_READY__ on stdout (spec load can take 60s+). | |
| # Stream the validator's startup log lines into progress.stage | |
| # (throttled) so the dashboard isn't a black hole while specs | |
| # load. Otherwise the daemon goes silent for a minute and the | |
| # button just sits on "Loading validator specs" with no | |
| # confirmation that anything's happening. | |
| ready = False | |
| last_spawn_update = 0.0 | |
| for _ in range(300): | |
| line = proc.stdout.readline() | |
| if not line: break | |
| if "__DAEMON_READY__" in line: | |
| ready = True | |
| break | |
| stripped = line.strip() | |
| if stripped and not stripped.startswith("__DAEMON_"): | |
| now = _t_spawn.time() | |
| if now - last_spawn_update >= 2.0: | |
| _update_progress_stage(progress_file, stripped[:120]) | |
| last_spawn_update = now | |
| if not ready: | |
| proc.kill() | |
| out(f" daemon[{di}] never reported ready; skipping") | |
| continue | |
| daemon_pool.append(proc) | |
| daemon_locks.append(_threading.Lock()) | |
| out(f" daemon[{di}] ready ({len(daemon_pool)}/{n_daemons})") | |
| _update_progress_stage(progress_file, | |
| f"Validator ready ({len(daemon_pool)}/{n_daemons})") | |
| except Exception as e: | |
| out(f" daemon[{di}] spawn failed ({type(e).__name__}: {e})") | |
| if not daemon_pool: | |
| out(f" no daemons spawned; falling back to per-zip subprocess.call") | |
| # Queue of available daemon indices. Workers check out one, | |
| # validate, return. | |
| available_daemons: _queue.Queue = _queue.Queue() | |
| for idx in range(len(daemon_pool)): | |
| available_daemons.put(idx) | |
| # Shared-state locks for the parallel per-zip workers below. | |
| _state_lock = _threading.Lock() | |
| _stop_event = _threading.Event() | |
| def _process_zip(i: int, zip_rel: str, zip_sha) -> None: | |
| nonlocal cache_hits, zips_processed, profile_autodetect_done | |
| nonlocal profile, consecutive_unrecoverable, was_cancelled | |
| nonlocal use_plugin_default, issue_filed_for_registration_bug | |
| nonlocal issue_filing_disabled, any_preliminary_check_failed | |
| # Honor early abort (cancel or unrecoverable failure) — tasks | |
| # queued before the stop signal still get scheduled and have | |
| # to no-op themselves. | |
| if _stop_event.is_set(): | |
| return | |
| # Cancel check: dashboard's Cancel button POSTs to the Space's | |
| # cancel_run endpoint which creates /tmp/sr-cancel/<id>. Stop | |
| # accepting new work so the in-flight gradio call returns | |
| # promptly with whatever partial results we have instead of | |
| # grinding through hundreds more zips. | |
| if submission_id and _is_cancelled(submission_id, run_token): | |
| with _state_lock: | |
| if not was_cancelled: | |
| out(f" CANCEL signal received — stopping (in-flight tasks finish)") | |
| try: | |
| p = cancel_path_for(submission_id) | |
| if p and p.exists(): | |
| p.unlink() | |
| except OSError: | |
| pass | |
| was_cancelled = True | |
| _stop_event.set() | |
| return | |
| _emit_unit_progress(processed=i, total=len(zip_entries), | |
| current=zip_rel, started_at=started_at, state="zip", | |
| results=_accumulated_progress_rows(merged_results)) | |
| # Cache lookup. Skip when zip_sha is None (HF didn't surface a | |
| # blob sha — rare, defensive). force=True already cleared the | |
| # cache above so the read here will miss. | |
| cache_key = None | |
| if zip_sha: | |
| cache_key = _zip_cache_key(zip_sha, profile, val_ver, found_sha) | |
| cached = _read_zip_cache(dataset, cache_key) if not force else None | |
| if cached: | |
| merged_results.extend(cached.get("results", [])) | |
| merged_layout.extend(cached.get("layout_findings") or []) | |
| if cached.get("preliminary_check_failed"): | |
| any_preliminary_check_failed = True | |
| cache_hits += 1 | |
| out(f" [{i+1}/{len(zip_entries)}] cache hit: {zip_rel} " | |
| f"({len(cached.get('results', []))} asset(s))") | |
| _emit_unit_progress(processed=i + 1, total=len(zip_entries), | |
| current=zip_rel, | |
| started_at=started_at, state="zip", | |
| results=_accumulated_progress_rows(merged_results)) | |
| return | |
| per_zip = work / f"zip_{i:04d}" | |
| per_zip.mkdir(parents=True, exist_ok=True) | |
| if is_flat: | |
| # Flat dataset: caller already materialized the contents in | |
| # flat_target. Treat it as the sole pre-extracted "unit". | |
| # No download, no zip extraction step — just point the | |
| # validator at the snapshot dir directly. | |
| extract_dir = flat_target | |
| else: | |
| extract_dir = per_zip / "extracted" | |
| out_dir = per_zip / "out" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| zip_results: list[dict] = [] | |
| zip_layout: list[dict] = [] | |
| try: | |
| if not is_flat: | |
| out(f" [{i+1}/{len(zip_entries)}] download: {zip_rel}") | |
| _update_progress_stage(progress_file, "Downloading") | |
| downloaded_str = hf_hub_download( | |
| repo_id=dataset, repo_type="dataset", | |
| filename=zip_rel, local_dir=str(per_zip), | |
| token=token, | |
| ) | |
| downloaded = Path(downloaded_str) | |
| if not downloaded.is_file(): | |
| out(f" ! download produced no file at {downloaded}") | |
| return | |
| out(f" [{i+1}/{len(zip_entries)}] extract") | |
| _update_progress_stage(progress_file, "Extracting") | |
| extract_dir.mkdir(parents=True, exist_ok=True) | |
| with zipfile.ZipFile(downloaded) as zf: | |
| # asset-per-zip: this archive IS one asset, so its | |
| # contents must be validated as a single <asset>/ bundle | |
| # — otherwise the entry USD reads as a stray root-USD and | |
| # the asset's own payload subdirs (textures/, geometries/ | |
| # …) read as separate assets missing manifests. Wrap the | |
| # contents in a dir named after the zip, UNLESS the zip | |
| # already nests everything under one root dir (then it's | |
| # its own bundle — extracting flat avoids double-nesting). | |
| if asset_per_zip and _zip_single_root(zf) is None: | |
| unpack_to = extract_dir / (_safe_name(Path(zip_rel).stem) or "asset") | |
| else: | |
| unpack_to = extract_dir | |
| unpack_to.mkdir(parents=True, exist_ok=True) | |
| _safe_extract_zip(zf, unpack_to) | |
| try: downloaded.unlink() | |
| except OSError: pass | |
| # Profile auto-detect — only when the caller explicitly | |
| # asked for "Auto" (the dashboard's default). Any specific | |
| # profile bypasses detection entirely: operator override | |
| # is respected as the source of truth. | |
| if not profile_autodetect_done and profile.lower() == "auto": | |
| _update_progress_stage(progress_file, "Detecting profile") | |
| sample = _find_representative_usd(extract_dir) | |
| if sample is not None: | |
| detected = _detect_profile_from_usd(sample, out) | |
| if detected: | |
| out(f" profile auto-detect: 'Auto' → '{detected}' " | |
| f"(sampled {sample.relative_to(extract_dir)})") | |
| profile = detected | |
| if zip_sha: | |
| cache_key = _zip_cache_key(zip_sha, profile, val_ver, found_sha) | |
| else: | |
| # Detection failed — fall back to a permissive default. | |
| out(f" profile auto-detect: could not classify; falling back to Prop-Robotics-Neutral") | |
| profile = "Prop-Robotics-Neutral" | |
| if zip_sha: | |
| cache_key = _zip_cache_key(zip_sha, profile, val_ver, found_sha) | |
| profile_autodetect_done = True | |
| def _run_validator(use_plugin: bool) -> int: | |
| # Check out a daemon from the pool, send the request, | |
| # read response, return daemon to pool. The pool's | |
| # blocking get() naturally limits concurrent validates | |
| # to len(daemon_pool). Falls back to one-shot | |
| # subprocess.call when no daemons are available. | |
| if daemon_pool: | |
| daemon_idx = available_daemons.get() | |
| try: | |
| proc = daemon_pool[daemon_idx] | |
| if proc.poll() is not None: | |
| # This daemon died — skip pool entry and fall through. | |
| raise RuntimeError(f"daemon[{daemon_idx}] dead") | |
| req = { | |
| "target": str(extract_dir), | |
| "output": str(out_dir), | |
| "profile": profile, | |
| "version": version, | |
| "use_kit": kit_flag == "--use-kit", | |
| } | |
| # Progress-file ownership: | |
| # - Zip path (many units, 1 asset each after | |
| # scene-root reduction): streaming loop owns | |
| # the progress file, emits "k of N zips". Do | |
| # NOT pass progress_file to the daemon — it | |
| # would overwrite the zip counter with "1 of 1". | |
| # - Flat path (1 unit with many assets): unit | |
| # counter is useless ("0/1" / "1/1"). Hand the | |
| # progress file to the daemon so the validator | |
| # emits per-asset progress. Streaming loop | |
| # skips its own writes (see is_flat checks). | |
| if is_flat and progress_file is not None: | |
| req["progress_file"] = str(progress_file) | |
| with daemon_locks[daemon_idx]: | |
| proc.stdin.write(json.dumps(req) + "\n") | |
| proc.stdin.flush() | |
| rc = 99 | |
| import time as _t | |
| last_stage_update = 0.0 | |
| with log_file.open("w", encoding="utf-8") as logf: | |
| for line in proc.stdout: | |
| line = line.rstrip("\n") | |
| if line.startswith("__DAEMON_RESPONSE__"): | |
| try: | |
| payload = json.loads(line[len("__DAEMON_RESPONSE__"):].strip()) | |
| rc = int(payload.get("rc", 99)) | |
| except Exception: | |
| pass | |
| break | |
| logf.write(line + "\n") | |
| # Surface the latest validator output to | |
| # the progress file so the dashboard can | |
| # show what's happening during the long | |
| # opaque phase between profile-detect and | |
| # per-asset progress emission (Kit boot, | |
| # foundation specs load, profile register, | |
| # USD parse). Throttle to once per ~2 s so | |
| # we don't churn the progress file. | |
| stripped = line.strip() | |
| if not stripped or stripped.startswith("__DAEMON_"): | |
| continue | |
| now = _t.time() | |
| if now - last_stage_update >= 2.0: | |
| # Keep payload small; full line is | |
| # already in validator.log if anyone | |
| # needs it. CSS ellipsis-truncates | |
| # the visible button to ~14 chars | |
| # anyway; hover shows what fits. | |
| _update_progress_stage( | |
| progress_file, | |
| stripped[:120], | |
| ) | |
| last_stage_update = now | |
| return rc | |
| except Exception as e: | |
| out(f" daemon[{daemon_idx}] failed ({type(e).__name__}: {e}); falling back to subprocess") | |
| finally: | |
| available_daemons.put(daemon_idx) | |
| # Fallback: one-shot subprocess (original path). | |
| cmd = [ | |
| sys.executable, str(VALIDATOR), str(extract_dir), | |
| "--profile", profile, "--version", version, | |
| "--output", str(out_dir), kit_flag, | |
| "--workers", workers, | |
| ] | |
| if use_plugin: | |
| cmd.append("--use-plugin") | |
| with log_file.open("wb") as logf: | |
| return subprocess.call(cmd, stdout=logf, stderr=subprocess.STDOUT) | |
| log_file = out_dir / "validator.log" | |
| out(f" [{i+1}/{len(zip_entries)}] validate" | |
| + (" (--use-plugin)" if use_plugin_default else "")) | |
| _update_progress_stage(progress_file, "Booting validator") | |
| rc = _run_validator(use_plugin=use_plugin_default) | |
| results_path = out_dir / "results.json" | |
| # Deterministic recovery: if the default loader produced | |
| # the "profile not registered" signature, retry the same | |
| # zip with --use-plugin. If that works, promote it to the | |
| # default for every remaining zip. | |
| if (not results_path.is_file() and not use_plugin_default | |
| and _is_profile_registration_failure(log_file)): | |
| out(f" detected loader-registration failure; retrying with --use-plugin") | |
| if not issue_filed_for_registration_bug and not issue_filing_disabled: | |
| try: | |
| _file_registration_issue(dataset, profile, val_ver, found_sha, | |
| log_file, out) | |
| issue_filed_for_registration_bug = True | |
| except Exception as e: | |
| if "404" in str(e): | |
| out(f" issue filing 404'd; disabling for the rest of this run") | |
| issue_filing_disabled = True | |
| rc = _run_validator(use_plugin=True) | |
| if results_path.is_file(): | |
| out(f" --use-plugin recovered; switching default for remaining zips") | |
| use_plugin_default = True | |
| # Unrecoverable: SimReadyPlugin entry point not installed. | |
| # Both loader paths go through the same plugin discovery, | |
| # so retrying won't help. Track consecutive failures and | |
| # abort the loop after N to avoid wasting compute. | |
| if not results_path.is_file() and _is_unrecoverable_plugin_miss(log_file): | |
| consecutive_unrecoverable += 1 | |
| if consecutive_unrecoverable >= UNRECOVERABLE_ABORT_AT: | |
| out(f" ABORTING: {consecutive_unrecoverable} consecutive failures with " | |
| f"'SimReadyPlugin not discovered' — the foundation entry point isn't " | |
| f"installed on this Space, validator cannot proceed regardless of " | |
| f"how many zips we try") | |
| shutil.rmtree(per_zip, ignore_errors=True) | |
| # Signal all other in-flight tasks to stop early. | |
| _stop_event.set() | |
| return | |
| elif results_path.is_file(): | |
| consecutive_unrecoverable = 0 | |
| zips_processed += 1 | |
| if results_path.is_file(): | |
| try: | |
| rj = json.loads(results_path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError: | |
| rj = {} | |
| for asset in rj.get("results", []): | |
| asset_rel = (asset.get("rel_path") or "").lstrip("./") | |
| if is_flat or asset_per_zip: | |
| # Both already carry a short, asset-relative path: flat | |
| # datasets give "<asset>/<file>", and asset-per-zip | |
| # extracted under a <stem>/ wrap dir named after the | |
| # asset, so the validator's rel is "<asset>/<file>" too | |
| # (e.g. "var_galley_0137d9bc/model.usda"). Use it as-is. | |
| # Prefixing the zip's dataset location would duplicate | |
| # the asset name (scenes/<asset>/<asset>/…). | |
| asset["rel_path"] = asset_rel | |
| else: | |
| # Single archive bundling many assets: prefix the zip's | |
| # path (minus the ".zip", which otherwise reads like a | |
| # naming-convention violation) so each asset shows which | |
| # archive it came from. | |
| zip_disp = zip_rel[:-4] if zip_rel[-4:].lower() == ".zip" else zip_rel | |
| asset["rel_path"] = f"{zip_disp}/{asset_rel}".replace("//", "/") | |
| zip_results = rj.get("results", []) | |
| zip_layout = rj.get("layout_findings") or [] | |
| zip_preliminary = rj.get("preliminary_findings") or [] | |
| merged_results.extend(zip_results) | |
| merged_layout.extend(zip_layout) | |
| merged_preliminary.extend(zip_preliminary) | |
| if rj.get("preliminary_check_failed"): | |
| any_preliminary_check_failed = True | |
| out(f" {len(zip_results)} asset(s); rc={rc}") | |
| # Emit a progress write so the dashboard sees the | |
| # updated zip-count + per-asset rows immediately | |
| # (next poll picks them up). Without this the chip | |
| # only updates on the NEXT zip's "zip" state write. | |
| _emit_unit_progress(processed=i + 1, total=len(zip_entries), | |
| current=zip_rel, | |
| started_at=started_at, state="zip", | |
| results=_accumulated_progress_rows(merged_results)) | |
| # Write per-zip cache entry on successful validation. We | |
| # cache even when rc!=0 IF results.json was produced — | |
| # the validator may exit 1 to signal failures-present | |
| # while still having emitted a valid report. | |
| if cache_key and rj: | |
| _write_zip_cache(dataset, cache_key, { | |
| "schema_version": 1, | |
| "zip_rel": zip_rel, | |
| "zip_sha": zip_sha, | |
| "results": zip_results, | |
| "layout_findings": zip_layout, | |
| "preliminary_check_failed": bool(rj.get("preliminary_check_failed")), | |
| "validator_version": val_ver, | |
| "foundation_sha": found_sha, | |
| "profile": profile, | |
| "cached_at": _now(), | |
| }) | |
| else: | |
| # Diagnostic: dump the validator's own log tail into | |
| # the Space log so we can see WHY the zip failed. | |
| # Without this we just see "rc=N" lines forever and | |
| # have no idea what the validator was complaining about. | |
| tail_lines: list[str] = [] | |
| if log_file.is_file(): | |
| try: | |
| text = log_file.read_text(encoding="utf-8", errors="replace") | |
| tail_lines = text.splitlines()[-20:] | |
| except OSError: | |
| pass | |
| # Also list what's actually in the extracted tree to | |
| # diagnose "extracted but no USDs found" cases — common | |
| # if the zip has USDs nested deeper than discover_assets | |
| # walks, or uses an extension we don't recognize. | |
| tree_sample: list[str] = [] | |
| try: | |
| files = sorted(p for p in extract_dir.rglob("*") if p.is_file()) | |
| for p in files[:8]: | |
| rel = p.relative_to(extract_dir) | |
| tree_sample.append(f" {rel}") | |
| if len(files) > 8: | |
| tree_sample.append(f" ... and {len(files) - 8} more") | |
| except OSError: | |
| pass | |
| out(f" ! no results.json (rc={rc})") | |
| if tree_sample: | |
| out(f" extracted tree ({sum(1 for _ in extract_dir.rglob('*') if _.is_file())} files):") | |
| for line in tree_sample: | |
| out(line) | |
| if tail_lines: | |
| out(f" validator log tail:") | |
| for line in tail_lines: | |
| out(f" {line[:240]}") | |
| except Exception as e: | |
| out(f" ! [{i+1}/{len(zip_entries)}] {type(e).__name__}: {e}") | |
| finally: | |
| shutil.rmtree(per_zip, ignore_errors=True) | |
| # Dispatch all zips to the daemon pool via a thread pool. Concurrency | |
| # is bounded by max_workers (= number of live daemons). Each thread | |
| # runs _process_zip for one zip end-to-end (download + extract + | |
| # validate). Cancel signal causes pending tasks to no-op via the | |
| # _stop_event check at function entry. | |
| import concurrent.futures as _futures | |
| n_parallel = max(1, len(daemon_pool)) | |
| out(f" dispatching {len(zip_entries)} zip(s) across {n_parallel} parallel worker(s)") | |
| with _futures.ThreadPoolExecutor(max_workers=n_parallel) as _ex: | |
| _all_futures = [ | |
| _ex.submit(_process_zip, i, zr, zs) | |
| for i, (zr, zs) in enumerate(zip_entries) | |
| ] | |
| for _fut in _futures.as_completed(_all_futures): | |
| try: | |
| _fut.result() | |
| except Exception as _e: | |
| out(f" ! task crashed: {type(_e).__name__}: {_e}") | |
| # Teardown daemon pool. Close stdins so daemons exit cleanly; | |
| # short wait then kill to bound shutdown time. | |
| for proc in daemon_pool: | |
| try: | |
| proc.stdin.close() | |
| except Exception: | |
| pass | |
| for proc in daemon_pool: | |
| try: | |
| proc.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| except Exception: | |
| pass | |
| _emit_unit_progress(processed=len(zip_entries), total=len(zip_entries), | |
| current=None, started_at=started_at, state="done", | |
| results=_accumulated_progress_rows(merged_results)) | |
| out(f" zip-streaming done: {cache_hits} cached, " | |
| f"{zips_processed} freshly validated" | |
| + (f", CANCELLED after {zips_processed + cache_hits} of {len(zip_entries)}" if was_cancelled else "")) | |
| return { | |
| "schema_version": 1, | |
| "results": merged_results, | |
| "layout_findings": merged_layout, | |
| "preliminary_findings": merged_preliminary, | |
| "preliminary_check_failed": any_preliminary_check_failed, | |
| "profile_coverage": {}, | |
| "streaming_zips": len(zip_entries), | |
| "streaming_cache_hits": cache_hits, | |
| "streaming_processed": zips_processed + cache_hits, | |
| "cancelled": was_cancelled, | |
| } | |
| def _summarize(results_json: dict) -> tuple[str, str]: | |
| """Return (status, one-line summary).""" | |
| # Preliminary-check failures short-circuit the normal | |
| # "M/N assets passed" framing — the dataset didn't get to USD | |
| # validation because filesystem-only foundation checks already | |
| # flagged issues. The summary names the phase so the operator | |
| # knows what to do (forward the report to the partner; address | |
| # these before re-validating to surface deeper USD findings). | |
| if results_json.get("preliminary_check_failed"): | |
| # Count actual issues by summing across results — robust to | |
| # whichever sidecar field the validator populated. | |
| violations = sum(len(r.get("issues") or []) | |
| for r in (results_json.get("results") or [])) | |
| if violations == 0: | |
| # Fall back to the sidecar list when results is empty | |
| # (shouldn't happen, defensive). | |
| violations = len(results_json.get("preliminary_findings") | |
| or results_json.get("layout_findings") or []) | |
| files_affected = len(results_json.get("results") or []) | |
| # Per-code breakdown for the chip text — the partner-facing | |
| # summary is more useful when it names the failing rules. | |
| code_counts: dict[str, int] = {} | |
| for r in (results_json.get("results") or []): | |
| for iss in (r.get("issues") or []): | |
| c = iss.get("code") or "UNKNOWN" | |
| code_counts[c] = code_counts.get(c, 0) + 1 | |
| top_codes = sorted(code_counts.items(), key=lambda kv: -kv[1])[:3] | |
| codes_text = ", ".join(f"{c} ×{n}" for c, n in top_codes) if top_codes else "0 issues" | |
| return "fail", (f"PRELIMINARY CHECK FAILED — {codes_text} " | |
| f"({files_affected} file(s) affected). Address these " | |
| f"before deeper validation runs.") | |
| counts = {"error": 0, "failure": 0, "warning": 0} | |
| total = len(results_json.get("results", [])) | |
| failed = 0 | |
| for asset in results_json.get("results", []): | |
| if not asset.get("passed"): | |
| failed += 1 | |
| for issue in asset.get("issues", []): | |
| sev = (issue.get("severity") or "").lower() | |
| if sev in counts: | |
| counts[sev] += 1 | |
| if counts["error"] or counts["failure"]: | |
| status = "fail" | |
| elif counts["warning"]: | |
| status = "warn" | |
| elif total > 0: | |
| status = "pass" | |
| else: | |
| status = "warn" | |
| parts = [f"{total - failed}/{total} assets passed"] | |
| parts += [f"{k}={v}" for k, v in counts.items() if v] | |
| coverage = results_json.get("profile_coverage") or {} | |
| if coverage.get("missing"): | |
| parts.append(f"coverage {coverage.get('loaded')}/{coverage.get('declared')} features") | |
| return status, " · ".join(parts) | |
| def _open_verdict_pr( | |
| api: HfApi, dataset: str, results_path: Path, report_dir: Path, | |
| profile: str, version: str, status: str, summary: str, | |
| ) -> str | None: | |
| """Upload `validation/results.json` + `validation/report/` to the dataset | |
| as a PR. Returns the discussion URL. | |
| Why PR rather than commit-to-main: the dataset owner reviews the | |
| verdict like any other change. The HF Hub PR flow is exactly the | |
| surface the production end-state assumes — see PRD §3. | |
| """ | |
| import io | |
| pr_branch = f"simready-validate/{profile}-v{version}-{_now().replace(':', '-')}" | |
| body_md = ( | |
| f"### SimReady validation\n\n" | |
| f"- **Profile**: `{profile}` v{version}\n" | |
| f"- **Status**: **{status.upper()}**\n" | |
| f"- **Summary**: {summary}\n" | |
| f"- **Generated**: {_now()}\n\n" | |
| f"Run by the SimReady Validator HF Space. The full HTML report " | |
| f"is in `validation/report/index.html`; machine-readable " | |
| f"results in `validation/results.json`.\n" | |
| ) | |
| # Stage everything that should land in the dataset under a single | |
| # tree we can iterate. `validation/results.json` plus the entire | |
| # `validation/report/` directory. | |
| additions: list[tuple[str, bytes]] = [] | |
| additions.append(("validation/results.json", results_path.read_bytes())) | |
| for path in report_dir.rglob("*"): | |
| if path.is_file(): | |
| rel = path.relative_to(report_dir.parent) # keep `report/...` | |
| additions.append((f"validation/{rel.as_posix()}", path.read_bytes())) | |
| from huggingface_hub import CommitOperationAdd | |
| operations = [ | |
| CommitOperationAdd(path_in_repo=p, path_or_fileobj=io.BytesIO(b)) | |
| for p, b in additions | |
| ] | |
| commit = api.create_commit( | |
| repo_id=dataset, repo_type="dataset", | |
| operations=operations, | |
| commit_message=f"simready-validate: {profile} v{version} → {status}", | |
| create_pr=True, | |
| ) | |
| # `create_pr=True` returns the PR's revision; the discussion URL is | |
| # derivable from it. HfApi exposes the field but its key name has | |
| # varied across versions — fall back gracefully. | |
| return getattr(commit, "pr_url", None) or getattr(commit, "discussion_url", None) | |
| PROGRESS_DIR = Path("/tmp/sr-progress") | |
| # Cancel signal directory. The streaming-zip loop checks for the | |
| # existence of /tmp/sr-cancel/<submission_id> between zips; presence | |
| # means abort. Set by the Space's cancel_run gradio endpoint (called | |
| # from the dashboard when the operator clicks Cancel) — the GH Action | |
| # cancel alone doesn't stop the in-flight gradio call server-side. | |
| CANCEL_DIR = Path("/tmp/sr-cancel") | |
| def cancel_path_for(submission_id: str) -> Path | None: | |
| if not submission_id: | |
| return None | |
| return CANCEL_DIR / _safe_name(submission_id) | |
| def _is_cancelled(submission_id: str, run_token: str = "") -> bool: | |
| """True only when a cancel flag exists AND it targets THIS run. | |
| The flag's content is the run_token the dashboard read from the | |
| progress endpoint and echoed back to cancel_run. A flag whose | |
| content doesn't match the currently-running token is stale — left | |
| over from a previous run of the same submission that ended before | |
| consuming it — and is ignored. That's the "never stale" guarantee: | |
| a cancel can only abort the exact run it was issued against, so a | |
| leftover flag can never kill a fresh force-run (the bug this fixes). | |
| No run_token on either side (legacy caller / a run with no token) | |
| falls back to presence-only so cancel still works.""" | |
| p = cancel_path_for(submission_id) | |
| if not (p and p.is_file()): | |
| return False | |
| if not run_token: | |
| return True | |
| try: | |
| return p.read_text(encoding="utf-8").strip() == run_token | |
| except OSError: | |
| return False | |
| # Persistent volume mounted on the Space — survives container restarts. | |
| # See space_info().runtime.raw["volumes"]: nvidia/simready-validator-storage | |
| # is mounted at /data. We keep results.json + the summary keyed by the | |
| # four-tuple that determines "would this run produce the same answer?" | |
| # When the next call matches all four, we serve the cached result | |
| # instead of paying ~5 min for an identical re-run. | |
| CACHE_DIR = Path("/data/sr-cache") | |
| def _cache_key(dataset_head: str, profile: str, validator_version: str, | |
| foundation_sha: str) -> str: | |
| """Stable key over every input that determines the verdict. Runner | |
| wrapper-code changes are intentionally NOT in the key — they don't | |
| change what assets passed/failed, only how the result is shaped on | |
| its way out. Shift+Click is the operator's escape valve when they | |
| actually want a fresh re-run.""" | |
| import hashlib | |
| blob = f"{dataset_head}|{profile}|{validator_version}|{foundation_sha}" | |
| return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] | |
| def _cache_path_for(dataset: str, key: str) -> Path: | |
| """One file per dataset+key. Dataset name in the path so an operator | |
| can browse the cache by partner.""" | |
| return CACHE_DIR / _safe_name(dataset) / f"{key}.json" | |
| def _foundation_sha() -> str: | |
| """Pinned commit of NVIDIA/simready-foundation that the Space was | |
| built against. Set by the Dockerfile (ENV SIMREADY_FOUNDATIONS_COMMIT).""" | |
| return os.environ.get("SIMREADY_FOUNDATIONS_COMMIT", "unpinned") | |
| # Path to the spec-sync state file. The state file declares which | |
| # foundation commit our hardcoded validator rules were last aligned | |
| # against. The Space's checkout includes the file at this relative | |
| # path (tools/spec_sync/state.json in the repo). | |
| _SPEC_SYNC_STATE_FILE = Path(__file__).resolve().parents[2] / "tools" / "spec_sync" / "state.json" | |
| def _check_foundation_spec_drift() -> dict: | |
| """Check whether NVIDIA/simready-foundation has new commits to its | |
| spec dir since we last synced our hardcoded rules. | |
| Cheap event-driven drift detection (one GitHub API call per run, | |
| soft-fails on errors). The validator surfaces drift in results.json | |
| so the dashboard can warn operators that hardcoded rules may be | |
| stale relative to the source of truth. Auto-update is the | |
| follow-up step (spec-sync workflow opens a PR to refresh rules). | |
| Returns a dict the dashboard renders; never raises. | |
| """ | |
| out: dict = {"checked": False} | |
| try: | |
| if not _SPEC_SYNC_STATE_FILE.is_file(): | |
| out["reason"] = "state file missing" | |
| return out | |
| state = json.loads(_SPEC_SYNC_STATE_FILE.read_text(encoding="utf-8")) | |
| last_sync_sha = state.get("foundation_commit_sha") or "" | |
| last_sync_at = state.get("synced_at") or "" | |
| watched_path = state.get("watched_path") or "nv_core/sr_specs/docs" | |
| repo = state.get("foundation_repo") or "NVIDIA/simready-foundation" | |
| except Exception as e: | |
| out["reason"] = f"could not read state: {type(e).__name__}: {e}" | |
| return out | |
| try: | |
| import urllib.request | |
| url = (f"https://api.github.com/repos/{repo}/commits" | |
| f"?path={watched_path}&per_page=1") | |
| req = urllib.request.Request(url) | |
| req.add_header("Accept", "application/vnd.github.v3+json") | |
| token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_VALIDATOR_TOKEN") | |
| if token: | |
| req.add_header("Authorization", f"Bearer {token}") | |
| with urllib.request.urlopen(req, timeout=8) as resp: | |
| data = json.loads(resp.read()) | |
| except Exception as e: | |
| out["reason"] = f"github api: {type(e).__name__}: {e}" | |
| return out | |
| if not isinstance(data, list) or not data: | |
| out["reason"] = "no commit data" | |
| return out | |
| current = data[0] or {} | |
| current_sha = current.get("sha") or "" | |
| current_at = (current.get("commit") or {}).get("committer", {}).get("date") or "" | |
| drifted = bool(current_sha) and current_sha != last_sync_sha | |
| return { | |
| "checked": True, | |
| "drifted": drifted, | |
| "current_sha": current_sha, | |
| "current_at": current_at, | |
| "last_sync_sha": last_sync_sha, | |
| "last_sync_at": last_sync_at, | |
| "repo": repo, | |
| "watched_path": watched_path, | |
| } | |
| def _validator_version() -> str: | |
| """Version of the simready-validate package that ships in this Space.""" | |
| try: | |
| import importlib.metadata as md | |
| return md.version("simready-validate") | |
| except Exception: | |
| return "unknown" | |
| def _read_cache(dataset: str, key: str) -> dict | None: | |
| """Read + sanity-check a cached dataset-level entry. Returns None | |
| for stale / broken entries so the caller falls through to a real | |
| re-run instead of replaying garbage. | |
| Stale signatures we reject: | |
| - results_json.results == [] with status pass/warn/fail | |
| (impossible from a correct run — validator emits status=error | |
| when it can't find any USDs, never pass/warn/fail with zero). | |
| Detects the broken-pre-streaming era where zips were excluded | |
| and the cached payload looks "successful" with zero work done. | |
| """ | |
| p = _cache_path_for(dataset, key) | |
| if not p.is_file(): | |
| return None | |
| try: | |
| payload = json.loads(p.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| return None | |
| rj = payload.get("results_json") or {} | |
| results = rj.get("results") or [] | |
| status = payload.get("status") or "" | |
| if not results and status in ("pass", "warn", "fail"): | |
| # Suspicious — looks like a stale entry from a code path that | |
| # didn't actually validate anything. Treat as miss. | |
| return None | |
| return payload | |
| def _write_cache(dataset: str, key: str, payload: dict) -> None: | |
| p = _cache_path_for(dataset, key) | |
| try: | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| # Atomic via temp+rename so concurrent reads can't see a half file. | |
| tmp = p.with_suffix(p.suffix + ".tmp") | |
| tmp.write_text(json.dumps(payload), encoding="utf-8") | |
| os.replace(tmp, p) | |
| except OSError: | |
| # Cache is advisory — never block a real validation on disk hiccups. | |
| pass | |
| def progress_path_for(submission_id: str) -> Path: | |
| """Where the validator writes per-asset progress for this submission. | |
| Read by the Space's get_progress endpoint to feed the dashboard's | |
| fill-up progress bar. Empty submission_id → None (caller skips).""" | |
| if not submission_id: | |
| return None # type: ignore[return-value] | |
| return PROGRESS_DIR / f"{_safe_name(submission_id)}.json" | |
| def run_token_path_for(submission_id: str) -> Path | None: | |
| """Sidecar file holding the current run's cancel-match token. | |
| Kept separate from the progress file because the progress file is | |
| rewritten constantly (and, in flat mode, owned by the validator | |
| daemon, which knows nothing about the token). get_progress reads | |
| this and surfaces `run_token` to the dashboard, which echoes it back | |
| on cancel so runner._is_cancelled can match it. See _is_cancelled.""" | |
| if not submission_id: | |
| return None | |
| return PROGRESS_DIR / f"{_safe_name(submission_id)}.token" | |
| def _finalize_run(*, dataset: str, profile: str, version: str, | |
| results_json: dict, status: str, summary: str, | |
| out_dir: Path, api: HfApi, token: str | None, | |
| open_pr: bool, results_path: Path, out, | |
| dataset_head: str | None = None) -> RunResult: | |
| """Shared tail-end of run(): file issues, optionally open PR on | |
| dataset, persist report, write cache, return RunResult.""" | |
| try: | |
| from github_issues import ensure_internal_issues | |
| ensure_internal_issues(results_json, dataset=dataset, profile=profile, log_fn=out) | |
| except Exception as e: | |
| out(f" ! issue-filing skipped: {type(e).__name__}: {e}") | |
| pr_url = None | |
| if open_pr: | |
| if not token: | |
| out(" ! HF_TOKEN missing; cannot open PR") | |
| else: | |
| try: | |
| pr_url = _open_verdict_pr( | |
| api=api, dataset=dataset, | |
| results_path=results_path, report_dir=out_dir, | |
| profile=profile, version=version, | |
| status=status, summary=summary, | |
| ) | |
| out(f" PR opened: {pr_url}") | |
| except Exception as e: | |
| out(f" ! PR creation failed: {type(e).__name__}: {e}") | |
| persisted = Path("/tmp") / f"hfsp-report-{dataset.replace('/', '_')}" | |
| if persisted.exists(): | |
| shutil.rmtree(persisted) | |
| shutil.copytree(out_dir, persisted) | |
| # Skip dataset-level cache write for incomplete runs. Two cases: | |
| # - Cancelled mid-streaming (operator clicked Cancel) | |
| # - Unrecoverable plugin-miss abort | |
| # Either way the merged_results don't represent the dataset — they're | |
| # the partial output up to where we bailed. Caching them would replay | |
| # the partial verdict on the next click. Per-zip cache entries from | |
| # the zips we DID process are still kept (they're keyed on zip_sha, | |
| # not dataset HEAD, and represent real validation of those zips). | |
| is_cancelled = bool(results_json.get("cancelled")) | |
| is_partial = ( | |
| results_json.get("streaming_zips") is not None | |
| and results_json.get("streaming_processed", 0) < results_json["streaming_zips"] | |
| ) | |
| if is_cancelled or is_partial: | |
| out(f" skipping dataset-level cache write " | |
| f"({'cancelled' if is_cancelled else 'partial: ' + str(results_json.get('streaming_processed')) + '/' + str(results_json.get('streaming_zips'))})") | |
| else: | |
| try: | |
| # Reuse the pre-resolved HEAD when run() already fetched it. | |
| # Falls back to a fresh API call only if the caller didn't | |
| # pass one (e.g. legacy call sites). | |
| head = dataset_head if dataset_head is not None else api.repo_info(dataset, repo_type="dataset").sha | |
| key = _cache_key(head, profile, _validator_version(), _foundation_sha()) | |
| _write_cache(dataset, key, { | |
| "schema_version": 1, | |
| "dataset": dataset, "dataset_head": head, | |
| "profile": profile, "validator_version": _validator_version(), | |
| "foundation_sha": _foundation_sha(), | |
| "status": status, "summary": summary, | |
| "results_json": results_json, | |
| "report_path": str(persisted), | |
| "cached_at": _now(), | |
| }) | |
| out(f" cached result under key={key}") | |
| except Exception as e: | |
| out(f" ! cache write failed ({type(e).__name__}: {e}); ignored") | |
| return RunResult( | |
| dataset=dataset, profile=profile, version=version, | |
| status=status, summary=summary, | |
| results_json=results_json, | |
| report_path=persisted, pr_url=pr_url, | |
| ) | |
| def run( | |
| dataset: str, | |
| profile: str = "Robot-Body-Runnable", | |
| version: str = "1.0.0", | |
| open_pr: bool = False, | |
| hf_token: str | None = None, | |
| log: Iterator[str] | None = None, | |
| submission_id: str = "", | |
| force: bool = False, | |
| preliminary: bool = False, | |
| use_kit: bool = False, | |
| ) -> RunResult: | |
| """Validate a single HF dataset. Yields log lines via the `log` callable. | |
| The Space's Gradio UI passes a callable that streams lines to the | |
| output panel; the test harness can pass `print` directly. | |
| `force=True` bypasses the dataset-level cache — used by manual | |
| "Validate now" clicks from the dashboard so the operator gets a | |
| real re-run even if nothing relevant changed. Auto-triggered runs | |
| (PR webhooks, scheduled re-validation) leave force=False and get | |
| the cached result when the four-tuple matches. | |
| `preliminary=True` is a structure-only sweep used by the | |
| dashboard's Preliminary scan tab: | |
| - Zip-bundled datasets are scanned (skips the strict-spec | |
| PKG.NO-ARCHIVES pre-check). Only the first zip is processed. | |
| - Flat datasets are sliced to the first asset directory before | |
| validation, so per-asset checks run on one sample asset only. | |
| """ | |
| out = log or (lambda s: print(s, flush=True)) | |
| flags = [] | |
| if force: flags.append("force") | |
| if preliminary: flags.append("preliminary") | |
| flag_str = f" ({', '.join(flags)})" if flags else "" | |
| out(f"[{_now()}] validating dataset={dataset} profile={profile} v{version}{flag_str}") | |
| # PhysX/MDL rules run inside Isaac Sim Kit. Off by default (fast — no | |
| # Kit boot); the dashboard's PhysX toggle sets use_kit=True to opt in. | |
| # Honored only when Kit is actually present on the image (_KIT_FLAG | |
| # resolves to --use-kit), otherwise we fall back to --no-use-kit. | |
| kit_flag = _KIT_FLAG if use_kit else "--no-use-kit" | |
| if kit_flag == "--use-kit": | |
| out(" kit: --use-kit (PhysX/MDL rules covered)") | |
| elif use_kit: | |
| out(" kit: --no-use-kit (PhysX requested but Kit not available on this image)") | |
| else: | |
| out(" kit: --no-use-kit (PhysX off by default; toggle PhysX on to enable)") | |
| token = hf_token or os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") | |
| api = HfApi(token=token) | |
| # Resolve the dataset HEAD ONCE up front. Used for: (a) the | |
| # dataset-level cache key, (b) the per-unit cache key in the flat | |
| # path, (c) the streaming function's "synthetic zip sha" for the | |
| # flat unit. Without this, the same metadata was re-fetched from | |
| # HF up to 4 times per validation. | |
| dataset_head: str | None = None | |
| try: | |
| dataset_head = api.repo_info(dataset, repo_type="dataset").sha | |
| except Exception as e: | |
| out(f" ! could not resolve dataset HEAD ({type(e).__name__}: {e}); cache + drift checks skipped") | |
| if not force and dataset_head: | |
| key = _cache_key(dataset_head, profile, _validator_version(), _foundation_sha()) | |
| cached = _read_cache(dataset, key) | |
| if cached: | |
| out(f" cache hit (key={key}, head={dataset_head[:8]}, " | |
| f"cached_at={cached.get('cached_at')}); returning without re-running") | |
| return RunResult( | |
| dataset=dataset, profile=profile, version=version, | |
| status=cached["status"], summary=cached["summary"], | |
| results_json=cached["results_json"], | |
| report_path=Path(cached.get("report_path") or "/tmp"), | |
| pr_url=None, | |
| ) | |
| out(f" cache miss (key={key}, head={dataset_head[:8]}); running validator") | |
| with tempfile.TemporaryDirectory(prefix=f"hfsp-{dataset.replace('/', '_')}-") as td: | |
| work = Path(td) | |
| out(f" workdir: {work}") | |
| # Single validation path: every dataset (zip-bundled or flat) | |
| # goes through _validate_zip_streaming, which uses a persistent | |
| # daemon pool + per-unit cache + cancel signaling + live | |
| # progress. Flat datasets pre-materialize once via | |
| # snapshot_download and pass the dir as flat_target. | |
| # Per-run cancel token. Identifies THIS invocation so a cancel | |
| # can only ever abort the run it was issued against. Written to a | |
| # sidecar file get_progress surfaces to the dashboard; the | |
| # dashboard echoes it back on cancel and _is_cancelled matches it. | |
| import hashlib as _hashlib, secrets as _secrets | |
| run_token = _hashlib.sha256( | |
| f"{submission_id}|{dataset_head or ''}|{_now()}|{_secrets.token_hex(8)}" | |
| .encode("utf-8")).hexdigest()[:16] | |
| if submission_id: | |
| PROGRESS_DIR.mkdir(parents=True, exist_ok=True) | |
| # Clear any flag left over from a PRIOR run of this submission | |
| # that ended before consuming it — belt to the token's | |
| # suspenders so stale flags never accumulate either. | |
| stale = cancel_path_for(submission_id) | |
| if stale and stale.exists(): | |
| try: stale.unlink() | |
| except OSError: pass | |
| tok_path = run_token_path_for(submission_id) | |
| if tok_path: | |
| tok_path.write_text(run_token, encoding="utf-8") | |
| prog_path = progress_path_for(submission_id) if submission_id else None | |
| if prog_path: | |
| PROGRESS_DIR.mkdir(parents=True, exist_ok=True) | |
| prog_path.write_text(json.dumps({ | |
| "processed": 0, "total": 0, "current": None, | |
| "started_at": _now(), "updated_at": _now(), | |
| "state": "starting", | |
| })) | |
| # Pre-probe: ask the API which case we're in before downloading. | |
| flat_target: Path | None = None | |
| try: | |
| probe_zip_entries = _list_dataset_zips(api, dataset, token) | |
| except Exception as e: | |
| out(f" ! zip probe failed ({type(e).__name__}: {e}); assuming flat") | |
| probe_zip_entries = [] | |
| # STRICT PRE-CHECK at the dataset level: zips are not in any | |
| # spec's allowlist (foundation AA.002 lists only USD/image/ | |
| # audio extensions; SDK packaging-spec.md describes an unpacked | |
| # layout). Zip-bundled datasets fail PKG.NO-ARCHIVES at the | |
| # dataset listing stage — we never download anything. Partner | |
| # must repackage as unpacked. | |
| # | |
| # Exception: preliminary scan. The dashboard's Preliminary | |
| # scan tab wants a structure check on a sample asset even when | |
| # the dataset is zip-bundled, so the strict pre-check is | |
| # bypassed in that mode and the first zip is streamed. | |
| if probe_zip_entries and preliminary: | |
| # Preliminary scan + zip-bundled: stream just the first zip | |
| # through _validate_zip_streaming and return. Skips the | |
| # PKG.NO-ARCHIVES strict-fail block AND the flat | |
| # snapshot_download path entirely — we only want one zip's | |
| # worth of validator work. | |
| # Asset-per-zip heuristic: a dataset shipping MANY zips is | |
| # almost always one asset per archive (e.g. | |
| # scenes/<name>/<name>.zip); a SINGLE zip is more likely one | |
| # archive bundling many assets. Decided on the FULL count, | |
| # before we slice to the first zip for the preliminary sample. | |
| asset_per_zip = len(probe_zip_entries) > 1 | |
| probe_zip_entries = probe_zip_entries[:1] | |
| out(f" preliminary mode: streaming first zip only " | |
| f"({probe_zip_entries[0][0]}; asset_per_zip={asset_per_zip})") | |
| streamed = _validate_zip_streaming( | |
| api=api, dataset=dataset, token=token, work=work, | |
| profile=profile, version=version, | |
| progress_file=prog_path, out=out, force=force, | |
| submission_id=submission_id, run_token=run_token, | |
| kit_flag=kit_flag, asset_per_zip=asset_per_zip, | |
| flat_target=None, | |
| prefetched_zip_entries=probe_zip_entries, | |
| prefetched_dataset_head=dataset_head, | |
| continue_on_preliminary=True, | |
| ) | |
| out_dir = work / "out" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| results_path = out_dir / "results.json" | |
| if streamed is None: | |
| return RunResult( | |
| dataset=dataset, profile=profile, version=version, | |
| status="error", | |
| summary="validator produced no result (preliminary zip path returned None)", | |
| results_json={}, report_path=out_dir, pr_url=None, | |
| ) | |
| results_path.write_text(json.dumps(streamed), encoding="utf-8") | |
| status, summary = _summarize(streamed) | |
| out(f" {status.upper()}: {summary}") | |
| return _finalize_run( | |
| dataset=dataset, profile=profile, version=version, | |
| results_json=streamed, status=status, summary=summary, | |
| out_dir=out_dir, api=api, token=token, open_pr=open_pr, | |
| results_path=results_path, out=out, | |
| dataset_head=dataset_head, | |
| ) | |
| if probe_zip_entries: | |
| out(f" PRELIMINARY FAILURE: dataset ships {len(probe_zip_entries)} " | |
| f"zip archive(s); zips are not in the spec's allowlist. " | |
| f"Skipping download + validation entirely.") | |
| zip_issues = [] | |
| for zip_rel, _zip_sha in probe_zip_entries: | |
| zip_issues.append({ | |
| "code": "PKG.NO-ARCHIVES", | |
| "severity": "failure", | |
| "path": zip_rel, | |
| "spec_url": ("https://github.com/NVIDIA-dev/" | |
| "simready-oem-library-pm/blob/main/" | |
| "dashboard/docs/sdk/packaging-spec.md" | |
| "#folder-structure"), | |
| "msg": ("SimReady datasets must be delivered as " | |
| "unpacked directories — neither foundation " | |
| "AA.002 nor the SDK packaging spec lists " | |
| ".zip as an accepted file type."), | |
| }) | |
| by_path: dict[str, list[dict]] = {} | |
| for f in zip_issues: | |
| by_path.setdefault(f["path"], []).append(f) | |
| results = [] | |
| for rel, issues_here in by_path.items(): | |
| results.append({ | |
| "asset_path": f"{dataset}/{rel}", | |
| "rel_path": rel, | |
| "validation_status": "fail", | |
| "profile": profile, | |
| "profile_version": version, | |
| "issues": issues_here, | |
| "passed": False, | |
| }) | |
| results_json = { | |
| "schema_version": 1, | |
| "profile": profile, | |
| "profile_version": version, | |
| "results": results, | |
| "preliminary_findings": zip_issues, | |
| "preliminary_check_failed": True, | |
| } | |
| out_dir = work / "out" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| results_path = out_dir / "results.json" | |
| results_path.write_text(json.dumps(results_json, indent=2), | |
| encoding="utf-8") | |
| status, summary = _summarize(results_json) | |
| out(f" {status.upper()}: {summary}") | |
| return _finalize_run( | |
| dataset=dataset, profile=profile, version=version, | |
| results_json=results_json, status=status, summary=summary, | |
| out_dir=out_dir, api=api, token=token, open_pr=open_pr, | |
| results_path=results_path, out=out, | |
| dataset_head=dataset_head, | |
| ) | |
| # No zips: standard flat-dataset path. Materialize via | |
| # snapshot_download, then hand off to _validate_zip_streaming | |
| # (which treats the unpacked dir as a single "unit" and runs | |
| # the daemon-pool + per-unit cache + cancel + progress code). | |
| local = work / "raw" | |
| local.mkdir(parents=True, exist_ok=True) | |
| out(f" $ snapshot_download {dataset} ignore_patterns={list(HF_DOWNLOAD_EXCLUDES)}") | |
| snapshot_download( | |
| repo_id=dataset, | |
| repo_type="dataset", | |
| local_dir=str(local), | |
| ignore_patterns=list(HF_DOWNLOAD_EXCLUDES), | |
| token=token, | |
| ) | |
| flat_target = _wrap_layout_for_validator(local, work) | |
| out(f" validator target: {flat_target}") | |
| # Preliminary scan + flat dataset: slice flat_target down to its | |
| # first asset directory (one level deep, contains at least one | |
| # .usd/.usda/.usdc file) so per-asset validation only runs on | |
| # one sample asset. Preliminary structure checks (PKG.01, .06, | |
| # AA.002) still surface from that single asset's vantage; for a | |
| # fuller sweep the operator promotes the partner out of the | |
| # Preliminary scan tab. | |
| if preliminary: | |
| try: | |
| _USD_EXTS = (".usd", ".usda", ".usdc") | |
| first_asset_dir = None | |
| for child in sorted(flat_target.iterdir()): | |
| if not child.is_dir(): | |
| continue | |
| if any(p.suffix.lower() in _USD_EXTS for p in child.rglob("*") | |
| if p.is_file()): | |
| first_asset_dir = child | |
| break | |
| if first_asset_dir is not None: | |
| slim = work / "preliminary-sample" | |
| slim.mkdir(parents=True, exist_ok=True) | |
| target = slim / first_asset_dir.name | |
| if not target.exists(): | |
| import shutil | |
| shutil.copytree(first_asset_dir, target, symlinks=True) | |
| flat_target = slim | |
| out(f" preliminary mode: sliced flat target to " | |
| f"first asset dir '{first_asset_dir.name}'") | |
| else: | |
| out(" preliminary mode: no asset directory found to " | |
| "slice; running on full flat target") | |
| except Exception as e: | |
| out(f" ! preliminary slice failed ({type(e).__name__}: " | |
| f"{e}); running on full flat target") | |
| streamed = _validate_zip_streaming( | |
| api=api, dataset=dataset, token=token, work=work, | |
| profile=profile, version=version, | |
| progress_file=prog_path, out=out, force=force, | |
| submission_id=submission_id, run_token=run_token, | |
| kit_flag=kit_flag, | |
| flat_target=flat_target, | |
| prefetched_zip_entries=probe_zip_entries, | |
| prefetched_dataset_head=dataset_head, | |
| continue_on_preliminary=preliminary, | |
| ) | |
| out_dir = work / "out" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| results_path = out_dir / "results.json" | |
| if streamed is None: | |
| # Should not happen — either zip path or flat path always | |
| # returns a dict. Defensive bail-out. | |
| return RunResult( | |
| dataset=dataset, profile=profile, version=version, | |
| status="error", | |
| summary="validator produced no result (unified streaming path returned None)", | |
| results_json={}, report_path=out_dir, pr_url=None, | |
| ) | |
| # Event-driven foundation spec drift check. One GitHub API | |
| # call per run; soft-fails so network hiccups don't block | |
| # validation. The dashboard renders a notice if drifted=true. | |
| drift = _check_foundation_spec_drift() | |
| if drift.get("checked"): | |
| if drift.get("drifted"): | |
| out(f" ⚠ spec drift: foundation HEAD={drift.get('current_sha', '')[:8]} " | |
| f"@ {drift.get('current_at', '')}, last synced " | |
| f"{drift.get('last_sync_sha', '')[:8]} @ {drift.get('last_sync_at', '')} " | |
| f"— hardcoded rules may be stale; run spec-sync to refresh") | |
| else: | |
| out(f" spec sync: in sync with foundation HEAD " | |
| f"{drift.get('current_sha', '')[:8]}") | |
| else: | |
| out(f" spec sync: skipped ({drift.get('reason', 'unknown')})") | |
| streamed["spec_drift"] = drift | |
| results_path.write_text(json.dumps(streamed), encoding="utf-8") | |
| results_json = streamed | |
| status, summary = _summarize(results_json) | |
| out(f" {status.upper()}: {summary}") | |
| return _finalize_run( | |
| dataset=dataset, profile=profile, version=version, | |
| results_json=results_json, status=status, summary=summary, | |
| out_dir=out_dir, api=api, token=token, open_pr=open_pr, | |
| results_path=results_path, out=out, | |
| dataset_head=dataset_head, | |
| ) | |