Spaces:
Running on Zero
Running on Zero
| """Space-side smoke validation runner. | |
| Automates the smokes documented in docs/space_smoke.md so a Space operator | |
| can run one command and get a JSON report of which smokes passed, which | |
| were skipped (missing deps), and which failed (with diagnostic context). | |
| Usage: | |
| # Run all smokes that have their deps installed: | |
| python -m scripts.run_space_smoke --output ./space_smoke_report.json | |
| # Run only a subset: | |
| python -m scripts.run_space_smoke --smoke lexical --smoke ablation | |
| # Force-fail on skipped smokes (CI-style strict mode): | |
| python -m scripts.run_space_smoke --strict | |
| The runner does NOT install missing dependencies — that's deliberately the | |
| operator's job (each smoke's deps add Space build time and download cost). | |
| A skipped smoke prints the exact `pip install` line you'd need. | |
| Smokes mirror docs/space_smoke.md: | |
| lexical - model-free benchmark on a synthetic markdown corpus | |
| ablation - per-parser ablation runner (text vs pymupdf) | |
| embedding - sentence-transformers / jina-embeddings-v3 retrieval | |
| gpu_repair - live Qwen2.5-VL invocation against a malformed table | |
| marker - shell out to marker_single on a small PDF (if installed) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import importlib.util | |
| import json | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Callable | |
| REPO_ROOT = Path(__file__).resolve().parents[1] | |
| class SmokeResult: | |
| name: str | |
| status: str # "pass" | "fail" | "skip" | "error" | |
| elapsed_seconds: float = 0.0 | |
| detail: dict[str, Any] = field(default_factory=dict) | |
| skip_reason: str = "" | |
| install_hint: str = "" | |
| class SmokeReport: | |
| smokes: list[SmokeResult] = field(default_factory=list) | |
| def passed(self) -> bool: | |
| return all(item.status in {"pass", "skip"} for item in self.smokes) | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "smokes": [ | |
| { | |
| "name": item.name, | |
| "status": item.status, | |
| "elapsed_seconds": round(item.elapsed_seconds, 3), | |
| "detail": item.detail, | |
| "skip_reason": item.skip_reason, | |
| "install_hint": item.install_hint, | |
| } | |
| for item in self.smokes | |
| ], | |
| "summary": { | |
| "total": len(self.smokes), | |
| "passed": sum(1 for item in self.smokes if item.status == "pass"), | |
| "failed": sum(1 for item in self.smokes if item.status == "fail"), | |
| "errored": sum(1 for item in self.smokes if item.status == "error"), | |
| "skipped": sum(1 for item in self.smokes if item.status == "skip"), | |
| }, | |
| } | |
| # --- Individual smokes ------------------------------------------------------- | |
| def _make_distinctive_corpus(root: Path) -> Path: | |
| """Build a small corpus with three sentences distinct enough that the | |
| synthetic-QA generator picks one query per chunk.""" | |
| src = root / "in" | |
| src.mkdir() | |
| (src / "doc.md").write_text( | |
| "# Sample Doc\n\n" | |
| "Apples grow on trees in the orchard during autumn harvest season.\n\n" | |
| "Submarines navigate beneath the ocean using sonar pulses across waters.\n\n" | |
| "Mountains rise above the clouds in the distant horizon line.\n", | |
| encoding="utf-8", | |
| ) | |
| return src | |
| def smoke_lexical() -> SmokeResult: | |
| started = time.perf_counter() | |
| from zsgdp.benchmarks.parser_quality import run_parser_benchmark | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tmp_path = Path(tmp) | |
| src = _make_distinctive_corpus(tmp_path) | |
| out = tmp_path / "out" | |
| try: | |
| summary = run_parser_benchmark(src, out, dataset_name="custom_folder") | |
| except Exception as exc: | |
| return SmokeResult( | |
| name="lexical", | |
| status="error", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={"exception": str(exc)}, | |
| ) | |
| quality = float(summary.get("mean_quality_score", 0.0)) | |
| recall = float(summary.get("mean_retrieval_recall_at_1", 0.0)) | |
| passed = quality >= 0.85 and recall >= 0.7 | |
| return SmokeResult( | |
| name="lexical", | |
| status="pass" if passed else "fail", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={ | |
| "mean_quality_score": quality, | |
| "mean_retrieval_recall_at_1": recall, | |
| "documents_evaluated": summary.get("document_count"), | |
| }, | |
| ) | |
| def smoke_ablation() -> SmokeResult: | |
| started = time.perf_counter() | |
| from zsgdp.benchmarks.ablation_runner import run_parser_ablations | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tmp_path = Path(tmp) | |
| src = _make_distinctive_corpus(tmp_path) | |
| out = tmp_path / "out" | |
| try: | |
| comparison = run_parser_ablations( | |
| src, | |
| out, | |
| parsers=["text", "pymupdf"], | |
| dataset_name="custom_folder", | |
| ) | |
| except Exception as exc: | |
| return SmokeResult( | |
| name="ablation", | |
| status="error", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={"exception": str(exc)}, | |
| ) | |
| comparison_csv_exists = (out / "ablation_comparison.csv").exists() | |
| arms = [row["arm"] for row in comparison["rows"]] | |
| expected_arms = {"text", "pymupdf", "merged"} | |
| passed = comparison["arm_count"] == 3 and set(arms) == expected_arms and comparison_csv_exists | |
| return SmokeResult( | |
| name="ablation", | |
| status="pass" if passed else "fail", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={ | |
| "arm_count": comparison["arm_count"], | |
| "arms": arms, | |
| "comparison_csv_emitted": comparison_csv_exists, | |
| }, | |
| ) | |
| def smoke_embedding() -> SmokeResult: | |
| """Validate the embedding-retriever wiring on a real Space. | |
| Set ZSGDP_SMOKE_EMBEDDING_MODEL_ID to override the default model_id — | |
| useful when the configured default (jinaai/jina-embeddings-v3) has | |
| transformers-version compat issues with the running container. A | |
| common safe fallback is `sentence-transformers/all-MiniLM-L6-v2`, | |
| which has no custom remote modeling code and works with any | |
| transformers version. | |
| """ | |
| started = time.perf_counter() | |
| if importlib.util.find_spec("sentence_transformers") is None: | |
| return SmokeResult( | |
| name="embedding", | |
| status="skip", | |
| elapsed_seconds=time.perf_counter() - started, | |
| skip_reason="sentence-transformers not installed", | |
| install_hint="python -m pip install 'zero-shot-gpu-doc-parser[embedding]'", | |
| ) | |
| import os | |
| from zsgdp.benchmarks.parser_quality import run_parser_benchmark | |
| override_model_id = os.environ.get("ZSGDP_SMOKE_EMBEDDING_MODEL_ID") or None | |
| # NOTE: do not eagerly instantiate SentenceTransformer in the main | |
| # process. On ZeroGPU Spaces, the main process runs in CUDA-emulation | |
| # mode and any CUDA call outside a @spaces.GPU-decorated function raises | |
| # "Low-level CUDA init reached". The actual benchmark below routes | |
| # through _gpu_encode_batch (decorated), which is the supported path. | |
| # Errors inside the benchmark surface through the `error` status branch. | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tmp_path = Path(tmp) | |
| src = _make_distinctive_corpus(tmp_path) | |
| out = tmp_path / "out" | |
| config_path = tmp_path / "config.yaml" | |
| # Inline config write — keeps the smoke self-contained. Honours the | |
| # env-var model override so the operator can swap models without | |
| # editing this script. | |
| config_lines = ["benchmarks:", " retriever:", " backend: embedding"] | |
| if override_model_id: | |
| config_lines.append(f" model_id: {override_model_id}") | |
| config_path.write_text("\n".join(config_lines) + "\n", encoding="utf-8") | |
| try: | |
| summary = run_parser_benchmark(src, out, config_path=config_path, dataset_name="custom_folder") | |
| except Exception as exc: | |
| return SmokeResult( | |
| name="embedding", | |
| status="error", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={"exception": str(exc)}, | |
| ) | |
| recall_5 = float(summary.get("mean_retrieval_recall_at_5", 0.0)) | |
| passed = recall_5 >= 0.7 | |
| return SmokeResult( | |
| name="embedding", | |
| status="pass" if passed else "fail", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={ | |
| "mean_retrieval_recall_at_5": recall_5, | |
| "mean_retrieval_recall_at_1": float(summary.get("mean_retrieval_recall_at_1", 0.0)), | |
| "documents_evaluated": summary.get("document_count"), | |
| }, | |
| ) | |
| def smoke_gpu_repair() -> SmokeResult: | |
| started = time.perf_counter() | |
| if importlib.util.find_spec("transformers") is None: | |
| return SmokeResult( | |
| name="gpu_repair", | |
| status="skip", | |
| elapsed_seconds=time.perf_counter() - started, | |
| skip_reason="transformers not installed", | |
| install_hint="python -m pip install 'zero-shot-gpu-doc-parser[gpu_repair]'", | |
| ) | |
| # Don't actually instantiate the transformers pipeline here — it would | |
| # download multi-GB Qwen2.5-VL weights even on a dry probe. Instead, we | |
| # smoke-test the wiring: a dry-run task plan, and report whether the | |
| # underlying client class can be imported. Operators who want a real | |
| # model invocation should use `run-gpu-tasks --execute` against a parsed | |
| # output directory; the result lands in repair.gpu_escalation.results. | |
| from zsgdp.gpu.transformers_client import TransformersClient | |
| from zsgdp.pipeline import parse_document | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tmp_path = Path(tmp) | |
| src = tmp_path / "report.md" | |
| # Malformed table (header has 2 columns; data row has 3) forces the | |
| # repair loop to plan a table_vlm_repair task. | |
| src.write_text( | |
| "# Report\n\n| A | B |\n| --- | --- |\n| 1 | 2 | 3 |\n", | |
| encoding="utf-8", | |
| ) | |
| out = tmp_path / "out" | |
| try: | |
| parsed = parse_document(src, out) | |
| except Exception as exc: | |
| return SmokeResult( | |
| name="gpu_repair", | |
| status="error", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={"exception": str(exc)}, | |
| ) | |
| repair = parsed.provenance.get("repair", {}) | |
| gpu_escalation = repair.get("gpu_escalation") or {} | |
| task_count = int(gpu_escalation.get("task_count") or 0) | |
| iterations = parsed.provenance.get("repair_iterations") or [] | |
| # We can confirm: | |
| # * Dry-run plan ran (task_count >= 1 for the malformed table) | |
| # * The repair loop iterated at least once | |
| # * The TransformersClient class is importable for live execution | |
| can_execute = TransformersClient is not None | |
| passed = task_count >= 1 and len(iterations) >= 1 and can_execute | |
| return SmokeResult( | |
| name="gpu_repair", | |
| status="pass" if passed else "fail", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={ | |
| "dry_run_task_count": task_count, | |
| "repair_iterations": len(iterations), | |
| "transformers_client_importable": can_execute, | |
| "note": "This smoke verifies wiring only. To verify model invocation " | |
| "end-to-end, set repair.execute_gpu_escalations=true in config " | |
| "and run zsgdp run-gpu-tasks --execute against a parsed dir.", | |
| }, | |
| ) | |
| def smoke_marker() -> SmokeResult: | |
| started = time.perf_counter() | |
| if shutil.which("marker_single") is None and shutil.which("marker") is None: | |
| return SmokeResult( | |
| name="marker", | |
| status="skip", | |
| elapsed_seconds=time.perf_counter() - started, | |
| skip_reason="neither `marker_single` nor `marker` found on PATH", | |
| install_hint="python -m pip install marker-pdf", | |
| ) | |
| # Marker is heavy enough that even a probe call can take 30+s on first | |
| # invocation (model load). We confirm the registry adapter reports | |
| # available, but don't run a full parse here — surface that as a manual | |
| # follow-up via the smoke checklist. | |
| from zsgdp.parsers.registry import get_parser | |
| try: | |
| adapter = get_parser("marker") | |
| except KeyError as exc: | |
| return SmokeResult( | |
| name="marker", | |
| status="error", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={"exception": str(exc)}, | |
| ) | |
| available = bool(adapter.available()) | |
| return SmokeResult( | |
| name="marker", | |
| status="pass" if available else "fail", | |
| elapsed_seconds=time.perf_counter() - started, | |
| detail={ | |
| "adapter_reports_available": available, | |
| "note": "End-to-end Marker parse is intentionally not run here " | |
| "(cold-load is heavy). See docs/space_smoke.md Smoke 5 " | |
| "for the manual upload-and-parse procedure.", | |
| }, | |
| ) | |
| SMOKE_REGISTRY: dict[str, Callable[[], SmokeResult]] = { | |
| "lexical": smoke_lexical, | |
| "ablation": smoke_ablation, | |
| "embedding": smoke_embedding, | |
| "gpu_repair": smoke_gpu_repair, | |
| "marker": smoke_marker, | |
| } | |
| # --- Driver ------------------------------------------------------------------ | |
| def run_smokes(names: list[str] | None = None) -> SmokeReport: | |
| selected = names or list(SMOKE_REGISTRY) | |
| report = SmokeReport() | |
| for name in selected: | |
| smoke = SMOKE_REGISTRY.get(name) | |
| if smoke is None: | |
| report.smokes.append( | |
| SmokeResult( | |
| name=name, | |
| status="error", | |
| detail={"exception": f"unknown smoke: {name}"}, | |
| ) | |
| ) | |
| continue | |
| try: | |
| result = smoke() | |
| except Exception as exc: | |
| result = SmokeResult( | |
| name=name, | |
| status="error", | |
| detail={"exception": f"{type(exc).__name__}: {exc}"}, | |
| ) | |
| report.smokes.append(result) | |
| return report | |
| def format_text_summary(report: SmokeReport, *, strict: bool = False) -> str: | |
| lines: list[str] = [] | |
| for item in report.smokes: | |
| marker = { | |
| "pass": "ok", | |
| "fail": "FAIL", | |
| "skip": "skip", | |
| "error": "ERROR", | |
| }.get(item.status, item.status.upper()) | |
| line = f" [{marker}] {item.name} ({item.elapsed_seconds:.2f}s)" | |
| if item.status == "skip": | |
| line += f" reason={item.skip_reason}" | |
| elif item.status == "fail": | |
| line += f" detail={json.dumps(item.detail, default=str)}" | |
| elif item.status == "error": | |
| line += f" detail={json.dumps(item.detail, default=str)}" | |
| lines.append(line) | |
| summary = report.to_dict()["summary"] | |
| overall = "PASS" if (report.passed and (not strict or summary["skipped"] == 0)) else "FAIL" | |
| lines.append( | |
| f"smoke: {overall} passed={summary['passed']} failed={summary['failed']} " | |
| f"errored={summary['errored']} skipped={summary['skipped']}" | |
| ) | |
| return "\n".join(lines) | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser( | |
| prog="run_space_smoke", | |
| description="Run zsgdp Space-side smoke validations.", | |
| ) | |
| parser.add_argument( | |
| "--smoke", | |
| action="append", | |
| dest="smokes", | |
| choices=list(SMOKE_REGISTRY), | |
| help="Smoke to run. Repeat to run multiple. Default: all registered smokes.", | |
| ) | |
| parser.add_argument("--output", help="Optional JSON report path.") | |
| parser.add_argument( | |
| "--strict", | |
| action="store_true", | |
| help="Treat skipped smokes as failures (useful in CI when all deps must be present).", | |
| ) | |
| args = parser.parse_args(argv) | |
| report = run_smokes(args.smokes) | |
| print(format_text_summary(report, strict=args.strict)) | |
| if args.output: | |
| Path(args.output).write_text( | |
| json.dumps(report.to_dict(), indent=2, ensure_ascii=False) + "\n", | |
| encoding="utf-8", | |
| ) | |
| summary = report.to_dict()["summary"] | |
| if summary["failed"] or summary["errored"]: | |
| return 1 | |
| if args.strict and summary["skipped"]: | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |