Spaces:
Running
Running
| """ | |
| Stage 2: Parse raw OCR text blocks into structured receipt data. | |
| Uses spatial layout (bounding box coordinates) to reconstruct receipt rows, | |
| rather than relying on single-line regex matching. Receipts have a consistent | |
| column layout: | |
| - Far left: quantity (single digit) | |
| - Middle: item description | |
| - Far right: price | |
| The parser groups blocks into rows by y-coordinate proximity, classifies | |
| columns by x-position, then extracts structured fields. | |
| """ | |
| import re | |
| from typing import Any | |
| # Known UK retailer names (uppercase) for exact matching against header text | |
| _KNOWN_RETAILERS = { | |
| "TESCO", "ASDA", "ALDI", "SAINSBURY'S", "SAINSBURYS", "MORRISONS", | |
| "WAITROSE", "COSTCO", "ICELAND", "SPAR", "NISA", "BOOTHS", | |
| "LIDL", "CO-OP", "M&S", "BUDGENS", "LONDIS", "ONE STOP", | |
| } | |
| # Map of common OCR misreads for stylized logos → canonical retailer name | |
| _OCR_VARIANTS = { | |
| "LODZ": "LIDL", | |
| "LIOL": "LIDL", | |
| "LDL": "LIDL", | |
| "IIDL": "LIDL", | |
| "COOP": "CO-OP", | |
| "CO OP": "CO-OP", | |
| "OWNED BY YOU": "CO-OP", | |
| "OWNED BY YOU.": "CO-OP", | |
| "RIGHT BY YOU": "CO-OP", | |
| "RIGHT BY YOU.": "CO-OP", | |
| "M & S": "M&S", | |
| "MARKS & SPENCER": "M&S", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Compiled patterns | |
| # --------------------------------------------------------------------------- | |
| _DATE_PATTERNS = [ | |
| re.compile(r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b"), | |
| re.compile(r"\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b"), | |
| re.compile( | |
| r"\b(\d{1,2}\s+" | |
| r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*" | |
| r"\s+\d{2,4})\b", | |
| re.IGNORECASE, | |
| ), | |
| ] | |
| _PRICE_EXTRACT_RE = re.compile(r"(-?)[£$€¥]?\s*(\d{1,6}[.,\s]\d{2})") | |
| _TOTALS_KEYWORDS = re.compile( | |
| r"\b(total|sub[\s-]?total|subtotal|savings|promotions|tax|gst|hst|balance)\b", | |
| re.IGNORECASE, | |
| ) | |
| _HEADER_SKIP = re.compile( | |
| r"(www\.|\.com|\.co\.uk|vat\s*(?:no\.?|number)|questions?\s*please|please\s*visit|store.locator)", | |
| re.IGNORECASE, | |
| ) | |
| _DISCOUNT_PREFIX_RE = re.compile(r"^Cc", re.IGNORECASE) | |
| # "1.90 each" or "3.00each" — informational price-per-unit lines, not items | |
| _EACH_RE = re.compile(r"\d\s*each\b", re.IGNORECASE) | |
| # Inline quantity: "2x0.13", "3 x £0.22" — Lidl-style qty on the description line | |
| _INLINE_QTY_RE = re.compile(r"^(\d+)\s*x\s*[£$€¥]?(\d+[.,]\d{2})$", re.IGNORECASE) | |
| # Payment / non-item rows in the totals section | |
| _PAYMENT_SKIP = re.compile(r"\b(cash|change|card|visa|mastercard|amex|contactless|clubcard|credit|debit)\b", re.IGNORECASE) | |
| # Row merging tolerance in pixels | |
| _ROW_Y_TOLERANCE = 30 | |
| # Minimum ratio of "readable" characters (letters, digits, spaces, common punct) | |
| # to filter out ghost text from receipt backs | |
| _MIN_READABLE_RATIO = 0.6 | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def parse_blocks(blocks: list[dict[str, Any]]) -> dict[str, Any]: | |
| """ | |
| Accept OCR blocks (sorted top-to-bottom) and return structured receipt data. | |
| Each block: { "text": str, "confidence": float, "bbox": [...] } | |
| """ | |
| if not blocks: | |
| return _empty_result() | |
| clean_blocks = [b for b in blocks if _is_readable(b["text"])] | |
| if not clean_blocks: | |
| return _empty_result() | |
| rows = _build_rows(clean_blocks) | |
| receipt_width = _estimate_receipt_width(clean_blocks) | |
| date = _extract_date(clean_blocks) | |
| header_end, totals_start = _find_sections(rows, receipt_width) | |
| merchant_name, store_location = _extract_header(rows[:header_end]) | |
| item_rows = rows[header_end:totals_start] | |
| # Tesco receipts can split the first item's qty+description and price | |
| # into two rows when ghost text above the item pulls the row anchor up. | |
| # Detect an orphaned qty row immediately before the first-price row and | |
| # glue its blocks into the first item row. | |
| if header_end > 0 and item_rows: | |
| prev_row = rows[header_end - 1] | |
| if any(_is_qty_block(b, receipt_width) for b in prev_row): | |
| item_rows = [ | |
| sorted(prev_row + item_rows[0], key=lambda b: _left_x(b["bbox"])) | |
| ] + item_rows[1:] | |
| line_items = _extract_line_items(item_rows, receipt_width) | |
| totals = _extract_totals(rows[totals_start:]) | |
| return { | |
| "merchant_name": merchant_name, | |
| "store_location": store_location, | |
| "date": date, | |
| "line_items": line_items, | |
| "subtotal": totals.get("subtotal"), | |
| "savings": totals.get("savings"), | |
| "total": totals.get("total"), | |
| } | |
| def _empty_result() -> dict[str, Any]: | |
| return { | |
| "merchant_name": None, | |
| "store_location": None, | |
| "date": None, | |
| "line_items": [], | |
| "subtotal": None, | |
| "savings": None, | |
| "total": None, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Text quality filter | |
| # --------------------------------------------------------------------------- | |
| def _is_readable(text: str) -> bool: | |
| """ | |
| Filter out garbage text from receipt backs / noise. | |
| Ghost text tends to have: random case mixing, no recognizable words, | |
| high consonant density. Real receipt text has: prices, normal English words, | |
| or standard labels. | |
| """ | |
| if not text or not text.strip(): | |
| return False | |
| stripped = text.strip() | |
| # Single digits pass (could be quantity column) | |
| if len(stripped) == 1: | |
| return stripped.isdigit() | |
| if len(stripped) < 2: | |
| return False | |
| # Prices always pass | |
| if _PRICE_EXTRACT_RE.search(stripped): | |
| return True | |
| # Short tokens (2-3 chars) — allow if they look like real text | |
| if len(stripped) <= 3: | |
| # Allow "Cc", digits, common abbreviations | |
| if re.fullmatch(r"[A-Za-z]{2,3}|[0-9]+", stripped): | |
| return True | |
| return False | |
| # For longer text: check if it has a reasonable ratio of lowercase letters | |
| # and spaces (real English text). Ghost text is often CamelCase garbage | |
| # with no spaces and random character distribution. | |
| alpha = sum(1 for c in stripped if c.isalpha()) | |
| if alpha == 0: | |
| # Pure numbers/symbols — keep if it has recognizable structure | |
| return bool(re.search(r"\d", stripped)) | |
| # Check for word-like patterns (sequences of letters separated by spaces/punct) | |
| words = re.findall(r"[A-Za-z]+", stripped) | |
| if not words: | |
| return False | |
| # Ghost text signature: many words with unusual capitalization mixing | |
| # Real text: "Tesco British Whole Milk", "VAT Number", "Subtotal:" | |
| # Ghost text: "VIAJeY", "biqemoa ot vlggsanoiibno", "68T3-uoniqAoncguaGAon" | |
| # Heuristic: if average word length > 6 and mostly lowercase jumbles, it's noise | |
| avg_word_len = sum(len(w) for w in words) / len(words) | |
| has_spaces = " " in stripped | |
| # Long unbroken text with digits mixed into letters is ghost text | |
| # (e.g., "68T3-uoniqAoncguaGAon"). Pure-letter words like "Cornflakes" | |
| # or "BakedBeansTomSauce" are valid receipt items. | |
| if avg_word_len > 8 and not has_spaces and re.search(r"\d", stripped) and alpha > 5: | |
| return False | |
| # Check consonant density — ghost text has unusual consonant clusters | |
| consonants = sum(1 for c in stripped.lower() if c in "bcdfghjklmnpqrstvwxyz") | |
| vowels = sum(1 for c in stripped.lower() if c in "aeiou") | |
| if vowels > 0 and consonants / (vowels + consonants) > 0.80: | |
| return False | |
| if vowels == 0 and alpha > 3: | |
| return False | |
| return True | |
| # --------------------------------------------------------------------------- | |
| # Row building — group blocks by y-coordinate proximity | |
| # --------------------------------------------------------------------------- | |
| def _top_y(bbox: list) -> float: | |
| try: | |
| return min(pt[1] for pt in bbox) | |
| except (TypeError, IndexError): | |
| return 0.0 | |
| def _left_x(bbox: list) -> float: | |
| try: | |
| return min(pt[0] for pt in bbox) | |
| except (TypeError, IndexError): | |
| return 0.0 | |
| def _build_rows(blocks: list[dict]) -> list[list[dict]]: | |
| """Group blocks into rows by y-coordinate proximity, sorted left-to-right.""" | |
| if not blocks: | |
| return [] | |
| sorted_blocks = sorted(blocks, key=lambda b: _top_y(b["bbox"])) | |
| rows: list[list[dict]] = [] | |
| current_row: list[dict] = [sorted_blocks[0]] | |
| current_y = _top_y(sorted_blocks[0]["bbox"]) | |
| for block in sorted_blocks[1:]: | |
| y = _top_y(block["bbox"]) | |
| if abs(y - current_y) <= _ROW_Y_TOLERANCE: | |
| current_row.append(block) | |
| else: | |
| rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"]))) | |
| current_row = [block] | |
| current_y = y | |
| if current_row: | |
| rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"]))) | |
| return rows | |
| # --------------------------------------------------------------------------- | |
| # Receipt width estimation and column classification | |
| # --------------------------------------------------------------------------- | |
| def _estimate_receipt_width(blocks: list[dict]) -> float: | |
| """Estimate the receipt width from the rightmost x-coordinate.""" | |
| max_x = 0.0 | |
| for block in blocks: | |
| for pt in block["bbox"]: | |
| try: | |
| max_x = max(max_x, float(pt[0])) | |
| except (TypeError, IndexError): | |
| pass | |
| return max_x if max_x > 0 else 1000.0 | |
| def _is_price_block(block: dict, receipt_width: float) -> bool: | |
| """A price block sits in the right column and contains a price pattern.""" | |
| x = _left_x(block["bbox"]) | |
| return x > receipt_width * 0.70 and bool(_PRICE_EXTRACT_RE.search(block["text"])) | |
| def _is_qty_block(block: dict, receipt_width: float) -> bool: | |
| """A quantity block sits in the left column and is a single digit.""" | |
| x = _left_x(block["bbox"]) | |
| return x < receipt_width * 0.12 and re.fullmatch(r"\d", block["text"].strip()) is not None | |
| def _row_has_right_price(row: list[dict], receipt_width: float) -> bool: | |
| """Check if any block in the row is a price in the right column.""" | |
| return any(_is_price_block(b, receipt_width) for b in row) | |
| # --------------------------------------------------------------------------- | |
| # Section detection | |
| # --------------------------------------------------------------------------- | |
| def _find_sections(rows: list[list[dict]], receipt_width: float) -> tuple[int, int]: | |
| """ | |
| Find where the header ends and the totals section begins. | |
| Returns (header_end_index, totals_start_index). | |
| """ | |
| header_end = 0 | |
| totals_start = len(rows) | |
| # Header ends at the first row with a price in the right column | |
| for i, row in enumerate(rows): | |
| if _row_has_right_price(row, receipt_width): | |
| header_end = i | |
| break | |
| # Primary: keyword detection ("Subtotal:", "TOTAL:", etc.) | |
| for i in range(header_end, len(rows)): | |
| row_text = " ".join(b["text"] for b in rows[i]) | |
| if _TOTALS_KEYWORDS.search(row_text): | |
| totals_start = i | |
| break | |
| # Fallback: if no keyword found, look for a vertical gap significantly | |
| # larger than normal item spacing. This handles receipts where OCR | |
| # misses the "TOTAL" text (e.g., Lidl's dashed separator). | |
| if totals_start == len(rows) and len(rows) > header_end + 2: | |
| spacings = [] | |
| for i in range(header_end + 1, len(rows)): | |
| prev_y = max(_top_y(b["bbox"]) for b in rows[i - 1]) | |
| curr_y = min(_top_y(b["bbox"]) for b in rows[i]) | |
| spacings.append(curr_y - prev_y) | |
| if spacings: | |
| avg_spacing = sum(spacings) / len(spacings) | |
| gap_threshold = max(avg_spacing * 1.8, 60) | |
| for i, spacing in enumerate(spacings): | |
| if spacing > gap_threshold: | |
| totals_start = header_end + 1 + i | |
| break | |
| return header_end, totals_start | |
| # --------------------------------------------------------------------------- | |
| # Header extraction | |
| # --------------------------------------------------------------------------- | |
| def _extract_header(header_rows: list[list[dict]]) -> tuple[str | None, str | None]: | |
| """ | |
| Extract merchant name and store location from header rows. | |
| Checks header blocks against a known retailer list (exact match) and | |
| an OCR variants map (e.g., "LODZ" → "LIDL") for stylized logos. | |
| If no known retailer is found, returns (None, None) — the app layer | |
| can prompt the user. | |
| """ | |
| retailer, retailer_y = _match_known_retailer(header_rows) | |
| if not retailer: | |
| return None, None | |
| # Only consider rows below the retailer logo. Thermal-printed receipts | |
| # can curl and reveal the back's ghost text above the logo; that text | |
| # often passes the readability filter and must be excluded here. | |
| store_location = None | |
| for row in header_rows: | |
| row_y = min(_top_y(b["bbox"]) for b in row) | |
| if row_y <= retailer_y: | |
| continue | |
| meaningful = [b for b in row if len(b["text"].strip()) >= 3 | |
| and not _HEADER_SKIP.search(b["text"]) | |
| and not any(p.search(b["text"]) for p in _DATE_PATTERNS) | |
| and b["text"].strip().upper() != retailer | |
| and b["text"].strip().upper() not in _OCR_VARIANTS | |
| and b["confidence"] >= 0.85] | |
| if not meaningful: | |
| continue | |
| best = max(meaningful, key=lambda b: b["confidence"]) | |
| store_location = best["text"].strip() | |
| break | |
| return retailer, store_location | |
| def _match_known_retailer(header_rows: list[list[dict]]) -> tuple[str | None, float]: | |
| """ | |
| Check if any header block matches a known retailer or OCR variant. | |
| Returns (retailer_name, y_coordinate_of_match) or (None, 0.0). | |
| """ | |
| for row in header_rows: | |
| for block in row: | |
| text = block["text"].strip().upper() | |
| if text in _KNOWN_RETAILERS: | |
| return text, _top_y(block["bbox"]) | |
| if text in _OCR_VARIANTS: | |
| return _OCR_VARIANTS[text], _top_y(block["bbox"]) | |
| row_text = " ".join(b["text"].strip() for b in row).strip().upper() | |
| if row_text in _KNOWN_RETAILERS: | |
| return row_text, min(_top_y(b["bbox"]) for b in row) | |
| if row_text in _OCR_VARIANTS: | |
| return _OCR_VARIANTS[row_text], min(_top_y(b["bbox"]) for b in row) | |
| return None, 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Date extraction | |
| # --------------------------------------------------------------------------- | |
| def _extract_date(blocks: list[dict]) -> str | None: | |
| """Scan all blocks for the first date match.""" | |
| for block in blocks: | |
| for pattern in _DATE_PATTERNS: | |
| m = pattern.search(block["text"]) | |
| if m: | |
| return m.group(1) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Line item extraction (price-anchored) | |
| # --------------------------------------------------------------------------- | |
| def _extract_line_items( | |
| item_rows: list[list[dict]], receipt_width: float | |
| ) -> list[dict[str, Any]]: | |
| """ | |
| Row-based line-item extraction. | |
| Walk the rows produced by _build_rows top-to-bottom: | |
| - A row containing a positive right-column price starts a new item | |
| using the non-price blocks in that row as its initial description. | |
| - A row without a price is a continuation — append its text to the | |
| current item. | |
| - A row containing a negative price attaches it as a discount to the | |
| current item (first discount wins). | |
| """ | |
| items: list[dict[str, Any]] = [] | |
| current: dict[str, Any] | None = None | |
| for row in item_rows: | |
| price_blk = next( | |
| (b for b in reversed(row) if _is_price_block(b, receipt_width)), | |
| None, | |
| ) | |
| desc_blocks = [b for b in row if b is not price_blk] | |
| if price_blk is None: | |
| if current is not None: | |
| _append_desc(current, desc_blocks, receipt_width) | |
| continue | |
| price_str = _normalise_price(price_blk["text"]) | |
| if ( | |
| not price_str.startswith("-") | |
| and current is not None | |
| and current["discount"] is None | |
| and any(_is_cc_discount_indicator(b["text"]) for b in desc_blocks) | |
| ): | |
| price_str = f"-{price_str}" | |
| if price_str.startswith("-"): | |
| if current is not None: | |
| _append_desc(current, desc_blocks, receipt_width) | |
| if current["discount"] is None: | |
| current["discount"] = price_str | |
| continue | |
| # No preceding item — standalone negative price (refund/return) | |
| current = { | |
| "description": None, | |
| "quantity": 1, | |
| "unit_price": price_str, | |
| "total_price": price_str, | |
| "discount": None, | |
| } | |
| _append_desc(current, desc_blocks, receipt_width) | |
| items.append(current) | |
| return items | |
| def _is_desc_block(block: dict, receipt_width: float) -> bool: | |
| """ | |
| Description blocks start before the price column (70% mark). | |
| Exclude blocks in the 65-70% zone that are short fragments — these | |
| are typically standalone ghost text from the receipt back. | |
| """ | |
| x = _left_x(block["bbox"]) | |
| return x < receipt_width * 0.65 | |
| def _append_desc(item: dict, desc_blocks: list[dict], receipt_width: float) -> None: | |
| """Merge extra desc blocks into an item, updating qty and unit_price.""" | |
| parts: list[str] = [item["description"]] if item["description"] else [] | |
| for b in desc_blocks: | |
| text = b["text"].strip() | |
| if _is_qty_block(b, receipt_width): | |
| item["quantity"] = int(text) | |
| continue | |
| # Co-op style: qty embedded in description ("1 BATCH S/NOODLE B") | |
| if not item["description"] and not parts: | |
| m_qty = re.match(r"^(\d{1,2})\s+([A-Za-z].+)$", text) | |
| if m_qty: | |
| item["quantity"] = int(m_qty.group(1)) | |
| parts.append(m_qty.group(2).strip()) | |
| continue | |
| if _DISCOUNT_PREFIX_RE.match(text): | |
| continue | |
| if _EACH_RE.search(text): | |
| continue | |
| iq = _INLINE_QTY_RE.match(text) | |
| if iq: | |
| item["quantity"] = int(iq.group(1)) | |
| continue | |
| if not _is_desc_block(b, receipt_width): | |
| continue | |
| parts.append(text) | |
| item["description"] = " ".join(parts).strip() or None | |
| item["unit_price"] = _calc_unit_price(item["total_price"], item["quantity"]) | |
| # --------------------------------------------------------------------------- | |
| # Totals extraction | |
| # --------------------------------------------------------------------------- | |
| def _extract_totals(totals_rows: list[list[dict]]) -> dict[str, str | None]: | |
| """Extract subtotal, savings, and total from the totals section.""" | |
| result: dict[str, str | None] = {"subtotal": None, "savings": None, "total": None} | |
| for ri, row in enumerate(totals_rows): | |
| row_text = " ".join(b["text"] for b in row).strip().lower() | |
| # Find the price — prefer rightmost block | |
| price = None | |
| for block in sorted(row, key=lambda b: _left_x(b["bbox"]), reverse=True): | |
| m = _PRICE_EXTRACT_RE.search(block["text"]) | |
| if m: | |
| price = _normalise_price(block["text"]) | |
| break | |
| if price is None: | |
| continue | |
| # Skip payment rows (CASH, CHANGE, CARD, etc.) | |
| if _PAYMENT_SKIP.search(row_text): | |
| continue | |
| # For savings/promotions, prefer the negative price if available. | |
| # Sometimes OCR splits "Savings: -£6.70" into two rows. | |
| if "saving" in row_text or "promotion" in row_text: | |
| if not price.startswith("-"): | |
| # Check the next row for a standalone negative price | |
| if ri + 1 < len(totals_rows): | |
| next_row = totals_rows[ri + 1] | |
| for nb in sorted(next_row, key=lambda b: _left_x(b["bbox"]), reverse=True): | |
| nm = _PRICE_EXTRACT_RE.search(nb["text"]) | |
| if nm: | |
| np_ = _normalise_price(nb["text"]) | |
| if np_.startswith("-"): | |
| price = np_ | |
| break | |
| result["savings"] = result["savings"] or price | |
| elif "subtotal" in row_text or "sub total" in row_text: | |
| result["subtotal"] = result["subtotal"] or price | |
| elif "total" in row_text and "sub" not in row_text and "card" not in row_text: | |
| result["total"] = result["total"] or price | |
| elif "balance" in row_text: | |
| result["total"] = result["total"] or price | |
| elif result["total"] is None and not price.startswith("-"): | |
| # Standalone positive price with no keyword — treat as total if | |
| # not yet set (handles receipts where OCR misses the "TOTAL" text). | |
| # Negative standalone prices are savings/discounts, not totals. | |
| result["total"] = price | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _normalise_price(text: str) -> str: | |
| """Extract and standardise a price string.""" | |
| m = _PRICE_EXTRACT_RE.search(text) | |
| if not m: | |
| return text.strip() | |
| sign = m.group(1) | |
| digits = m.group(2).replace(",", ".").replace(" ", ".") | |
| cleaned = re.sub(r"[£$€¥\s]", "", digits) | |
| return f"{sign}{cleaned}" | |
| def _is_cc_discount_indicator(text: str) -> bool: | |
| """True when text is a Clubcard discount marker: 'Cc' followed by a price | |
| and no other meaningful words. Distinguishes 'Cc £2.25' (discount) from | |
| 'Cc Any 3 For 2' (promotion label).""" | |
| text = text.strip() | |
| if not _DISCOUNT_PREFIX_RE.match(text): | |
| return False | |
| after_cc = text[2:].strip() | |
| if not re.search(r"\d", after_cc): | |
| return False | |
| cleaned = _PRICE_EXTRACT_RE.sub("", after_cc) | |
| cleaned = re.sub(r"[£$€¥\d.,\s]", "", cleaned) | |
| return len(cleaned) <= 2 | |
| def _calc_unit_price(total_price: str, quantity: int) -> str: | |
| """Calculate unit price from total and quantity.""" | |
| if quantity <= 1: | |
| return total_price | |
| try: | |
| return str(round(float(total_price) / quantity, 2)) | |
| except (ValueError, ZeroDivisionError): | |
| return total_price | |