File size: 7,750 Bytes
5e4028d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""Image preprocessing: deskew, binarize, line segmentation.

Default line detector is doctr (robust on cursive); horizontal projection
profile is a fallback for clean printed pages. Returns line image crops with
bounding boxes for downstream OCR.
"""

from __future__ import annotations

import sys
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path

import cv2
import numpy as np
from deskew import determine_skew
from PIL import Image, ImageOps

# Register HEIF/HEIC support with PIL so iPhone photos load through the PIL
# fallback path. Many iPhone-exported files have a .jpg/.jpeg extension but
# HEIC contents — without this, PIL.Image.open() fails with UnidentifiedImageError.
try:
    from pillow_heif import register_heif_opener

    register_heif_opener()
except ImportError:
    pass  # pillow-heif is optional; absence just means HEIC files won't load


@dataclass
class LineCrop:
    """A cropped line image plus its bounding box in the original (deskewed) page."""

    image: np.ndarray
    bbox: tuple[int, int, int, int]  # (x, y, w, h)
    deskew_angle: float = 0.0


@lru_cache(maxsize=1)
def _load_doctr_detector():
    """Lazy-load doctr's text detector. Heavy on first import (~500 MB model)."""
    from doctr.models import detection_predictor

    return detection_predictor("db_resnet50", pretrained=True, assume_straight_pages=True)


def _read_image(image_path: Path) -> np.ndarray:
    """Read an image as a BGR numpy array.

    cv2.imread is fast but silently returns None on JPEG variants it doesn't
    handle (HEIC-derived files exported by iPhone Photos, unusual ICC profiles,
    progressive JPEGs with non-standard markers). We fall back to PIL, which
    handles those, and also apply EXIF orientation since phone cameras store
    rotation in metadata rather than rotating pixels.
    """
    image = cv2.imread(str(image_path))
    if image is not None:
        return image
    try:
        pil = Image.open(image_path)
        pil = ImageOps.exif_transpose(pil).convert("RGB")
        rgb = np.array(pil)
        # cv2 expects BGR ordering downstream
        return rgb[:, :, ::-1].copy()
    except Exception as exc:
        raise FileNotFoundError(
            f"Could not read image: {image_path} "
            f"(cv2.imread returned None; PIL fallback failed: {exc!r})"
        )


def _to_grayscale(image: np.ndarray) -> np.ndarray:
    if image.ndim == 3:
        return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    return image


def _deskew(image: np.ndarray) -> tuple[np.ndarray, float]:
    """Estimate skew angle and rotate the image to correct it."""
    gray = _to_grayscale(image)
    angle = determine_skew(gray)
    if angle is None or abs(angle) < 0.1:
        return image, 0.0
    h, w = image.shape[:2]
    M = cv2.getRotationMatrix2D((w / 2, h / 2), float(angle), 1.0)
    rotated = cv2.warpAffine(
        image, M, (w, h), flags=cv2.INTER_CUBIC, borderValue=(255, 255, 255)
    )
    return rotated, float(angle)


def _segment_lines_doctr(image: np.ndarray) -> list[tuple[int, int, int, int]]:
    """Use doctr to find word boxes, then cluster vertically into line bboxes."""
    detector = _load_doctr_detector()
    if image.ndim == 2:
        rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    else:
        rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    h, w = rgb.shape[:2]
    result = detector([rgb])
    if not result:
        return []
    page = result[0]
    # doctr returns either a dict with "words" or a numpy array of (N, 5)
    if isinstance(page, dict):
        words = page.get("words", [])
    else:
        words = page
    if len(words) == 0:
        return []
    # Convert normalized [0,1] coords to absolute pixel coords
    abs_boxes: list[tuple[int, int, int, int]] = []
    for box in words:
        x0, y0, x1, y1 = box[:4]
        abs_boxes.append((int(x0 * w), int(y0 * h), int(x1 * w), int(y1 * h)))
    return _cluster_words_to_lines(abs_boxes)


def _cluster_words_to_lines(
    word_boxes: list[tuple[int, int, int, int]],
) -> list[tuple[int, int, int, int]]:
    """Group word boxes by vertical center; return one bbox per line."""
    if not word_boxes:
        return []
    word_boxes = sorted(word_boxes, key=lambda b: (b[1] + b[3]) / 2)
    clusters: list[list[tuple[int, int, int, int]]] = [[word_boxes[0]]]
    for box in word_boxes[1:]:
        last_cluster = clusters[-1]
        ref_y = sum((b[1] + b[3]) / 2 for b in last_cluster) / len(last_cluster)
        ref_height = max((b[3] - b[1]) for b in last_cluster)
        yc = (box[1] + box[3]) / 2
        if abs(yc - ref_y) < 0.5 * ref_height:
            last_cluster.append(box)
        else:
            clusters.append([box])

    line_bboxes: list[tuple[int, int, int, int]] = []
    for cluster in clusters:
        x0 = min(b[0] for b in cluster)
        y0 = min(b[1] for b in cluster)
        x1 = max(b[2] for b in cluster)
        y1 = max(b[3] for b in cluster)
        line_bboxes.append((x0, y0, x1 - x0, y1 - y0))
    # Sort top-to-bottom for reading order
    line_bboxes.sort(key=lambda b: b[1])
    return line_bboxes


def _segment_lines_projection(gray: np.ndarray) -> list[tuple[int, int, int, int]]:
    """Fallback line segmentation via horizontal projection profile.

    Works well on clean printed text; less reliable on cursive.
    """
    binary = cv2.adaptiveThreshold(
        gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 31, 10
    )
    h, w = binary.shape
    row_sums = (binary > 0).sum(axis=1)
    ink_threshold = w * 0.01
    in_line = False
    start = 0
    boxes: list[tuple[int, int, int, int]] = []
    min_height = 5
    for y, s in enumerate(row_sums):
        if s > ink_threshold:
            if not in_line:
                start = y
                in_line = True
        else:
            if in_line and y - start > min_height:
                boxes.append((0, start, w, y - start))
            in_line = False
    if in_line and h - start > min_height:
        boxes.append((0, start, w, h - start))
    return boxes


def preprocess(
    image_path: str | Path,
    *,
    use_doctr: bool = True,
    min_line_width: int = 20,
    min_line_height: int = 8,
) -> list[LineCrop]:
    """Load an image, deskew it, segment into line crops in reading order.

    Returns a list of LineCrop objects whose `bbox` is in the deskewed-page
    coordinate frame. Tiny artifacts below the size thresholds are discarded.
    """
    image_path = Path(image_path)
    image = _read_image(image_path)

    deskewed, angle = _deskew(image)
    gray = _to_grayscale(deskewed)

    if use_doctr:
        try:
            line_bboxes = _segment_lines_doctr(deskewed)
            if not line_bboxes:
                print(
                    f"[preprocess] doctr returned no boxes for {image_path}; "
                    "falling back to projection profile",
                    file=sys.stderr,
                )
                line_bboxes = _segment_lines_projection(gray)
        except Exception as e:
            print(
                f"[preprocess] doctr failed ({e!r}); "
                "falling back to projection profile. "
                "Run `python scripts/setup_models.py` if the doctr model file is missing.",
                file=sys.stderr,
            )
            line_bboxes = _segment_lines_projection(gray)
    else:
        line_bboxes = _segment_lines_projection(gray)

    crops: list[LineCrop] = []
    for x, y, w, h in line_bboxes:
        if w < min_line_width or h < min_line_height:
            continue
        crop = deskewed[y : y + h, x : x + w].copy()
        crops.append(LineCrop(image=crop, bbox=(x, y, w, h), deskew_angle=angle))
    return crops