Spaces:
Running on Zero
Running on Zero
| """Hugging Face Spaces entrypoint for zeroshotGPU.""" | |
| from __future__ import annotations | |
| import os | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Any, Iterable | |
| try: | |
| import gradio as gr | |
| except ImportError as exc: # pragma: no cover - only used when launching the Space UI. | |
| raise RuntimeError("Gradio is required for the Spaces UI. Install with `python -m pip install -r requirements.txt`.") from exc | |
| from zsgdp.artifacts import validate_artifact_manifest | |
| from zsgdp.config import load_config, load_env_file | |
| from zsgdp.gpu import collect_gpu_runtime_status | |
| from zsgdp.logging_config import configure_logging, get_logger | |
| from zsgdp.pipeline import parse_document | |
| from zsgdp.profiling import profile_document | |
| # Load .env first so any keys it sets (HF_TOKEN, ZSGDP_LOG_LEVEL, etc.) are | |
| # visible before we read environment defaults below. Pre-set Space variables | |
| # always win β load_env_file does not override existing env entries. | |
| load_env_file() | |
| # On a ZeroGPU Space, explicitly seed huggingface_hub's auth context so | |
| # subsequent @spaces.GPU calls see Pro-tier quota. Setting HF_TOKEN as an | |
| # env var alone isn't always enough β the spaces SDK in some versions | |
| # reads the auth from huggingface_hub's cached login state, which | |
| # huggingface_hub.login() establishes. | |
| def _seed_hf_login() -> None: | |
| token = ( | |
| os.environ.get("HF_TOKEN") | |
| or os.environ.get("HUGGING_FACE_HUB_TOKEN") | |
| or os.environ.get("HUGGINGFACE_TOKEN") | |
| or os.environ.get("HF_ACCESS_TOKEN") | |
| ) | |
| if not token: | |
| return | |
| try: | |
| from huggingface_hub import login # type: ignore | |
| login(token=token, add_to_git_credential=False) | |
| except Exception: | |
| # Auth seeding is best-effort. If huggingface_hub isn't importable | |
| # or login fails, the Space still functions β just on whatever | |
| # quota the bare HF_TOKEN env var unlocks. | |
| pass | |
| _seed_hf_login() | |
| # Default to JSON logs on the Space so the HF Spaces logs page is greppable. | |
| # Override locally with `ZSGDP_LOG_JSON=0` for human-readable text output. | |
| os.environ.setdefault("ZSGDP_LOG_LEVEL", "INFO") | |
| os.environ.setdefault("ZSGDP_LOG_JSON", "1" if os.environ.get("SPACE_ID") else "0") | |
| # Use a transformers-compat-friendly default for the embedding smoke. Jina-v3 | |
| # has known issues with newer transformers' remote-modules loader; the | |
| # all-MiniLM-L6-v2 default has no custom modeling code and works everywhere. | |
| # Override via Space settings β Variables and secrets if you want jina-v3. | |
| os.environ.setdefault("ZSGDP_SMOKE_EMBEDDING_MODEL_ID", "sentence-transformers/all-MiniLM-L6-v2") | |
| configure_logging() | |
| _logger = get_logger(__name__) | |
| ROOT = Path(__file__).resolve().parent | |
| DOCLING_CONFIG = ROOT / "configs" / "docling.yaml" | |
| LIVE_GPU_CONFIG = ROOT / "configs" / "live_gpu_repair.yaml" | |
| # Abuse guards. Override at deployment time via env vars to relax for trusted | |
| # Spaces or tighten further for public ones. | |
| MAX_UPLOAD_BYTES = int(os.environ.get("ZSGDP_MAX_UPLOAD_BYTES", str(50 * 1024 * 1024))) # 50 MB | |
| MAX_PAGE_COUNT = int(os.environ.get("ZSGDP_MAX_PAGE_COUNT", "200")) | |
| # Cap on docs extracted from a single zip so a malicious archive can't | |
| # fan out into thousands of parses. Each doc still goes through the | |
| # per-file MAX_UPLOAD_BYTES / MAX_PAGE_COUNT guards. | |
| MAX_BATCH_DOCS = int(os.environ.get("ZSGDP_MAX_BATCH_DOCS", "20")) | |
| SUPPORTED_PARSE_EXTS = (".pdf", ".md", ".txt", ".html", ".htm") | |
| class UploadRejected(Exception): | |
| """Raised when an upload exceeds an abuse-guard limit.""" | |
| def _validate_upload(path: Path) -> None: | |
| """Reject oversized uploads or PDFs with too many pages before parsing. | |
| Cheap to compute (file stat + profiler page count) and avoids spending | |
| GPU/CPU minutes on inputs the Space wasn't sized for. | |
| """ | |
| if not path.exists(): | |
| raise UploadRejected("Uploaded file is missing on disk.") | |
| size = path.stat().st_size | |
| if size > MAX_UPLOAD_BYTES: | |
| raise UploadRejected( | |
| f"Upload is {size / 1024 / 1024:.1f} MB; the Space limit is " | |
| f"{MAX_UPLOAD_BYTES / 1024 / 1024:.0f} MB. Set ZSGDP_MAX_UPLOAD_BYTES to override." | |
| ) | |
| try: | |
| profile = profile_document(path) | |
| except Exception: # pragma: no cover - profiler is robust; this is belt-and-braces. | |
| return | |
| if profile.page_count > MAX_PAGE_COUNT: | |
| raise UploadRejected( | |
| f"Document has {profile.page_count} pages; the Space limit is " | |
| f"{MAX_PAGE_COUNT}. Set ZSGDP_MAX_PAGE_COUNT to override." | |
| ) | |
| # Top-level artifact files surfaced as individual downloads. Nested | |
| # directories like assets/ stay bundled in the zip only β they can be | |
| # large for multi-page PDFs and would clutter the per-artifact list. | |
| _INDIVIDUAL_ARTIFACT_NAMES = ( | |
| "parsed_document.json", | |
| "document.md", | |
| "elements.jsonl", | |
| "tables.jsonl", | |
| "figures.jsonl", | |
| "chunks.jsonl", | |
| "chunking_plan.json", | |
| "parser_metrics.json", | |
| "quality_report.json", | |
| "routing_report.json", | |
| "profile.json", | |
| "gpu_runtime.json", | |
| "gpu_tasks.jsonl", | |
| "gpu_task_report.json", | |
| "artifact_manifest.json", | |
| "conflict_report.json", | |
| ) | |
| def _collect_artifact_files(output_dir: Path) -> list[str]: | |
| """Return absolute paths for the top-level artifacts the Space surfaces. | |
| Order matches _INDIVIDUAL_ARTIFACT_NAMES so the UI listing is stable. | |
| Missing files are silently skipped (different parse runs emit different | |
| subsets β e.g. conflict_report.json only when multiple parsers ran). | |
| """ | |
| paths: list[str] = [] | |
| for name in _INDIVIDUAL_ARTIFACT_NAMES: | |
| candidate = output_dir / name | |
| if candidate.exists(): | |
| paths.append(str(candidate)) | |
| return paths | |
| def _empty_outputs(reason: str, source: Path | None, *, rejected: bool, runtime: dict) -> tuple: | |
| """Return-shape used for every error path. Centralised so the tuple width | |
| can't drift between the success path and the four error paths.""" | |
| summary: dict[str, Any] = {"error": reason} | |
| if source is not None: | |
| summary["source"] = str(source) | |
| if rejected: | |
| summary["rejected"] = True | |
| return ("", summary, {}, {}, {}, runtime, [], {}, {}, None, []) | |
| def _build_chunk_detail(parsed) -> dict[str, Any]: | |
| """Produce a richer chunking summary than the bare chunking_plan. | |
| Surfaces strategy counts, token-count distribution, sample chunks per | |
| strategy (truncated to keep the payload UI-friendly), and counts of | |
| tables / figures / parent / child chunks. Companion to the | |
| `chunking_plan` JSON which only describes the strategy ladder. | |
| """ | |
| chunks = parsed.chunks | |
| by_strategy: dict[str, list] = {} | |
| for chunk in chunks: | |
| by_strategy.setdefault(chunk.strategy, []).append(chunk) | |
| strategy_breakdown: dict[str, dict[str, Any]] = {} | |
| for strategy, items in sorted(by_strategy.items()): | |
| token_counts = sorted(item.token_count for item in items) | |
| sample_chunks = [] | |
| for item in items[:3]: | |
| preview = item.text.strip() | |
| if len(preview) > 240: | |
| preview = preview[:237] + "..." | |
| sample_chunks.append( | |
| { | |
| "chunk_id": item.chunk_id, | |
| "page_start": item.page_start, | |
| "page_end": item.page_end, | |
| "section_path": item.section_path, | |
| "boundary_reason": item.boundary_reason, | |
| "token_count": item.token_count, | |
| "source_parser": item.source_parser, | |
| "preview": preview, | |
| } | |
| ) | |
| strategy_breakdown[strategy] = { | |
| "count": len(items), | |
| "token_count_min": token_counts[0] if token_counts else 0, | |
| "token_count_median": token_counts[len(token_counts) // 2] if token_counts else 0, | |
| "token_count_max": token_counts[-1] if token_counts else 0, | |
| "samples": sample_chunks, | |
| } | |
| parent_count = sum(1 for c in chunks if c.content_type == "parent") | |
| child_count = sum(1 for c in chunks if c.parent_chunk_id) | |
| table_chunks = sum(1 for c in chunks if c.table_ids) | |
| figure_chunks = sum(1 for c in chunks if c.figure_ids) | |
| visual_context = sum(1 for c in chunks if c.requires_visual_context) | |
| return { | |
| "total_chunks": len(chunks), | |
| "parent_chunks": parent_count, | |
| "child_chunks": child_count, | |
| "table_linked_chunks": table_chunks, | |
| "figure_linked_chunks": figure_chunks, | |
| "visual_context_required": visual_context, | |
| "strategies": strategy_breakdown, | |
| "plan": parsed.provenance.get("chunking", {}), | |
| } | |
| def _extract_uploads_to_parse(uploads: Iterable[Path], work_dir: Path) -> list[Path]: | |
| """Resolve a set of uploaded files (possibly zips) into individual docs. | |
| Each input is either: | |
| - A supported document file (.pdf, .md, .txt, .html) β kept as-is. | |
| - A .zip archive β extracted; supported files inside are added to the | |
| list. Nested zips are skipped (no recursive extraction; one level only). | |
| Other extensions are silently dropped. | |
| The total number of resolved docs is capped at MAX_BATCH_DOCS to bound | |
| the worst-case parse time per request. | |
| """ | |
| resolved: list[Path] = [] | |
| for upload in uploads: | |
| ext = upload.suffix.lower() | |
| if ext == ".zip": | |
| extract_dir = Path(tempfile.mkdtemp(prefix="zsgdp_zip_", dir=work_dir)) | |
| try: | |
| with zipfile.ZipFile(upload) as zf: | |
| # Skip directories and nested zips. | |
| for member in zf.namelist(): | |
| if member.endswith("/"): | |
| continue | |
| member_lower = member.lower() | |
| if not member_lower.endswith(SUPPORTED_PARSE_EXTS): | |
| continue | |
| if "__MACOSX" in member or member_lower.startswith("."): | |
| continue | |
| # Path traversal guard. | |
| target = (extract_dir / member).resolve() | |
| if not str(target).startswith(str(extract_dir.resolve())): | |
| continue | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| with zf.open(member) as source, open(target, "wb") as out: | |
| shutil.copyfileobj(source, out) | |
| resolved.append(target) | |
| except zipfile.BadZipFile: | |
| _logger.warning("space_zip_corrupt", extra={"path": str(upload)}) | |
| continue | |
| elif ext in SUPPORTED_PARSE_EXTS: | |
| resolved.append(upload) | |
| else: | |
| _logger.info("space_upload_skipped", extra={"path": str(upload), "reason": "unsupported_extension"}) | |
| if len(resolved) >= MAX_BATCH_DOCS: | |
| break | |
| return resolved[:MAX_BATCH_DOCS] | |
| def _parse_one_doc( | |
| source: Path, | |
| output_dir: Path, | |
| pipeline_mode: str, | |
| ) -> dict[str, Any]: | |
| """Parse a single doc and return a per-doc result block. | |
| Raises on parse failure so the batch driver can record the error and | |
| continue with remaining docs instead of aborting the whole request. | |
| """ | |
| config_path = _config_path_for_mode(pipeline_mode) | |
| parsed = parse_document(source, output_dir, config_path=config_path) | |
| artifact_validation = validate_artifact_manifest(output_dir) | |
| individual_files = _collect_artifact_files(output_dir) | |
| return { | |
| "source_path": str(source), | |
| "doc_id": parsed.doc_id, | |
| "file_type": parsed.file_type, | |
| "elements": len(parsed.elements), | |
| "tables": len(parsed.tables), | |
| "figures": len(parsed.figures), | |
| "chunks": len(parsed.chunks), | |
| "quality_score": parsed.quality_report.score, | |
| "blocking": parsed.quality_report.has_blocking_failures, | |
| "artifact_manifest_valid": artifact_validation.get("valid"), | |
| "individual_artifact_count": len(individual_files), | |
| "_parsed": parsed, | |
| "_artifact_validation": artifact_validation, | |
| "_individual_files": individual_files, | |
| "_output_dir": str(output_dir), | |
| } | |
| def parse_uploaded_document(file_obj: Any, pipeline_mode: str, progress: Any = None): | |
| """Parse one or more documents into Markdown, structured JSON, and chunks. | |
| Accepts either a single file or a list of files (Gradio's `file_count="multiple"` | |
| semantics). `.zip` uploads are extracted on the server side and each | |
| supported file inside is parsed; total docs are capped at | |
| MAX_BATCH_DOCS (default 20) to bound the worst-case work per request. | |
| For multi-doc inputs the Markdown tab shows the first document's | |
| output; the Summary tab includes a `batch` block listing every doc's | |
| headline metrics; the Artifacts zip contains every per-doc directory. | |
| Use when a user supplies one or many documents and wants either | |
| (a) the text reconstructed cleanly, (b) structured elements + tables | |
| + figures with bounding boxes, (c) chunks for downstream RAG, or | |
| (d) an audit trail showing which parsers ran and how the merger | |
| resolved conflicts. | |
| Args: | |
| file_obj: Uploaded file(s). Single `.pdf` / `.md` / `.txt` / | |
| `.html`, or a `.zip` of those, or a list of any of the above. | |
| Per-file caps of 50 MB and 200 pages apply (configurable via | |
| ZSGDP_MAX_UPLOAD_BYTES / ZSGDP_MAX_PAGE_COUNT). | |
| pipeline_mode: "Docling + PyMuPDF" / "Default lightweight" / | |
| "Live GPU repair". The third dispatches malformed-table, | |
| OCR-coverage, figure, and reading-order issues to the | |
| configured GPU backend (Qwen2.5-VL by default). | |
| progress: optional Gradio Progress object (auto-injected by the | |
| Gradio click handler β leave None for direct API calls). | |
| """ | |
| if progress is None: | |
| # When called via /gradio_api/call, no progress is wired; use a no-op | |
| # so the function signature stays consistent. | |
| def progress(value, *, desc=""): # type: ignore[no-redef] | |
| return None | |
| if file_obj is None: | |
| return _empty_outputs("Upload a document first.", None, rejected=False, runtime={}) | |
| progress(0.0, desc="Validating uploads...") | |
| # Normalise to a list of Path. Gradio passes a single FileData when | |
| # file_count='single' and a list when 'multiple'. | |
| if isinstance(file_obj, list): | |
| upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None] | |
| elif hasattr(file_obj, "name"): | |
| upload_paths = [Path(file_obj.name)] | |
| else: | |
| upload_paths = [Path(str(file_obj))] | |
| if not upload_paths: | |
| return _empty_outputs("Upload a document first.", None, rejected=False, runtime={}) | |
| work_dir = Path(tempfile.mkdtemp(prefix="zeroshotgpu_")) | |
| docs_to_parse = _extract_uploads_to_parse(upload_paths, work_dir) | |
| if not docs_to_parse: | |
| runtime = runtime_status_for_mode(pipeline_mode) | |
| return _empty_outputs( | |
| "No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).", | |
| upload_paths[0], | |
| rejected=True, | |
| runtime=runtime, | |
| ) | |
| # Per-file abuse guard. | |
| for doc in docs_to_parse: | |
| try: | |
| _validate_upload(doc) | |
| except UploadRejected as exc: | |
| _logger.warning( | |
| "space_upload_rejected", | |
| extra={"source_path": str(doc), "reason": str(exc)}, | |
| ) | |
| runtime = runtime_status_for_mode(pipeline_mode) | |
| return _empty_outputs(str(exc), doc, rejected=True, runtime=runtime) | |
| progress(0.05, desc=f"Parsing {len(docs_to_parse)} document(s)...") | |
| output_root = work_dir / "parsed" | |
| output_root.mkdir(parents=True, exist_ok=True) | |
| per_doc_results: list[dict[str, Any]] = [] | |
| used_names: set[str] = set() | |
| for index, doc in enumerate(docs_to_parse, start=1): | |
| # Stable per-doc subdir. | |
| stem = doc.stem or f"doc_{index}" | |
| candidate = stem | |
| suffix = 2 | |
| while candidate in used_names: | |
| candidate = f"{stem}_{suffix}" | |
| suffix += 1 | |
| used_names.add(candidate) | |
| doc_out = output_root / candidate | |
| progress( | |
| 0.05 + 0.85 * (index - 1) / max(1, len(docs_to_parse)), | |
| desc=f"Parsing {index}/{len(docs_to_parse)}: {doc.name}", | |
| ) | |
| try: | |
| result = _parse_one_doc(doc, doc_out, pipeline_mode) | |
| per_doc_results.append(result) | |
| except Exception as exc: # pragma: no cover - surfaced in UI | |
| _logger.warning( | |
| "space_parse_failed", | |
| extra={"source_path": str(doc), "error": str(exc)}, | |
| ) | |
| per_doc_results.append( | |
| { | |
| "source_path": str(doc), | |
| "error": str(exc), | |
| "doc_id": None, | |
| "_parsed": None, | |
| } | |
| ) | |
| progress(0.92, desc="Bundling artifacts...") | |
| # Pick the first successful parse as the primary doc shown in the UI. | |
| successful = [r for r in per_doc_results if r.get("_parsed") is not None] | |
| if not successful: | |
| runtime = runtime_status_for_mode(pipeline_mode) | |
| first_error = next((r.get("error") for r in per_doc_results if r.get("error")), "All parses failed.") | |
| return _empty_outputs(first_error, upload_paths[0], rejected=False, runtime=runtime) | |
| primary = successful[0] | |
| parsed = primary["_parsed"] | |
| artifact_validation = primary["_artifact_validation"] | |
| individual_files = primary["_individual_files"] | |
| # If batch, the archive bundles the whole output_root; otherwise just the | |
| # single doc's dir. Always returns a single zip path. | |
| if len(per_doc_results) > 1: | |
| archive_path = shutil.make_archive(str(output_root), "zip", output_root) | |
| else: | |
| archive_path = shutil.make_archive(str(Path(primary["_output_dir"])), "zip", primary["_output_dir"]) | |
| runtime = parsed.provenance.get("gpu_runtime", {}) | |
| summary = { | |
| "doc_id": parsed.doc_id, | |
| "file_type": parsed.file_type, | |
| "elements": len(parsed.elements), | |
| "tables": len(parsed.tables), | |
| "figures": len(parsed.figures), | |
| "chunks": len(parsed.chunks), | |
| "quality_score": parsed.quality_report.score, | |
| "blocking": parsed.quality_report.has_blocking_failures, | |
| "deployment": parsed.provenance.get("config_deployment", {}), | |
| "runtime_device": runtime.get("device"), | |
| "running_on_huggingface_space": runtime.get("running_on_huggingface_space"), | |
| "artifact_manifest_valid": artifact_validation.get("valid"), | |
| "artifact_count": artifact_validation.get("artifact_count"), | |
| "artifact_checked_count": artifact_validation.get("checked_count"), | |
| "individual_artifact_count": len(individual_files), | |
| } | |
| if len(per_doc_results) > 1: | |
| successful_count = sum(1 for r in per_doc_results if r.get("_parsed") is not None) | |
| summary["batch"] = { | |
| "input_count": len(docs_to_parse), | |
| "successful_count": successful_count, | |
| "failed_count": len(per_doc_results) - successful_count, | |
| "documents": [ | |
| {key: value for key, value in record.items() if not key.startswith("_")} | |
| for record in per_doc_results | |
| ], | |
| "aggregate": { | |
| "total_elements": sum(r.get("elements", 0) for r in per_doc_results if r.get("elements") is not None), | |
| "total_tables": sum(r.get("tables", 0) for r in per_doc_results if r.get("tables") is not None), | |
| "total_figures": sum(r.get("figures", 0) for r in per_doc_results if r.get("figures") is not None), | |
| "total_chunks": sum(r.get("chunks", 0) for r in per_doc_results if r.get("chunks") is not None), | |
| "mean_quality_score": ( | |
| sum(r.get("quality_score", 0.0) for r in per_doc_results if r.get("quality_score") is not None) | |
| / max(1, successful_count) | |
| ), | |
| }, | |
| } | |
| chunking_payload = { | |
| "plan": parsed.provenance.get("chunking", {}), | |
| "detail": _build_chunk_detail(parsed), | |
| } | |
| progress(1.0, desc="Done") | |
| return ( | |
| parsed.to_markdown(), | |
| summary, | |
| parsed.quality_report.to_dict(), | |
| parsed.provenance.get("parser_metrics", {}), | |
| chunking_payload, | |
| runtime, | |
| parsed.provenance.get("gpu_tasks", []), | |
| parsed.provenance.get("gpu_task_report", {}), | |
| artifact_validation, | |
| archive_path, | |
| individual_files, | |
| ) | |
| def _config_path_for_mode(pipeline_mode: str) -> Path | None: | |
| env_config = os.environ.get("ZSGDP_CONFIG_PATH") | |
| if env_config: | |
| return Path(env_config) | |
| if pipeline_mode == "Live GPU repair" and LIVE_GPU_CONFIG.exists(): | |
| return LIVE_GPU_CONFIG | |
| if pipeline_mode == "Docling + PyMuPDF" and DOCLING_CONFIG.exists(): | |
| return DOCLING_CONFIG | |
| return None | |
| def runtime_status_for_mode(pipeline_mode: str) -> dict: | |
| return collect_gpu_runtime_status(load_config(_config_path_for_mode(pipeline_mode))).to_dict() | |
| def diagnose_runtime() -> dict: | |
| """Report env-var presence (not values) so we can confirm HF_TOKEN is loaded. | |
| Returns booleans for which token-related env vars are present, plus their | |
| lengths (to confirm a non-empty value), plus whether the spaces SDK can | |
| detect authentication. NEVER returns actual token values. | |
| """ | |
| import os | |
| token_vars = ("HF_TOKEN", "HUGGING_FACE_HUB_TOKEN", "HUGGINGFACE_TOKEN", "HF_ACCESS_TOKEN") | |
| info: dict[str, Any] = { | |
| "space_id": os.environ.get("SPACE_ID"), | |
| "space_host": os.environ.get("SPACE_HOST"), | |
| } | |
| for var in token_vars: | |
| value = os.environ.get(var) | |
| info[f"{var}_set"] = bool(value) | |
| info[f"{var}_length"] = len(value) if value else 0 | |
| # Try to import spaces SDK and see what it reports. | |
| try: | |
| import spaces # type: ignore | |
| info["spaces_sdk_available"] = True | |
| except ImportError: | |
| info["spaces_sdk_available"] = False | |
| # Authenticate the token against HF Hub to see which user it resolves to | |
| # and whether Pro is recognized. This is the actual auth ZeroGPU does. | |
| token_value = next((os.environ.get(v) for v in token_vars if os.environ.get(v)), None) | |
| if token_value: | |
| import urllib.request, json as _json | |
| try: | |
| req = urllib.request.Request( | |
| "https://huggingface.co/api/whoami-v2", | |
| headers={"Authorization": f"Bearer {token_value}"}, | |
| ) | |
| with urllib.request.urlopen(req, timeout=15) as resp: | |
| whoami = _json.loads(resp.read().decode("utf-8")) | |
| # Cherry-pick non-sensitive fields. | |
| info["whoami_name"] = whoami.get("name") | |
| info["whoami_type"] = whoami.get("type") | |
| info["whoami_isPro"] = whoami.get("isPro") | |
| info["whoami_canPay"] = whoami.get("canPay") | |
| info["whoami_periodEnd"] = whoami.get("periodEnd") | |
| info["whoami_auth_type"] = (whoami.get("auth") or {}).get("type") | |
| info["whoami_auth_role"] = (whoami.get("auth") or {}).get("accessToken", {}).get("role") | |
| except Exception as exc: | |
| info["whoami_error"] = str(exc) | |
| return info | |
| def run_smokes_in_space() -> dict: | |
| """Run scripts/run_space_smoke.py inside the Space and return the JSON report. | |
| Exposes the in-process smoke runner as a Gradio endpoint so it's callable | |
| from the UI tab AND from `/gradio_api/call/run_smokes_in_space` remotely. | |
| Same code path as the terminal `python -m scripts.run_space_smoke` β just | |
| triggered through Gradio instead of an SSH session. | |
| Returns the same dict shape as SmokeReport.to_dict(): per-smoke results | |
| with status / elapsed / detail / skip_reason / install_hint, plus an | |
| aggregate summary count block. | |
| """ | |
| from scripts.run_space_smoke import run_smokes | |
| _logger.info("space_smokes_requested", extra={"trigger": "gradio_endpoint"}) | |
| report = run_smokes() | |
| payload = report.to_dict() | |
| _logger.info( | |
| "space_smokes_complete", | |
| extra={ | |
| "passed": payload["summary"]["passed"], | |
| "failed": payload["summary"]["failed"], | |
| "skipped": payload["summary"]["skipped"], | |
| "errored": payload["summary"]["errored"], | |
| }, | |
| ) | |
| return payload | |
| def run_benchmark_on_upload(file_obj: Any) -> dict: | |
| """Run the parser benchmark against a user-supplied corpus. | |
| Accepts the same upload shapes as `parse_uploaded_document`: a single | |
| document, a list, or a `.zip` of documents. Per-file caps and batch | |
| cap apply identically. Returns the benchmark headline metrics plus a | |
| `documents` list with per-doc records. | |
| For real Β§29 numbers against labelled datasets, use the | |
| `omnidocbench` or `doclaynet` loader from a Pro-tier Dev Mode | |
| terminal β those add layout F1 / table structure / formula CER which | |
| require ground-truth annotations not available from a raw upload. | |
| """ | |
| if file_obj is None: | |
| return {"error": "Upload at least one document to benchmark."} | |
| import tempfile | |
| from zsgdp.benchmarks.parser_quality import run_parser_benchmark | |
| if isinstance(file_obj, list): | |
| upload_paths = [Path(item.name if hasattr(item, "name") else item) for item in file_obj if item is not None] | |
| elif hasattr(file_obj, "name"): | |
| upload_paths = [Path(file_obj.name)] | |
| else: | |
| upload_paths = [Path(str(file_obj))] | |
| if not upload_paths: | |
| return {"error": "Upload at least one document to benchmark."} | |
| work_dir = Path(tempfile.mkdtemp(prefix="zsgdp_bench_upload_")) | |
| docs = _extract_uploads_to_parse(upload_paths, work_dir) | |
| if not docs: | |
| return { | |
| "error": "No supported documents found in the upload (accepted: pdf/md/txt/html, optionally inside a zip).", | |
| "input_count": len(upload_paths), | |
| } | |
| # Per-file abuse guards. | |
| for doc in docs: | |
| try: | |
| _validate_upload(doc) | |
| except UploadRejected as exc: | |
| return {"error": str(exc), "rejected": True, "source_path": str(doc)} | |
| bench_input = work_dir / "input" | |
| bench_input.mkdir() | |
| for doc in docs: | |
| target = bench_input / doc.name | |
| # Avoid name collisions (different paths, same filename inside zips). | |
| suffix = 2 | |
| while target.exists(): | |
| target = bench_input / f"{doc.stem}_{suffix}{doc.suffix}" | |
| suffix += 1 | |
| shutil.copy2(doc, target) | |
| out = work_dir / "out" | |
| _logger.info( | |
| "space_benchmark_upload_requested", | |
| extra={"input_count": len(upload_paths), "docs_found": len(docs)}, | |
| ) | |
| summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder") | |
| headline = { | |
| "dataset_name": summary.get("dataset_name"), | |
| "document_count": summary.get("document_count"), | |
| "mean_quality_score": summary.get("mean_quality_score"), | |
| "mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"), | |
| "mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"), | |
| "mean_retrieval_mrr": summary.get("mean_retrieval_mrr"), | |
| "mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"), | |
| "mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"), | |
| "mean_repair_regression_rate": summary.get("mean_repair_regression_rate"), | |
| "retrieval_evaluated_count": summary.get("retrieval_evaluated_count"), | |
| "documents": [ | |
| { | |
| "doc_id": doc.get("doc_id"), | |
| "file_type": doc.get("file_type"), | |
| "quality_score": doc.get("quality_score"), | |
| "elements": doc.get("element_count"), | |
| "tables": doc.get("table_count"), | |
| "figures": doc.get("figure_count"), | |
| "chunks": doc.get("chunk_count"), | |
| "parser_disagreement_rate": doc.get("parser_disagreement_rate"), | |
| "repair_resolution_rate": doc.get("repair_resolution_rate"), | |
| "elapsed_seconds": doc.get("elapsed_seconds"), | |
| } | |
| for doc in summary.get("documents") or [] | |
| ], | |
| "note": ( | |
| "GT-comparison metrics (layout F1, table structure, formula CER) " | |
| "are unavailable for arbitrary uploads β they need labelled datasets " | |
| "(omnidocbench / doclaynet)." | |
| ), | |
| } | |
| _logger.info( | |
| "space_benchmark_upload_complete", | |
| extra={k: v for k, v in headline.items() if k != "documents" and not isinstance(v, list)}, | |
| ) | |
| return headline | |
| def run_benchmark_in_space() -> dict: | |
| """Run a benchmark against tests/regression/fixtures and return the headline numbers. | |
| Triggered from the UI / API. The fixture corpus is committed to the repo | |
| so the benchmark is reproducible without uploading any data. For real | |
| corpora, drop documents into a Space-side directory and modify the input | |
| path here, or run zsgdp benchmark from a Dev Mode terminal. | |
| Filters fixture input to `*.input.*` files (the seed documents) so the | |
| paired `*.expected.json` snapshot files don't get misparsed as docs. | |
| """ | |
| import tempfile | |
| from zsgdp.benchmarks.parser_quality import run_parser_benchmark | |
| fixtures = ROOT / "tests" / "regression" / "fixtures" | |
| _logger.info("space_benchmark_requested", extra={"input_dir": str(fixtures)}) | |
| with tempfile.TemporaryDirectory(prefix="zsgdp_bench_") as tmp: | |
| # Copy only the actual document inputs (skip the .expected.json snapshots). | |
| bench_input = Path(tmp) / "input" | |
| bench_input.mkdir() | |
| copied = 0 | |
| for source in sorted(fixtures.glob("*.input.*")): | |
| shutil.copy2(source, bench_input / source.name) | |
| copied += 1 | |
| out = Path(tmp) / "out" | |
| summary = run_parser_benchmark(bench_input, out, dataset_name="custom_folder") | |
| headline = { | |
| "dataset_name": summary.get("dataset_name"), | |
| "document_count": summary.get("document_count"), | |
| "mean_quality_score": summary.get("mean_quality_score"), | |
| "mean_layout_f1": summary.get("mean_layout_f1"), | |
| "mean_table_structure_score": summary.get("mean_table_structure_score"), | |
| "mean_formula_cer": summary.get("mean_formula_cer"), | |
| "mean_retrieval_recall_at_1": summary.get("mean_retrieval_recall_at_1"), | |
| "mean_retrieval_recall_at_5": summary.get("mean_retrieval_recall_at_5"), | |
| "mean_retrieval_mrr": summary.get("mean_retrieval_mrr"), | |
| "mean_parser_disagreement_rate": summary.get("mean_parser_disagreement_rate"), | |
| "mean_repair_resolution_rate": summary.get("mean_repair_resolution_rate"), | |
| "mean_repair_regression_rate": summary.get("mean_repair_regression_rate"), | |
| "retrieval_evaluated_count": summary.get("retrieval_evaluated_count"), | |
| "layout_evaluated_count": summary.get("layout_evaluated_count"), | |
| } | |
| _logger.info("space_benchmark_complete", extra=headline) | |
| return headline | |
| _HELP_MARKDOWN = f""" | |
| ## What this is | |
| **zeroshotGPU** is an agentic document-parsing control plane. It does not rely | |
| on a single extraction engine β it profiles each document, routes pages to the | |
| best parser expert (Docling, PyMuPDF, optionally Marker / MinerU / olmOCR / | |
| PaddleOCR / Unstructured), normalizes outputs into a canonical schema, verifies | |
| quality, repairs weak regions through a bounded verify/repair loop (with | |
| optional GPU escalation), and emits retrieval-ready chunks with provenance. | |
| ## How to use this Space | |
| **1. Pick a pipeline mode.** | |
| | Mode | What it does | | |
| |---|---| | |
| | `Docling + PyMuPDF` | Default. Runs both parsers so the parser-disagreement metric has a comparison surface. Good for general-purpose parsing. | | |
| | `Default lightweight` | Text + PyMuPDF only. Fastest. Use when you just need clean text extraction. | | |
| | `Live GPU repair` | Enables `repair.execute_gpu_escalations=true`. Verification failures (invalid tables, OCR coverage gaps, reading-order issues, missing figure captions) are dispatched to Qwen2.5-VL-3B on the GPU. Slower; requires the GPU path to actually be hit (deterministic repair handles markdown tables before this fires). | | |
| **2. Upload one or more documents.** Accepts `.pdf`, `.md`, `.txt`, `.html`, | |
| or a `.zip` of any of those. Multi-file selection works. Per-file cap: | |
| {MAX_UPLOAD_BYTES // (1024 * 1024)} MB / {MAX_PAGE_COUNT} pages. Batch cap: | |
| {MAX_BATCH_DOCS} docs per request. | |
| **3. Click Parse.** Watch the progress bar; first call may take longer if a | |
| model has to download. | |
| ## What each tab shows | |
| - **Markdown** β canonical reconstruction of the parsed document. For batch | |
| uploads, this shows the first document; the full set is in the artifacts zip. | |
| - **Run** β summary, quality report, parser metrics, and artifact manifest | |
| validation. For batch uploads, `Summary.batch` lists every document parsed | |
| in the request with its headline metrics + an aggregate block. | |
| - **Chunks** β per-strategy chunk breakdown: total / parent / child / table-linked | |
| / figure-linked / visual-context counts, plus per-strategy blocks with token | |
| count distribution (min/median/max) and 3 sample chunks per strategy with | |
| 240-char previews. | |
| - **Artifacts** β each top-level artifact (`parsed_document.json`, `chunks.jsonl`, | |
| `quality_report.json`, etc.) downloadable individually. Nested asset crops | |
| (page renders, table images) stay bundled in the zip above. | |
| - **Runtime** β detected GPU runtime, planned GPU tasks, preflight report. | |
| - **Smokes** β runs the project's smoke validation suite in-Space; reports | |
| per-smoke pass/fail/skip + detail. API: `/gradio_api/call/run_smokes_in_space`. | |
| - **Benchmark** β two modes: against committed regression fixtures, OR against | |
| an uploaded corpus you supply. Returns headline metrics (quality score, | |
| retrieval recall, repair resolution rate, etc.) plus a per-doc breakdown. | |
| API: `/gradio_api/call/run_benchmark_in_space` and `/gradio_api/call/run_benchmark_on_upload`. | |
| ## API surface | |
| Every button is also a Gradio API endpoint, so AI agents and downstream tooling | |
| can invoke them programmatically. Discovery: `agents.md` at the Space root | |
| returns the calling instructions; `/gradio_api/info` returns the full schema. | |
| ```bash | |
| # Parse a doc: | |
| curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/parse_uploaded_document \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{{"data": [{{file_data}}, "Default lightweight"]}}' | |
| # Run smokes: | |
| curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_smokes_in_space \\ | |
| -H "Content-Type: application/json" -d '{{"data": []}}' | |
| # Benchmark: | |
| curl -X POST https://arjun10g-zeroshotgpu.hf.space/gradio_api/call/run_benchmark_in_space \\ | |
| -H "Content-Type: application/json" -d '{{"data": []}}' | |
| ``` | |
| ## Configuration | |
| Defaults work out of the box. To change behavior, set Space variables: | |
| - `ZSGDP_CONFIG_PATH` β point at one of `configs/default.yaml`, `configs/docling.yaml`, `configs/live_gpu_repair.yaml`, or your own committed YAML. | |
| - `ZSGDP_LOG_LEVEL` β `INFO` (default on Spaces), `DEBUG`, `WARNING`, etc. | |
| - `ZSGDP_LOG_JSON` β `1` (default on Spaces) for one-line JSON log records. | |
| - `ZSGDP_MAX_UPLOAD_BYTES` / `ZSGDP_MAX_PAGE_COUNT` / `ZSGDP_MAX_BATCH_DOCS` β abuse guards. | |
| - `HF_TOKEN` β required for gated models (jina-embeddings-v3 may need it). | |
| ## Known limits | |
| - **ZeroGPU duration cap.** Each `@spaces.GPU`-decorated call runs in a 60s | |
| GPU slot. First-call cold-start for big models (Qwen2.5-VL-3B is ~6 GB) | |
| exceeds this on a clean cache. Subsequent calls reuse the cached weights | |
| and fit comfortably. | |
| - **Live GPU repair** only fires when the deterministic repair path can't | |
| resolve an issue. For markdown tables, the deterministic normalizer | |
| handles most malformations before GPU dispatch is needed. | |
| - **GT-comparison metrics** (layout F1, table structure score, formula CER) | |
| require labelled datasets (`omnidocbench`, `doclaynet`). Uploaded | |
| custom corpora produce all the GT-free metrics but those three. | |
| ## Source | |
| [](https://huggingface.co/spaces/arjun10g/zeroshotGPU) | |
| The full project source β including the multi-step spec, contributor docs, | |
| and 250+ unit tests β is at the link above. The `Files` tab on the Space | |
| page shows the live deploy. | |
| """ | |
| with gr.Blocks(title="zeroshotGPU") as demo: | |
| gr.Markdown( | |
| "# zeroshotGPU\n\n" | |
| "Self-hosted agentic document parser. Upload a single document, multiple " | |
| "documents, or a `.zip` of documents (PDF / Markdown / plaintext / HTML). " | |
| "Each parse emits canonical markdown, structured JSON, retrieval-ready " | |
| "chunks (multi-strategy), a quality report with GT-comparison metrics " | |
| "where applicable, and a SHA-256-checksummed artifact manifest. " | |
| f"Per-file caps: {MAX_UPLOAD_BYTES // (1024 * 1024)} MB / " | |
| f"{MAX_PAGE_COUNT} pages. Batch cap: {MAX_BATCH_DOCS} docs per request. " | |
| "**See the [Help] tab for full instructions.**\n\n" | |
| "[Source on Hugging Face](https://huggingface.co/spaces/arjun10g/zeroshotGPU)" | |
| ) | |
| with gr.Row(): | |
| upload = gr.File( | |
| label="Document(s) β single file, multi-select, or .zip", | |
| file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"], | |
| file_count="multiple", | |
| ) | |
| with gr.Column(): | |
| pipeline = gr.Dropdown( | |
| choices=["Docling + PyMuPDF", "Default lightweight", "Live GPU repair"], | |
| value="Docling + PyMuPDF", | |
| label="Pipeline", | |
| info="`Docling + PyMuPDF` runs both for the disagreement signal. `Default lightweight` is text + PyMuPDF only. `Live GPU repair` enables repair.execute_gpu_escalations=true and dispatches malformed-table / OCR / figure / reading-order issues to Qwen2.5-VL.", | |
| ) | |
| parse_button = gr.Button("Parse", variant="primary") | |
| archive = gr.File(label="Artifacts (zip)") | |
| with gr.Tabs(): | |
| with gr.Tab("Help"): | |
| gr.Markdown(_HELP_MARKDOWN) | |
| with gr.Tab("Markdown"): | |
| gr.Markdown( | |
| "_Canonical markdown reconstruction of the parsed document. " | |
| "For batch uploads, this shows the first document; the full " | |
| "set is in the artifacts zip._" | |
| ) | |
| markdown = gr.Markdown(label="Canonical Markdown") | |
| with gr.Tab("Run"): | |
| gr.Markdown( | |
| "_Summary, quality report, parser metrics, and artifact " | |
| "validation. For batch uploads, `Summary.batch` lists every " | |
| "document parsed in the request._" | |
| ) | |
| summary = gr.JSON(label="Summary") | |
| quality = gr.JSON(label="Quality Report") | |
| parser_metrics = gr.JSON(label="Parser Metrics") | |
| artifact_validation = gr.JSON(label="Artifact Manifest Validation") | |
| with gr.Tab("Chunks"): | |
| gr.Markdown( | |
| "_Per-strategy chunk breakdown: counts, token-count " | |
| "distribution (min / median / max), and three sample chunks " | |
| "with previews per strategy. The full chunks.jsonl is in the " | |
| "Artifacts tab and inside the zip._\n\n" | |
| "Strategies emitted by default: `fixed_token_baseline`, " | |
| "`recursive_structure`, `parent_child` (with linked parent / " | |
| "child IDs), `page_level`, plus `table` / `figure` chunks " | |
| "with provenance. `semantic`, `late`, `vision_guided`, and " | |
| "`agentic_proposition` are config-gated stubs that emit " | |
| "deterministic candidates marked for backend replacement." | |
| ) | |
| chunking = gr.JSON(label="Chunking plan + per-strategy detail") | |
| with gr.Tab("Artifacts"): | |
| gr.Markdown( | |
| "Each top-level artifact is downloadable individually. " | |
| "Nested assets (page renders, table/figure crops) stay bundled " | |
| "in the zip above." | |
| ) | |
| individual_artifacts = gr.Files(label="Individual artifacts") | |
| with gr.Tab("Runtime"): | |
| runtime = gr.JSON(label="GPU Runtime", value=runtime_status_for_mode("Docling + PyMuPDF")) | |
| gpu_tasks = gr.JSON(label="Planned GPU Tasks") | |
| gpu_task_report = gr.JSON(label="GPU Task Preflight") | |
| with gr.Tab("Smokes"): | |
| gr.Markdown( | |
| "Runs the same smokes as `python -m scripts.run_space_smoke`, " | |
| "in-process. Each call is also exposed via the Gradio API at " | |
| "`/gradio_api/call/run_smokes_in_space` for remote validation." | |
| ) | |
| smoke_button = gr.Button("Run all smokes", variant="primary") | |
| smoke_output = gr.JSON(label="Smoke report") | |
| with gr.Tab("Benchmark"): | |
| gr.Markdown( | |
| "**Two benchmark modes:**\n" | |
| "- **Run on regression fixtures** β uses the committed seed " | |
| "documents (`tests/regression/fixtures/`); reproducible without " | |
| "any upload. API: `/gradio_api/call/run_benchmark_in_space`.\n" | |
| "- **Run on uploaded corpus** β accepts a `.zip` of documents " | |
| "(or a list of files). Returns headline metrics plus a per-doc " | |
| "breakdown. GT-comparison metrics (layout F1, table structure, " | |
| "formula CER) are NOT computed β those require labelled " | |
| "datasets (`omnidocbench` / `doclaynet`) which can be loaded " | |
| "via the CLI from a Pro-tier Dev Mode terminal. API: " | |
| "`/gradio_api/call/run_benchmark_on_upload`." | |
| ) | |
| with gr.Row(): | |
| benchmark_button = gr.Button("Run on regression fixtures", variant="primary") | |
| benchmark_upload_button = gr.Button("Run on uploaded corpus") | |
| benchmark_corpus = gr.File( | |
| label="Optional upload β used only when 'Run on uploaded corpus' is clicked", | |
| file_types=[".pdf", ".md", ".txt", ".html", ".htm", ".zip"], | |
| file_count="multiple", | |
| ) | |
| benchmark_output = gr.JSON(label="Benchmark headline metrics") | |
| parse_button.click( | |
| parse_uploaded_document, | |
| inputs=[upload, pipeline], | |
| outputs=[ | |
| markdown, | |
| summary, | |
| quality, | |
| parser_metrics, | |
| chunking, | |
| runtime, | |
| gpu_tasks, | |
| gpu_task_report, | |
| artifact_validation, | |
| archive, | |
| individual_artifacts, | |
| ], | |
| ) | |
| smoke_button.click(run_smokes_in_space, inputs=[], outputs=smoke_output, api_name="run_smokes_in_space") | |
| benchmark_button.click(run_benchmark_in_space, inputs=[], outputs=benchmark_output, api_name="run_benchmark_in_space") | |
| benchmark_upload_button.click( | |
| run_benchmark_on_upload, | |
| inputs=[benchmark_corpus], | |
| outputs=benchmark_output, | |
| api_name="run_benchmark_on_upload", | |
| ) | |
| # Hidden diagnostic endpoint β reachable via /gradio_api/call/diagnose_runtime | |
| # but no UI button. Reports env-var presence (not values) for debugging | |
| # secrets / token / spaces SDK plumbing on the Space. | |
| diag_dummy = gr.Button("diag", visible=False) | |
| diag_output = gr.JSON(visible=False) | |
| diag_dummy.click(diagnose_runtime, inputs=[], outputs=diag_output, api_name="diagnose_runtime") | |
| if __name__ == "__main__": | |
| demo.launch() | |