"""Hugging Face Spaces entrypoint for zeroshotGPU.""" from __future__ import annotations import os import shutil import tempfile import zipfile from pathlib import Path from typing import Any, Iterable try: import gradio as gr except ImportError as exc: # pragma: no cover - only used when launching the Space UI. raise RuntimeError("Gradio is required for the Spaces UI. Install with `python -m pip install -r requirements.txt`.") from exc from zsgdp.artifacts import validate_artifact_manifest from zsgdp.config import load_config, load_env_file from zsgdp.gpu import collect_gpu_runtime_status from zsgdp.logging_config import configure_logging, get_logger from zsgdp.pipeline import parse_document from zsgdp.profiling import profile_document # Load .env first so any keys it sets (HF_TOKEN, ZSGDP_LOG_LEVEL, etc.) are # visible before we read environment defaults below. Pre-set Space variables # always win — load_env_file does not override existing env entries. load_env_file() # On a ZeroGPU Space, explicitly seed huggingface_hub's auth context so # subsequent @spaces.GPU calls see Pro-tier quota. Setting HF_TOKEN as an # env var alone isn't always enough — the spaces SDK in some versions # reads the auth from huggingface_hub's cached login state, which # huggingface_hub.login() establishes. def _seed_hf_login() -> None: token = ( os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") or os.environ.get("HF_ACCESS_TOKEN") ) if not token: return try: from huggingface_hub import login # type: ignore login(token=token, add_to_git_credential=False) except Exception: # Auth seeding is best-effort. If huggingface_hub isn't importable # or login fails, the Space still functions — just on whatever # quota the bare HF_TOKEN env var unlocks. pass _seed_hf_login() # Default to JSON logs on the Space so the HF Spaces logs page is greppable. # Override locally with `ZSGDP_LOG_JSON=0` for human-readable text output. os.environ.setdefault("ZSGDP_LOG_LEVEL", "INFO") os.environ.setdefault("ZSGDP_LOG_JSON", "1" if os.environ.get("SPACE_ID") else "0") # Use a transformers-compat-friendly default for the embedding smoke. Jina-v3 # has known issues with newer transformers' remote-modules loader; the # all-MiniLM-L6-v2 default has no custom modeling code and works everywhere. # Override via Space settings → Variables and secrets if you want jina-v3. os.environ.setdefault("ZSGDP_SMOKE_EMBEDDING_MODEL_ID", "sentence-transformers/all-MiniLM-L6-v2") configure_logging() _logger = get_logger(__name__) ROOT = Path(__file__).resolve().parent DOCLING_CONFIG = ROOT / "configs" / "docling.yaml" LIVE_GPU_CONFIG = ROOT / "configs" / "live_gpu_repair.yaml" # Abuse guards. Override at deployment time via env vars to relax for trusted # Spaces or tighten further for public ones. MAX_UPLOAD_BYTES = int(os.environ.get("ZSGDP_MAX_UPLOAD_BYTES", str(50 * 1024 * 1024))) # 50 MB MAX_PAGE_COUNT = int(os.environ.get("ZSGDP_MAX_PAGE_COUNT", "200")) # Cap on docs extracted from a single zip so a malicious archive can't # fan out into thousands of parses. Each doc still goes through the # per-file MAX_UPLOAD_BYTES / MAX_PAGE_COUNT guards. MAX_BATCH_DOCS = int(os.environ.get("ZSGDP_MAX_BATCH_DOCS", "20")) SUPPORTED_PARSE_EXTS = (".pdf", ".md", ".txt", ".html", ".htm") class UploadRejected(Exception): """Raised when an upload exceeds an abuse-guard limit.""" def _validate_upload(path: Path) -> None: """Reject oversized uploads or PDFs with too many pages before parsing. Cheap to compute (file stat + profiler page count) and avoids spending GPU/CPU minutes on inputs the Space wasn't sized for. """ if not path.exists(): raise UploadRejected("Uploaded file is missing on disk.") size = path.stat().st_size if size > MAX_UPLOAD_BYTES: raise UploadRejected( f"Upload is {size / 1024 / 1024:.1f} MB; the Space limit is " f"{MAX_UPLOAD_BYTES / 1024 / 1024:.0f} MB. Set ZSGDP_MAX_UPLOAD_BYTES to override." ) try: profile = profile_document(path) except Exception: # pragma: no cover - profiler is robust; this is belt-and-braces. return if profile.page_count > MAX_PAGE_COUNT: raise UploadRejected( f"Document has {profile.page_count} pages; the Space limit is " f"{MAX_PAGE_COUNT}. Set ZSGDP_MAX_PAGE_COUNT to override." ) # Top-level artifact files surfaced as individual downloads. Nested # directories like assets/ stay bundled in the zip only — they can be # large for multi-page PDFs and would clutter the per-artifact list. _INDIVIDUAL_ARTIFACT_NAMES = ( "parsed_document.json", "document.md", "elements.jsonl", "tables.jsonl", "figures.jsonl", "chunks.jsonl", "chunking_plan.json", "parser_metrics.json", "quality_report.json", "routing_report.json", "profile.json", "gpu_runtime.json", "gpu_tasks.jsonl", "gpu_task_report.json", "artifact_manifest.json", "conflict_report.json", ) def _collect_artifact_files(output_dir: Path) -> list[str]: """Return absolute paths for the top-level artifacts the Space surfaces. Order matches _INDIVIDUAL_ARTIFACT_NAMES so the UI listing is stable. Missing files are silently skipped (different parse runs emit different subsets — e.g. conflict_report.json only when multiple parsers ran). """ paths: list[str] = [] for name in _INDIVIDUAL_ARTIFACT_NAMES: candidate = output_dir / name if candidate.exists(): paths.append(str(candidate)) return paths def _empty_outputs(reason: str, source: Path | None, *, rejected: bool, runtime: dict) -> tuple: """Return-shape used for every error path. Centralised so the tuple width can't drift between the success path and the four error paths.""" summary: dict[str, Any] = {"error": reason} if source is not None: summary["source"] = str(source) if rejected: summary["rejected"] = True return ("", summary, {}, {}, {}, runtime, [], {}, {}, None, []) def _build_chunk_detail(parsed) -> dict[str, Any]: """Produce a richer chunking summary than the bare chunking_plan. Surfaces strategy counts, token-count distribution, sample chunks per strategy (truncated to keep the payload UI-friendly), and counts of tables / figures / parent / child chunks. Companion to the `chunking_plan` JSON which only describes the strategy ladder. """ chunks = parsed.chunks by_strategy: dict[str, list] = {} for chunk in chunks: by_strategy.setdefault(chunk.strategy, []).append(chunk) strategy_breakdown: dict[str, dict[str, Any]] = {} for strategy, items in sorted(by_strategy.items()): token_counts = sorted(item.token_count for item in items) sample_chunks = [] for item in items[:3]: preview = item.text.strip() if len(preview) > 240: preview = preview[:237] + "..." sample_chunks.append( { "chunk_id": item.chunk_id, "page_start": item.page_start, "page_end": item.page_end, "section_path": item.section_path, "boundary_reason": item.boundary_reason, "token_count": item.token_count, "source_parser": item.source_parser, "preview": preview, } ) strategy_breakdown[strategy] = { "count": len(items), "token_count_min": token_counts[0] if token_counts else 0, "token_count_median": token_counts[len(token_counts) // 2] if token_counts else 0, "token_count_max": token_counts[-1] if token_counts else 0, "samples": sample_chunks, } parent_count = sum(1 for c in chunks if c.content_type == "parent") child_count = sum(1 for c in chunks if c.parent_chunk_id) table_chunks = sum(1 for c in chunks if c.table_ids) figure_chunks = sum(1 for c in chunks if c.figure_ids) visual_context = sum(1 for c in chunks if c.requires_visual_context) return { "total_chunks": len(chunks), "parent_chunks": parent_count, "child_chunks": child_count, "table_linked_chunks": table_chunks, "figure_linked_chunks": figure_chunks, "visual_context_required": visual_context, "strategies": strategy_breakdown, "plan": parsed.provenance.get("chunking", {}), } def _extract_uploads_to_parse(uploads: Iterable[Path], work_dir: Path) -> list[Path]: """Resolve a set of uploaded files (possibly zips) into individual docs. Each input is either: - A supported document file (.pdf, .md, .txt, .html) — kept as-is. - A .zip archive — extracted; supported files inside are added to the list. Nested zips are skipped (no recursive extraction; one level only). Other extensions are silently dropped. The total number of resolved docs is capped at MAX_BATCH_DOCS to bound the worst-case parse time per request. """ resolved: list[Path] = [] for upload in uploads: ext = upload.suffix.lower() if ext == ".zip": extract_dir = Path(tempfile.mkdtemp(prefix="zsgdp_zip_", dir=work_dir)) try: with zipfile.ZipFile(upload) as zf: # Skip directories and nested zips. for member in zf.namelist(): if member.endswith("/"): continue member_lower = member.lower() if not member_lower.endswith(SUPPORTED_PARSE_EXTS): continue if "__MACOSX" in member or member_lower.startswith("."): continue # Path traversal guard. target = (extract_dir / member).resolve() if not str(target).startswith(str(extract_dir.resolve())): continue target.parent.mkdir(parents=True, exist_ok=True) with zf.open(member) as source, open(target, "wb") as out: shutil.copyfileobj(source, out) resolved.append(target) except zipfile.BadZipFile: _logger.warning("space_zip_corrupt", extra={"path": str(upload)}) continue elif ext in SUPPORTED_PARSE_EXTS: resolved.append(upload) else: _logger.info("space_upload_skipped", extra={"path": str(upload), "reason": "unsupported_extension"}) if len(resolved) >= MAX_BATCH_DOCS: break return resolved[:MAX_BATCH_DOCS] def _parse_one_doc( source: Path, output_dir: Path, pipeline_mode: str, ) -> dict[str, Any]: """Parse a single doc and return a per-doc result block. Raises on parse failure so the batch driver can record the error and continue with remaining docs instead of aborting the whole request. """ config_path = _config_path_for_mode(pipeline_mode) parsed = parse_document(source, output_dir, config_path=config_path) artifact_validation = validate_artifact_manifest(output_dir) individual_files = _collect_artifact_files(output_dir) return { "source_path": str(source), "doc_id": parsed.doc_id, "file_type": parsed.file_type, "elements": len(parsed.elements), "tables": len(parsed.tables), "figures": len(parsed.figures), "chunks": len(parsed.chunks), "quality_score": parsed.quality_report.score, "blocking": parsed.quality_report.has_blocking_failures, "artifact_manifest_valid": artifact_validation.get("valid"), "individual_artifact_count": len(individual_files), "_parsed": parsed, "_artifact_validation": artifact_validation, "_individual_files": individual_files, "_output_dir": str(output_dir), } def parse_uploaded_document(file_obj: Any, pipeline_mode: str, progress: Any = None): """Parse one or more documents into Markdown, structured JSON, and chunks. Accepts either a single file or a list of files (Gradio's `file_count="multiple"` semantics). `.zip` uploads are extracted on the server side and each supported file inside is parsed; total docs are capped at MAX_BATCH_DOCS (default 20) to bound the worst-case work per request. For multi-doc inputs the Markdown tab shows the first document's output; the Summary tab includes a `batch` block listing every doc's headline metrics; the Artifacts zip contains every per-doc directory. Use when a user supplies one or many documents and wants either (a) the text reconstructed cleanly, (b) structured elements + tables + figures with bounding boxes, (c) chunks for downstream RAG, or (d) an audit trail showing which parsers ran and how the merger resolved conflicts. Args: file_obj: Uploaded file(s). Single `.pdf` / `.md` / `.txt` / `.html`, or a `.zip` of those, or a list of any of the above. Per-file caps of 50 MB and 200 pages apply (configurable via ZSGDP_MAX_UPLOAD_BYTES / ZSGDP_MAX_PAGE_COUNT). pipeline_mode: "Docling + PyMuPDF" / "Default lightweight" / "Live GPU repair". The third dispatches malformed-table, OCR-coverage, figure, and reading-order issues to the configured GPU backend (Qwen2.5-VL by default). progress: optional Gradio Progress object (auto-injected by the Gradio click handler — leave None for direct API calls). """ if progress is None: # When called via /gradio_api/call, no progress is wired; use a no-op # so the function signature stays consistent. def progress(value, *, desc=""): # type: ignore[no-redef] return None if file_obj is None: return _empty_outputs("Upload a document first.", None, rejected=False, runtime={}) progress(0.0, desc="Validating uploads...") # Normalise to a list of Path. Gradio passes a single FileData when # file_count='single' and a list when 'multiple'. if isinstance(file_obj, list): upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None] elif hasattr(file_obj, "name"): upload_paths = [Path(file_obj.name)] else: upload_paths = [Path(str(file_obj))] if not upload_paths: return _empty_outputs("Upload a document first.", None, rejected=False, runtime={}) work_dir = Path(tempfile.mkdtemp(prefix="zeroshotgpu_")) docs_to_parse = _extract_uploads_to_parse(upload_paths, work_dir) if not docs_to_parse: runtime = runtime_status_for_mode(pipeline_mode) return _empty_outputs( "No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).", upload_paths[0], rejected=True, runtime=runtime, ) # Per-file abuse guard. for doc in docs_to_parse: try: _validate_upload(doc) except UploadRejected as exc: _logger.warning( "space_upload_rejected", extra={"source_path": str(doc), "reason": str(exc)}, ) runtime = runtime_status_for_mode(pipeline_mode) return _empty_outputs(str(exc), doc, rejected=True, runtime=runtime) progress(0.05, desc=f"Parsing {len(docs_to_parse)} document(s)...") output_root = work_dir / "parsed" output_root.mkdir(parents=True, exist_ok=True) per_doc_results: list[dict[str, Any]] = [] used_names: set[str] = set() for index, doc in enumerate(docs_to_parse, start=1): # Stable per-doc subdir. stem = doc.stem or f"doc_{index}" candidate = stem suffix = 2 while candidate in used_names: candidate = f"{stem}_{suffix}" suffix += 1 used_names.add(candidate) doc_out = output_root / candidate progress( 0.05 + 0.85 * (index - 1) / max(1, len(docs_to_parse)), desc=f"Parsing {index}/{len(docs_to_parse)}: {doc.name}", ) try: result = _parse_one_doc(doc, doc_out, pipeline_mode) per_doc_results.append(result) except Exception as exc: # pragma: no cover - surfaced in UI _logger.warning( "space_parse_failed", extra={"source_path": str(doc), "error": str(exc)}, ) per_doc_results.append( { "source_path": str(doc), "error": str(exc), "doc_id": None, "_parsed": None, } ) progress(0.92, desc="Bundling artifacts...") # Pick the first successful parse as the primary doc shown in the UI. successful = [r for r in per_doc_results if r.get("_parsed") is not None] if not successful: runtime = runtime_status_for_mode(pipeline_mode) first_error = next((r.get("error") for r in per_doc_results if r.get("error")), "All parses failed.") return _empty_outputs(first_error, upload_paths[0], rejected=False, runtime=runtime) primary = successful[0] parsed = primary["_parsed"] artifact_validation = primary["_artifact_validation"] individual_files = primary["_individual_files"] # If batch, the archive bundles the whole output_root; otherwise just the # single doc's dir. Always returns a single zip path. if len(per_doc_results) > 1: archive_path = shutil.make_archive(str(output_root), "zip", output_root) else: archive_path = shutil.make_archive(str(Path(primary["_output_dir"])), "zip", primary["_output_dir"]) runtime = parsed.provenance.get("gpu_runtime", {}) summary = { "doc_id": parsed.doc_id, "file_type": parsed.file_type, "elements": len(parsed.elements), "tables": len(parsed.tables), "figures": len(parsed.figures), "chunks": len(parsed.chunks), "quality_score": parsed.quality_report.score, "blocking": parsed.quality_report.has_blocking_failures, "deployment": parsed.provenance.get("config_deployment", {}), "runtime_device": runtime.get("device"), "running_on_huggingface_space": runtime.get("running_on_huggingface_space"), "artifact_manifest_valid": artifact_validation.get("valid"), "artifact_count": artifact_validation.get("artifact_count"), "artifact_checked_count": artifact_validation.get("checked_count"), "individual_artifact_count": len(individual_files), } if len(per_doc_results) > 1: successful_count = sum(1 for r in per_doc_results if r.get("_parsed") is not None) summary["batch"] = { "input_count": len(docs_to_parse), "successful_count": successful_count, "failed_count": len(per_doc_results) - successful_count, "documents": [ {key: value for key, value in record.items() if not key.startswith("_")} for record in per_doc_results ], "aggregate": { "total_elements": sum(r.get("elements", 0) for r in per_doc_results if r.get("elements") is not None), "total_tables": sum(r.get("tables", 0) for r in per_doc_results if r.get("tables") is not None), "total_figures": sum(r.get("figures", 0) for r in per_doc_results if r.get("figures") is not None), "total_chunks": sum(r.get("chunks", 0) for r in per_doc_results if r.get("chunks") is not None), "mean_quality_score": ( sum(r.get("quality_score", 0.0) for r in per_doc_results if r.get("quality_score") is not None) / max(1, successful_count) ), }, } chunking_payload = { "plan": parsed.provenance.get("chunking", {}), "detail": _build_chunk_detail(parsed), } progress(1.0, desc="Done") return ( parsed.to_markdown(), summary, parsed.quality_report.to_dict(), parsed.provenance.get("parser_metrics", {}), chunking_payload, runtime, parsed.provenance.get("gpu_tasks", []), parsed.provenance.get("gpu_task_report", {}), artifact_validation, archive_path, individual_files, ) def _config_path_for_mode(pipeline_mode: str) -> Path | None: env_config = os.environ.get("ZSGDP_CONFIG_PATH") if env_config: return Path(env_config) if pipeline_mode == "Live GPU repair" and LIVE_GPU_CONFIG.exists(): return LIVE_GPU_CONFIG if pipeline_mode == "Docling + PyMuPDF" and DOCLING_CONFIG.exists(): return DOCLING_CONFIG return None def runtime_status_for_mode(pipeline_mode: str) -> dict: return collect_gpu_runtime_status(load_config(_config_path_for_mode(pipeline_mode))).to_dict() def diagnose_runtime() -> dict: """Report env-var presence (not values) so we can confirm HF_TOKEN is loaded. Returns booleans for which token-related env vars are present, plus their lengths (to confirm a non-empty value), plus whether the spaces SDK can detect authentication. NEVER returns actual token values. """ import os token_vars = ("HF_TOKEN", "HUGGING_FACE_HUB_TOKEN", "HUGGINGFACE_TOKEN", "HF_ACCESS_TOKEN") info: dict[str, Any] = { "space_id": os.environ.get("SPACE_ID"), "space_host": os.environ.get("SPACE_HOST"), } for var in token_vars: value = os.environ.get(var) info[f"{var}_set"] = bool(value) info[f"{var}_length"] = len(value) if value else 0 # Try to import spaces SDK and see what it reports. try: import spaces # type: ignore info["spaces_sdk_available"] = True except ImportError: info["spaces_sdk_available"] = False # Authenticate the token against HF Hub to see which user it resolves to # and whether Pro is recognized. This is the actual auth ZeroGPU does. token_value = next((os.environ.get(v) for v in token_vars if os.environ.get(v)), None) if token_value: import urllib.request, json as _json try: req = urllib.request.Request( "https://huggingface.co/api/whoami-v2", headers={"Authorization": f"Bearer {token_value}"}, ) with urllib.request.urlopen(req, timeout=15) as resp: whoami = _json.loads(resp.read().decode("utf-8")) # Cherry-pick non-sensitive fields. info["whoami_name"] = whoami.get("name") info["whoami_type"] = whoami.get("type") info["whoami_isPro"] = whoami.get("isPro") info["whoami_canPay"] = whoami.get("canPay") info["whoami_periodEnd"] = whoami.get("periodEnd") info["whoami_auth_type"] = (whoami.get("auth") or {}).get("type") info["whoami_auth_role"] = (whoami.get("auth") or {}).get("accessToken", {}).get("role") except Exception as exc: info["whoami_error"] = str(exc) return info def run_smokes_in_space() -> dict: """Run scripts/run_space_smoke.py inside the Space and return the JSON report. Exposes the in-process smoke runner as a Gradio endpoint so it's callable from the UI tab AND from `/gradio_api/call/run_smokes_in_space` remotely. Same code path as the terminal `python -m scripts.run_space_smoke` — just triggered through Gradio instead of an SSH session. Returns the same dict shape as SmokeReport.to_dict(): per-smoke results with status / elapsed / detail / skip_reason / install_hint, plus an aggregate summary count block. """ from scripts.run_space_smoke import run_smokes _logger.info("space_smokes_requested", extra={"trigger": "gradio_endpoint"}) report = run_smokes() payload = report.to_dict() _logger.info( "space_smokes_complete", extra={ "passed": payload["summary"]["passed"], "failed": payload["summary"]["failed"], "skipped": payload["summary"]["skipped"], "errored": payload["summary"]["errored"], }, ) return payload def run_benchmark_on_upload(file_obj: Any) -> dict: """Run the parser benchmark against a user-supplied corpus. Accepts the same upload shapes as `parse_uploaded_document`: a single document, a list, or a `.zip` of documents. Per-file caps and batch cap apply identically. Returns the benchmark headline metrics plus a `documents` list with per-doc records. For real §29 numbers against labelled datasets, use the `omnidocbench` or `doclaynet` loader from a Pro-tier Dev Mode terminal — those add layout F1 / table structure / formula CER which require ground-truth annotations not available from a raw upload. """ if file_obj is None: return {"error": "Upload at least one document to benchmark."} import tempfile from zsgdp.benchmarks.parser_quality import run_parser_benchmark if isinstance(file_obj, list): upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None] elif hasattr(file_obj, "name"): upload_paths = [Path(file_obj.name)] else: upload_paths = [Path(str(file_obj))] if not upload_paths: return {"error": "Upload at least one document to benchmark."} work_dir = Path(tempfile.mkdtemp(prefix="zsgdp_bench_upload_")) docs = _extract_uploads_to_parse(upload_paths, work_dir) if not docs: return { "error": "No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).", "input_count": len(upload_paths), } # Per-file abuse guards. for doc in docs: try: _validate_upload(doc) except UploadRejected as exc: return {"error": str(exc), "rejected": True, "source_path": str(doc)} bench_input = work_dir / "input" bench_input.mkdir() for doc in docs: target = bench_input / doc.name # Avoid name collisions (different paths, same filename inside zips). suffix = 2 while target.exists(): target = bench_input / f"{doc.stem}_{suffix}{doc.suffix}" suffix += 1 shutil.copy2(doc, target) out = work_dir / "out" _logger.info( "space_benchmark_upload_requested", extra={"input_count": len(upload_paths), "docs_found": len(docs)}, ) summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder") headline = { "dataset_name": summary.get("dataset_name"), "document_count": summary.get("document_count"), "mean_quality_score": summary.get("mean_quality_score"), "mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"), "mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"), "mean_retrieval_mrr": summary.get("mean_retrieval_mrr"), "mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"), "mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"), "mean_repair_regression_rate": summary.get("mean_repair_regression_rate"), "retrieval_evaluated_count": summary.get("retrieval_evaluated_count"), "documents": [ { "doc_id": doc.get("doc_id"), "file_type": doc.get("file_type"), "quality_score": doc.get("quality_score"), "elements": doc.get("element_count"), "tables": doc.get("table_count"), "figures": doc.get("figure_count"), "chunks": doc.get("chunk_count"), "parser_disagreement_rate": doc.get("parser_disagreement_rate"), "repair_resolution_rate": doc.get("repair_resolution_rate"), "elapsed_seconds": doc.get("elapsed_seconds"), } for doc in summary.get("documents") or [] ], "note": ( "GT-comparison metrics (layout F1, table structure, formula CER) " "are unavailable for arbitrary uploads — they need labelled datasets " "(omnidocbench / doclaynet)." ), } _logger.info( "space_benchmark_upload_complete", extra={k: v for k, v in headline.items() if k != "documents" and not isinstance(v, list)}, ) return headline def run_benchmark_in_space() -> dict: """Run a benchmark against tests/regression/fixtures and return the headline numbers. Triggered from the UI / API. The fixture corpus is committed to the repo so the benchmark is reproducible without uploading any data. For real corpora, drop documents into a Space-side directory and modify the input path here, or run zsgdp benchmark from a Dev Mode terminal. Filters fixture input to `*.input.*` files (the seed documents) so the paired `*.expected.json` snapshot files don't get misparsed as docs. """ import tempfile from zsgdp.benchmarks.parser_quality import run_parser_benchmark fixtures = ROOT / "tests" / "regression" / "fixtures" _logger.info("space_benchmark_requested", extra={"input_dir": str(fixtures)}) with tempfile.TemporaryDirectory(prefix="zsgdp_bench_") as tmp: # Copy only the actual document inputs (skip the .expected.json snapshots). bench_input = Path(tmp) / "input" bench_input.mkdir() copied = 0 for source in sorted(fixtures.glob("*.input.*")): shutil.copy2(source, bench_input / source.name) copied += 1 out = Path(tmp) / "out" summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder") headline = { "dataset_name": summary.get("dataset_name"), "document_count": summary.get("document_count"), "mean_quality_score": summary.get("mean_quality_score"), "mean_layout_f1": summary.get("mean_layout_f1"), "mean_table_structure_score": summary.get("mean_table_structure_score"), "mean_formula_cer": summary.get("mean_formula_cer"), "mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"), "mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"), "mean_retrieval_mrr": summary.get("mean_retrieval_mrr"), "mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"), "mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"), "mean_repair_regression_rate": summary.get("mean_repair_regression_rate"), "retrieval_evaluated_count": summary.get("retrieval_evaluated_count"), "layout_evaluated_count": summary.get("layout_evaluated_count"), } _logger.info("space_benchmark_complete", extra=headline) return headline _HELP_MARKDOWN = f""" ## What this is **zeroshotGPU** is an agentic document-parsing control plane. It does not rely on a single extraction engine — it profiles each document, routes pages to the best parser expert (Docling, PyMuPDF, optionally Marker / MinerU / olmOCR / PaddleOCR / Unstructured), normalizes outputs into a canonical schema, verifies quality, repairs weak regions through a bounded verify/repair loop (with optional GPU escalation), and emits retrieval-ready chunks with provenance. ## How to use this Space **1. Pick a pipeline mode.** | Mode | What it does | |---|---| | `Docling + PyMuPDF` | Default. Runs both parsers so the parser-disagreement metric has a comparison surface. Good for general-purpose parsing. | | `Default lightweight` | Text + PyMuPDF only. Fastest. Use when you just need clean text extraction. | | `Live GPU repair` | Enables `repair.execute_gpu_escalations=true`. Verification failures (invalid tables, OCR coverage gaps, reading-order issues, missing figure captions) are dispatched to Qwen2.5-VL-3B on the GPU. Slower; requires the GPU path to actually be hit (deterministic repair handles markdown tables before this fires). | **2. Upload one or more documents.** Accepts `.pdf`, `.md`, `.txt`, `.html`, or a `.zip` of any of those. Multi-file selection works. Per-file cap: {MAX_UPLOAD_BYTES // (1024 * 1024)} MB / {MAX_PAGE_COUNT} pages. Batch cap: {MAX_BATCH_DOCS} docs per request. **3. Click Parse.** Watch the progress bar; first call may take longer if a model has to download. ## What each tab shows - **Markdown** — canonical reconstruction of the parsed document. For batch uploads, this shows the first document; the full set is in the artifacts zip. - **Run** — summary, quality report, parser metrics, and artifact manifest validation. For batch uploads, `Summary.batch` lists every document parsed in the request with its headline metrics + an aggregate block. - **Chunks** — per-strategy chunk breakdown: total / parent / child / table-linked / figure-linked / visual-context counts, plus per-strategy blocks with token count distribution (min/median/max) and 3 sample chunks per strategy with 240-char previews. - **Artifacts** — each top-level artifact (`parsed_document.json`, `chunks.jsonl`, `quality_report.json`, etc.) downloadable individually. Nested asset crops (page renders, table images) stay bundled in the zip above. - **Runtime** — detected GPU runtime, planned GPU tasks, preflight report. - **Smokes** — runs the project's smoke validation suite in-Space; reports per-smoke pass/fail/skip + detail. API: `/gradio_api/call/run_smokes_in_space`. - **Benchmark** — two modes: against committed regression fixtures, OR against an uploaded corpus you supply. Returns headline metrics (quality score, retrieval recall, repair resolution rate, etc.) plus a per-doc breakdown. API: `/gradio_api/call/run_benchmark_in_space` and `/gradio_api/call/run_benchmark_on_upload`. ## API surface Every button is also a Gradio API endpoint, so AI agents and downstream tooling can invoke them programmatically. Discovery: `agents.md` at the Space root returns the calling instructions; `/gradio_api/info` returns the full schema. ```bash # Parse a doc: curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/parse_uploaded_document \\ -H "Content-Type: application/json" \\ -d '{{"data": [{{file_data}}, "Default lightweight"]}}' # Run smokes: curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_smokes_in_space \\ -H "Content-Type: application/json" -d '{{"data": []}}' # Benchmark: curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_benchmark_in_space \\ -H "Content-Type: application/json" -d '{{"data": []}}' ``` ## Configuration Defaults work out of the box. To change behavior, set Space variables: - `ZSGDP_CONFIG_PATH` — point at one of `configs/default.yaml`, `configs/docling.yaml`, `configs/live_gpu_repair.yaml`, or your own committed YAML. - `ZSGDP_LOG_LEVEL` — `INFO` (default on Spaces), `DEBUG`, `WARNING`, etc. - `ZSGDP_LOG_JSON` — `1` (default on Spaces) for one-line JSON log records. - `ZSGDP_MAX_UPLOAD_BYTES` / `ZSGDP_MAX_PAGE_COUNT` / `ZSGDP_MAX_BATCH_DOCS` — abuse guards. - `HF_TOKEN` — required for gated models (jina-embeddings-v3 may need it). ## Known limits - **ZeroGPU duration cap.** Each `@spaces.GPU`-decorated call runs in a 60s GPU slot. First-call cold-start for big models (Qwen2.5-VL-3B is ~6 GB) exceeds this on a clean cache. Subsequent calls reuse the cached weights and fit comfortably. - **Live GPU repair** only fires when the deterministic repair path can't resolve an issue. For markdown tables, the deterministic normalizer handles most malformations before GPU dispatch is needed. - **GT-comparison metrics** (layout F1, table structure score, formula CER) require labelled datasets (`omnidocbench`, `doclaynet`). Uploaded custom corpora produce all the GT-free metrics but those three. ## Source [![View source on Hugging Face](https://img.shields.io/badge/HF%20Space-arjun10g%2FzeroshotGPU-blue)](https://huggingface.co/spaces/arjun10g/zeroshotGPU) The full project source — including the multi-step spec, contributor docs, and 250+ unit tests — is at the link above. The `Files` tab on the Space page shows the live deploy. """ with gr.Blocks(title="zeroshotGPU") as demo: gr.Markdown( "# zeroshotGPU\n\n" "Self-hosted agentic document parser. Upload a single document, multiple " "documents, or a `.zip` of documents (PDF / Markdown / plaintext / HTML). " "Each parse emits canonical markdown, structured JSON, retrieval-ready " "chunks (multi-strategy), a quality report with GT-comparison metrics " "where applicable, and a SHA-256-checksummed artifact manifest. " f"Per-file caps: {MAX_UPLOAD_BYTES // (1024 * 1024)} MB / " f"{MAX_PAGE_COUNT} pages. Batch cap: {MAX_BATCH_DOCS} docs per request. " "**See the [Help] tab for full instructions.**\n\n" "[Source on Hugging Face](https://huggingface.co/spaces/arjun10g/zeroshotGPU)" ) with gr.Row(): upload = gr.File( label="Document(s) — single file, multi-select, or .zip", file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"], file_count="multiple", ) with gr.Column(): pipeline = gr.Dropdown( choices=["Docling + PyMuPDF", "Default lightweight", "Live GPU repair"], value="Docling + PyMuPDF", label="Pipeline", info="`Docling + PyMuPDF` runs both for the disagreement signal. `Default lightweight` is text + PyMuPDF only. `Live GPU repair` enables repair.execute_gpu_escalations=true and dispatches malformed-table / OCR / figure / reading-order issues to Qwen2.5-VL.", ) parse_button = gr.Button("Parse", variant="primary") archive = gr.File(label="Artifacts (zip)") with gr.Tabs(): with gr.Tab("Help"): gr.Markdown(_HELP_MARKDOWN) with gr.Tab("Markdown"): gr.Markdown( "_Canonical markdown reconstruction of the parsed document. " "For batch uploads, this shows the first document; the full " "set is in the artifacts zip._" ) markdown = gr.Markdown(label="Canonical Markdown") with gr.Tab("Run"): gr.Markdown( "_Summary, quality report, parser metrics, and artifact " "validation. For batch uploads, `Summary.batch` lists every " "document parsed in the request._" ) summary = gr.JSON(label="Summary") quality = gr.JSON(label="Quality Report") parser_metrics = gr.JSON(label="Parser Metrics") artifact_validation = gr.JSON(label="Artifact Manifest Validation") with gr.Tab("Chunks"): gr.Markdown( "_Per-strategy chunk breakdown: counts, token-count " "distribution (min / median / max), and three sample chunks " "with previews per strategy. The full chunks.jsonl is in the " "Artifacts tab and inside the zip._\n\n" "Strategies emitted by default: `fixed_token_baseline`, " "`recursive_structure`, `parent_child` (with linked parent / " "child IDs), `page_level`, plus `table` / `figure` chunks " "with provenance. `semantic`, `late`, `vision_guided`, and " "`agentic_proposition` are config-gated stubs that emit " "deterministic candidates marked for backend replacement." ) chunking = gr.JSON(label="Chunking plan + per-strategy detail") with gr.Tab("Artifacts"): gr.Markdown( "Each top-level artifact is downloadable individually. " "Nested assets (page renders, table/figure crops) stay bundled " "in the zip above." ) individual_artifacts = gr.Files(label="Individual artifacts") with gr.Tab("Runtime"): runtime = gr.JSON(label="GPU Runtime", value=runtime_status_for_mode("Docling + PyMuPDF")) gpu_tasks = gr.JSON(label="Planned GPU Tasks") gpu_task_report = gr.JSON(label="GPU Task Preflight") with gr.Tab("Smokes"): gr.Markdown( "Runs the same smokes as `python -m scripts.run_space_smoke`, " "in-process. Each call is also exposed via the Gradio API at " "`/gradio_api/call/run_smokes_in_space` for remote validation." ) smoke_button = gr.Button("Run all smokes", variant="primary") smoke_output = gr.JSON(label="Smoke report") with gr.Tab("Benchmark"): gr.Markdown( "**Two benchmark modes:**\n" "- **Run on regression fixtures** — uses the committed seed " "documents (`tests/regression/fixtures/`); reproducible without " "any upload. API: `/gradio_api/call/run_benchmark_in_space`.\n" "- **Run on uploaded corpus** — accepts a `.zip` of documents " "(or a list of files). Returns headline metrics plus a per-doc " "breakdown. GT-comparison metrics (layout F1, table structure, " "formula CER) are NOT computed — those require labelled " "datasets (`omnidocbench` / `doclaynet`) which can be loaded " "via the CLI from a Pro-tier Dev Mode terminal. API: " "`/gradio_api/call/run_benchmark_on_upload`." ) with gr.Row(): benchmark_button = gr.Button("Run on regression fixtures", variant="primary") benchmark_upload_button = gr.Button("Run on uploaded corpus") benchmark_corpus = gr.File( label="Optional upload — used only when 'Run on uploaded corpus' is clicked", file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"], file_count="multiple", ) benchmark_output = gr.JSON(label="Benchmark headline metrics") parse_button.click( parse_uploaded_document, inputs=[upload, pipeline], outputs=[ markdown, summary, quality, parser_metrics, chunking, runtime, gpu_tasks, gpu_task_report, artifact_validation, archive, individual_artifacts, ], ) smoke_button.click(run_smokes_in_space, inputs=[], outputs=smoke_output, api_name="run_smokes_in_space") benchmark_button.click(run_benchmark_in_space, inputs=[], outputs=benchmark_output, api_name="run_benchmark_in_space") benchmark_upload_button.click( run_benchmark_on_upload, inputs=[benchmark_corpus], outputs=benchmark_output, api_name="run_benchmark_on_upload", ) # Hidden diagnostic endpoint — reachable via /gradio_api/call/diagnose_runtime # but no UI button. Reports env-var presence (not values) for debugging # secrets / token / spaces SDK plumbing on the Space. diag_dummy = gr.Button("diag", visible=False) diag_output = gr.JSON(visible=False) diag_dummy.click(diagnose_runtime, inputs=[], outputs=diag_output, api_name="diagnose_runtime") if __name__ == "__main__": demo.launch()