"""Batch runner: process a folder of images, write catalogue.csv. One row per document. Sequential processing — the per-doc Claude vision call dominates wall time and multiprocessing risks PyTorch / HuggingFace fork hazards for marginal gain at 30-50 docs. Idempotent: if --output already exists, files already represented in it are skipped. The CSV is written incrementally (header on first row, append after each doc) so an interrupted run leaves a usable partial catalogue. Usage: python -m src.batch data/raw/loc/ # uses ./catalogue.csv python -m src.batch data/raw/loc/ -o /tmp/catalogue.csv python -m src.batch data/raw/loc/abraham-lincoln-papers --no-api """ from __future__ import annotations import argparse import csv import sys import time from pathlib import Path from src.ner import Entity from src.pipeline import DocumentResult, process IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"} CSV_COLUMNS = [ "filename", "doc_type", "doc_type_confidence", "n_lines", "n_review_lines", "mean_prob_wrong", "sender", "recipient", "signed_date", "amount", "persons", "dates", "places", ] def _walk_images(folder: Path) -> list[Path]: """Return image paths sorted, recursive.""" paths: list[Path] = [] for p in folder.rglob("*"): if p.is_file() and p.suffix.lower() in IMAGE_EXTS: paths.append(p) return sorted(paths) def _existing_filenames(csv_path: Path) -> set[str]: """Read filename column from existing CSV (for resume).""" if not csv_path.exists(): return set() with csv_path.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) return {row["filename"] for row in reader if row.get("filename")} def _entities_by_label(entities: list[Entity], labels: set[str]) -> list[str]: """Unique entity texts where label matches one of `labels`, in first-seen order.""" seen: set[str] = set() out: list[str] = [] for e in entities: if e.label in labels and e.text not in seen: seen.add(e.text) out.append(e.text) return out def _first_entity(entities: list[Entity], labels: set[str]) -> str: """Text of the first entity matching one of `labels`, or empty string.""" for e in entities: if e.label in labels: return e.text return "" def _result_to_row(result: DocumentResult) -> dict[str, str | int | float]: e = result.entities return { "filename": result.image_path.name, "doc_type": result.classification.doc_type, "doc_type_confidence": round(result.classification.confidence, 3), "n_lines": len(result.lines), "n_review_lines": result.n_review_lines, "mean_prob_wrong": round(result.mean_prob_wrong, 3), "sender": _first_entity(e, {"SENDER"}), "recipient": _first_entity(e, {"RECIPIENT"}), "signed_date": _first_entity(e, {"SIGNED_DATE"}), "amount": _first_entity(e, {"AMOUNT"}), "persons": "; ".join(_entities_by_label(e, {"PERSON", "REFERENCED_PERSON"})), "dates": "; ".join(_entities_by_label(e, {"DATE", "SIGNED_DATE"})), "places": "; ".join(_entities_by_label(e, {"GPE", "LOC", "REFERENCED_PLACE"})), } def _write_row(csv_path: Path, row: dict, *, write_header: bool) -> None: mode = "w" if write_header else "a" with csv_path.open(mode, encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore") if write_header: writer.writeheader() writer.writerow(row) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("folder", type=Path, help="Folder of images to process") parser.add_argument( "-o", "--output", type=Path, default=Path("catalogue.csv"), help="Output CSV path (default: ./catalogue.csv)", ) parser.add_argument( "--no-api", action="store_true", help="Skip Claude calls (post-correction, classify, extract); spaCy still runs", ) parser.add_argument( "--limit", type=int, default=None, help="Process at most this many files", ) args = parser.parse_args() if not args.folder.is_dir(): sys.exit(f"folder not found: {args.folder}") paths = _walk_images(args.folder) if not paths: sys.exit(f"no image files in {args.folder}") print(f"[batch] found {len(paths)} images under {args.folder}", file=sys.stderr) already = _existing_filenames(args.output) if already: print(f"[batch] resuming — {len(already)} files already in {args.output}", file=sys.stderr) pending = [p for p in paths if p.name not in already] if args.limit is not None: pending = pending[: args.limit] if not pending: print("[batch] nothing to do", file=sys.stderr) return 0 write_header = not args.output.exists() args.output.parent.mkdir(parents=True, exist_ok=True) n_done = 0 n_failed = 0 t_start = time.monotonic() for i, path in enumerate(pending, start=1): t0 = time.monotonic() try: result = process(path, no_api=args.no_api) except Exception as exc: n_failed += 1 print(f"[{i}/{len(pending)}] {path.name} FAILED: {exc!r}", file=sys.stderr) continue row = _result_to_row(result) _write_row(args.output, row, write_header=write_header) write_header = False n_done += 1 elapsed = time.monotonic() - t0 print( f"[{i}/{len(pending)}] {path.name} -> " f"{row['doc_type']} ({row['doc_type_confidence']:.2f}), " f"{row['n_review_lines']}/{row['n_lines']} flagged, " f"{elapsed:.1f}s", file=sys.stderr, ) total = time.monotonic() - t_start print( f"\n[done] wrote {args.output} " f"({n_done} processed, {n_failed} failed, {total:.0f}s total)" ) return 0 if n_failed == 0 else 1 if __name__ == "__main__": sys.exit(main())