File size: 6,206 Bytes
5e4028d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""Batch runner: process a folder of images, write catalogue.csv.

One row per document. Sequential processing — the per-doc Claude vision
call dominates wall time and multiprocessing risks PyTorch / HuggingFace
fork hazards for marginal gain at 30-50 docs.

Idempotent: if --output already exists, files already represented in it
are skipped. The CSV is written incrementally (header on first row, append
after each doc) so an interrupted run leaves a usable partial catalogue.

Usage:
    python -m src.batch data/raw/loc/                          # uses ./catalogue.csv
    python -m src.batch data/raw/loc/ -o /tmp/catalogue.csv
    python -m src.batch data/raw/loc/abraham-lincoln-papers --no-api
"""

from __future__ import annotations

import argparse
import csv
import sys
import time
from pathlib import Path

from src.ner import Entity
from src.pipeline import DocumentResult, process

IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"}

CSV_COLUMNS = [
    "filename",
    "doc_type",
    "doc_type_confidence",
    "n_lines",
    "n_review_lines",
    "mean_prob_wrong",
    "sender",
    "recipient",
    "signed_date",
    "amount",
    "persons",
    "dates",
    "places",
]


def _walk_images(folder: Path) -> list[Path]:
    """Return image paths sorted, recursive."""
    paths: list[Path] = []
    for p in folder.rglob("*"):
        if p.is_file() and p.suffix.lower() in IMAGE_EXTS:
            paths.append(p)
    return sorted(paths)


def _existing_filenames(csv_path: Path) -> set[str]:
    """Read filename column from existing CSV (for resume)."""
    if not csv_path.exists():
        return set()
    with csv_path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        return {row["filename"] for row in reader if row.get("filename")}


def _entities_by_label(entities: list[Entity], labels: set[str]) -> list[str]:
    """Unique entity texts where label matches one of `labels`, in first-seen order."""
    seen: set[str] = set()
    out: list[str] = []
    for e in entities:
        if e.label in labels and e.text not in seen:
            seen.add(e.text)
            out.append(e.text)
    return out


def _first_entity(entities: list[Entity], labels: set[str]) -> str:
    """Text of the first entity matching one of `labels`, or empty string."""
    for e in entities:
        if e.label in labels:
            return e.text
    return ""


def _result_to_row(result: DocumentResult) -> dict[str, str | int | float]:
    e = result.entities
    return {
        "filename": result.image_path.name,
        "doc_type": result.classification.doc_type,
        "doc_type_confidence": round(result.classification.confidence, 3),
        "n_lines": len(result.lines),
        "n_review_lines": result.n_review_lines,
        "mean_prob_wrong": round(result.mean_prob_wrong, 3),
        "sender": _first_entity(e, {"SENDER"}),
        "recipient": _first_entity(e, {"RECIPIENT"}),
        "signed_date": _first_entity(e, {"SIGNED_DATE"}),
        "amount": _first_entity(e, {"AMOUNT"}),
        "persons": "; ".join(_entities_by_label(e, {"PERSON", "REFERENCED_PERSON"})),
        "dates": "; ".join(_entities_by_label(e, {"DATE", "SIGNED_DATE"})),
        "places": "; ".join(_entities_by_label(e, {"GPE", "LOC", "REFERENCED_PLACE"})),
    }


def _write_row(csv_path: Path, row: dict, *, write_header: bool) -> None:
    mode = "w" if write_header else "a"
    with csv_path.open(mode, encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
        if write_header:
            writer.writeheader()
        writer.writerow(row)


def main() -> int:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("folder", type=Path, help="Folder of images to process")
    parser.add_argument(
        "-o", "--output", type=Path, default=Path("catalogue.csv"),
        help="Output CSV path (default: ./catalogue.csv)",
    )
    parser.add_argument(
        "--no-api", action="store_true",
        help="Skip Claude calls (post-correction, classify, extract); spaCy still runs",
    )
    parser.add_argument(
        "--limit", type=int, default=None, help="Process at most this many files",
    )
    args = parser.parse_args()

    if not args.folder.is_dir():
        sys.exit(f"folder not found: {args.folder}")

    paths = _walk_images(args.folder)
    if not paths:
        sys.exit(f"no image files in {args.folder}")
    print(f"[batch] found {len(paths)} images under {args.folder}", file=sys.stderr)

    already = _existing_filenames(args.output)
    if already:
        print(f"[batch] resuming — {len(already)} files already in {args.output}",
              file=sys.stderr)
    pending = [p for p in paths if p.name not in already]
    if args.limit is not None:
        pending = pending[: args.limit]
    if not pending:
        print("[batch] nothing to do", file=sys.stderr)
        return 0

    write_header = not args.output.exists()
    args.output.parent.mkdir(parents=True, exist_ok=True)

    n_done = 0
    n_failed = 0
    t_start = time.monotonic()
    for i, path in enumerate(pending, start=1):
        t0 = time.monotonic()
        try:
            result = process(path, no_api=args.no_api)
        except Exception as exc:
            n_failed += 1
            print(f"[{i}/{len(pending)}] {path.name}  FAILED: {exc!r}",
                  file=sys.stderr)
            continue

        row = _result_to_row(result)
        _write_row(args.output, row, write_header=write_header)
        write_header = False
        n_done += 1
        elapsed = time.monotonic() - t0
        print(
            f"[{i}/{len(pending)}] {path.name}  -> "
            f"{row['doc_type']} ({row['doc_type_confidence']:.2f}), "
            f"{row['n_review_lines']}/{row['n_lines']} flagged, "
            f"{elapsed:.1f}s",
            file=sys.stderr,
        )

    total = time.monotonic() - t_start
    print(
        f"\n[done] wrote {args.output}  "
        f"({n_done} processed, {n_failed} failed, {total:.0f}s total)"
    )
    return 0 if n_failed == 0 else 1


if __name__ == "__main__":
    sys.exit(main())