File size: 6,206 Bytes
5e4028d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """Batch runner: process a folder of images, write catalogue.csv.
One row per document. Sequential processing — the per-doc Claude vision
call dominates wall time and multiprocessing risks PyTorch / HuggingFace
fork hazards for marginal gain at 30-50 docs.
Idempotent: if --output already exists, files already represented in it
are skipped. The CSV is written incrementally (header on first row, append
after each doc) so an interrupted run leaves a usable partial catalogue.
Usage:
python -m src.batch data/raw/loc/ # uses ./catalogue.csv
python -m src.batch data/raw/loc/ -o /tmp/catalogue.csv
python -m src.batch data/raw/loc/abraham-lincoln-papers --no-api
"""
from __future__ import annotations
import argparse
import csv
import sys
import time
from pathlib import Path
from src.ner import Entity
from src.pipeline import DocumentResult, process
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"}
CSV_COLUMNS = [
"filename",
"doc_type",
"doc_type_confidence",
"n_lines",
"n_review_lines",
"mean_prob_wrong",
"sender",
"recipient",
"signed_date",
"amount",
"persons",
"dates",
"places",
]
def _walk_images(folder: Path) -> list[Path]:
"""Return image paths sorted, recursive."""
paths: list[Path] = []
for p in folder.rglob("*"):
if p.is_file() and p.suffix.lower() in IMAGE_EXTS:
paths.append(p)
return sorted(paths)
def _existing_filenames(csv_path: Path) -> set[str]:
"""Read filename column from existing CSV (for resume)."""
if not csv_path.exists():
return set()
with csv_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
return {row["filename"] for row in reader if row.get("filename")}
def _entities_by_label(entities: list[Entity], labels: set[str]) -> list[str]:
"""Unique entity texts where label matches one of `labels`, in first-seen order."""
seen: set[str] = set()
out: list[str] = []
for e in entities:
if e.label in labels and e.text not in seen:
seen.add(e.text)
out.append(e.text)
return out
def _first_entity(entities: list[Entity], labels: set[str]) -> str:
"""Text of the first entity matching one of `labels`, or empty string."""
for e in entities:
if e.label in labels:
return e.text
return ""
def _result_to_row(result: DocumentResult) -> dict[str, str | int | float]:
e = result.entities
return {
"filename": result.image_path.name,
"doc_type": result.classification.doc_type,
"doc_type_confidence": round(result.classification.confidence, 3),
"n_lines": len(result.lines),
"n_review_lines": result.n_review_lines,
"mean_prob_wrong": round(result.mean_prob_wrong, 3),
"sender": _first_entity(e, {"SENDER"}),
"recipient": _first_entity(e, {"RECIPIENT"}),
"signed_date": _first_entity(e, {"SIGNED_DATE"}),
"amount": _first_entity(e, {"AMOUNT"}),
"persons": "; ".join(_entities_by_label(e, {"PERSON", "REFERENCED_PERSON"})),
"dates": "; ".join(_entities_by_label(e, {"DATE", "SIGNED_DATE"})),
"places": "; ".join(_entities_by_label(e, {"GPE", "LOC", "REFERENCED_PLACE"})),
}
def _write_row(csv_path: Path, row: dict, *, write_header: bool) -> None:
mode = "w" if write_header else "a"
with csv_path.open(mode, encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
if write_header:
writer.writeheader()
writer.writerow(row)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("folder", type=Path, help="Folder of images to process")
parser.add_argument(
"-o", "--output", type=Path, default=Path("catalogue.csv"),
help="Output CSV path (default: ./catalogue.csv)",
)
parser.add_argument(
"--no-api", action="store_true",
help="Skip Claude calls (post-correction, classify, extract); spaCy still runs",
)
parser.add_argument(
"--limit", type=int, default=None, help="Process at most this many files",
)
args = parser.parse_args()
if not args.folder.is_dir():
sys.exit(f"folder not found: {args.folder}")
paths = _walk_images(args.folder)
if not paths:
sys.exit(f"no image files in {args.folder}")
print(f"[batch] found {len(paths)} images under {args.folder}", file=sys.stderr)
already = _existing_filenames(args.output)
if already:
print(f"[batch] resuming — {len(already)} files already in {args.output}",
file=sys.stderr)
pending = [p for p in paths if p.name not in already]
if args.limit is not None:
pending = pending[: args.limit]
if not pending:
print("[batch] nothing to do", file=sys.stderr)
return 0
write_header = not args.output.exists()
args.output.parent.mkdir(parents=True, exist_ok=True)
n_done = 0
n_failed = 0
t_start = time.monotonic()
for i, path in enumerate(pending, start=1):
t0 = time.monotonic()
try:
result = process(path, no_api=args.no_api)
except Exception as exc:
n_failed += 1
print(f"[{i}/{len(pending)}] {path.name} FAILED: {exc!r}",
file=sys.stderr)
continue
row = _result_to_row(result)
_write_row(args.output, row, write_header=write_header)
write_header = False
n_done += 1
elapsed = time.monotonic() - t0
print(
f"[{i}/{len(pending)}] {path.name} -> "
f"{row['doc_type']} ({row['doc_type_confidence']:.2f}), "
f"{row['n_review_lines']}/{row['n_lines']} flagged, "
f"{elapsed:.1f}s",
file=sys.stderr,
)
total = time.monotonic() - t_start
print(
f"\n[done] wrote {args.output} "
f"({n_done} processed, {n_failed} failed, {total:.0f}s total)"
)
return 0 if n_failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())
|