receipt-ocr / ocr /reader.py
sinful1's picture
fix(logging): use print(flush=True) for TIMING lines so they appear in HF logs
3a00f27
import logging
from typing import Any
import cv2
import numpy as np
from paddleocr import PaddleOCR
from utils.image_prep import boost_contrast
logger = logging.getLogger(__name__)
# Tolerance for merging overlapping blocks from dual-pass OCR (pixels)
_MERGE_Y_TOL = 15
_MERGE_X_TOL = 40
class PaddleOCRReader:
"""
Stage 1: Wraps PaddleOCR to extract raw text blocks from a receipt image.
Runs a dual-pass strategy: OCR on the original image AND on a
CLAHE-enhanced version, then merges results. Different preprocessing
helps different regions of the receipt (e.g., CLAHE can wash out faded
areas but helps with low-contrast text elsewhere).
Initialised once at app startup (model loading is expensive).
"""
def __init__(self):
logger.info("Loading PaddleOCR model...")
self._ocr = PaddleOCR(
lang="en",
ocr_version="PP-OCRv5",
use_doc_orientation_classify=False,
use_doc_unwarping=False,
)
logger.info("PaddleOCR model loaded.")
def extract(self, image: np.ndarray, min_confidence: float = 0.80) -> list[dict[str, Any]]:
"""
Run OCR on a preprocessed BGR numpy array using dual-pass strategy.
Returns a list of blocks sorted top-to-bottom:
[{ "text": str, "confidence": float, "bbox": [[x,y], ...] }, ...]
Blocks below min_confidence are dropped.
"""
if image is None or image.size == 0:
raise ValueError("Empty image array provided")
# Use a lower confidence for the logo zone (top 10% of image)
logo_confidence = min(min_confidence, 0.50)
img_height = image.shape[0]
logo_cutoff_y = img_height * 0.10
import time
t0 = time.perf_counter()
clean = boost_contrast(image)
t1 = time.perf_counter()
print(f"TIMING boost_contrast: {t1-t0:.2f}s", flush=True)
clahe = _enhance_clahe(clean)
t2 = time.perf_counter()
print(f"TIMING clahe: {t2-t1:.2f}s", flush=True)
blocks_contrast = self._run_ocr(clean, logo_confidence)
t3 = time.perf_counter()
print(f"TIMING model predict pass1: {t3-t2:.2f}s", flush=True)
blocks_clahe = self._run_ocr(clahe, logo_confidence)
t4 = time.perf_counter()
print(f"TIMING model predict pass2: {t4-t3:.2f}s", flush=True)
merged = _merge_blocks(blocks_contrast, blocks_clahe)
# Apply normal confidence threshold to everything outside the logo zone
filtered = []
for b in merged:
y = _top_y(b["bbox"])
if y <= logo_cutoff_y or b["confidence"] >= min_confidence:
filtered.append(b)
filtered.sort(key=lambda b: _top_y(b["bbox"]))
return filtered
def _run_ocr(self, image: np.ndarray, min_confidence: float) -> list[dict[str, Any]]:
"""Run a single OCR pass and return filtered blocks."""
results = self._ocr.predict(image)
if not results:
return []
blocks: list[dict] = []
for result in results:
res = result.get("res", result) if hasattr(result, "get") else result
rec_texts = res.get("rec_texts") if hasattr(res, "get") else getattr(res, "rec_texts", None)
rec_scores = res.get("rec_scores") if hasattr(res, "get") else getattr(res, "rec_scores", None)
rec_polys = res.get("rec_polys") if hasattr(res, "get") else getattr(res, "rec_polys", None)
if rec_texts is None:
try:
for item in result:
block = _parse_legacy_item(item)
if block and block["confidence"] >= min_confidence:
blocks.append(block)
except (TypeError, ValueError):
logger.warning("Could not parse OCR result item: %s", result)
continue
for text, score, poly in zip(rec_texts, rec_scores, rec_polys):
if not text or not text.strip():
continue
score_f = float(score)
if score_f < min_confidence:
continue
blocks.append({
"text": text.strip(),
"confidence": score_f,
"bbox": poly.tolist() if hasattr(poly, "tolist") else list(poly),
})
return blocks
def _enhance_clahe(img: np.ndarray) -> np.ndarray:
"""Apply CLAHE contrast enhancement for the second OCR pass."""
grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(grey)
return cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR)
def _merge_blocks(primary: list[dict], secondary: list[dict]) -> list[dict]:
"""
Merge blocks from two OCR passes using Non-Maximum Suppression.
Combines all blocks, sorts by confidence, and greedily selects
non-overlapping blocks. This handles cases where the two passes
split text differently (e.g., one pass detects "Tesco Spanish Chorizo"
as one block while another splits it into "Tesco" + "Spanish Chorizo").
"""
all_blocks = primary + secondary
# Sort by confidence descending — prefer highest confidence first
all_blocks.sort(key=lambda b: -b["confidence"])
selected: list[dict] = []
for block in all_blocks:
if any(_blocks_overlap(block, sel) for sel in selected):
continue
selected.append(block)
return selected
def _blocks_overlap(a: dict, b: dict) -> bool:
"""Check if two blocks overlap: y-proximity AND horizontal bbox overlap."""
ay = _top_y(a["bbox"])
by = _top_y(b["bbox"])
if abs(ay - by) > _MERGE_Y_TOL:
return False
# Check horizontal overlap of bounding boxes
ax1 = min(pt[0] for pt in a["bbox"])
ax2 = max(pt[0] for pt in a["bbox"])
bx1 = min(pt[0] for pt in b["bbox"])
bx2 = max(pt[0] for pt in b["bbox"])
return ax1 < bx2 and ax2 > bx1
def _top_y(bbox: list) -> float:
try:
return min(pt[1] for pt in bbox)
except (TypeError, IndexError):
return 0.0
def _left_x(bbox: list) -> float:
try:
return min(pt[0] for pt in bbox)
except (TypeError, IndexError):
return 0.0
def _parse_legacy_item(item) -> dict | None:
"""Handle the older PaddleOCR list-of-tuples format as a fallback."""
try:
bbox, (text, confidence) = item
if not text or not text.strip():
return None
return {
"text": text.strip(),
"confidence": float(confidence),
"bbox": bbox,
}
except (TypeError, ValueError):
return None