File size: 7,750 Bytes
5e4028d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | """Image preprocessing: deskew, binarize, line segmentation.
Default line detector is doctr (robust on cursive); horizontal projection
profile is a fallback for clean printed pages. Returns line image crops with
bounding boxes for downstream OCR.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
import cv2
import numpy as np
from deskew import determine_skew
from PIL import Image, ImageOps
# Register HEIF/HEIC support with PIL so iPhone photos load through the PIL
# fallback path. Many iPhone-exported files have a .jpg/.jpeg extension but
# HEIC contents — without this, PIL.Image.open() fails with UnidentifiedImageError.
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
pass # pillow-heif is optional; absence just means HEIC files won't load
@dataclass
class LineCrop:
"""A cropped line image plus its bounding box in the original (deskewed) page."""
image: np.ndarray
bbox: tuple[int, int, int, int] # (x, y, w, h)
deskew_angle: float = 0.0
@lru_cache(maxsize=1)
def _load_doctr_detector():
"""Lazy-load doctr's text detector. Heavy on first import (~500 MB model)."""
from doctr.models import detection_predictor
return detection_predictor("db_resnet50", pretrained=True, assume_straight_pages=True)
def _read_image(image_path: Path) -> np.ndarray:
"""Read an image as a BGR numpy array.
cv2.imread is fast but silently returns None on JPEG variants it doesn't
handle (HEIC-derived files exported by iPhone Photos, unusual ICC profiles,
progressive JPEGs with non-standard markers). We fall back to PIL, which
handles those, and also apply EXIF orientation since phone cameras store
rotation in metadata rather than rotating pixels.
"""
image = cv2.imread(str(image_path))
if image is not None:
return image
try:
pil = Image.open(image_path)
pil = ImageOps.exif_transpose(pil).convert("RGB")
rgb = np.array(pil)
# cv2 expects BGR ordering downstream
return rgb[:, :, ::-1].copy()
except Exception as exc:
raise FileNotFoundError(
f"Could not read image: {image_path} "
f"(cv2.imread returned None; PIL fallback failed: {exc!r})"
)
def _to_grayscale(image: np.ndarray) -> np.ndarray:
if image.ndim == 3:
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return image
def _deskew(image: np.ndarray) -> tuple[np.ndarray, float]:
"""Estimate skew angle and rotate the image to correct it."""
gray = _to_grayscale(image)
angle = determine_skew(gray)
if angle is None or abs(angle) < 0.1:
return image, 0.0
h, w = image.shape[:2]
M = cv2.getRotationMatrix2D((w / 2, h / 2), float(angle), 1.0)
rotated = cv2.warpAffine(
image, M, (w, h), flags=cv2.INTER_CUBIC, borderValue=(255, 255, 255)
)
return rotated, float(angle)
def _segment_lines_doctr(image: np.ndarray) -> list[tuple[int, int, int, int]]:
"""Use doctr to find word boxes, then cluster vertically into line bboxes."""
detector = _load_doctr_detector()
if image.ndim == 2:
rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
else:
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
h, w = rgb.shape[:2]
result = detector([rgb])
if not result:
return []
page = result[0]
# doctr returns either a dict with "words" or a numpy array of (N, 5)
if isinstance(page, dict):
words = page.get("words", [])
else:
words = page
if len(words) == 0:
return []
# Convert normalized [0,1] coords to absolute pixel coords
abs_boxes: list[tuple[int, int, int, int]] = []
for box in words:
x0, y0, x1, y1 = box[:4]
abs_boxes.append((int(x0 * w), int(y0 * h), int(x1 * w), int(y1 * h)))
return _cluster_words_to_lines(abs_boxes)
def _cluster_words_to_lines(
word_boxes: list[tuple[int, int, int, int]],
) -> list[tuple[int, int, int, int]]:
"""Group word boxes by vertical center; return one bbox per line."""
if not word_boxes:
return []
word_boxes = sorted(word_boxes, key=lambda b: (b[1] + b[3]) / 2)
clusters: list[list[tuple[int, int, int, int]]] = [[word_boxes[0]]]
for box in word_boxes[1:]:
last_cluster = clusters[-1]
ref_y = sum((b[1] + b[3]) / 2 for b in last_cluster) / len(last_cluster)
ref_height = max((b[3] - b[1]) for b in last_cluster)
yc = (box[1] + box[3]) / 2
if abs(yc - ref_y) < 0.5 * ref_height:
last_cluster.append(box)
else:
clusters.append([box])
line_bboxes: list[tuple[int, int, int, int]] = []
for cluster in clusters:
x0 = min(b[0] for b in cluster)
y0 = min(b[1] for b in cluster)
x1 = max(b[2] for b in cluster)
y1 = max(b[3] for b in cluster)
line_bboxes.append((x0, y0, x1 - x0, y1 - y0))
# Sort top-to-bottom for reading order
line_bboxes.sort(key=lambda b: b[1])
return line_bboxes
def _segment_lines_projection(gray: np.ndarray) -> list[tuple[int, int, int, int]]:
"""Fallback line segmentation via horizontal projection profile.
Works well on clean printed text; less reliable on cursive.
"""
binary = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 31, 10
)
h, w = binary.shape
row_sums = (binary > 0).sum(axis=1)
ink_threshold = w * 0.01
in_line = False
start = 0
boxes: list[tuple[int, int, int, int]] = []
min_height = 5
for y, s in enumerate(row_sums):
if s > ink_threshold:
if not in_line:
start = y
in_line = True
else:
if in_line and y - start > min_height:
boxes.append((0, start, w, y - start))
in_line = False
if in_line and h - start > min_height:
boxes.append((0, start, w, h - start))
return boxes
def preprocess(
image_path: str | Path,
*,
use_doctr: bool = True,
min_line_width: int = 20,
min_line_height: int = 8,
) -> list[LineCrop]:
"""Load an image, deskew it, segment into line crops in reading order.
Returns a list of LineCrop objects whose `bbox` is in the deskewed-page
coordinate frame. Tiny artifacts below the size thresholds are discarded.
"""
image_path = Path(image_path)
image = _read_image(image_path)
deskewed, angle = _deskew(image)
gray = _to_grayscale(deskewed)
if use_doctr:
try:
line_bboxes = _segment_lines_doctr(deskewed)
if not line_bboxes:
print(
f"[preprocess] doctr returned no boxes for {image_path}; "
"falling back to projection profile",
file=sys.stderr,
)
line_bboxes = _segment_lines_projection(gray)
except Exception as e:
print(
f"[preprocess] doctr failed ({e!r}); "
"falling back to projection profile. "
"Run `python scripts/setup_models.py` if the doctr model file is missing.",
file=sys.stderr,
)
line_bboxes = _segment_lines_projection(gray)
else:
line_bboxes = _segment_lines_projection(gray)
crops: list[LineCrop] = []
for x, y, w, h in line_bboxes:
if w < min_line_width or h < min_line_height:
continue
crop = deskewed[y : y + h, x : x + w].copy()
crops.append(LineCrop(image=crop, bbox=(x, y, w, h), deskew_angle=angle))
return crops
|