loginowskid's picture
Sync from simready-oem-library-pm@d1ecea89
12e13e9 verified
"""SimReady Validator β€” Gradio UI for the HuggingFace Space.
Two surfaces, same engine:
- **/run** (the on-screen button) β€” streams log lines to the UI for
interactive use by an operator in the browser.
- **/run_api** (hidden, programmatic) β€” returns the full RunResult as
a JSON-serializable dict. This is what `tools/hf_watch/call_hf_space.py`
hits from the GitHub Actions runner so the workflow can patch
status.json and asset-status.json without scraping the UI's text.
Both go through `runner.run()`. The split is purely about output
shape (streaming text vs. one-shot dict).
The Space is internal-pilot scope: HF_TOKEN comes from the Space's
secrets, NOT from the requester. When a customer's dataset PR triggers
this (next milestone), the webhook payload identifies the dataset and
the Space's own token opens the verdict PR.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import gradio as gr
from runner import (run as run_validator, progress_path_for, cancel_path_for,
run_token_path_for, CANCEL_DIR)
PROFILE_CHOICES = [
"Prop-Robotics-Neutral",
"Prop-Robotics-Physx",
"Prop-Robotics-Isaac",
"Robot-Body-Neutral",
"Robot-Body-Runnable",
"Robot-Body-Isaac",
"Package",
"Package-Candidate",
]
DEFAULT_PROFILE = "Prop-Robotics-Neutral"
DEFAULT_VERSION = "1.0.0"
def _run_api(dataset: str, profile: str, version: str, open_pr: bool,
submission_id: str = "", force: bool = False,
preliminary: bool = False, use_kit: bool = False) -> dict:
"""Programmatic endpoint. Returns the RunResult as a JSON dict.
Caller is typically `tools/hf_watch/call_hf_space.py` running from
a GitHub Actions ubuntu-latest runner. Output shape must stay
stable β€” bump `schema_version` if you change it. The receiver
pattern-matches on the same field names `tools/hf_watch/validate.py`
produces, so status.json patching is identical regardless of which
backend ran the validation.
`submission_id` is optional β€” when set, the validator writes
per-asset progress to /tmp/sr-progress/<id>.json, which the
get_progress endpoint serves to the dashboard.
`preliminary` switches the runner to a structure-only sweep:
zip-bundled datasets are scanned (instead of failing
PKG.NO-ARCHIVES at the listing stage) and per-asset validation is
sliced to the first asset only. Used by the dashboard's
Preliminary scan tab.
"""
print(f"[run_api] preliminary={preliminary!r} force={force!r} "
f"use_kit={use_kit!r} submission_id={submission_id!r}", flush=True)
# Untrusted callers can hit /run_api directly β€” profile/version flow
# into the validator's argv, so validate them before use. Empty
# falls back to the defaults (existing behavior).
import re
profile = profile or DEFAULT_PROFILE
if profile not in PROFILE_CHOICES and profile.lower() != "auto":
raise ValueError(f"invalid profile: {profile!r}")
version = (version or DEFAULT_VERSION).strip()
if not re.fullmatch(r"[\w.\-]+", version):
raise ValueError(f"invalid version: {version!r}")
result = run_validator(
dataset=(dataset or "").strip(),
profile=profile,
version=version,
open_pr=bool(open_pr),
submission_id=(submission_id or "").strip(),
force=bool(force),
preliminary=bool(preliminary),
use_kit=bool(use_kit),
)
return {
"schema_version": 1,
"dataset": result.dataset,
"profile": result.profile,
"version": result.version,
"status": result.status,
"summary": result.summary,
"results_json": _sanitize_results_json(result.results_json),
"pr_url": result.pr_url,
}
def _list_profiles() -> dict:
"""Return the set of profiles that actually load on this Space's
foundation+validator combination. The dashboard polls this to
populate its dropdown so operators can't pick a profile that
would fatally fail at registration time.
Uses --use-plugin since the default CLI loader has known
registration mismatches against the current foundation pin; the
plugin path is what runner.py's streaming-zip flow falls back
to and is the source of truth for "actually usable" here.
Output format from validate.py is `PROFILE: <id> v<version>`
per profile, one per line.
"""
import subprocess, sys
from runner import VALIDATOR
try:
proc = subprocess.run(
# --list-profiles only ENUMERATES registered profiles from the
# spec/plugin registry (--use-plugin) β€” it runs no validation
# rules, so it never needs Kit. Force --no-use-kit: on a
# Kit-enabled image the validator auto-enables --use-kit for the
# PhysX-bearing default profile and boots the full Isaac Sim
# runtime (~5 min) just to print the list, blowing the 300s
# timeout below. Actual validation (runner.py) still uses Kit.
[sys.executable, str(VALIDATOR), "--list-profiles", "--use-plugin", "--no-use-kit"],
capture_output=True, text=True, timeout=300,
)
names: list[str] = []
for line in (proc.stdout or "").splitlines():
s = line.strip()
# Validator emits "PROFILE: <id> v<version>" β€” that's our
# only authoritative shape. Anything else is noise.
if s.startswith("PROFILE:"):
rest = s[len("PROFILE:"):].strip()
pid = rest.split()[0] if rest else ""
if pid:
names.append(pid)
# Dedupe while preserving order.
seen = set()
unique = []
for n in names:
if n not in seen:
seen.add(n)
unique.append(n)
result: dict = {"profiles": unique, "schema_version": 1, "rc": proc.returncode}
if not unique:
# No profiles registered AND no parse hits β€” surface why so
# the dashboard can show something useful. Truncate so the
# JSON response stays small.
stderr_tail = "\n".join((proc.stderr or "").splitlines()[-20:])[:2000]
stdout_tail = "\n".join((proc.stdout or "").splitlines()[-20:])[:2000]
result["stderr_tail"] = stderr_tail
result["stdout_tail"] = stdout_tail
return result
except subprocess.TimeoutExpired:
return {"profiles": [], "error": "timeout after 300s (spec load >5 min)"}
except Exception as e:
return {"profiles": [], "error": f"{type(e).__name__}: {e}"}
def _cancel_run(submission_id: str, run_token: str = "") -> dict:
"""Write the cancel-signal file for a given submission. The
streaming-zip loop in runner.py checks for this file between zips
and aborts when present. Idempotent β€” calling multiple times has no
extra effect; consuming runner.py deletes it.
`run_token` is the per-run token the dashboard read from get_progress.
It becomes the flag's content so runner._is_cancelled only honors it
for the exact run it was issued against β€” a flag left over from a
prior run of this submission can never abort a fresh one."""
sid = (submission_id or "").strip()
if not sid:
return {"state": "no_id"}
path = cancel_path_for(sid)
if path is None:
return {"state": "no_id"}
try:
CANCEL_DIR.mkdir(parents=True, exist_ok=True)
path.write_text((run_token or "").strip(), encoding="utf-8")
return {"state": "signaled", "path": str(path)}
except OSError as e:
return {"state": "error", "error": f"{type(e).__name__}: {e}"}
def _get_progress(submission_id: str) -> dict:
"""Read the validator's per-asset progress file for this submission.
Polled by the dashboard ~every 3 s while a Validate-now click is
in-flight, so the "Validate now" button can fill up as the
validator works through the asset list.
Returns one of three shapes:
- {"state": "not_found"} β€” no progress file (Space restarted, or
the dashboard is polling a Space-run that never happened).
- {"state": "starting"} β€” file seeded by runner.py before the
validator started its loop. processed/total are 0.
- {processed, total, current, started_at, updated_at} β€” live
per-asset progress written by validate.py._emit_progress.
Every shape also carries `run_token` (the current run's cancel
token, from the sidecar file) when one exists, so the dashboard can
echo it back to cancel_run and target the exact run.
Caller treats anything with total > 0 as "show the fill bar".
"""
sid = (submission_id or "").strip()
if not sid:
return {"state": "no_id"}
# Per-run cancel token (sidecar; see runner.run_token_path_for).
# Surfaced on every shape so the dashboard can echo it back to
# cancel_run β€” a cancel then only aborts the run it was issued
# against, never a later one that reused the submission_id.
run_token = ""
tok_path = run_token_path_for(sid)
if tok_path and tok_path.is_file():
try:
run_token = tok_path.read_text(encoding="utf-8").strip()
except OSError:
pass
path = progress_path_for(sid)
if path is None or not path.is_file():
return {"state": "not_found", "run_token": run_token}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict) and run_token:
data["run_token"] = run_token
return data
except (OSError, json.JSONDecodeError):
# Mid-write β€” caller will poll again in a few seconds.
return {"state": "transient", "run_token": run_token}
def _sanitize_results_json(raw: dict) -> dict:
"""Strip absolute filesystem paths from results_json before returning.
Gradio's JSON serializer treats string fields that resolve to files
on the Space's filesystem as downloadable references and tries to
serve them through `/gradio_api/file=...`. The validator's
results.json contains absolute paths (target dir + per-asset
`path`) which point into the Space's ephemeral tempdir and are
NOT exposed through gradio's allowed_paths β€” gradio_client then
fails with 403 trying to auto-fetch them after a successful run.
Callers don't need filesystem paths anyway β€” only `rel_path`
(dataset-relative), `passed`, and `issues` are used downstream.
Keep the rest of the report intact (profile_coverage, summary,
layout_findings, etc.).
"""
if not isinstance(raw, dict):
return raw
sanitized = {k: v for k, v in raw.items() if k != "target"}
if "results" in sanitized and isinstance(sanitized["results"], list):
sanitized["results"] = [
{k: v for k, v in asset.items() if k != "path"}
for asset in sanitized["results"]
if isinstance(asset, dict)
]
# Specs/dashboard dir paths are local to the Space, useless to caller.
for k in ("specs_docs_dir", "dashboard_docs_dir"):
sanitized.pop(k, None)
return sanitized
def _run_streaming(dataset: str, profile: str, version: str, open_pr: bool):
"""Generator that yields incremental log output to the UI as the
validator runs. Gradio streams each yielded tuple to the connected
outputs."""
lines: list[str] = []
def log(line: str) -> None:
lines.append(line)
yield "\n".join(lines), "", "(running…)", None
try:
result = run_validator(
dataset=dataset.strip(),
profile=profile,
version=version.strip() or DEFAULT_VERSION,
open_pr=open_pr,
log=log,
)
except Exception as e:
lines.append(f"\nERROR: {type(e).__name__}: {e}")
yield "\n".join(lines), "", f"error: {e}", None
return
status_badge = f"**{result.status.upper()}** β€” {result.summary}"
if result.pr_url:
status_badge += f"\n\nPR: {result.pr_url}"
report_index = result.report_path / "index.html"
report_url = str(report_index) if report_index.is_file() else None
yield (
"\n".join(lines),
status_badge,
result.summary,
report_url,
)
def _read_md(name: str) -> str:
"""Return the contents of name (relative to this file's dir),
stripping a leading YAML frontmatter block if present. Falls back
to a friendly stub when the file is missing β€” keeps the Space
bootable even before the space-deploy workflow has synced the
assembled docs into the container."""
from pathlib import Path
p = Path(__file__).resolve().parent / name
try:
src = p.read_text(encoding="utf-8")
except FileNotFoundError:
return f"_{name} not yet synced into this Space β€” check back after the next deploy._"
if src.startswith("---"):
end = src.find("\n---\n", 4)
if end > 0:
src = src[end + len("\n---\n"):].lstrip()
return src
with gr.Blocks(title="SimReady Validator") as demo:
with gr.Tabs():
with gr.Tab("Overview"):
gr.Markdown(_read_md("README.md"))
with gr.Tab("Validator"):
gr.Markdown(
"Submit a HuggingFace dataset to validate against a SimReady "
"profile. With **Open PR** enabled, the verdict is uploaded "
"back to the dataset as a `validation/` pull request."
)
with gr.Row():
dataset = gr.Textbox(
label="Dataset",
placeholder="org/dataset (e.g. imagineio/PhysicalAI-SimReady-Kitchens-v1)",
)
with gr.Row():
profile = gr.Dropdown(
choices=PROFILE_CHOICES, value=DEFAULT_PROFILE, label="Profile",
)
version = gr.Textbox(label="Version", value=DEFAULT_VERSION)
open_pr = gr.Checkbox(label="Open PR on dataset with verdict", value=False)
run_btn = gr.Button("Validate", variant="primary")
status_md = gr.Markdown(label="Verdict")
summary_box = gr.Textbox(label="Summary", interactive=False)
log_box = gr.Textbox(label="Log", lines=20, interactive=False)
report_link = gr.File(label="HTML report (download)", interactive=False)
with gr.Tab("Partner walkthrough"):
gr.Markdown(_read_md("VALIDATE.md"))
run_btn.click(
fn=_run_streaming,
inputs=[dataset, profile, version, open_pr],
outputs=[log_box, status_md, summary_box, report_link],
api_name="run",
)
# Programmatic endpoint β€” bound to invisible components so the UI
# doesn't render anything extra, but the Gradio queue still exposes
# an `/api/predict/run_api` route the gradio_client can hit. The
# outputs[0] is the JSON return; api_name turns it into a stable
# path the GitHub Actions caller depends on.
api_dataset = gr.Textbox(visible=False)
api_profile = gr.Textbox(visible=False)
api_version = gr.Textbox(visible=False)
api_open_pr = gr.Checkbox(visible=False)
api_submission_id = gr.Textbox(visible=False)
api_force = gr.Checkbox(visible=False)
api_preliminary = gr.Checkbox(visible=False)
api_use_kit = gr.Checkbox(visible=False)
api_output = gr.JSON(visible=False)
api_button = gr.Button(visible=False)
api_button.click(
fn=_run_api,
inputs=[api_dataset, api_profile, api_version, api_open_pr,
api_submission_id, api_force, api_preliminary, api_use_kit],
outputs=api_output,
api_name="run_api",
)
# Progress endpoint β€” polled by the dashboard while a row is
# validating. CORS is open on /gradio_api/* by default, so the
# browser can fetch this from github.io directly without any
# GitHub-Actions side polling/commit churn.
prog_in = gr.Textbox(visible=False)
prog_out = gr.JSON(visible=False)
prog_button = gr.Button(visible=False)
prog_button.click(
fn=_get_progress,
inputs=[prog_in],
outputs=prog_out,
api_name="get_progress",
)
# Profile-listing endpoint β€” polled by the dashboard at startup
# so its dropdown reflects what's actually loadable on this Space
# right now (foundation+validator pin determines which profiles
# register). Stops the operator from picking something that
# would fatal at runtime.
profiles_out = gr.JSON(visible=False)
profiles_button = gr.Button(visible=False)
profiles_button.click(
fn=_list_profiles,
inputs=None,
outputs=profiles_out,
api_name="list_profiles",
)
# Cancel endpoint β€” the dashboard's Cancel button calls this AFTER
# cancelling the GH Action so the in-flight server-side gradio call
# actually stops (cancelling the Action alone only kills the
# gradio_client wrapper, the Space's loop keeps going).
cancel_in = gr.Textbox(visible=False)
cancel_token = gr.Textbox(visible=False)
cancel_out = gr.JSON(visible=False)
cancel_button = gr.Button(visible=False)
cancel_button.click(
fn=_cancel_run,
inputs=[cancel_in, cancel_token],
outputs=cancel_out,
api_name="cancel_run",
)
if __name__ == "__main__":
demo.queue().launch(
server_name=os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0"),
server_port=int(os.environ.get("GRADIO_SERVER_PORT", "7860")),
)