zeroshotGPU / app.py
Arjunvir Singh
Seed huggingface_hub.login at startup so spaces SDK picks up Pro tier
48b8a91
"""Hugging Face Spaces entrypoint for zeroshotGPU."""
from __future__ import annotations
import os
import shutil
import tempfile
import zipfile
from pathlib import Path
from typing import Any, Iterable
try:
import gradio as gr
except ImportError as exc: # pragma: no cover - only used when launching the Space UI.
raise RuntimeError("Gradio is required for the Spaces UI. Install with `python -m pip install -r requirements.txt`.") from exc
from zsgdp.artifacts import validate_artifact_manifest
from zsgdp.config import load_config, load_env_file
from zsgdp.gpu import collect_gpu_runtime_status
from zsgdp.logging_config import configure_logging, get_logger
from zsgdp.pipeline import parse_document
from zsgdp.profiling import profile_document
# Load .env first so any keys it sets (HF_TOKEN, ZSGDP_LOG_LEVEL, etc.) are
# visible before we read environment defaults below. Pre-set Space variables
# always win β€” load_env_file does not override existing env entries.
load_env_file()
# On a ZeroGPU Space, explicitly seed huggingface_hub's auth context so
# subsequent @spaces.GPU calls see Pro-tier quota. Setting HF_TOKEN as an
# env var alone isn't always enough β€” the spaces SDK in some versions
# reads the auth from huggingface_hub's cached login state, which
# huggingface_hub.login() establishes.
def _seed_hf_login() -> None:
token = (
os.environ.get("HF_TOKEN")
or os.environ.get("HUGGING_FACE_HUB_TOKEN")
or os.environ.get("HUGGINGFACE_TOKEN")
or os.environ.get("HF_ACCESS_TOKEN")
)
if not token:
return
try:
from huggingface_hub import login # type: ignore
login(token=token, add_to_git_credential=False)
except Exception:
# Auth seeding is best-effort. If huggingface_hub isn't importable
# or login fails, the Space still functions β€” just on whatever
# quota the bare HF_TOKEN env var unlocks.
pass
_seed_hf_login()
# Default to JSON logs on the Space so the HF Spaces logs page is greppable.
# Override locally with `ZSGDP_LOG_JSON=0` for human-readable text output.
os.environ.setdefault("ZSGDP_LOG_LEVEL", "INFO")
os.environ.setdefault("ZSGDP_LOG_JSON", "1" if os.environ.get("SPACE_ID") else "0")
# Use a transformers-compat-friendly default for the embedding smoke. Jina-v3
# has known issues with newer transformers' remote-modules loader; the
# all-MiniLM-L6-v2 default has no custom modeling code and works everywhere.
# Override via Space settings β†’ Variables and secrets if you want jina-v3.
os.environ.setdefault("ZSGDP_SMOKE_EMBEDDING_MODEL_ID", "sentence-transformers/all-MiniLM-L6-v2")
configure_logging()
_logger = get_logger(__name__)
ROOT = Path(__file__).resolve().parent
DOCLING_CONFIG = ROOT / "configs" / "docling.yaml"
LIVE_GPU_CONFIG = ROOT / "configs" / "live_gpu_repair.yaml"
# Abuse guards. Override at deployment time via env vars to relax for trusted
# Spaces or tighten further for public ones.
MAX_UPLOAD_BYTES = int(os.environ.get("ZSGDP_MAX_UPLOAD_BYTES", str(50 * 1024 * 1024))) # 50 MB
MAX_PAGE_COUNT = int(os.environ.get("ZSGDP_MAX_PAGE_COUNT", "200"))
# Cap on docs extracted from a single zip so a malicious archive can't
# fan out into thousands of parses. Each doc still goes through the
# per-file MAX_UPLOAD_BYTES / MAX_PAGE_COUNT guards.
MAX_BATCH_DOCS = int(os.environ.get("ZSGDP_MAX_BATCH_DOCS", "20"))
SUPPORTED_PARSE_EXTS = (".pdf", ".md", ".txt", ".html", ".htm")
class UploadRejected(Exception):
"""Raised when an upload exceeds an abuse-guard limit."""
def _validate_upload(path: Path) -> None:
"""Reject oversized uploads or PDFs with too many pages before parsing.
Cheap to compute (file stat + profiler page count) and avoids spending
GPU/CPU minutes on inputs the Space wasn't sized for.
"""
if not path.exists():
raise UploadRejected("Uploaded file is missing on disk.")
size = path.stat().st_size
if size > MAX_UPLOAD_BYTES:
raise UploadRejected(
f"Upload is {size / 1024 / 1024:.1f} MB; the Space limit is "
f"{MAX_UPLOAD_BYTES / 1024 / 1024:.0f} MB. Set ZSGDP_MAX_UPLOAD_BYTES to override."
)
try:
profile = profile_document(path)
except Exception: # pragma: no cover - profiler is robust; this is belt-and-braces.
return
if profile.page_count > MAX_PAGE_COUNT:
raise UploadRejected(
f"Document has {profile.page_count} pages; the Space limit is "
f"{MAX_PAGE_COUNT}. Set ZSGDP_MAX_PAGE_COUNT to override."
)
# Top-level artifact files surfaced as individual downloads. Nested
# directories like assets/ stay bundled in the zip only β€” they can be
# large for multi-page PDFs and would clutter the per-artifact list.
_INDIVIDUAL_ARTIFACT_NAMES = (
"parsed_document.json",
"document.md",
"elements.jsonl",
"tables.jsonl",
"figures.jsonl",
"chunks.jsonl",
"chunking_plan.json",
"parser_metrics.json",
"quality_report.json",
"routing_report.json",
"profile.json",
"gpu_runtime.json",
"gpu_tasks.jsonl",
"gpu_task_report.json",
"artifact_manifest.json",
"conflict_report.json",
)
def _collect_artifact_files(output_dir: Path) -> list[str]:
"""Return absolute paths for the top-level artifacts the Space surfaces.
Order matches _INDIVIDUAL_ARTIFACT_NAMES so the UI listing is stable.
Missing files are silently skipped (different parse runs emit different
subsets β€” e.g. conflict_report.json only when multiple parsers ran).
"""
paths: list[str] = []
for name in _INDIVIDUAL_ARTIFACT_NAMES:
candidate = output_dir / name
if candidate.exists():
paths.append(str(candidate))
return paths
def _empty_outputs(reason: str, source: Path | None, *, rejected: bool, runtime: dict) -> tuple:
"""Return-shape used for every error path. Centralised so the tuple width
can't drift between the success path and the four error paths."""
summary: dict[str, Any] = {"error": reason}
if source is not None:
summary["source"] = str(source)
if rejected:
summary["rejected"] = True
return ("", summary, {}, {}, {}, runtime, [], {}, {}, None, [])
def _build_chunk_detail(parsed) -> dict[str, Any]:
"""Produce a richer chunking summary than the bare chunking_plan.
Surfaces strategy counts, token-count distribution, sample chunks per
strategy (truncated to keep the payload UI-friendly), and counts of
tables / figures / parent / child chunks. Companion to the
`chunking_plan` JSON which only describes the strategy ladder.
"""
chunks = parsed.chunks
by_strategy: dict[str, list] = {}
for chunk in chunks:
by_strategy.setdefault(chunk.strategy, []).append(chunk)
strategy_breakdown: dict[str, dict[str, Any]] = {}
for strategy, items in sorted(by_strategy.items()):
token_counts = sorted(item.token_count for item in items)
sample_chunks = []
for item in items[:3]:
preview = item.text.strip()
if len(preview) > 240:
preview = preview[:237] + "..."
sample_chunks.append(
{
"chunk_id": item.chunk_id,
"page_start": item.page_start,
"page_end": item.page_end,
"section_path": item.section_path,
"boundary_reason": item.boundary_reason,
"token_count": item.token_count,
"source_parser": item.source_parser,
"preview": preview,
}
)
strategy_breakdown[strategy] = {
"count": len(items),
"token_count_min": token_counts[0] if token_counts else 0,
"token_count_median": token_counts[len(token_counts) // 2] if token_counts else 0,
"token_count_max": token_counts[-1] if token_counts else 0,
"samples": sample_chunks,
}
parent_count = sum(1 for c in chunks if c.content_type == "parent")
child_count = sum(1 for c in chunks if c.parent_chunk_id)
table_chunks = sum(1 for c in chunks if c.table_ids)
figure_chunks = sum(1 for c in chunks if c.figure_ids)
visual_context = sum(1 for c in chunks if c.requires_visual_context)
return {
"total_chunks": len(chunks),
"parent_chunks": parent_count,
"child_chunks": child_count,
"table_linked_chunks": table_chunks,
"figure_linked_chunks": figure_chunks,
"visual_context_required": visual_context,
"strategies": strategy_breakdown,
"plan": parsed.provenance.get("chunking", {}),
}
def _extract_uploads_to_parse(uploads: Iterable[Path], work_dir: Path) -> list[Path]:
"""Resolve a set of uploaded files (possibly zips) into individual docs.
Each input is either:
- A supported document file (.pdf, .md, .txt, .html) β€” kept as-is.
- A .zip archive β€” extracted; supported files inside are added to the
list. Nested zips are skipped (no recursive extraction; one level only).
Other extensions are silently dropped.
The total number of resolved docs is capped at MAX_BATCH_DOCS to bound
the worst-case parse time per request.
"""
resolved: list[Path] = []
for upload in uploads:
ext = upload.suffix.lower()
if ext == ".zip":
extract_dir = Path(tempfile.mkdtemp(prefix="zsgdp_zip_", dir=work_dir))
try:
with zipfile.ZipFile(upload) as zf:
# Skip directories and nested zips.
for member in zf.namelist():
if member.endswith("/"):
continue
member_lower = member.lower()
if not member_lower.endswith(SUPPORTED_PARSE_EXTS):
continue
if "__MACOSX" in member or member_lower.startswith("."):
continue
# Path traversal guard.
target = (extract_dir / member).resolve()
if not str(target).startswith(str(extract_dir.resolve())):
continue
target.parent.mkdir(parents=True, exist_ok=True)
with zf.open(member) as source, open(target, "wb") as out:
shutil.copyfileobj(source, out)
resolved.append(target)
except zipfile.BadZipFile:
_logger.warning("space_zip_corrupt", extra={"path": str(upload)})
continue
elif ext in SUPPORTED_PARSE_EXTS:
resolved.append(upload)
else:
_logger.info("space_upload_skipped", extra={"path": str(upload), "reason": "unsupported_extension"})
if len(resolved) >= MAX_BATCH_DOCS:
break
return resolved[:MAX_BATCH_DOCS]
def _parse_one_doc(
source: Path,
output_dir: Path,
pipeline_mode: str,
) -> dict[str, Any]:
"""Parse a single doc and return a per-doc result block.
Raises on parse failure so the batch driver can record the error and
continue with remaining docs instead of aborting the whole request.
"""
config_path = _config_path_for_mode(pipeline_mode)
parsed = parse_document(source, output_dir, config_path=config_path)
artifact_validation = validate_artifact_manifest(output_dir)
individual_files = _collect_artifact_files(output_dir)
return {
"source_path": str(source),
"doc_id": parsed.doc_id,
"file_type": parsed.file_type,
"elements": len(parsed.elements),
"tables": len(parsed.tables),
"figures": len(parsed.figures),
"chunks": len(parsed.chunks),
"quality_score": parsed.quality_report.score,
"blocking": parsed.quality_report.has_blocking_failures,
"artifact_manifest_valid": artifact_validation.get("valid"),
"individual_artifact_count": len(individual_files),
"_parsed": parsed,
"_artifact_validation": artifact_validation,
"_individual_files": individual_files,
"_output_dir": str(output_dir),
}
def parse_uploaded_document(file_obj: Any, pipeline_mode: str, progress: Any = None):
"""Parse one or more documents into Markdown, structured JSON, and chunks.
Accepts either a single file or a list of files (Gradio's `file_count="multiple"`
semantics). `.zip` uploads are extracted on the server side and each
supported file inside is parsed; total docs are capped at
MAX_BATCH_DOCS (default 20) to bound the worst-case work per request.
For multi-doc inputs the Markdown tab shows the first document's
output; the Summary tab includes a `batch` block listing every doc's
headline metrics; the Artifacts zip contains every per-doc directory.
Use when a user supplies one or many documents and wants either
(a) the text reconstructed cleanly, (b) structured elements + tables
+ figures with bounding boxes, (c) chunks for downstream RAG, or
(d) an audit trail showing which parsers ran and how the merger
resolved conflicts.
Args:
file_obj: Uploaded file(s). Single `.pdf` / `.md` / `.txt` /
`.html`, or a `.zip` of those, or a list of any of the above.
Per-file caps of 50 MB and 200 pages apply (configurable via
ZSGDP_MAX_UPLOAD_BYTES / ZSGDP_MAX_PAGE_COUNT).
pipeline_mode: "Docling + PyMuPDF" / "Default lightweight" /
"Live GPU repair". The third dispatches malformed-table,
OCR-coverage, figure, and reading-order issues to the
configured GPU backend (Qwen2.5-VL by default).
progress: optional Gradio Progress object (auto-injected by the
Gradio click handler β€” leave None for direct API calls).
"""
if progress is None:
# When called via /gradio_api/call, no progress is wired; use a no-op
# so the function signature stays consistent.
def progress(value, *, desc=""): # type: ignore[no-redef]
return None
if file_obj is None:
return _empty_outputs("Upload a document first.", None, rejected=False, runtime={})
progress(0.0, desc="Validating uploads...")
# Normalise to a list of Path. Gradio passes a single FileData when
# file_count='single' and a list when 'multiple'.
if isinstance(file_obj, list):
upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None]
elif hasattr(file_obj, "name"):
upload_paths = [Path(file_obj.name)]
else:
upload_paths = [Path(str(file_obj))]
if not upload_paths:
return _empty_outputs("Upload a document first.", None, rejected=False, runtime={})
work_dir = Path(tempfile.mkdtemp(prefix="zeroshotgpu_"))
docs_to_parse = _extract_uploads_to_parse(upload_paths, work_dir)
if not docs_to_parse:
runtime = runtime_status_for_mode(pipeline_mode)
return _empty_outputs(
"No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).",
upload_paths[0],
rejected=True,
runtime=runtime,
)
# Per-file abuse guard.
for doc in docs_to_parse:
try:
_validate_upload(doc)
except UploadRejected as exc:
_logger.warning(
"space_upload_rejected",
extra={"source_path": str(doc), "reason": str(exc)},
)
runtime = runtime_status_for_mode(pipeline_mode)
return _empty_outputs(str(exc), doc, rejected=True, runtime=runtime)
progress(0.05, desc=f"Parsing {len(docs_to_parse)} document(s)...")
output_root = work_dir / "parsed"
output_root.mkdir(parents=True, exist_ok=True)
per_doc_results: list[dict[str, Any]] = []
used_names: set[str] = set()
for index, doc in enumerate(docs_to_parse, start=1):
# Stable per-doc subdir.
stem = doc.stem or f"doc_{index}"
candidate = stem
suffix = 2
while candidate in used_names:
candidate = f"{stem}_{suffix}"
suffix += 1
used_names.add(candidate)
doc_out = output_root / candidate
progress(
0.05 + 0.85 * (index - 1) / max(1, len(docs_to_parse)),
desc=f"Parsing {index}/{len(docs_to_parse)}: {doc.name}",
)
try:
result = _parse_one_doc(doc, doc_out, pipeline_mode)
per_doc_results.append(result)
except Exception as exc: # pragma: no cover - surfaced in UI
_logger.warning(
"space_parse_failed",
extra={"source_path": str(doc), "error": str(exc)},
)
per_doc_results.append(
{
"source_path": str(doc),
"error": str(exc),
"doc_id": None,
"_parsed": None,
}
)
progress(0.92, desc="Bundling artifacts...")
# Pick the first successful parse as the primary doc shown in the UI.
successful = [r for r in per_doc_results if r.get("_parsed") is not None]
if not successful:
runtime = runtime_status_for_mode(pipeline_mode)
first_error = next((r.get("error") for r in per_doc_results if r.get("error")), "All parses failed.")
return _empty_outputs(first_error, upload_paths[0], rejected=False, runtime=runtime)
primary = successful[0]
parsed = primary["_parsed"]
artifact_validation = primary["_artifact_validation"]
individual_files = primary["_individual_files"]
# If batch, the archive bundles the whole output_root; otherwise just the
# single doc's dir. Always returns a single zip path.
if len(per_doc_results) > 1:
archive_path = shutil.make_archive(str(output_root), "zip", output_root)
else:
archive_path = shutil.make_archive(str(Path(primary["_output_dir"])), "zip", primary["_output_dir"])
runtime = parsed.provenance.get("gpu_runtime", {})
summary = {
"doc_id": parsed.doc_id,
"file_type": parsed.file_type,
"elements": len(parsed.elements),
"tables": len(parsed.tables),
"figures": len(parsed.figures),
"chunks": len(parsed.chunks),
"quality_score": parsed.quality_report.score,
"blocking": parsed.quality_report.has_blocking_failures,
"deployment": parsed.provenance.get("config_deployment", {}),
"runtime_device": runtime.get("device"),
"running_on_huggingface_space": runtime.get("running_on_huggingface_space"),
"artifact_manifest_valid": artifact_validation.get("valid"),
"artifact_count": artifact_validation.get("artifact_count"),
"artifact_checked_count": artifact_validation.get("checked_count"),
"individual_artifact_count": len(individual_files),
}
if len(per_doc_results) > 1:
successful_count = sum(1 for r in per_doc_results if r.get("_parsed") is not None)
summary["batch"] = {
"input_count": len(docs_to_parse),
"successful_count": successful_count,
"failed_count": len(per_doc_results) - successful_count,
"documents": [
{key: value for key, value in record.items() if not key.startswith("_")}
for record in per_doc_results
],
"aggregate": {
"total_elements": sum(r.get("elements", 0) for r in per_doc_results if r.get("elements") is not None),
"total_tables": sum(r.get("tables", 0) for r in per_doc_results if r.get("tables") is not None),
"total_figures": sum(r.get("figures", 0) for r in per_doc_results if r.get("figures") is not None),
"total_chunks": sum(r.get("chunks", 0) for r in per_doc_results if r.get("chunks") is not None),
"mean_quality_score": (
sum(r.get("quality_score", 0.0) for r in per_doc_results if r.get("quality_score") is not None)
/ max(1, successful_count)
),
},
}
chunking_payload = {
"plan": parsed.provenance.get("chunking", {}),
"detail": _build_chunk_detail(parsed),
}
progress(1.0, desc="Done")
return (
parsed.to_markdown(),
summary,
parsed.quality_report.to_dict(),
parsed.provenance.get("parser_metrics", {}),
chunking_payload,
runtime,
parsed.provenance.get("gpu_tasks", []),
parsed.provenance.get("gpu_task_report", {}),
artifact_validation,
archive_path,
individual_files,
)
def _config_path_for_mode(pipeline_mode: str) -> Path | None:
env_config = os.environ.get("ZSGDP_CONFIG_PATH")
if env_config:
return Path(env_config)
if pipeline_mode == "Live GPU repair" and LIVE_GPU_CONFIG.exists():
return LIVE_GPU_CONFIG
if pipeline_mode == "Docling + PyMuPDF" and DOCLING_CONFIG.exists():
return DOCLING_CONFIG
return None
def runtime_status_for_mode(pipeline_mode: str) -> dict:
return collect_gpu_runtime_status(load_config(_config_path_for_mode(pipeline_mode))).to_dict()
def diagnose_runtime() -> dict:
"""Report env-var presence (not values) so we can confirm HF_TOKEN is loaded.
Returns booleans for which token-related env vars are present, plus their
lengths (to confirm a non-empty value), plus whether the spaces SDK can
detect authentication. NEVER returns actual token values.
"""
import os
token_vars = ("HF_TOKEN", "HUGGING_FACE_HUB_TOKEN", "HUGGINGFACE_TOKEN", "HF_ACCESS_TOKEN")
info: dict[str, Any] = {
"space_id": os.environ.get("SPACE_ID"),
"space_host": os.environ.get("SPACE_HOST"),
}
for var in token_vars:
value = os.environ.get(var)
info[f"{var}_set"] = bool(value)
info[f"{var}_length"] = len(value) if value else 0
# Try to import spaces SDK and see what it reports.
try:
import spaces # type: ignore
info["spaces_sdk_available"] = True
except ImportError:
info["spaces_sdk_available"] = False
# Authenticate the token against HF Hub to see which user it resolves to
# and whether Pro is recognized. This is the actual auth ZeroGPU does.
token_value = next((os.environ.get(v) for v in token_vars if os.environ.get(v)), None)
if token_value:
import urllib.request, json as _json
try:
req = urllib.request.Request(
"https://huggingface.co/api/whoami-v2",
headers={"Authorization": f"Bearer {token_value}"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
whoami = _json.loads(resp.read().decode("utf-8"))
# Cherry-pick non-sensitive fields.
info["whoami_name"] = whoami.get("name")
info["whoami_type"] = whoami.get("type")
info["whoami_isPro"] = whoami.get("isPro")
info["whoami_canPay"] = whoami.get("canPay")
info["whoami_periodEnd"] = whoami.get("periodEnd")
info["whoami_auth_type"] = (whoami.get("auth") or {}).get("type")
info["whoami_auth_role"] = (whoami.get("auth") or {}).get("accessToken", {}).get("role")
except Exception as exc:
info["whoami_error"] = str(exc)
return info
def run_smokes_in_space() -> dict:
"""Run scripts/run_space_smoke.py inside the Space and return the JSON report.
Exposes the in-process smoke runner as a Gradio endpoint so it's callable
from the UI tab AND from `/gradio_api/call/run_smokes_in_space` remotely.
Same code path as the terminal `python -m scripts.run_space_smoke` β€” just
triggered through Gradio instead of an SSH session.
Returns the same dict shape as SmokeReport.to_dict(): per-smoke results
with status / elapsed / detail / skip_reason / install_hint, plus an
aggregate summary count block.
"""
from scripts.run_space_smoke import run_smokes
_logger.info("space_smokes_requested", extra={"trigger": "gradio_endpoint"})
report = run_smokes()
payload = report.to_dict()
_logger.info(
"space_smokes_complete",
extra={
"passed": payload["summary"]["passed"],
"failed": payload["summary"]["failed"],
"skipped": payload["summary"]["skipped"],
"errored": payload["summary"]["errored"],
},
)
return payload
def run_benchmark_on_upload(file_obj: Any) -> dict:
"""Run the parser benchmark against a user-supplied corpus.
Accepts the same upload shapes as `parse_uploaded_document`: a single
document, a list, or a `.zip` of documents. Per-file caps and batch
cap apply identically. Returns the benchmark headline metrics plus a
`documents` list with per-doc records.
For real Β§29 numbers against labelled datasets, use the
`omnidocbench` or `doclaynet` loader from a Pro-tier Dev Mode
terminal β€” those add layout F1 / table structure / formula CER which
require ground-truth annotations not available from a raw upload.
"""
if file_obj is None:
return {"error": "Upload at least one document to benchmark."}
import tempfile
from zsgdp.benchmarks.parser_quality import run_parser_benchmark
if isinstance(file_obj, list):
upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None]
elif hasattr(file_obj, "name"):
upload_paths = [Path(file_obj.name)]
else:
upload_paths = [Path(str(file_obj))]
if not upload_paths:
return {"error": "Upload at least one document to benchmark."}
work_dir = Path(tempfile.mkdtemp(prefix="zsgdp_bench_upload_"))
docs = _extract_uploads_to_parse(upload_paths, work_dir)
if not docs:
return {
"error": "No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).",
"input_count": len(upload_paths),
}
# Per-file abuse guards.
for doc in docs:
try:
_validate_upload(doc)
except UploadRejected as exc:
return {"error": str(exc), "rejected": True, "source_path": str(doc)}
bench_input = work_dir / "input"
bench_input.mkdir()
for doc in docs:
target = bench_input / doc.name
# Avoid name collisions (different paths, same filename inside zips).
suffix = 2
while target.exists():
target = bench_input / f"{doc.stem}_{suffix}{doc.suffix}"
suffix += 1
shutil.copy2(doc, target)
out = work_dir / "out"
_logger.info(
"space_benchmark_upload_requested",
extra={"input_count": len(upload_paths), "docs_found": len(docs)},
)
summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder")
headline = {
"dataset_name": summary.get("dataset_name"),
"document_count": summary.get("document_count"),
"mean_quality_score": summary.get("mean_quality_score"),
"mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"),
"mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"),
"mean_retrieval_mrr": summary.get("mean_retrieval_mrr"),
"mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"),
"mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"),
"mean_repair_regression_rate": summary.get("mean_repair_regression_rate"),
"retrieval_evaluated_count": summary.get("retrieval_evaluated_count"),
"documents": [
{
"doc_id": doc.get("doc_id"),
"file_type": doc.get("file_type"),
"quality_score": doc.get("quality_score"),
"elements": doc.get("element_count"),
"tables": doc.get("table_count"),
"figures": doc.get("figure_count"),
"chunks": doc.get("chunk_count"),
"parser_disagreement_rate": doc.get("parser_disagreement_rate"),
"repair_resolution_rate": doc.get("repair_resolution_rate"),
"elapsed_seconds": doc.get("elapsed_seconds"),
}
for doc in summary.get("documents") or []
],
"note": (
"GT-comparison metrics (layout F1, table structure, formula CER) "
"are unavailable for arbitrary uploads β€” they need labelled datasets "
"(omnidocbench / doclaynet)."
),
}
_logger.info(
"space_benchmark_upload_complete",
extra={k: v for k, v in headline.items() if k != "documents" and not isinstance(v, list)},
)
return headline
def run_benchmark_in_space() -> dict:
"""Run a benchmark against tests/regression/fixtures and return the headline numbers.
Triggered from the UI / API. The fixture corpus is committed to the repo
so the benchmark is reproducible without uploading any data. For real
corpora, drop documents into a Space-side directory and modify the input
path here, or run zsgdp benchmark from a Dev Mode terminal.
Filters fixture input to `*.input.*` files (the seed documents) so the
paired `*.expected.json` snapshot files don't get misparsed as docs.
"""
import tempfile
from zsgdp.benchmarks.parser_quality import run_parser_benchmark
fixtures = ROOT / "tests" / "regression" / "fixtures"
_logger.info("space_benchmark_requested", extra={"input_dir": str(fixtures)})
with tempfile.TemporaryDirectory(prefix="zsgdp_bench_") as tmp:
# Copy only the actual document inputs (skip the .expected.json snapshots).
bench_input = Path(tmp) / "input"
bench_input.mkdir()
copied = 0
for source in sorted(fixtures.glob("*.input.*")):
shutil.copy2(source, bench_input / source.name)
copied += 1
out = Path(tmp) / "out"
summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder")
headline = {
"dataset_name": summary.get("dataset_name"),
"document_count": summary.get("document_count"),
"mean_quality_score": summary.get("mean_quality_score"),
"mean_layout_f1": summary.get("mean_layout_f1"),
"mean_table_structure_score": summary.get("mean_table_structure_score"),
"mean_formula_cer": summary.get("mean_formula_cer"),
"mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"),
"mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"),
"mean_retrieval_mrr": summary.get("mean_retrieval_mrr"),
"mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"),
"mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"),
"mean_repair_regression_rate": summary.get("mean_repair_regression_rate"),
"retrieval_evaluated_count": summary.get("retrieval_evaluated_count"),
"layout_evaluated_count": summary.get("layout_evaluated_count"),
}
_logger.info("space_benchmark_complete", extra=headline)
return headline
_HELP_MARKDOWN = f"""
## What this is
**zeroshotGPU** is an agentic document-parsing control plane. It does not rely
on a single extraction engine β€” it profiles each document, routes pages to the
best parser expert (Docling, PyMuPDF, optionally Marker / MinerU / olmOCR /
PaddleOCR / Unstructured), normalizes outputs into a canonical schema, verifies
quality, repairs weak regions through a bounded verify/repair loop (with
optional GPU escalation), and emits retrieval-ready chunks with provenance.
## How to use this Space
**1. Pick a pipeline mode.**
| Mode | What it does |
|---|---|
| `Docling + PyMuPDF` | Default. Runs both parsers so the parser-disagreement metric has a comparison surface. Good for general-purpose parsing. |
| `Default lightweight` | Text + PyMuPDF only. Fastest. Use when you just need clean text extraction. |
| `Live GPU repair` | Enables `repair.execute_gpu_escalations=true`. Verification failures (invalid tables, OCR coverage gaps, reading-order issues, missing figure captions) are dispatched to Qwen2.5-VL-3B on the GPU. Slower; requires the GPU path to actually be hit (deterministic repair handles markdown tables before this fires). |
**2. Upload one or more documents.** Accepts `.pdf`, `.md`, `.txt`, `.html`,
or a `.zip` of any of those. Multi-file selection works. Per-file cap:
{MAX_UPLOAD_BYTES // (1024 * 1024)} MB / {MAX_PAGE_COUNT} pages. Batch cap:
{MAX_BATCH_DOCS} docs per request.
**3. Click Parse.** Watch the progress bar; first call may take longer if a
model has to download.
## What each tab shows
- **Markdown** β€” canonical reconstruction of the parsed document. For batch
uploads, this shows the first document; the full set is in the artifacts zip.
- **Run** β€” summary, quality report, parser metrics, and artifact manifest
validation. For batch uploads, `Summary.batch` lists every document parsed
in the request with its headline metrics + an aggregate block.
- **Chunks** β€” per-strategy chunk breakdown: total / parent / child / table-linked
/ figure-linked / visual-context counts, plus per-strategy blocks with token
count distribution (min/median/max) and 3 sample chunks per strategy with
240-char previews.
- **Artifacts** β€” each top-level artifact (`parsed_document.json`, `chunks.jsonl`,
`quality_report.json`, etc.) downloadable individually. Nested asset crops
(page renders, table images) stay bundled in the zip above.
- **Runtime** β€” detected GPU runtime, planned GPU tasks, preflight report.
- **Smokes** β€” runs the project's smoke validation suite in-Space; reports
per-smoke pass/fail/skip + detail. API: `/gradio_api/call/run_smokes_in_space`.
- **Benchmark** β€” two modes: against committed regression fixtures, OR against
an uploaded corpus you supply. Returns headline metrics (quality score,
retrieval recall, repair resolution rate, etc.) plus a per-doc breakdown.
API: `/gradio_api/call/run_benchmark_in_space` and `/gradio_api/call/run_benchmark_on_upload`.
## API surface
Every button is also a Gradio API endpoint, so AI agents and downstream tooling
can invoke them programmatically. Discovery: `agents.md` at the Space root
returns the calling instructions; `/gradio_api/info` returns the full schema.
```bash
# Parse a doc:
curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/parse_uploaded_document \\
-H "Content-Type: application/json" \\
-d '{{"data": [{{file_data}}, "Default lightweight"]}}'
# Run smokes:
curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_smokes_in_space \\
-H "Content-Type: application/json" -d '{{"data": []}}'
# Benchmark:
curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_benchmark_in_space \\
-H "Content-Type: application/json" -d '{{"data": []}}'
```
## Configuration
Defaults work out of the box. To change behavior, set Space variables:
- `ZSGDP_CONFIG_PATH` β€” point at one of `configs/default.yaml`, `configs/docling.yaml`, `configs/live_gpu_repair.yaml`, or your own committed YAML.
- `ZSGDP_LOG_LEVEL` β€” `INFO` (default on Spaces), `DEBUG`, `WARNING`, etc.
- `ZSGDP_LOG_JSON` β€” `1` (default on Spaces) for one-line JSON log records.
- `ZSGDP_MAX_UPLOAD_BYTES` / `ZSGDP_MAX_PAGE_COUNT` / `ZSGDP_MAX_BATCH_DOCS` β€” abuse guards.
- `HF_TOKEN` β€” required for gated models (jina-embeddings-v3 may need it).
## Known limits
- **ZeroGPU duration cap.** Each `@spaces.GPU`-decorated call runs in a 60s
GPU slot. First-call cold-start for big models (Qwen2.5-VL-3B is ~6 GB)
exceeds this on a clean cache. Subsequent calls reuse the cached weights
and fit comfortably.
- **Live GPU repair** only fires when the deterministic repair path can't
resolve an issue. For markdown tables, the deterministic normalizer
handles most malformations before GPU dispatch is needed.
- **GT-comparison metrics** (layout F1, table structure score, formula CER)
require labelled datasets (`omnidocbench`, `doclaynet`). Uploaded
custom corpora produce all the GT-free metrics but those three.
## Source
[![View source on Hugging Face](https://img.shields.io/badge/HF%20Space-arjun10g%2FzeroshotGPU-blue)](https://huggingface.co/spaces/arjun10g/zeroshotGPU)
The full project source β€” including the multi-step spec, contributor docs,
and 250+ unit tests β€” is at the link above. The `Files` tab on the Space
page shows the live deploy.
"""
with gr.Blocks(title="zeroshotGPU") as demo:
gr.Markdown(
"# zeroshotGPU\n\n"
"Self-hosted agentic document parser. Upload a single document, multiple "
"documents, or a `.zip` of documents (PDF / Markdown / plaintext / HTML). "
"Each parse emits canonical markdown, structured JSON, retrieval-ready "
"chunks (multi-strategy), a quality report with GT-comparison metrics "
"where applicable, and a SHA-256-checksummed artifact manifest. "
f"Per-file caps: {MAX_UPLOAD_BYTES // (1024 * 1024)} MB / "
f"{MAX_PAGE_COUNT} pages. Batch cap: {MAX_BATCH_DOCS} docs per request. "
"**See the [Help] tab for full instructions.**\n\n"
"[Source on Hugging Face](https://huggingface.co/spaces/arjun10g/zeroshotGPU)"
)
with gr.Row():
upload = gr.File(
label="Document(s) β€” single file, multi-select, or .zip",
file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"],
file_count="multiple",
)
with gr.Column():
pipeline = gr.Dropdown(
choices=["Docling + PyMuPDF", "Default lightweight", "Live GPU repair"],
value="Docling + PyMuPDF",
label="Pipeline",
info="`Docling + PyMuPDF` runs both for the disagreement signal. `Default lightweight` is text + PyMuPDF only. `Live GPU repair` enables repair.execute_gpu_escalations=true and dispatches malformed-table / OCR / figure / reading-order issues to Qwen2.5-VL.",
)
parse_button = gr.Button("Parse", variant="primary")
archive = gr.File(label="Artifacts (zip)")
with gr.Tabs():
with gr.Tab("Help"):
gr.Markdown(_HELP_MARKDOWN)
with gr.Tab("Markdown"):
gr.Markdown(
"_Canonical markdown reconstruction of the parsed document. "
"For batch uploads, this shows the first document; the full "
"set is in the artifacts zip._"
)
markdown = gr.Markdown(label="Canonical Markdown")
with gr.Tab("Run"):
gr.Markdown(
"_Summary, quality report, parser metrics, and artifact "
"validation. For batch uploads, `Summary.batch` lists every "
"document parsed in the request._"
)
summary = gr.JSON(label="Summary")
quality = gr.JSON(label="Quality Report")
parser_metrics = gr.JSON(label="Parser Metrics")
artifact_validation = gr.JSON(label="Artifact Manifest Validation")
with gr.Tab("Chunks"):
gr.Markdown(
"_Per-strategy chunk breakdown: counts, token-count "
"distribution (min / median / max), and three sample chunks "
"with previews per strategy. The full chunks.jsonl is in the "
"Artifacts tab and inside the zip._\n\n"
"Strategies emitted by default: `fixed_token_baseline`, "
"`recursive_structure`, `parent_child` (with linked parent / "
"child IDs), `page_level`, plus `table` / `figure` chunks "
"with provenance. `semantic`, `late`, `vision_guided`, and "
"`agentic_proposition` are config-gated stubs that emit "
"deterministic candidates marked for backend replacement."
)
chunking = gr.JSON(label="Chunking plan + per-strategy detail")
with gr.Tab("Artifacts"):
gr.Markdown(
"Each top-level artifact is downloadable individually. "
"Nested assets (page renders, table/figure crops) stay bundled "
"in the zip above."
)
individual_artifacts = gr.Files(label="Individual artifacts")
with gr.Tab("Runtime"):
runtime = gr.JSON(label="GPU Runtime", value=runtime_status_for_mode("Docling + PyMuPDF"))
gpu_tasks = gr.JSON(label="Planned GPU Tasks")
gpu_task_report = gr.JSON(label="GPU Task Preflight")
with gr.Tab("Smokes"):
gr.Markdown(
"Runs the same smokes as `python -m scripts.run_space_smoke`, "
"in-process. Each call is also exposed via the Gradio API at "
"`/gradio_api/call/run_smokes_in_space` for remote validation."
)
smoke_button = gr.Button("Run all smokes", variant="primary")
smoke_output = gr.JSON(label="Smoke report")
with gr.Tab("Benchmark"):
gr.Markdown(
"**Two benchmark modes:**\n"
"- **Run on regression fixtures** β€” uses the committed seed "
"documents (`tests/regression/fixtures/`); reproducible without "
"any upload. API: `/gradio_api/call/run_benchmark_in_space`.\n"
"- **Run on uploaded corpus** β€” accepts a `.zip` of documents "
"(or a list of files). Returns headline metrics plus a per-doc "
"breakdown. GT-comparison metrics (layout F1, table structure, "
"formula CER) are NOT computed β€” those require labelled "
"datasets (`omnidocbench` / `doclaynet`) which can be loaded "
"via the CLI from a Pro-tier Dev Mode terminal. API: "
"`/gradio_api/call/run_benchmark_on_upload`."
)
with gr.Row():
benchmark_button = gr.Button("Run on regression fixtures", variant="primary")
benchmark_upload_button = gr.Button("Run on uploaded corpus")
benchmark_corpus = gr.File(
label="Optional upload β€” used only when 'Run on uploaded corpus' is clicked",
file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"],
file_count="multiple",
)
benchmark_output = gr.JSON(label="Benchmark headline metrics")
parse_button.click(
parse_uploaded_document,
inputs=[upload, pipeline],
outputs=[
markdown,
summary,
quality,
parser_metrics,
chunking,
runtime,
gpu_tasks,
gpu_task_report,
artifact_validation,
archive,
individual_artifacts,
],
)
smoke_button.click(run_smokes_in_space, inputs=[], outputs=smoke_output, api_name="run_smokes_in_space")
benchmark_button.click(run_benchmark_in_space, inputs=[], outputs=benchmark_output, api_name="run_benchmark_in_space")
benchmark_upload_button.click(
run_benchmark_on_upload,
inputs=[benchmark_corpus],
outputs=benchmark_output,
api_name="run_benchmark_on_upload",
)
# Hidden diagnostic endpoint β€” reachable via /gradio_api/call/diagnose_runtime
# but no UI button. Reports env-var presence (not values) for debugging
# secrets / token / spaces SDK plumbing on the Space.
diag_dummy = gr.Button("diag", visible=False)
diag_output = gr.JSON(visible=False)
diag_dummy.click(diagnose_runtime, inputs=[], outputs=diag_output, api_name="diagnose_runtime")
if __name__ == "__main__":
demo.launch()