narayananv10
HF Space deploy snapshot
5e4028d
"""Batch runner: process a folder of images, write catalogue.csv.
One row per document. Sequential processing — the per-doc Claude vision
call dominates wall time and multiprocessing risks PyTorch / HuggingFace
fork hazards for marginal gain at 30-50 docs.
Idempotent: if --output already exists, files already represented in it
are skipped. The CSV is written incrementally (header on first row, append
after each doc) so an interrupted run leaves a usable partial catalogue.
Usage:
python -m src.batch data/raw/loc/ # uses ./catalogue.csv
python -m src.batch data/raw/loc/ -o /tmp/catalogue.csv
python -m src.batch data/raw/loc/abraham-lincoln-papers --no-api
"""
from __future__ import annotations
import argparse
import csv
import sys
import time
from pathlib import Path
from src.ner import Entity
from src.pipeline import DocumentResult, process
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"}
CSV_COLUMNS = [
"filename",
"doc_type",
"doc_type_confidence",
"n_lines",
"n_review_lines",
"mean_prob_wrong",
"sender",
"recipient",
"signed_date",
"amount",
"persons",
"dates",
"places",
]
def _walk_images(folder: Path) -> list[Path]:
"""Return image paths sorted, recursive."""
paths: list[Path] = []
for p in folder.rglob("*"):
if p.is_file() and p.suffix.lower() in IMAGE_EXTS:
paths.append(p)
return sorted(paths)
def _existing_filenames(csv_path: Path) -> set[str]:
"""Read filename column from existing CSV (for resume)."""
if not csv_path.exists():
return set()
with csv_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
return {row["filename"] for row in reader if row.get("filename")}
def _entities_by_label(entities: list[Entity], labels: set[str]) -> list[str]:
"""Unique entity texts where label matches one of `labels`, in first-seen order."""
seen: set[str] = set()
out: list[str] = []
for e in entities:
if e.label in labels and e.text not in seen:
seen.add(e.text)
out.append(e.text)
return out
def _first_entity(entities: list[Entity], labels: set[str]) -> str:
"""Text of the first entity matching one of `labels`, or empty string."""
for e in entities:
if e.label in labels:
return e.text
return ""
def _result_to_row(result: DocumentResult) -> dict[str, str | int | float]:
e = result.entities
return {
"filename": result.image_path.name,
"doc_type": result.classification.doc_type,
"doc_type_confidence": round(result.classification.confidence, 3),
"n_lines": len(result.lines),
"n_review_lines": result.n_review_lines,
"mean_prob_wrong": round(result.mean_prob_wrong, 3),
"sender": _first_entity(e, {"SENDER"}),
"recipient": _first_entity(e, {"RECIPIENT"}),
"signed_date": _first_entity(e, {"SIGNED_DATE"}),
"amount": _first_entity(e, {"AMOUNT"}),
"persons": "; ".join(_entities_by_label(e, {"PERSON", "REFERENCED_PERSON"})),
"dates": "; ".join(_entities_by_label(e, {"DATE", "SIGNED_DATE"})),
"places": "; ".join(_entities_by_label(e, {"GPE", "LOC", "REFERENCED_PLACE"})),
}
def _write_row(csv_path: Path, row: dict, *, write_header: bool) -> None:
mode = "w" if write_header else "a"
with csv_path.open(mode, encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
if write_header:
writer.writeheader()
writer.writerow(row)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("folder", type=Path, help="Folder of images to process")
parser.add_argument(
"-o", "--output", type=Path, default=Path("catalogue.csv"),
help="Output CSV path (default: ./catalogue.csv)",
)
parser.add_argument(
"--no-api", action="store_true",
help="Skip Claude calls (post-correction, classify, extract); spaCy still runs",
)
parser.add_argument(
"--limit", type=int, default=None, help="Process at most this many files",
)
args = parser.parse_args()
if not args.folder.is_dir():
sys.exit(f"folder not found: {args.folder}")
paths = _walk_images(args.folder)
if not paths:
sys.exit(f"no image files in {args.folder}")
print(f"[batch] found {len(paths)} images under {args.folder}", file=sys.stderr)
already = _existing_filenames(args.output)
if already:
print(f"[batch] resuming — {len(already)} files already in {args.output}",
file=sys.stderr)
pending = [p for p in paths if p.name not in already]
if args.limit is not None:
pending = pending[: args.limit]
if not pending:
print("[batch] nothing to do", file=sys.stderr)
return 0
write_header = not args.output.exists()
args.output.parent.mkdir(parents=True, exist_ok=True)
n_done = 0
n_failed = 0
t_start = time.monotonic()
for i, path in enumerate(pending, start=1):
t0 = time.monotonic()
try:
result = process(path, no_api=args.no_api)
except Exception as exc:
n_failed += 1
print(f"[{i}/{len(pending)}] {path.name} FAILED: {exc!r}",
file=sys.stderr)
continue
row = _result_to_row(result)
_write_row(args.output, row, write_header=write_header)
write_header = False
n_done += 1
elapsed = time.monotonic() - t0
print(
f"[{i}/{len(pending)}] {path.name} -> "
f"{row['doc_type']} ({row['doc_type_confidence']:.2f}), "
f"{row['n_review_lines']}/{row['n_lines']} flagged, "
f"{elapsed:.1f}s",
file=sys.stderr,
)
total = time.monotonic() - t_start
print(
f"\n[done] wrote {args.output} "
f"({n_done} processed, {n_failed} failed, {total:.0f}s total)"
)
return 0 if n_failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())