Spaces:
Running
Running
| import logging | |
| from typing import Any | |
| import cv2 | |
| import numpy as np | |
| from paddleocr import PaddleOCR | |
| from utils.image_prep import boost_contrast | |
| logger = logging.getLogger(__name__) | |
| # Tolerance for merging overlapping blocks from dual-pass OCR (pixels) | |
| _MERGE_Y_TOL = 15 | |
| _MERGE_X_TOL = 40 | |
| class PaddleOCRReader: | |
| """ | |
| Stage 1: Wraps PaddleOCR to extract raw text blocks from a receipt image. | |
| Runs a dual-pass strategy: OCR on the original image AND on a | |
| CLAHE-enhanced version, then merges results. Different preprocessing | |
| helps different regions of the receipt (e.g., CLAHE can wash out faded | |
| areas but helps with low-contrast text elsewhere). | |
| Initialised once at app startup (model loading is expensive). | |
| """ | |
| def __init__(self): | |
| logger.info("Loading PaddleOCR model...") | |
| self._ocr = PaddleOCR( | |
| lang="en", | |
| ocr_version="PP-OCRv5", | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| ) | |
| logger.info("PaddleOCR model loaded.") | |
| def extract(self, image: np.ndarray, min_confidence: float = 0.80) -> list[dict[str, Any]]: | |
| """ | |
| Run OCR on a preprocessed BGR numpy array using dual-pass strategy. | |
| Returns a list of blocks sorted top-to-bottom: | |
| [{ "text": str, "confidence": float, "bbox": [[x,y], ...] }, ...] | |
| Blocks below min_confidence are dropped. | |
| """ | |
| if image is None or image.size == 0: | |
| raise ValueError("Empty image array provided") | |
| # Use a lower confidence for the logo zone (top 10% of image) | |
| logo_confidence = min(min_confidence, 0.50) | |
| img_height = image.shape[0] | |
| logo_cutoff_y = img_height * 0.10 | |
| import time | |
| t0 = time.perf_counter() | |
| clean = boost_contrast(image) | |
| t1 = time.perf_counter() | |
| print(f"TIMING boost_contrast: {t1-t0:.2f}s", flush=True) | |
| clahe = _enhance_clahe(clean) | |
| t2 = time.perf_counter() | |
| print(f"TIMING clahe: {t2-t1:.2f}s", flush=True) | |
| blocks_contrast = self._run_ocr(clean, logo_confidence) | |
| t3 = time.perf_counter() | |
| print(f"TIMING model predict pass1: {t3-t2:.2f}s", flush=True) | |
| blocks_clahe = self._run_ocr(clahe, logo_confidence) | |
| t4 = time.perf_counter() | |
| print(f"TIMING model predict pass2: {t4-t3:.2f}s", flush=True) | |
| merged = _merge_blocks(blocks_contrast, blocks_clahe) | |
| # Apply normal confidence threshold to everything outside the logo zone | |
| filtered = [] | |
| for b in merged: | |
| y = _top_y(b["bbox"]) | |
| if y <= logo_cutoff_y or b["confidence"] >= min_confidence: | |
| filtered.append(b) | |
| filtered.sort(key=lambda b: _top_y(b["bbox"])) | |
| return filtered | |
| def _run_ocr(self, image: np.ndarray, min_confidence: float) -> list[dict[str, Any]]: | |
| """Run a single OCR pass and return filtered blocks.""" | |
| results = self._ocr.predict(image) | |
| if not results: | |
| return [] | |
| blocks: list[dict] = [] | |
| for result in results: | |
| res = result.get("res", result) if hasattr(result, "get") else result | |
| rec_texts = res.get("rec_texts") if hasattr(res, "get") else getattr(res, "rec_texts", None) | |
| rec_scores = res.get("rec_scores") if hasattr(res, "get") else getattr(res, "rec_scores", None) | |
| rec_polys = res.get("rec_polys") if hasattr(res, "get") else getattr(res, "rec_polys", None) | |
| if rec_texts is None: | |
| try: | |
| for item in result: | |
| block = _parse_legacy_item(item) | |
| if block and block["confidence"] >= min_confidence: | |
| blocks.append(block) | |
| except (TypeError, ValueError): | |
| logger.warning("Could not parse OCR result item: %s", result) | |
| continue | |
| for text, score, poly in zip(rec_texts, rec_scores, rec_polys): | |
| if not text or not text.strip(): | |
| continue | |
| score_f = float(score) | |
| if score_f < min_confidence: | |
| continue | |
| blocks.append({ | |
| "text": text.strip(), | |
| "confidence": score_f, | |
| "bbox": poly.tolist() if hasattr(poly, "tolist") else list(poly), | |
| }) | |
| return blocks | |
| def _enhance_clahe(img: np.ndarray) -> np.ndarray: | |
| """Apply CLAHE contrast enhancement for the second OCR pass.""" | |
| grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| enhanced = clahe.apply(grey) | |
| return cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR) | |
| def _merge_blocks(primary: list[dict], secondary: list[dict]) -> list[dict]: | |
| """ | |
| Merge blocks from two OCR passes using Non-Maximum Suppression. | |
| Combines all blocks, sorts by confidence, and greedily selects | |
| non-overlapping blocks. This handles cases where the two passes | |
| split text differently (e.g., one pass detects "Tesco Spanish Chorizo" | |
| as one block while another splits it into "Tesco" + "Spanish Chorizo"). | |
| """ | |
| all_blocks = primary + secondary | |
| # Sort by confidence descending — prefer highest confidence first | |
| all_blocks.sort(key=lambda b: -b["confidence"]) | |
| selected: list[dict] = [] | |
| for block in all_blocks: | |
| if any(_blocks_overlap(block, sel) for sel in selected): | |
| continue | |
| selected.append(block) | |
| return selected | |
| def _blocks_overlap(a: dict, b: dict) -> bool: | |
| """Check if two blocks overlap: y-proximity AND horizontal bbox overlap.""" | |
| ay = _top_y(a["bbox"]) | |
| by = _top_y(b["bbox"]) | |
| if abs(ay - by) > _MERGE_Y_TOL: | |
| return False | |
| # Check horizontal overlap of bounding boxes | |
| ax1 = min(pt[0] for pt in a["bbox"]) | |
| ax2 = max(pt[0] for pt in a["bbox"]) | |
| bx1 = min(pt[0] for pt in b["bbox"]) | |
| bx2 = max(pt[0] for pt in b["bbox"]) | |
| return ax1 < bx2 and ax2 > bx1 | |
| def _top_y(bbox: list) -> float: | |
| try: | |
| return min(pt[1] for pt in bbox) | |
| except (TypeError, IndexError): | |
| return 0.0 | |
| def _left_x(bbox: list) -> float: | |
| try: | |
| return min(pt[0] for pt in bbox) | |
| except (TypeError, IndexError): | |
| return 0.0 | |
| def _parse_legacy_item(item) -> dict | None: | |
| """Handle the older PaddleOCR list-of-tuples format as a fallback.""" | |
| try: | |
| bbox, (text, confidence) = item | |
| if not text or not text.strip(): | |
| return None | |
| return { | |
| "text": text.strip(), | |
| "confidence": float(confidence), | |
| "bbox": bbox, | |
| } | |
| except (TypeError, ValueError): | |
| return None | |