""" Stage 2: Parse raw OCR text blocks into structured receipt data. Uses spatial layout (bounding box coordinates) to reconstruct receipt rows, rather than relying on single-line regex matching. Receipts have a consistent column layout: - Far left: quantity (single digit) - Middle: item description - Far right: price The parser groups blocks into rows by y-coordinate proximity, classifies columns by x-position, then extracts structured fields. """ import re from typing import Any # Known UK retailer names (uppercase) for exact matching against header text _KNOWN_RETAILERS = { "TESCO", "ASDA", "ALDI", "SAINSBURY'S", "SAINSBURYS", "MORRISONS", "WAITROSE", "COSTCO", "ICELAND", "SPAR", "NISA", "BOOTHS", "LIDL", "CO-OP", "M&S", "BUDGENS", "LONDIS", "ONE STOP", } # Map of common OCR misreads for stylized logos → canonical retailer name _OCR_VARIANTS = { "LODZ": "LIDL", "LIOL": "LIDL", "LDL": "LIDL", "IIDL": "LIDL", "COOP": "CO-OP", "CO OP": "CO-OP", "OWNED BY YOU": "CO-OP", "OWNED BY YOU.": "CO-OP", "RIGHT BY YOU": "CO-OP", "RIGHT BY YOU.": "CO-OP", "M & S": "M&S", "MARKS & SPENCER": "M&S", } # --------------------------------------------------------------------------- # Compiled patterns # --------------------------------------------------------------------------- _DATE_PATTERNS = [ re.compile(r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b"), re.compile(r"\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b"), re.compile( r"\b(\d{1,2}\s+" r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*" r"\s+\d{2,4})\b", re.IGNORECASE, ), ] _PRICE_EXTRACT_RE = re.compile(r"(-?)[£$€¥]?\s*(\d{1,6}[.,\s]\d{2})") _TOTALS_KEYWORDS = re.compile( r"\b(total|sub[\s-]?total|subtotal|savings|promotions|tax|gst|hst|balance)\b", re.IGNORECASE, ) _HEADER_SKIP = re.compile( r"(www\.|\.com|\.co\.uk|vat\s*(?:no\.?|number)|questions?\s*please|please\s*visit|store.locator)", re.IGNORECASE, ) _DISCOUNT_PREFIX_RE = re.compile(r"^Cc", re.IGNORECASE) # "1.90 each" or "3.00each" — informational price-per-unit lines, not items _EACH_RE = re.compile(r"\d\s*each\b", re.IGNORECASE) # Inline quantity: "2x0.13", "3 x £0.22" — Lidl-style qty on the description line _INLINE_QTY_RE = re.compile(r"^(\d+)\s*x\s*[£$€¥]?(\d+[.,]\d{2})$", re.IGNORECASE) # Payment / non-item rows in the totals section _PAYMENT_SKIP = re.compile(r"\b(cash|change|card|visa|mastercard|amex|contactless|clubcard|credit|debit)\b", re.IGNORECASE) # Row merging tolerance in pixels _ROW_Y_TOLERANCE = 30 # Minimum ratio of "readable" characters (letters, digits, spaces, common punct) # to filter out ghost text from receipt backs _MIN_READABLE_RATIO = 0.6 # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def parse_blocks(blocks: list[dict[str, Any]]) -> dict[str, Any]: """ Accept OCR blocks (sorted top-to-bottom) and return structured receipt data. Each block: { "text": str, "confidence": float, "bbox": [...] } """ if not blocks: return _empty_result() clean_blocks = [b for b in blocks if _is_readable(b["text"])] if not clean_blocks: return _empty_result() rows = _build_rows(clean_blocks) receipt_width = _estimate_receipt_width(clean_blocks) date = _extract_date(clean_blocks) header_end, totals_start = _find_sections(rows, receipt_width) merchant_name, store_location = _extract_header(rows[:header_end]) item_rows = rows[header_end:totals_start] # Tesco receipts can split the first item's qty+description and price # into two rows when ghost text above the item pulls the row anchor up. # Detect an orphaned qty row immediately before the first-price row and # glue its blocks into the first item row. if header_end > 0 and item_rows: prev_row = rows[header_end - 1] if any(_is_qty_block(b, receipt_width) for b in prev_row): item_rows = [ sorted(prev_row + item_rows[0], key=lambda b: _left_x(b["bbox"])) ] + item_rows[1:] line_items = _extract_line_items(item_rows, receipt_width) totals = _extract_totals(rows[totals_start:]) return { "merchant_name": merchant_name, "store_location": store_location, "date": date, "line_items": line_items, "subtotal": totals.get("subtotal"), "savings": totals.get("savings"), "total": totals.get("total"), } def _empty_result() -> dict[str, Any]: return { "merchant_name": None, "store_location": None, "date": None, "line_items": [], "subtotal": None, "savings": None, "total": None, } # --------------------------------------------------------------------------- # Text quality filter # --------------------------------------------------------------------------- def _is_readable(text: str) -> bool: """ Filter out garbage text from receipt backs / noise. Ghost text tends to have: random case mixing, no recognizable words, high consonant density. Real receipt text has: prices, normal English words, or standard labels. """ if not text or not text.strip(): return False stripped = text.strip() # Single digits pass (could be quantity column) if len(stripped) == 1: return stripped.isdigit() if len(stripped) < 2: return False # Prices always pass if _PRICE_EXTRACT_RE.search(stripped): return True # Short tokens (2-3 chars) — allow if they look like real text if len(stripped) <= 3: # Allow "Cc", digits, common abbreviations if re.fullmatch(r"[A-Za-z]{2,3}|[0-9]+", stripped): return True return False # For longer text: check if it has a reasonable ratio of lowercase letters # and spaces (real English text). Ghost text is often CamelCase garbage # with no spaces and random character distribution. alpha = sum(1 for c in stripped if c.isalpha()) if alpha == 0: # Pure numbers/symbols — keep if it has recognizable structure return bool(re.search(r"\d", stripped)) # Check for word-like patterns (sequences of letters separated by spaces/punct) words = re.findall(r"[A-Za-z]+", stripped) if not words: return False # Ghost text signature: many words with unusual capitalization mixing # Real text: "Tesco British Whole Milk", "VAT Number", "Subtotal:" # Ghost text: "VIAJeY", "biqemoa ot vlggsanoiibno", "68T3-uoniqAoncguaGAon" # Heuristic: if average word length > 6 and mostly lowercase jumbles, it's noise avg_word_len = sum(len(w) for w in words) / len(words) has_spaces = " " in stripped # Long unbroken text with digits mixed into letters is ghost text # (e.g., "68T3-uoniqAoncguaGAon"). Pure-letter words like "Cornflakes" # or "BakedBeansTomSauce" are valid receipt items. if avg_word_len > 8 and not has_spaces and re.search(r"\d", stripped) and alpha > 5: return False # Check consonant density — ghost text has unusual consonant clusters consonants = sum(1 for c in stripped.lower() if c in "bcdfghjklmnpqrstvwxyz") vowels = sum(1 for c in stripped.lower() if c in "aeiou") if vowels > 0 and consonants / (vowels + consonants) > 0.80: return False if vowels == 0 and alpha > 3: return False return True # --------------------------------------------------------------------------- # Row building — group blocks by y-coordinate proximity # --------------------------------------------------------------------------- def _top_y(bbox: list) -> float: try: return min(pt[1] for pt in bbox) except (TypeError, IndexError): return 0.0 def _left_x(bbox: list) -> float: try: return min(pt[0] for pt in bbox) except (TypeError, IndexError): return 0.0 def _build_rows(blocks: list[dict]) -> list[list[dict]]: """Group blocks into rows by y-coordinate proximity, sorted left-to-right.""" if not blocks: return [] sorted_blocks = sorted(blocks, key=lambda b: _top_y(b["bbox"])) rows: list[list[dict]] = [] current_row: list[dict] = [sorted_blocks[0]] current_y = _top_y(sorted_blocks[0]["bbox"]) for block in sorted_blocks[1:]: y = _top_y(block["bbox"]) if abs(y - current_y) <= _ROW_Y_TOLERANCE: current_row.append(block) else: rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"]))) current_row = [block] current_y = y if current_row: rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"]))) return rows # --------------------------------------------------------------------------- # Receipt width estimation and column classification # --------------------------------------------------------------------------- def _estimate_receipt_width(blocks: list[dict]) -> float: """Estimate the receipt width from the rightmost x-coordinate.""" max_x = 0.0 for block in blocks: for pt in block["bbox"]: try: max_x = max(max_x, float(pt[0])) except (TypeError, IndexError): pass return max_x if max_x > 0 else 1000.0 def _is_price_block(block: dict, receipt_width: float) -> bool: """A price block sits in the right column and contains a price pattern.""" x = _left_x(block["bbox"]) return x > receipt_width * 0.70 and bool(_PRICE_EXTRACT_RE.search(block["text"])) def _is_qty_block(block: dict, receipt_width: float) -> bool: """A quantity block sits in the left column and is a single digit.""" x = _left_x(block["bbox"]) return x < receipt_width * 0.12 and re.fullmatch(r"\d", block["text"].strip()) is not None def _row_has_right_price(row: list[dict], receipt_width: float) -> bool: """Check if any block in the row is a price in the right column.""" return any(_is_price_block(b, receipt_width) for b in row) # --------------------------------------------------------------------------- # Section detection # --------------------------------------------------------------------------- def _find_sections(rows: list[list[dict]], receipt_width: float) -> tuple[int, int]: """ Find where the header ends and the totals section begins. Returns (header_end_index, totals_start_index). """ header_end = 0 totals_start = len(rows) # Header ends at the first row with a price in the right column for i, row in enumerate(rows): if _row_has_right_price(row, receipt_width): header_end = i break # Primary: keyword detection ("Subtotal:", "TOTAL:", etc.) for i in range(header_end, len(rows)): row_text = " ".join(b["text"] for b in rows[i]) if _TOTALS_KEYWORDS.search(row_text): totals_start = i break # Fallback: if no keyword found, look for a vertical gap significantly # larger than normal item spacing. This handles receipts where OCR # misses the "TOTAL" text (e.g., Lidl's dashed separator). if totals_start == len(rows) and len(rows) > header_end + 2: spacings = [] for i in range(header_end + 1, len(rows)): prev_y = max(_top_y(b["bbox"]) for b in rows[i - 1]) curr_y = min(_top_y(b["bbox"]) for b in rows[i]) spacings.append(curr_y - prev_y) if spacings: avg_spacing = sum(spacings) / len(spacings) gap_threshold = max(avg_spacing * 1.8, 60) for i, spacing in enumerate(spacings): if spacing > gap_threshold: totals_start = header_end + 1 + i break return header_end, totals_start # --------------------------------------------------------------------------- # Header extraction # --------------------------------------------------------------------------- def _extract_header(header_rows: list[list[dict]]) -> tuple[str | None, str | None]: """ Extract merchant name and store location from header rows. Checks header blocks against a known retailer list (exact match) and an OCR variants map (e.g., "LODZ" → "LIDL") for stylized logos. If no known retailer is found, returns (None, None) — the app layer can prompt the user. """ retailer, retailer_y = _match_known_retailer(header_rows) if not retailer: return None, None # Only consider rows below the retailer logo. Thermal-printed receipts # can curl and reveal the back's ghost text above the logo; that text # often passes the readability filter and must be excluded here. store_location = None for row in header_rows: row_y = min(_top_y(b["bbox"]) for b in row) if row_y <= retailer_y: continue meaningful = [b for b in row if len(b["text"].strip()) >= 3 and not _HEADER_SKIP.search(b["text"]) and not any(p.search(b["text"]) for p in _DATE_PATTERNS) and b["text"].strip().upper() != retailer and b["text"].strip().upper() not in _OCR_VARIANTS and b["confidence"] >= 0.85] if not meaningful: continue best = max(meaningful, key=lambda b: b["confidence"]) store_location = best["text"].strip() break return retailer, store_location def _match_known_retailer(header_rows: list[list[dict]]) -> tuple[str | None, float]: """ Check if any header block matches a known retailer or OCR variant. Returns (retailer_name, y_coordinate_of_match) or (None, 0.0). """ for row in header_rows: for block in row: text = block["text"].strip().upper() if text in _KNOWN_RETAILERS: return text, _top_y(block["bbox"]) if text in _OCR_VARIANTS: return _OCR_VARIANTS[text], _top_y(block["bbox"]) row_text = " ".join(b["text"].strip() for b in row).strip().upper() if row_text in _KNOWN_RETAILERS: return row_text, min(_top_y(b["bbox"]) for b in row) if row_text in _OCR_VARIANTS: return _OCR_VARIANTS[row_text], min(_top_y(b["bbox"]) for b in row) return None, 0.0 # --------------------------------------------------------------------------- # Date extraction # --------------------------------------------------------------------------- def _extract_date(blocks: list[dict]) -> str | None: """Scan all blocks for the first date match.""" for block in blocks: for pattern in _DATE_PATTERNS: m = pattern.search(block["text"]) if m: return m.group(1) return None # --------------------------------------------------------------------------- # Line item extraction (price-anchored) # --------------------------------------------------------------------------- def _extract_line_items( item_rows: list[list[dict]], receipt_width: float ) -> list[dict[str, Any]]: """ Row-based line-item extraction. Walk the rows produced by _build_rows top-to-bottom: - A row containing a positive right-column price starts a new item using the non-price blocks in that row as its initial description. - A row without a price is a continuation — append its text to the current item. - A row containing a negative price attaches it as a discount to the current item (first discount wins). """ items: list[dict[str, Any]] = [] current: dict[str, Any] | None = None for row in item_rows: price_blk = next( (b for b in reversed(row) if _is_price_block(b, receipt_width)), None, ) desc_blocks = [b for b in row if b is not price_blk] if price_blk is None: if current is not None: _append_desc(current, desc_blocks, receipt_width) continue price_str = _normalise_price(price_blk["text"]) if ( not price_str.startswith("-") and current is not None and current["discount"] is None and any(_is_cc_discount_indicator(b["text"]) for b in desc_blocks) ): price_str = f"-{price_str}" if price_str.startswith("-"): if current is not None: _append_desc(current, desc_blocks, receipt_width) if current["discount"] is None: current["discount"] = price_str continue # No preceding item — standalone negative price (refund/return) current = { "description": None, "quantity": 1, "unit_price": price_str, "total_price": price_str, "discount": None, } _append_desc(current, desc_blocks, receipt_width) items.append(current) return items def _is_desc_block(block: dict, receipt_width: float) -> bool: """ Description blocks start before the price column (70% mark). Exclude blocks in the 65-70% zone that are short fragments — these are typically standalone ghost text from the receipt back. """ x = _left_x(block["bbox"]) return x < receipt_width * 0.65 def _append_desc(item: dict, desc_blocks: list[dict], receipt_width: float) -> None: """Merge extra desc blocks into an item, updating qty and unit_price.""" parts: list[str] = [item["description"]] if item["description"] else [] for b in desc_blocks: text = b["text"].strip() if _is_qty_block(b, receipt_width): item["quantity"] = int(text) continue # Co-op style: qty embedded in description ("1 BATCH S/NOODLE B") if not item["description"] and not parts: m_qty = re.match(r"^(\d{1,2})\s+([A-Za-z].+)$", text) if m_qty: item["quantity"] = int(m_qty.group(1)) parts.append(m_qty.group(2).strip()) continue if _DISCOUNT_PREFIX_RE.match(text): continue if _EACH_RE.search(text): continue iq = _INLINE_QTY_RE.match(text) if iq: item["quantity"] = int(iq.group(1)) continue if not _is_desc_block(b, receipt_width): continue parts.append(text) item["description"] = " ".join(parts).strip() or None item["unit_price"] = _calc_unit_price(item["total_price"], item["quantity"]) # --------------------------------------------------------------------------- # Totals extraction # --------------------------------------------------------------------------- def _extract_totals(totals_rows: list[list[dict]]) -> dict[str, str | None]: """Extract subtotal, savings, and total from the totals section.""" result: dict[str, str | None] = {"subtotal": None, "savings": None, "total": None} for ri, row in enumerate(totals_rows): row_text = " ".join(b["text"] for b in row).strip().lower() # Find the price — prefer rightmost block price = None for block in sorted(row, key=lambda b: _left_x(b["bbox"]), reverse=True): m = _PRICE_EXTRACT_RE.search(block["text"]) if m: price = _normalise_price(block["text"]) break if price is None: continue # Skip payment rows (CASH, CHANGE, CARD, etc.) if _PAYMENT_SKIP.search(row_text): continue # For savings/promotions, prefer the negative price if available. # Sometimes OCR splits "Savings: -£6.70" into two rows. if "saving" in row_text or "promotion" in row_text: if not price.startswith("-"): # Check the next row for a standalone negative price if ri + 1 < len(totals_rows): next_row = totals_rows[ri + 1] for nb in sorted(next_row, key=lambda b: _left_x(b["bbox"]), reverse=True): nm = _PRICE_EXTRACT_RE.search(nb["text"]) if nm: np_ = _normalise_price(nb["text"]) if np_.startswith("-"): price = np_ break result["savings"] = result["savings"] or price elif "subtotal" in row_text or "sub total" in row_text: result["subtotal"] = result["subtotal"] or price elif "total" in row_text and "sub" not in row_text and "card" not in row_text: result["total"] = result["total"] or price elif "balance" in row_text: result["total"] = result["total"] or price elif result["total"] is None and not price.startswith("-"): # Standalone positive price with no keyword — treat as total if # not yet set (handles receipts where OCR misses the "TOTAL" text). # Negative standalone prices are savings/discounts, not totals. result["total"] = price return result # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _normalise_price(text: str) -> str: """Extract and standardise a price string.""" m = _PRICE_EXTRACT_RE.search(text) if not m: return text.strip() sign = m.group(1) digits = m.group(2).replace(",", ".").replace(" ", ".") cleaned = re.sub(r"[£$€¥\s]", "", digits) return f"{sign}{cleaned}" def _is_cc_discount_indicator(text: str) -> bool: """True when text is a Clubcard discount marker: 'Cc' followed by a price and no other meaningful words. Distinguishes 'Cc £2.25' (discount) from 'Cc Any 3 For 2' (promotion label).""" text = text.strip() if not _DISCOUNT_PREFIX_RE.match(text): return False after_cc = text[2:].strip() if not re.search(r"\d", after_cc): return False cleaned = _PRICE_EXTRACT_RE.sub("", after_cc) cleaned = re.sub(r"[£$€¥\d.,\s]", "", cleaned) return len(cleaned) <= 2 def _calc_unit_price(total_price: str, quantity: int) -> str: """Calculate unit price from total and quantity.""" if quantity <= 1: return total_price try: return str(round(float(total_price) / quantity, 2)) except (ValueError, ZeroDivisionError): return total_price