"""Command-line interface.""" from __future__ import annotations import argparse from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass import json from pathlib import Path import shutil from typing import Sequence from zsgdp.artifacts import validate_artifact_manifest from zsgdp.benchmarks.ablation_runner import run_parser_ablations from zsgdp.benchmarks.cross_dataset import combine_benchmark_summaries, write_cross_dataset_outputs from zsgdp.benchmarks.parser_quality import run_parser_benchmark from zsgdp.config import load_env_file from zsgdp.logging_config import configure_logging from zsgdp.preflight import format_failures, format_summary, run_preflight from zsgdp.config import load_config from zsgdp.deployment import check_huggingface_space from zsgdp.gpu import collect_gpu_runtime_status, run_gpu_task_manifest from zsgdp.parsers.registry import get_parser, parser_names from zsgdp.pipeline import parse_document from zsgdp.profiling import profile_document from zsgdp.utils import dumps_json, write_json def _epilog(text: str) -> str: """Format a multi-line examples block for argparse epilog. Dedents the source-indented triple-quoted string so the rendered help output isn't pushed to the right by however far the call site happens to be nested. """ import textwrap dedented = textwrap.dedent(text).strip("\n") return "Examples:\n" + "\n".join(f" {line}" if line else "" for line in dedented.splitlines()) def main(argv: Sequence[str] | None = None) -> int: load_env_file() configure_logging() parser = argparse.ArgumentParser( prog="zsgdp", description="Zero-shot GPU document parser control plane.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp parse --input ./docs/sample.md --output ./out/sample zsgdp benchmark --input ./docs --output ./bench zsgdp preflight --root . See README.md and docs/space_smoke.md for end-to-end workflows. """ ), ) subparsers = parser.add_subparsers(dest="command", required=True) parse_parser = subparsers.add_parser( "parse", help="Parse one document.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp parse --input ./docs/report.pdf --output ./out/report zsgdp parse --input ./docs/report.pdf --output ./out/report --config configs/docling.yaml zsgdp parse --input ./docs/report.pdf --output ./out/report --parser docling --parser pymupdf """ ), ) parse_parser.add_argument("--input", required=True, help="Input document path.") parse_parser.add_argument("--output", required=True, help="Output directory.") parse_parser.add_argument("--config", help="Optional YAML config path.") parse_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.") parse_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.") folder_parser = subparsers.add_parser( "parse-folder", help="Parse every file in a folder.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp parse-folder --input ./docs --output ./parsed --workers 4 zsgdp parse-folder --input ./docs --output ./parsed --workers 8 --gpu-workers 2 --config configs/docling.yaml """ ), ) folder_parser.add_argument("--input", required=True, help="Input folder.") folder_parser.add_argument("--output", required=True, help="Output folder.") folder_parser.add_argument("--config", help="Optional YAML config path.") folder_parser.add_argument("--workers", type=int, default=1, help="Number of documents to parse concurrently.") folder_parser.add_argument( "--gpu-workers", type=int, default=0, help="Record reserved GPU worker slots for downstream task execution; document parsing uses --workers.", ) folder_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.") folder_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.") profile_parser = subparsers.add_parser("profile", help="Profile a document without parsing.") profile_parser.add_argument("--input", required=True, help="Input document path.") gpu_parser = subparsers.add_parser("gpu-status", help="Print GPU/model runtime status.") gpu_parser.add_argument("--config", help="Optional YAML config path.") space_parser = subparsers.add_parser( "space-check", help="Check Hugging Face Space deployment readiness.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp space-check --root . zsgdp space-check --root . --output ./space_report.json """ ), ) space_parser.add_argument("--root", default=".", help="Repository root to check.") space_parser.add_argument("--config", help="Optional YAML config path.") space_parser.add_argument("--output", help="Optional JSON readiness report path.") task_parser = subparsers.add_parser( "run-gpu-tasks", help="Validate and optionally execute a gpu_tasks.jsonl manifest.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ # Dry-run preflight (default — no model invoked): zsgdp run-gpu-tasks --input ./out/report --output ./out/report/gpu_task_report.json # Live execution against the configured backend: zsgdp run-gpu-tasks --input ./out/report --output ./out/report/gpu_task_report.json --execute """ ), ) task_parser.add_argument("--input", required=True, help="Parsed output directory or gpu_tasks.jsonl path.") task_parser.add_argument("--output", required=True, help="Execution report JSON path.") task_parser.add_argument("--config", help="Optional YAML config path.") task_parser.add_argument("--execute", action="store_true", help="Execute ready tasks with the configured GPU backend.") subparsers.add_parser("parsers", help="List parser adapters and availability.") bench_parser = subparsers.add_parser( "benchmark", help="Run a parser/chunking benchmark over a folder.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ # Custom corpus, no GT (still emits all GT-free metrics): zsgdp benchmark --input ./docs --output ./bench # OmniDocBench checkout (also runs layout F1 / table structure / formula CER): zsgdp benchmark --input ./omnidocbench --dataset omnidocbench --output ./bench/omni # DocLayNet checkout (layout F1 only — DocLayNet has no table/formula GT): zsgdp benchmark --input ./doclaynet --dataset doclaynet --output ./bench/doclay # Force a specific parser combo: zsgdp benchmark --input ./docs --output ./bench --parser docling --parser pymupdf """ ), ) bench_parser.add_argument("--input", required=False, help="Input folder of documents.") bench_parser.add_argument( "--dataset", required=False, default="custom_folder", help="Dataset loader name (custom_folder, omnidocbench, doclaynet). 'custom' is accepted as an alias.", ) bench_parser.add_argument("--output", required=False, default="./benchmarks/results") bench_parser.add_argument("--config", help="Optional YAML config path.") bench_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.") bench_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.") ablate_parser = subparsers.add_parser( "benchmark-ablate", help="Run the benchmark once per parser in isolation plus a merged arm, and emit a comparison.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ # Two-parser ablation with the merged arm: zsgdp benchmark-ablate --input ./docs --output ./bench/ablation \\ --parser docling --parser pymupdf # Three parsers, no merged arm: zsgdp benchmark-ablate --input ./docs --output ./bench/ablation \\ --parser docling --parser pymupdf --parser text --no-merged """ ), ) ablate_parser.add_argument("--input", required=True, help="Input folder of documents.") ablate_parser.add_argument( "--dataset", required=False, default="custom_folder", help="Dataset loader name. 'custom' aliases to custom_folder.", ) ablate_parser.add_argument("--output", required=False, default="./benchmarks/ablations") ablate_parser.add_argument("--config", help="Optional YAML config path.") ablate_parser.add_argument( "--parser", action="append", dest="ablate_parsers", required=True, help="Parser to include as an ablation arm. Repeat to add more.", ) ablate_parser.add_argument( "--no-merged", dest="include_merged", action="store_false", default=True, help="Skip the all-parsers-together merged arm.", ) preflight_parser = subparsers.add_parser( "preflight", help="Run all local guards (unit tests, regression fixtures, space-check, parser registry) before pushing to a Space.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ # Standard preflight (~10s): zsgdp preflight --root . # Add an end-to-end benchmark smoke (adds ~1-3s): zsgdp preflight --root . --benchmark # Skip slow steps when iterating locally: zsgdp preflight --root . --skip-unit """ ), ) preflight_parser.add_argument("--root", default=".", help="Repository root to check.") preflight_parser.add_argument("--skip-unit", action="store_true", help="Skip the unittest discovery step.") preflight_parser.add_argument("--skip-regression", action="store_true", help="Skip the regression fixture step.") preflight_parser.add_argument("--skip-space-check", action="store_true", help="Skip the Space readiness check.") preflight_parser.add_argument("--skip-parsers", action="store_true", help="Skip the parser registry sanity step.") preflight_parser.add_argument( "--benchmark", action="store_true", help="Also run an end-to-end benchmark against tests/regression/fixtures (off by default).", ) combine_parser = subparsers.add_parser( "combine-benchmarks", help="Combine multiple benchmark summaries into a cross-dataset comparison.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ # Compare OmniDocBench vs DocLayNet runs: zsgdp combine-benchmarks \\ --input ./bench/omni --label omnidocbench \\ --input ./bench/doclay --label doclaynet \\ --output ./bench/cross # Without explicit labels (uses dataset_name from each summary): zsgdp combine-benchmarks \\ --input ./bench/omni \\ --input ./bench/doclay \\ --output ./bench/cross """ ), ) combine_parser.add_argument( "--input", action="append", dest="combine_inputs", required=True, help="Benchmark output directory or results.json path. Repeat once per dataset.", ) combine_parser.add_argument( "--label", action="append", dest="combine_labels", help="Optional label per --input (defaults to dataset_name from each summary).", ) combine_parser.add_argument("--output", required=True, help="Output directory for the comparison artifacts.") export_parser = subparsers.add_parser( "export-chunks", help="Export chunks from a parsed document directory.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp export-chunks --parsed ./out/sample --format jsonl --output ./chunks.jsonl zsgdp export-chunks --parsed ./out/sample --format json --output ./chunks.json """ ), ) export_parser.add_argument("--parsed", required=True, help="Parsed document output directory.") export_parser.add_argument("--format", choices=["jsonl", "json"], default="jsonl", help="Output format.") export_parser.add_argument("--output", required=True, help="Output file path.") validate_parser = subparsers.add_parser( "validate-artifacts", help="Validate artifact_manifest.json checksums.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=_epilog( """ zsgdp validate-artifacts --parsed ./out/sample zsgdp validate-artifacts --parsed ./out/sample --output ./validation.json """ ), ) validate_parser.add_argument("--parsed", required=True, help="Parsed document output directory.") validate_parser.add_argument("--output", help="Optional JSON validation report path.") args = parser.parse_args(argv) if args.command == "parse": parsed = parse_document(args.input, args.output, config_path=args.config, selected_parsers=_selected_parsers(args)) _print_parse_summary(parsed, Path(args.output)) return 0 if args.command == "parse-folder": if args.workers < 1: parser.error("parse-folder --workers must be >= 1") if args.gpu_workers < 0: parser.error("parse-folder --gpu-workers must be >= 0") input_dir = Path(args.input) if not input_dir.is_dir(): parser.error(f"parse-folder input must be a folder: {input_dir}") summary = _parse_folder( input_dir, Path(args.output), config_path=args.config, selected_parsers=_selected_parsers(args), workers=args.workers, gpu_workers=args.gpu_workers, ) for result in summary["results"]: if result["status"] == "parsed": print( f"parsed {result['file']} -> {result['output']} " f"score={result['quality_score']:.2f} chunks={result['chunks']}" ) else: print(f"failed {result['file']} -> {result['output']} error={result['error']}") print( f"parsed {summary['success_count']} file(s), " f"failed {summary['failure_count']} file(s), " f"workers={summary['workers']} gpu_workers={summary['gpu_workers']}" ) return 0 if summary["failure_count"] == 0 else 1 if args.command == "profile": print(dumps_json(profile_document(args.input))) return 0 if args.command == "gpu-status": print(dumps_json(collect_gpu_runtime_status(load_config(args.config)).to_dict())) return 0 if args.command == "space-check": report = check_huggingface_space(args.root, config_path=args.config) if args.output: write_json(args.output, report) print( f"valid={report['valid']} target={report['target']} space={report['space_name']} " f"failures={report['failure_count']} warnings={report['warning_count']}" ) return 0 if report["valid"] else 1 if args.command == "run-gpu-tasks": report = run_gpu_task_manifest( args.input, config=load_config(args.config), output_path=args.output, dry_run=not args.execute, ) print( f"gpu_tasks={report['task_count']} batches={report['batch_count']} " f"ready={report['ready_count']} blocked={report['blocked_count']} " f"executed={report.get('executed_count', 0)} failed={report.get('failed_count', 0)} " f"report={args.output}" ) return 0 if args.command == "parsers": config = load_config() for name in parser_names(): adapter = get_parser(name) enabled = config.get("parsers", {}).get(name, {}).get("enabled", False) print(f"{name}\tenabled={enabled}\tavailable={adapter.available()}") return 0 if args.command == "benchmark": if not args.input: parser.error("benchmark requires --input") summary = run_parser_benchmark( args.input, args.output, config_path=args.config, selected_parsers=_selected_parsers(args), dataset_name=args.dataset, ) print(f"dataset={summary.get('dataset_name', args.dataset)}") print(f"documents={summary['document_count']} mean_quality_score={summary['mean_quality_score']:.2f}") print(f"leaderboard={Path(args.output) / 'leaderboard.csv'}") return 0 if args.command == "benchmark-ablate": comparison = run_parser_ablations( args.input, args.output, parsers=args.ablate_parsers, config_path=args.config, dataset_name=args.dataset, include_merged=args.include_merged, ) print(f"arms={comparison['arm_count']} comparison={Path(args.output) / 'ablation_comparison.csv'}") for row in comparison["rows"]: quality = row.get("mean_quality_score", 0.0) layout = row.get("mean_layout_f1", 0.0) recall = row.get("mean_retrieval_recall_at_1", 0.0) print(f" arm={row['arm']:<14} quality={quality:.2f} layout_f1={layout:.2f} recall@1={recall:.2f}") return 0 if args.command == "preflight": import sys as _sys result = run_preflight( root=args.root, skip_unit=args.skip_unit, skip_regression=args.skip_regression, skip_space_check=args.skip_space_check, skip_parsers=args.skip_parsers, run_benchmark=args.benchmark, ) print(format_summary(result)) if not result.passed: failures = format_failures(result) if failures: print("\n" + failures, file=_sys.stderr) return 1 return 0 if args.command == "combine-benchmarks": labels = list(args.combine_labels or []) if labels and len(labels) != len(args.combine_inputs): parser.error("combine-benchmarks: --label must be passed once per --input or omitted entirely.") pairs = [] for index, source in enumerate(args.combine_inputs): label = labels[index] if labels else None if label is None: from zsgdp.benchmarks.cross_dataset import _load_summary summary_for_default = _load_summary(source) label = str(summary_for_default.get("dataset_name") or f"run_{index + 1}") pairs.append((label, source)) comparison = combine_benchmark_summaries(pairs) write_cross_dataset_outputs(comparison, args.output) print(f"combined {comparison['run_count']} run(s) -> {args.output}") for row in comparison["dataset_summary"]: print( f" {row['label']:<14} docs={row.get('document_count') or 0} " f"layout_f1={row.get('mean_layout_f1') or 0:.2f} " f"recall@5={row.get('mean_retrieval_recall_at_5') or 0:.2f}" ) return 0 if args.command == "export-chunks": exported = _export_chunks(Path(args.parsed), Path(args.output), args.format) print(f"exported {exported} chunk(s) -> {args.output}") return 0 if args.command == "validate-artifacts": report = validate_artifact_manifest(args.parsed) if args.output: write_json(args.output, report) print(f"valid={report['valid']} checked={report['checked_count']} errors={len(report['errors'])}") return 0 if report["valid"] else 1 parser.error(f"Unhandled command: {args.command}") return 2 def _print_parse_summary(parsed, output_dir: Path) -> None: print(f"doc_id={parsed.doc_id}") print(f"file_type={parsed.file_type}") print(f"elements={len(parsed.elements)} tables={len(parsed.tables)} figures={len(parsed.figures)} chunks={len(parsed.chunks)}") print(f"quality_score={parsed.quality_report.score:.2f} blocking={parsed.quality_report.has_blocking_failures}") print(f"output={output_dir}") def _selected_parsers(args) -> list[str] | None: selected = list(getattr(args, "parsers", None) or []) selected.extend(getattr(args, "parser_list", None) or []) return selected or None @dataclass(slots=True) class _FolderParseJob: index: int path: Path output_dir: Path def _parse_folder( input_dir: Path, output_dir: Path, *, config_path: str | Path | None, selected_parsers: Sequence[str] | None, workers: int, gpu_workers: int = 0, ) -> dict: if not input_dir.is_dir(): raise NotADirectoryError(f"Input folder does not exist: {input_dir}") output_dir.mkdir(parents=True, exist_ok=True) jobs = _build_folder_jobs(input_dir, output_dir) if not jobs: return { "workers": workers, "gpu_workers": gpu_workers, "success_count": 0, "failure_count": 0, "results": [], } if workers == 1: results = [ _parse_folder_job(job, config_path=config_path, selected_parsers=selected_parsers) for job in jobs ] else: results_by_index: list[dict | None] = [None] * len(jobs) max_workers = min(workers, len(jobs)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_job = { executor.submit( _parse_folder_job, job, config_path=config_path, selected_parsers=selected_parsers, ): job for job in jobs } for future in as_completed(future_to_job): job = future_to_job[future] results_by_index[job.index] = future.result() results = [result for result in results_by_index if result is not None] failures = [result for result in results if result["status"] != "parsed"] return { "workers": min(workers, len(jobs)), "gpu_workers": max(gpu_workers, 0), "success_count": len(results) - len(failures), "failure_count": len(failures), "results": results, } def _build_folder_jobs(input_dir: Path, output_dir: Path) -> list[_FolderParseJob]: used_names: set[str] = set() jobs: list[_FolderParseJob] = [] for index, path in enumerate(sorted(item for item in input_dir.iterdir() if item.is_file())): jobs.append( _FolderParseJob( index=index, path=path, output_dir=output_dir / _unique_output_name(path, used_names), ) ) return jobs def _unique_output_name(path: Path, used_names: set[str]) -> str: base_name = path.stem or path.name candidates = [base_name] if path.suffix: candidates.append(f"{base_name}-{path.suffix.lstrip('.')}") suffix = 2 while True: for candidate in candidates: key = candidate.casefold() if key not in used_names: used_names.add(key) return candidate candidates = [f"{base_name}-{suffix}"] suffix += 1 def _parse_folder_job( job: _FolderParseJob, *, config_path: str | Path | None, selected_parsers: Sequence[str] | None, ) -> dict: try: parsed = parse_document( job.path, job.output_dir, config_path=config_path, selected_parsers=selected_parsers, ) except Exception as exc: return { "status": "failed", "file": job.path.name, "output": str(job.output_dir), "error": str(exc), } return { "status": "parsed", "file": job.path.name, "output": str(job.output_dir), "doc_id": parsed.doc_id, "file_type": parsed.file_type, "quality_score": parsed.quality_report.score, "blocking": parsed.quality_report.has_blocking_failures, "elements": len(parsed.elements), "tables": len(parsed.tables), "figures": len(parsed.figures), "chunks": len(parsed.chunks), } def _export_chunks(parsed_dir: Path, output_path: Path, fmt: str) -> int: chunks_path = parsed_dir / "chunks.jsonl" if not chunks_path.exists(): raise FileNotFoundError(f"Missing chunks artifact: {chunks_path}") output_path.parent.mkdir(parents=True, exist_ok=True) if fmt == "jsonl": shutil.copyfile(chunks_path, output_path) return _count_jsonl(chunks_path) records = [ json.loads(line) for line in chunks_path.read_text(encoding="utf-8").splitlines() if line.strip() ] output_path.write_text(json.dumps(records, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") return len(records) def _count_jsonl(path: Path) -> int: return sum(1 for line in path.read_text(encoding="utf-8").splitlines() if line.strip()) if __name__ == "__main__": raise SystemExit(main())