import logging from typing import Any import cv2 import numpy as np from paddleocr import PaddleOCR from utils.image_prep import boost_contrast logger = logging.getLogger(__name__) # Tolerance for merging overlapping blocks from dual-pass OCR (pixels) _MERGE_Y_TOL = 15 _MERGE_X_TOL = 40 class PaddleOCRReader: """ Stage 1: Wraps PaddleOCR to extract raw text blocks from a receipt image. Runs a dual-pass strategy: OCR on the original image AND on a CLAHE-enhanced version, then merges results. Different preprocessing helps different regions of the receipt (e.g., CLAHE can wash out faded areas but helps with low-contrast text elsewhere). Initialised once at app startup (model loading is expensive). """ def __init__(self): logger.info("Loading PaddleOCR model...") self._ocr = PaddleOCR( lang="en", ocr_version="PP-OCRv5", use_doc_orientation_classify=False, use_doc_unwarping=False, ) logger.info("PaddleOCR model loaded.") def extract(self, image: np.ndarray, min_confidence: float = 0.80) -> list[dict[str, Any]]: """ Run OCR on a preprocessed BGR numpy array using dual-pass strategy. Returns a list of blocks sorted top-to-bottom: [{ "text": str, "confidence": float, "bbox": [[x,y], ...] }, ...] Blocks below min_confidence are dropped. """ if image is None or image.size == 0: raise ValueError("Empty image array provided") # Use a lower confidence for the logo zone (top 10% of image) logo_confidence = min(min_confidence, 0.50) img_height = image.shape[0] logo_cutoff_y = img_height * 0.10 import time t0 = time.perf_counter() clean = boost_contrast(image) t1 = time.perf_counter() print(f"TIMING boost_contrast: {t1-t0:.2f}s", flush=True) clahe = _enhance_clahe(clean) t2 = time.perf_counter() print(f"TIMING clahe: {t2-t1:.2f}s", flush=True) blocks_contrast = self._run_ocr(clean, logo_confidence) t3 = time.perf_counter() print(f"TIMING model predict pass1: {t3-t2:.2f}s", flush=True) blocks_clahe = self._run_ocr(clahe, logo_confidence) t4 = time.perf_counter() print(f"TIMING model predict pass2: {t4-t3:.2f}s", flush=True) merged = _merge_blocks(blocks_contrast, blocks_clahe) # Apply normal confidence threshold to everything outside the logo zone filtered = [] for b in merged: y = _top_y(b["bbox"]) if y <= logo_cutoff_y or b["confidence"] >= min_confidence: filtered.append(b) filtered.sort(key=lambda b: _top_y(b["bbox"])) return filtered def _run_ocr(self, image: np.ndarray, min_confidence: float) -> list[dict[str, Any]]: """Run a single OCR pass and return filtered blocks.""" results = self._ocr.predict(image) if not results: return [] blocks: list[dict] = [] for result in results: res = result.get("res", result) if hasattr(result, "get") else result rec_texts = res.get("rec_texts") if hasattr(res, "get") else getattr(res, "rec_texts", None) rec_scores = res.get("rec_scores") if hasattr(res, "get") else getattr(res, "rec_scores", None) rec_polys = res.get("rec_polys") if hasattr(res, "get") else getattr(res, "rec_polys", None) if rec_texts is None: try: for item in result: block = _parse_legacy_item(item) if block and block["confidence"] >= min_confidence: blocks.append(block) except (TypeError, ValueError): logger.warning("Could not parse OCR result item: %s", result) continue for text, score, poly in zip(rec_texts, rec_scores, rec_polys): if not text or not text.strip(): continue score_f = float(score) if score_f < min_confidence: continue blocks.append({ "text": text.strip(), "confidence": score_f, "bbox": poly.tolist() if hasattr(poly, "tolist") else list(poly), }) return blocks def _enhance_clahe(img: np.ndarray) -> np.ndarray: """Apply CLAHE contrast enhancement for the second OCR pass.""" grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(grey) return cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR) def _merge_blocks(primary: list[dict], secondary: list[dict]) -> list[dict]: """ Merge blocks from two OCR passes using Non-Maximum Suppression. Combines all blocks, sorts by confidence, and greedily selects non-overlapping blocks. This handles cases where the two passes split text differently (e.g., one pass detects "Tesco Spanish Chorizo" as one block while another splits it into "Tesco" + "Spanish Chorizo"). """ all_blocks = primary + secondary # Sort by confidence descending — prefer highest confidence first all_blocks.sort(key=lambda b: -b["confidence"]) selected: list[dict] = [] for block in all_blocks: if any(_blocks_overlap(block, sel) for sel in selected): continue selected.append(block) return selected def _blocks_overlap(a: dict, b: dict) -> bool: """Check if two blocks overlap: y-proximity AND horizontal bbox overlap.""" ay = _top_y(a["bbox"]) by = _top_y(b["bbox"]) if abs(ay - by) > _MERGE_Y_TOL: return False # Check horizontal overlap of bounding boxes ax1 = min(pt[0] for pt in a["bbox"]) ax2 = max(pt[0] for pt in a["bbox"]) bx1 = min(pt[0] for pt in b["bbox"]) bx2 = max(pt[0] for pt in b["bbox"]) return ax1 < bx2 and ax2 > bx1 def _top_y(bbox: list) -> float: try: return min(pt[1] for pt in bbox) except (TypeError, IndexError): return 0.0 def _left_x(bbox: list) -> float: try: return min(pt[0] for pt in bbox) except (TypeError, IndexError): return 0.0 def _parse_legacy_item(item) -> dict | None: """Handle the older PaddleOCR list-of-tuples format as a fallback.""" try: bbox, (text, confidence) = item if not text or not text.strip(): return None return { "text": text.strip(), "confidence": float(confidence), "bbox": bbox, } except (TypeError, ValueError): return None