zeroshotGPU / zsgdp /cli.py
Arjunvir Singh
Initial commit: zeroshotGPU MVP with full eval surface
db06ffa
"""Command-line interface."""
from __future__ import annotations
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import json
from pathlib import Path
import shutil
from typing import Sequence
from zsgdp.artifacts import validate_artifact_manifest
from zsgdp.benchmarks.ablation_runner import run_parser_ablations
from zsgdp.benchmarks.cross_dataset import combine_benchmark_summaries, write_cross_dataset_outputs
from zsgdp.benchmarks.parser_quality import run_parser_benchmark
from zsgdp.config import load_env_file
from zsgdp.logging_config import configure_logging
from zsgdp.preflight import format_failures, format_summary, run_preflight
from zsgdp.config import load_config
from zsgdp.deployment import check_huggingface_space
from zsgdp.gpu import collect_gpu_runtime_status, run_gpu_task_manifest
from zsgdp.parsers.registry import get_parser, parser_names
from zsgdp.pipeline import parse_document
from zsgdp.profiling import profile_document
from zsgdp.utils import dumps_json, write_json
def _epilog(text: str) -> str:
"""Format a multi-line examples block for argparse epilog.
Dedents the source-indented triple-quoted string so the rendered help
output isn't pushed to the right by however far the call site happens
to be nested.
"""
import textwrap
dedented = textwrap.dedent(text).strip("\n")
return "Examples:\n" + "\n".join(f" {line}" if line else "" for line in dedented.splitlines())
def main(argv: Sequence[str] | None = None) -> int:
load_env_file()
configure_logging()
parser = argparse.ArgumentParser(
prog="zsgdp",
description="Zero-shot GPU document parser control plane.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp parse --input ./docs/sample.md --output ./out/sample
zsgdp benchmark --input ./docs --output ./bench
zsgdp preflight --root .
See README.md and docs/space_smoke.md for end-to-end workflows.
"""
),
)
subparsers = parser.add_subparsers(dest="command", required=True)
parse_parser = subparsers.add_parser(
"parse",
help="Parse one document.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp parse --input ./docs/report.pdf --output ./out/report
zsgdp parse --input ./docs/report.pdf --output ./out/report --config configs/docling.yaml
zsgdp parse --input ./docs/report.pdf --output ./out/report --parser docling --parser pymupdf
"""
),
)
parse_parser.add_argument("--input", required=True, help="Input document path.")
parse_parser.add_argument("--output", required=True, help="Output directory.")
parse_parser.add_argument("--config", help="Optional YAML config path.")
parse_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.")
parse_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.")
folder_parser = subparsers.add_parser(
"parse-folder",
help="Parse every file in a folder.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp parse-folder --input ./docs --output ./parsed --workers 4
zsgdp parse-folder --input ./docs --output ./parsed --workers 8 --gpu-workers 2 --config configs/docling.yaml
"""
),
)
folder_parser.add_argument("--input", required=True, help="Input folder.")
folder_parser.add_argument("--output", required=True, help="Output folder.")
folder_parser.add_argument("--config", help="Optional YAML config path.")
folder_parser.add_argument("--workers", type=int, default=1, help="Number of documents to parse concurrently.")
folder_parser.add_argument(
"--gpu-workers",
type=int,
default=0,
help="Record reserved GPU worker slots for downstream task execution; document parsing uses --workers.",
)
folder_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.")
folder_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.")
profile_parser = subparsers.add_parser("profile", help="Profile a document without parsing.")
profile_parser.add_argument("--input", required=True, help="Input document path.")
gpu_parser = subparsers.add_parser("gpu-status", help="Print GPU/model runtime status.")
gpu_parser.add_argument("--config", help="Optional YAML config path.")
space_parser = subparsers.add_parser(
"space-check",
help="Check Hugging Face Space deployment readiness.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp space-check --root .
zsgdp space-check --root . --output ./space_report.json
"""
),
)
space_parser.add_argument("--root", default=".", help="Repository root to check.")
space_parser.add_argument("--config", help="Optional YAML config path.")
space_parser.add_argument("--output", help="Optional JSON readiness report path.")
task_parser = subparsers.add_parser(
"run-gpu-tasks",
help="Validate and optionally execute a gpu_tasks.jsonl manifest.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
# Dry-run preflight (default — no model invoked):
zsgdp run-gpu-tasks --input ./out/report --output ./out/report/gpu_task_report.json
# Live execution against the configured backend:
zsgdp run-gpu-tasks --input ./out/report --output ./out/report/gpu_task_report.json --execute
"""
),
)
task_parser.add_argument("--input", required=True, help="Parsed output directory or gpu_tasks.jsonl path.")
task_parser.add_argument("--output", required=True, help="Execution report JSON path.")
task_parser.add_argument("--config", help="Optional YAML config path.")
task_parser.add_argument("--execute", action="store_true", help="Execute ready tasks with the configured GPU backend.")
subparsers.add_parser("parsers", help="List parser adapters and availability.")
bench_parser = subparsers.add_parser(
"benchmark",
help="Run a parser/chunking benchmark over a folder.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
# Custom corpus, no GT (still emits all GT-free metrics):
zsgdp benchmark --input ./docs --output ./bench
# OmniDocBench checkout (also runs layout F1 / table structure / formula CER):
zsgdp benchmark --input ./omnidocbench --dataset omnidocbench --output ./bench/omni
# DocLayNet checkout (layout F1 only — DocLayNet has no table/formula GT):
zsgdp benchmark --input ./doclaynet --dataset doclaynet --output ./bench/doclay
# Force a specific parser combo:
zsgdp benchmark --input ./docs --output ./bench --parser docling --parser pymupdf
"""
),
)
bench_parser.add_argument("--input", required=False, help="Input folder of documents.")
bench_parser.add_argument(
"--dataset",
required=False,
default="custom_folder",
help="Dataset loader name (custom_folder, omnidocbench, doclaynet). 'custom' is accepted as an alias.",
)
bench_parser.add_argument("--output", required=False, default="./benchmarks/results")
bench_parser.add_argument("--config", help="Optional YAML config path.")
bench_parser.add_argument("--parser", action="append", dest="parsers", help="Force a parser. Can be repeated.")
bench_parser.add_argument("--parsers", nargs="+", dest="parser_list", help="Force one or more parsers.")
ablate_parser = subparsers.add_parser(
"benchmark-ablate",
help="Run the benchmark once per parser in isolation plus a merged arm, and emit a comparison.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
# Two-parser ablation with the merged arm:
zsgdp benchmark-ablate --input ./docs --output ./bench/ablation \\
--parser docling --parser pymupdf
# Three parsers, no merged arm:
zsgdp benchmark-ablate --input ./docs --output ./bench/ablation \\
--parser docling --parser pymupdf --parser text --no-merged
"""
),
)
ablate_parser.add_argument("--input", required=True, help="Input folder of documents.")
ablate_parser.add_argument(
"--dataset",
required=False,
default="custom_folder",
help="Dataset loader name. 'custom' aliases to custom_folder.",
)
ablate_parser.add_argument("--output", required=False, default="./benchmarks/ablations")
ablate_parser.add_argument("--config", help="Optional YAML config path.")
ablate_parser.add_argument(
"--parser",
action="append",
dest="ablate_parsers",
required=True,
help="Parser to include as an ablation arm. Repeat to add more.",
)
ablate_parser.add_argument(
"--no-merged",
dest="include_merged",
action="store_false",
default=True,
help="Skip the all-parsers-together merged arm.",
)
preflight_parser = subparsers.add_parser(
"preflight",
help="Run all local guards (unit tests, regression fixtures, space-check, parser registry) before pushing to a Space.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
# Standard preflight (~10s):
zsgdp preflight --root .
# Add an end-to-end benchmark smoke (adds ~1-3s):
zsgdp preflight --root . --benchmark
# Skip slow steps when iterating locally:
zsgdp preflight --root . --skip-unit
"""
),
)
preflight_parser.add_argument("--root", default=".", help="Repository root to check.")
preflight_parser.add_argument("--skip-unit", action="store_true", help="Skip the unittest discovery step.")
preflight_parser.add_argument("--skip-regression", action="store_true", help="Skip the regression fixture step.")
preflight_parser.add_argument("--skip-space-check", action="store_true", help="Skip the Space readiness check.")
preflight_parser.add_argument("--skip-parsers", action="store_true", help="Skip the parser registry sanity step.")
preflight_parser.add_argument(
"--benchmark",
action="store_true",
help="Also run an end-to-end benchmark against tests/regression/fixtures (off by default).",
)
combine_parser = subparsers.add_parser(
"combine-benchmarks",
help="Combine multiple benchmark summaries into a cross-dataset comparison.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
# Compare OmniDocBench vs DocLayNet runs:
zsgdp combine-benchmarks \\
--input ./bench/omni --label omnidocbench \\
--input ./bench/doclay --label doclaynet \\
--output ./bench/cross
# Without explicit labels (uses dataset_name from each summary):
zsgdp combine-benchmarks \\
--input ./bench/omni \\
--input ./bench/doclay \\
--output ./bench/cross
"""
),
)
combine_parser.add_argument(
"--input",
action="append",
dest="combine_inputs",
required=True,
help="Benchmark output directory or results.json path. Repeat once per dataset.",
)
combine_parser.add_argument(
"--label",
action="append",
dest="combine_labels",
help="Optional label per --input (defaults to dataset_name from each summary).",
)
combine_parser.add_argument("--output", required=True, help="Output directory for the comparison artifacts.")
export_parser = subparsers.add_parser(
"export-chunks",
help="Export chunks from a parsed document directory.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp export-chunks --parsed ./out/sample --format jsonl --output ./chunks.jsonl
zsgdp export-chunks --parsed ./out/sample --format json --output ./chunks.json
"""
),
)
export_parser.add_argument("--parsed", required=True, help="Parsed document output directory.")
export_parser.add_argument("--format", choices=["jsonl", "json"], default="jsonl", help="Output format.")
export_parser.add_argument("--output", required=True, help="Output file path.")
validate_parser = subparsers.add_parser(
"validate-artifacts",
help="Validate artifact_manifest.json checksums.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=_epilog(
"""
zsgdp validate-artifacts --parsed ./out/sample
zsgdp validate-artifacts --parsed ./out/sample --output ./validation.json
"""
),
)
validate_parser.add_argument("--parsed", required=True, help="Parsed document output directory.")
validate_parser.add_argument("--output", help="Optional JSON validation report path.")
args = parser.parse_args(argv)
if args.command == "parse":
parsed = parse_document(args.input, args.output, config_path=args.config, selected_parsers=_selected_parsers(args))
_print_parse_summary(parsed, Path(args.output))
return 0
if args.command == "parse-folder":
if args.workers < 1:
parser.error("parse-folder --workers must be >= 1")
if args.gpu_workers < 0:
parser.error("parse-folder --gpu-workers must be >= 0")
input_dir = Path(args.input)
if not input_dir.is_dir():
parser.error(f"parse-folder input must be a folder: {input_dir}")
summary = _parse_folder(
input_dir,
Path(args.output),
config_path=args.config,
selected_parsers=_selected_parsers(args),
workers=args.workers,
gpu_workers=args.gpu_workers,
)
for result in summary["results"]:
if result["status"] == "parsed":
print(
f"parsed {result['file']} -> {result['output']} "
f"score={result['quality_score']:.2f} chunks={result['chunks']}"
)
else:
print(f"failed {result['file']} -> {result['output']} error={result['error']}")
print(
f"parsed {summary['success_count']} file(s), "
f"failed {summary['failure_count']} file(s), "
f"workers={summary['workers']} gpu_workers={summary['gpu_workers']}"
)
return 0 if summary["failure_count"] == 0 else 1
if args.command == "profile":
print(dumps_json(profile_document(args.input)))
return 0
if args.command == "gpu-status":
print(dumps_json(collect_gpu_runtime_status(load_config(args.config)).to_dict()))
return 0
if args.command == "space-check":
report = check_huggingface_space(args.root, config_path=args.config)
if args.output:
write_json(args.output, report)
print(
f"valid={report['valid']} target={report['target']} space={report['space_name']} "
f"failures={report['failure_count']} warnings={report['warning_count']}"
)
return 0 if report["valid"] else 1
if args.command == "run-gpu-tasks":
report = run_gpu_task_manifest(
args.input,
config=load_config(args.config),
output_path=args.output,
dry_run=not args.execute,
)
print(
f"gpu_tasks={report['task_count']} batches={report['batch_count']} "
f"ready={report['ready_count']} blocked={report['blocked_count']} "
f"executed={report.get('executed_count', 0)} failed={report.get('failed_count', 0)} "
f"report={args.output}"
)
return 0
if args.command == "parsers":
config = load_config()
for name in parser_names():
adapter = get_parser(name)
enabled = config.get("parsers", {}).get(name, {}).get("enabled", False)
print(f"{name}\tenabled={enabled}\tavailable={adapter.available()}")
return 0
if args.command == "benchmark":
if not args.input:
parser.error("benchmark requires --input")
summary = run_parser_benchmark(
args.input,
args.output,
config_path=args.config,
selected_parsers=_selected_parsers(args),
dataset_name=args.dataset,
)
print(f"dataset={summary.get('dataset_name', args.dataset)}")
print(f"documents={summary['document_count']} mean_quality_score={summary['mean_quality_score']:.2f}")
print(f"leaderboard={Path(args.output) / 'leaderboard.csv'}")
return 0
if args.command == "benchmark-ablate":
comparison = run_parser_ablations(
args.input,
args.output,
parsers=args.ablate_parsers,
config_path=args.config,
dataset_name=args.dataset,
include_merged=args.include_merged,
)
print(f"arms={comparison['arm_count']} comparison={Path(args.output) / 'ablation_comparison.csv'}")
for row in comparison["rows"]:
quality = row.get("mean_quality_score", 0.0)
layout = row.get("mean_layout_f1", 0.0)
recall = row.get("mean_retrieval_recall_at_1", 0.0)
print(f" arm={row['arm']:<14} quality={quality:.2f} layout_f1={layout:.2f} recall@1={recall:.2f}")
return 0
if args.command == "preflight":
import sys as _sys
result = run_preflight(
root=args.root,
skip_unit=args.skip_unit,
skip_regression=args.skip_regression,
skip_space_check=args.skip_space_check,
skip_parsers=args.skip_parsers,
run_benchmark=args.benchmark,
)
print(format_summary(result))
if not result.passed:
failures = format_failures(result)
if failures:
print("\n" + failures, file=_sys.stderr)
return 1
return 0
if args.command == "combine-benchmarks":
labels = list(args.combine_labels or [])
if labels and len(labels) != len(args.combine_inputs):
parser.error("combine-benchmarks: --label must be passed once per --input or omitted entirely.")
pairs = []
for index, source in enumerate(args.combine_inputs):
label = labels[index] if labels else None
if label is None:
from zsgdp.benchmarks.cross_dataset import _load_summary
summary_for_default = _load_summary(source)
label = str(summary_for_default.get("dataset_name") or f"run_{index + 1}")
pairs.append((label, source))
comparison = combine_benchmark_summaries(pairs)
write_cross_dataset_outputs(comparison, args.output)
print(f"combined {comparison['run_count']} run(s) -> {args.output}")
for row in comparison["dataset_summary"]:
print(
f" {row['label']:<14} docs={row.get('document_count') or 0} "
f"layout_f1={row.get('mean_layout_f1') or 0:.2f} "
f"recall@5={row.get('mean_retrieval_recall_at_5') or 0:.2f}"
)
return 0
if args.command == "export-chunks":
exported = _export_chunks(Path(args.parsed), Path(args.output), args.format)
print(f"exported {exported} chunk(s) -> {args.output}")
return 0
if args.command == "validate-artifacts":
report = validate_artifact_manifest(args.parsed)
if args.output:
write_json(args.output, report)
print(f"valid={report['valid']} checked={report['checked_count']} errors={len(report['errors'])}")
return 0 if report["valid"] else 1
parser.error(f"Unhandled command: {args.command}")
return 2
def _print_parse_summary(parsed, output_dir: Path) -> None:
print(f"doc_id={parsed.doc_id}")
print(f"file_type={parsed.file_type}")
print(f"elements={len(parsed.elements)} tables={len(parsed.tables)} figures={len(parsed.figures)} chunks={len(parsed.chunks)}")
print(f"quality_score={parsed.quality_report.score:.2f} blocking={parsed.quality_report.has_blocking_failures}")
print(f"output={output_dir}")
def _selected_parsers(args) -> list[str] | None:
selected = list(getattr(args, "parsers", None) or [])
selected.extend(getattr(args, "parser_list", None) or [])
return selected or None
@dataclass(slots=True)
class _FolderParseJob:
index: int
path: Path
output_dir: Path
def _parse_folder(
input_dir: Path,
output_dir: Path,
*,
config_path: str | Path | None,
selected_parsers: Sequence[str] | None,
workers: int,
gpu_workers: int = 0,
) -> dict:
if not input_dir.is_dir():
raise NotADirectoryError(f"Input folder does not exist: {input_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
jobs = _build_folder_jobs(input_dir, output_dir)
if not jobs:
return {
"workers": workers,
"gpu_workers": gpu_workers,
"success_count": 0,
"failure_count": 0,
"results": [],
}
if workers == 1:
results = [
_parse_folder_job(job, config_path=config_path, selected_parsers=selected_parsers)
for job in jobs
]
else:
results_by_index: list[dict | None] = [None] * len(jobs)
max_workers = min(workers, len(jobs))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_job = {
executor.submit(
_parse_folder_job,
job,
config_path=config_path,
selected_parsers=selected_parsers,
): job
for job in jobs
}
for future in as_completed(future_to_job):
job = future_to_job[future]
results_by_index[job.index] = future.result()
results = [result for result in results_by_index if result is not None]
failures = [result for result in results if result["status"] != "parsed"]
return {
"workers": min(workers, len(jobs)),
"gpu_workers": max(gpu_workers, 0),
"success_count": len(results) - len(failures),
"failure_count": len(failures),
"results": results,
}
def _build_folder_jobs(input_dir: Path, output_dir: Path) -> list[_FolderParseJob]:
used_names: set[str] = set()
jobs: list[_FolderParseJob] = []
for index, path in enumerate(sorted(item for item in input_dir.iterdir() if item.is_file())):
jobs.append(
_FolderParseJob(
index=index,
path=path,
output_dir=output_dir / _unique_output_name(path, used_names),
)
)
return jobs
def _unique_output_name(path: Path, used_names: set[str]) -> str:
base_name = path.stem or path.name
candidates = [base_name]
if path.suffix:
candidates.append(f"{base_name}-{path.suffix.lstrip('.')}")
suffix = 2
while True:
for candidate in candidates:
key = candidate.casefold()
if key not in used_names:
used_names.add(key)
return candidate
candidates = [f"{base_name}-{suffix}"]
suffix += 1
def _parse_folder_job(
job: _FolderParseJob,
*,
config_path: str | Path | None,
selected_parsers: Sequence[str] | None,
) -> dict:
try:
parsed = parse_document(
job.path,
job.output_dir,
config_path=config_path,
selected_parsers=selected_parsers,
)
except Exception as exc:
return {
"status": "failed",
"file": job.path.name,
"output": str(job.output_dir),
"error": str(exc),
}
return {
"status": "parsed",
"file": job.path.name,
"output": str(job.output_dir),
"doc_id": parsed.doc_id,
"file_type": parsed.file_type,
"quality_score": parsed.quality_report.score,
"blocking": parsed.quality_report.has_blocking_failures,
"elements": len(parsed.elements),
"tables": len(parsed.tables),
"figures": len(parsed.figures),
"chunks": len(parsed.chunks),
}
def _export_chunks(parsed_dir: Path, output_path: Path, fmt: str) -> int:
chunks_path = parsed_dir / "chunks.jsonl"
if not chunks_path.exists():
raise FileNotFoundError(f"Missing chunks artifact: {chunks_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
if fmt == "jsonl":
shutil.copyfile(chunks_path, output_path)
return _count_jsonl(chunks_path)
records = [
json.loads(line)
for line in chunks_path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
output_path.write_text(json.dumps(records, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
return len(records)
def _count_jsonl(path: Path) -> int:
return sum(1 for line in path.read_text(encoding="utf-8").splitlines() if line.strip())
if __name__ == "__main__":
raise SystemExit(main())