File size: 6,782 Bytes
619b919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ab1b0
 
619b919
d3ab1b0
3a00f27
d3ab1b0
 
3a00f27
b010bd2
d3ab1b0
3a00f27
b010bd2
 
3a00f27
b010bd2
619b919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import logging
from typing import Any

import cv2
import numpy as np
from paddleocr import PaddleOCR

from utils.image_prep import boost_contrast

logger = logging.getLogger(__name__)

# Tolerance for merging overlapping blocks from dual-pass OCR (pixels)
_MERGE_Y_TOL = 15
_MERGE_X_TOL = 40


class PaddleOCRReader:
    """
    Stage 1: Wraps PaddleOCR to extract raw text blocks from a receipt image.

    Runs a dual-pass strategy: OCR on the original image AND on a
    CLAHE-enhanced version, then merges results. Different preprocessing
    helps different regions of the receipt (e.g., CLAHE can wash out faded
    areas but helps with low-contrast text elsewhere).

    Initialised once at app startup (model loading is expensive).
    """

    def __init__(self):
        logger.info("Loading PaddleOCR model...")
        self._ocr = PaddleOCR(
            lang="en",
            ocr_version="PP-OCRv5",
            use_doc_orientation_classify=False,
            use_doc_unwarping=False,
        )
        logger.info("PaddleOCR model loaded.")

    def extract(self, image: np.ndarray, min_confidence: float = 0.80) -> list[dict[str, Any]]:
        """
        Run OCR on a preprocessed BGR numpy array using dual-pass strategy.

        Returns a list of blocks sorted top-to-bottom:
          [{ "text": str, "confidence": float, "bbox": [[x,y], ...] }, ...]

        Blocks below min_confidence are dropped.
        """
        if image is None or image.size == 0:
            raise ValueError("Empty image array provided")

        # Use a lower confidence for the logo zone (top 10% of image)
        logo_confidence = min(min_confidence, 0.50)
        img_height = image.shape[0]
        logo_cutoff_y = img_height * 0.10

        import time
        t0 = time.perf_counter()
        clean = boost_contrast(image)
        t1 = time.perf_counter()
        print(f"TIMING boost_contrast: {t1-t0:.2f}s", flush=True)
        clahe = _enhance_clahe(clean)
        t2 = time.perf_counter()
        print(f"TIMING clahe: {t2-t1:.2f}s", flush=True)
        blocks_contrast = self._run_ocr(clean, logo_confidence)
        t3 = time.perf_counter()
        print(f"TIMING model predict pass1: {t3-t2:.2f}s", flush=True)
        blocks_clahe = self._run_ocr(clahe, logo_confidence)
        t4 = time.perf_counter()
        print(f"TIMING model predict pass2: {t4-t3:.2f}s", flush=True)
        merged = _merge_blocks(blocks_contrast, blocks_clahe)

        # Apply normal confidence threshold to everything outside the logo zone
        filtered = []
        for b in merged:
            y = _top_y(b["bbox"])
            if y <= logo_cutoff_y or b["confidence"] >= min_confidence:
                filtered.append(b)

        filtered.sort(key=lambda b: _top_y(b["bbox"]))
        return filtered

    def _run_ocr(self, image: np.ndarray, min_confidence: float) -> list[dict[str, Any]]:
        """Run a single OCR pass and return filtered blocks."""
        results = self._ocr.predict(image)

        if not results:
            return []

        blocks: list[dict] = []
        for result in results:
            res = result.get("res", result) if hasattr(result, "get") else result

            rec_texts = res.get("rec_texts") if hasattr(res, "get") else getattr(res, "rec_texts", None)
            rec_scores = res.get("rec_scores") if hasattr(res, "get") else getattr(res, "rec_scores", None)
            rec_polys = res.get("rec_polys") if hasattr(res, "get") else getattr(res, "rec_polys", None)

            if rec_texts is None:
                try:
                    for item in result:
                        block = _parse_legacy_item(item)
                        if block and block["confidence"] >= min_confidence:
                            blocks.append(block)
                except (TypeError, ValueError):
                    logger.warning("Could not parse OCR result item: %s", result)
                continue

            for text, score, poly in zip(rec_texts, rec_scores, rec_polys):
                if not text or not text.strip():
                    continue
                score_f = float(score)
                if score_f < min_confidence:
                    continue
                blocks.append({
                    "text": text.strip(),
                    "confidence": score_f,
                    "bbox": poly.tolist() if hasattr(poly, "tolist") else list(poly),
                })

        return blocks


def _enhance_clahe(img: np.ndarray) -> np.ndarray:
    """Apply CLAHE contrast enhancement for the second OCR pass."""
    grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced = clahe.apply(grey)
    return cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR)


def _merge_blocks(primary: list[dict], secondary: list[dict]) -> list[dict]:
    """
    Merge blocks from two OCR passes using Non-Maximum Suppression.

    Combines all blocks, sorts by confidence, and greedily selects
    non-overlapping blocks. This handles cases where the two passes
    split text differently (e.g., one pass detects "Tesco Spanish Chorizo"
    as one block while another splits it into "Tesco" + "Spanish Chorizo").
    """
    all_blocks = primary + secondary
    # Sort by confidence descending — prefer highest confidence first
    all_blocks.sort(key=lambda b: -b["confidence"])

    selected: list[dict] = []
    for block in all_blocks:
        if any(_blocks_overlap(block, sel) for sel in selected):
            continue
        selected.append(block)

    return selected


def _blocks_overlap(a: dict, b: dict) -> bool:
    """Check if two blocks overlap: y-proximity AND horizontal bbox overlap."""
    ay = _top_y(a["bbox"])
    by = _top_y(b["bbox"])
    if abs(ay - by) > _MERGE_Y_TOL:
        return False

    # Check horizontal overlap of bounding boxes
    ax1 = min(pt[0] for pt in a["bbox"])
    ax2 = max(pt[0] for pt in a["bbox"])
    bx1 = min(pt[0] for pt in b["bbox"])
    bx2 = max(pt[0] for pt in b["bbox"])

    return ax1 < bx2 and ax2 > bx1


def _top_y(bbox: list) -> float:
    try:
        return min(pt[1] for pt in bbox)
    except (TypeError, IndexError):
        return 0.0


def _left_x(bbox: list) -> float:
    try:
        return min(pt[0] for pt in bbox)
    except (TypeError, IndexError):
        return 0.0


def _parse_legacy_item(item) -> dict | None:
    """Handle the older PaddleOCR list-of-tuples format as a fallback."""
    try:
        bbox, (text, confidence) = item
        if not text or not text.strip():
            return None
        return {
            "text": text.strip(),
            "confidence": float(confidence),
            "bbox": bbox,
        }
    except (TypeError, ValueError):
        return None