receipt-ocr / ocr /parser.py
sinful1's picture
fix(parser): detect Co-op via tagline, BALANCE DUE as total, strip inline qty, filter CREDIT/DEBIT
bb788ea
"""
Stage 2: Parse raw OCR text blocks into structured receipt data.
Uses spatial layout (bounding box coordinates) to reconstruct receipt rows,
rather than relying on single-line regex matching. Receipts have a consistent
column layout:
- Far left: quantity (single digit)
- Middle: item description
- Far right: price
The parser groups blocks into rows by y-coordinate proximity, classifies
columns by x-position, then extracts structured fields.
"""
import re
from typing import Any
# Known UK retailer names (uppercase) for exact matching against header text
_KNOWN_RETAILERS = {
"TESCO", "ASDA", "ALDI", "SAINSBURY'S", "SAINSBURYS", "MORRISONS",
"WAITROSE", "COSTCO", "ICELAND", "SPAR", "NISA", "BOOTHS",
"LIDL", "CO-OP", "M&S", "BUDGENS", "LONDIS", "ONE STOP",
}
# Map of common OCR misreads for stylized logos → canonical retailer name
_OCR_VARIANTS = {
"LODZ": "LIDL",
"LIOL": "LIDL",
"LDL": "LIDL",
"IIDL": "LIDL",
"COOP": "CO-OP",
"CO OP": "CO-OP",
"OWNED BY YOU": "CO-OP",
"OWNED BY YOU.": "CO-OP",
"RIGHT BY YOU": "CO-OP",
"RIGHT BY YOU.": "CO-OP",
"M & S": "M&S",
"MARKS & SPENCER": "M&S",
}
# ---------------------------------------------------------------------------
# Compiled patterns
# ---------------------------------------------------------------------------
_DATE_PATTERNS = [
re.compile(r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b"),
re.compile(r"\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b"),
re.compile(
r"\b(\d{1,2}\s+"
r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*"
r"\s+\d{2,4})\b",
re.IGNORECASE,
),
]
_PRICE_EXTRACT_RE = re.compile(r"(-?)[£$€¥]?\s*(\d{1,6}[.,\s]\d{2})")
_TOTALS_KEYWORDS = re.compile(
r"\b(total|sub[\s-]?total|subtotal|savings|promotions|tax|gst|hst|balance)\b",
re.IGNORECASE,
)
_HEADER_SKIP = re.compile(
r"(www\.|\.com|\.co\.uk|vat\s*(?:no\.?|number)|questions?\s*please|please\s*visit|store.locator)",
re.IGNORECASE,
)
_DISCOUNT_PREFIX_RE = re.compile(r"^Cc", re.IGNORECASE)
# "1.90 each" or "3.00each" — informational price-per-unit lines, not items
_EACH_RE = re.compile(r"\d\s*each\b", re.IGNORECASE)
# Inline quantity: "2x0.13", "3 x £0.22" — Lidl-style qty on the description line
_INLINE_QTY_RE = re.compile(r"^(\d+)\s*x\s*[£$€¥]?(\d+[.,]\d{2})$", re.IGNORECASE)
# Payment / non-item rows in the totals section
_PAYMENT_SKIP = re.compile(r"\b(cash|change|card|visa|mastercard|amex|contactless|clubcard|credit|debit)\b", re.IGNORECASE)
# Row merging tolerance in pixels
_ROW_Y_TOLERANCE = 30
# Minimum ratio of "readable" characters (letters, digits, spaces, common punct)
# to filter out ghost text from receipt backs
_MIN_READABLE_RATIO = 0.6
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_blocks(blocks: list[dict[str, Any]]) -> dict[str, Any]:
"""
Accept OCR blocks (sorted top-to-bottom) and return structured receipt data.
Each block: { "text": str, "confidence": float, "bbox": [...] }
"""
if not blocks:
return _empty_result()
clean_blocks = [b for b in blocks if _is_readable(b["text"])]
if not clean_blocks:
return _empty_result()
rows = _build_rows(clean_blocks)
receipt_width = _estimate_receipt_width(clean_blocks)
date = _extract_date(clean_blocks)
header_end, totals_start = _find_sections(rows, receipt_width)
merchant_name, store_location = _extract_header(rows[:header_end])
item_rows = rows[header_end:totals_start]
# Tesco receipts can split the first item's qty+description and price
# into two rows when ghost text above the item pulls the row anchor up.
# Detect an orphaned qty row immediately before the first-price row and
# glue its blocks into the first item row.
if header_end > 0 and item_rows:
prev_row = rows[header_end - 1]
if any(_is_qty_block(b, receipt_width) for b in prev_row):
item_rows = [
sorted(prev_row + item_rows[0], key=lambda b: _left_x(b["bbox"]))
] + item_rows[1:]
line_items = _extract_line_items(item_rows, receipt_width)
totals = _extract_totals(rows[totals_start:])
return {
"merchant_name": merchant_name,
"store_location": store_location,
"date": date,
"line_items": line_items,
"subtotal": totals.get("subtotal"),
"savings": totals.get("savings"),
"total": totals.get("total"),
}
def _empty_result() -> dict[str, Any]:
return {
"merchant_name": None,
"store_location": None,
"date": None,
"line_items": [],
"subtotal": None,
"savings": None,
"total": None,
}
# ---------------------------------------------------------------------------
# Text quality filter
# ---------------------------------------------------------------------------
def _is_readable(text: str) -> bool:
"""
Filter out garbage text from receipt backs / noise.
Ghost text tends to have: random case mixing, no recognizable words,
high consonant density. Real receipt text has: prices, normal English words,
or standard labels.
"""
if not text or not text.strip():
return False
stripped = text.strip()
# Single digits pass (could be quantity column)
if len(stripped) == 1:
return stripped.isdigit()
if len(stripped) < 2:
return False
# Prices always pass
if _PRICE_EXTRACT_RE.search(stripped):
return True
# Short tokens (2-3 chars) — allow if they look like real text
if len(stripped) <= 3:
# Allow "Cc", digits, common abbreviations
if re.fullmatch(r"[A-Za-z]{2,3}|[0-9]+", stripped):
return True
return False
# For longer text: check if it has a reasonable ratio of lowercase letters
# and spaces (real English text). Ghost text is often CamelCase garbage
# with no spaces and random character distribution.
alpha = sum(1 for c in stripped if c.isalpha())
if alpha == 0:
# Pure numbers/symbols — keep if it has recognizable structure
return bool(re.search(r"\d", stripped))
# Check for word-like patterns (sequences of letters separated by spaces/punct)
words = re.findall(r"[A-Za-z]+", stripped)
if not words:
return False
# Ghost text signature: many words with unusual capitalization mixing
# Real text: "Tesco British Whole Milk", "VAT Number", "Subtotal:"
# Ghost text: "VIAJeY", "biqemoa ot vlggsanoiibno", "68T3-uoniqAoncguaGAon"
# Heuristic: if average word length > 6 and mostly lowercase jumbles, it's noise
avg_word_len = sum(len(w) for w in words) / len(words)
has_spaces = " " in stripped
# Long unbroken text with digits mixed into letters is ghost text
# (e.g., "68T3-uoniqAoncguaGAon"). Pure-letter words like "Cornflakes"
# or "BakedBeansTomSauce" are valid receipt items.
if avg_word_len > 8 and not has_spaces and re.search(r"\d", stripped) and alpha > 5:
return False
# Check consonant density — ghost text has unusual consonant clusters
consonants = sum(1 for c in stripped.lower() if c in "bcdfghjklmnpqrstvwxyz")
vowels = sum(1 for c in stripped.lower() if c in "aeiou")
if vowels > 0 and consonants / (vowels + consonants) > 0.80:
return False
if vowels == 0 and alpha > 3:
return False
return True
# ---------------------------------------------------------------------------
# Row building — group blocks by y-coordinate proximity
# ---------------------------------------------------------------------------
def _top_y(bbox: list) -> float:
try:
return min(pt[1] for pt in bbox)
except (TypeError, IndexError):
return 0.0
def _left_x(bbox: list) -> float:
try:
return min(pt[0] for pt in bbox)
except (TypeError, IndexError):
return 0.0
def _build_rows(blocks: list[dict]) -> list[list[dict]]:
"""Group blocks into rows by y-coordinate proximity, sorted left-to-right."""
if not blocks:
return []
sorted_blocks = sorted(blocks, key=lambda b: _top_y(b["bbox"]))
rows: list[list[dict]] = []
current_row: list[dict] = [sorted_blocks[0]]
current_y = _top_y(sorted_blocks[0]["bbox"])
for block in sorted_blocks[1:]:
y = _top_y(block["bbox"])
if abs(y - current_y) <= _ROW_Y_TOLERANCE:
current_row.append(block)
else:
rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"])))
current_row = [block]
current_y = y
if current_row:
rows.append(sorted(current_row, key=lambda b: _left_x(b["bbox"])))
return rows
# ---------------------------------------------------------------------------
# Receipt width estimation and column classification
# ---------------------------------------------------------------------------
def _estimate_receipt_width(blocks: list[dict]) -> float:
"""Estimate the receipt width from the rightmost x-coordinate."""
max_x = 0.0
for block in blocks:
for pt in block["bbox"]:
try:
max_x = max(max_x, float(pt[0]))
except (TypeError, IndexError):
pass
return max_x if max_x > 0 else 1000.0
def _is_price_block(block: dict, receipt_width: float) -> bool:
"""A price block sits in the right column and contains a price pattern."""
x = _left_x(block["bbox"])
return x > receipt_width * 0.70 and bool(_PRICE_EXTRACT_RE.search(block["text"]))
def _is_qty_block(block: dict, receipt_width: float) -> bool:
"""A quantity block sits in the left column and is a single digit."""
x = _left_x(block["bbox"])
return x < receipt_width * 0.12 and re.fullmatch(r"\d", block["text"].strip()) is not None
def _row_has_right_price(row: list[dict], receipt_width: float) -> bool:
"""Check if any block in the row is a price in the right column."""
return any(_is_price_block(b, receipt_width) for b in row)
# ---------------------------------------------------------------------------
# Section detection
# ---------------------------------------------------------------------------
def _find_sections(rows: list[list[dict]], receipt_width: float) -> tuple[int, int]:
"""
Find where the header ends and the totals section begins.
Returns (header_end_index, totals_start_index).
"""
header_end = 0
totals_start = len(rows)
# Header ends at the first row with a price in the right column
for i, row in enumerate(rows):
if _row_has_right_price(row, receipt_width):
header_end = i
break
# Primary: keyword detection ("Subtotal:", "TOTAL:", etc.)
for i in range(header_end, len(rows)):
row_text = " ".join(b["text"] for b in rows[i])
if _TOTALS_KEYWORDS.search(row_text):
totals_start = i
break
# Fallback: if no keyword found, look for a vertical gap significantly
# larger than normal item spacing. This handles receipts where OCR
# misses the "TOTAL" text (e.g., Lidl's dashed separator).
if totals_start == len(rows) and len(rows) > header_end + 2:
spacings = []
for i in range(header_end + 1, len(rows)):
prev_y = max(_top_y(b["bbox"]) for b in rows[i - 1])
curr_y = min(_top_y(b["bbox"]) for b in rows[i])
spacings.append(curr_y - prev_y)
if spacings:
avg_spacing = sum(spacings) / len(spacings)
gap_threshold = max(avg_spacing * 1.8, 60)
for i, spacing in enumerate(spacings):
if spacing > gap_threshold:
totals_start = header_end + 1 + i
break
return header_end, totals_start
# ---------------------------------------------------------------------------
# Header extraction
# ---------------------------------------------------------------------------
def _extract_header(header_rows: list[list[dict]]) -> tuple[str | None, str | None]:
"""
Extract merchant name and store location from header rows.
Checks header blocks against a known retailer list (exact match) and
an OCR variants map (e.g., "LODZ" → "LIDL") for stylized logos.
If no known retailer is found, returns (None, None) — the app layer
can prompt the user.
"""
retailer, retailer_y = _match_known_retailer(header_rows)
if not retailer:
return None, None
# Only consider rows below the retailer logo. Thermal-printed receipts
# can curl and reveal the back's ghost text above the logo; that text
# often passes the readability filter and must be excluded here.
store_location = None
for row in header_rows:
row_y = min(_top_y(b["bbox"]) for b in row)
if row_y <= retailer_y:
continue
meaningful = [b for b in row if len(b["text"].strip()) >= 3
and not _HEADER_SKIP.search(b["text"])
and not any(p.search(b["text"]) for p in _DATE_PATTERNS)
and b["text"].strip().upper() != retailer
and b["text"].strip().upper() not in _OCR_VARIANTS
and b["confidence"] >= 0.85]
if not meaningful:
continue
best = max(meaningful, key=lambda b: b["confidence"])
store_location = best["text"].strip()
break
return retailer, store_location
def _match_known_retailer(header_rows: list[list[dict]]) -> tuple[str | None, float]:
"""
Check if any header block matches a known retailer or OCR variant.
Returns (retailer_name, y_coordinate_of_match) or (None, 0.0).
"""
for row in header_rows:
for block in row:
text = block["text"].strip().upper()
if text in _KNOWN_RETAILERS:
return text, _top_y(block["bbox"])
if text in _OCR_VARIANTS:
return _OCR_VARIANTS[text], _top_y(block["bbox"])
row_text = " ".join(b["text"].strip() for b in row).strip().upper()
if row_text in _KNOWN_RETAILERS:
return row_text, min(_top_y(b["bbox"]) for b in row)
if row_text in _OCR_VARIANTS:
return _OCR_VARIANTS[row_text], min(_top_y(b["bbox"]) for b in row)
return None, 0.0
# ---------------------------------------------------------------------------
# Date extraction
# ---------------------------------------------------------------------------
def _extract_date(blocks: list[dict]) -> str | None:
"""Scan all blocks for the first date match."""
for block in blocks:
for pattern in _DATE_PATTERNS:
m = pattern.search(block["text"])
if m:
return m.group(1)
return None
# ---------------------------------------------------------------------------
# Line item extraction (price-anchored)
# ---------------------------------------------------------------------------
def _extract_line_items(
item_rows: list[list[dict]], receipt_width: float
) -> list[dict[str, Any]]:
"""
Row-based line-item extraction.
Walk the rows produced by _build_rows top-to-bottom:
- A row containing a positive right-column price starts a new item
using the non-price blocks in that row as its initial description.
- A row without a price is a continuation — append its text to the
current item.
- A row containing a negative price attaches it as a discount to the
current item (first discount wins).
"""
items: list[dict[str, Any]] = []
current: dict[str, Any] | None = None
for row in item_rows:
price_blk = next(
(b for b in reversed(row) if _is_price_block(b, receipt_width)),
None,
)
desc_blocks = [b for b in row if b is not price_blk]
if price_blk is None:
if current is not None:
_append_desc(current, desc_blocks, receipt_width)
continue
price_str = _normalise_price(price_blk["text"])
if (
not price_str.startswith("-")
and current is not None
and current["discount"] is None
and any(_is_cc_discount_indicator(b["text"]) for b in desc_blocks)
):
price_str = f"-{price_str}"
if price_str.startswith("-"):
if current is not None:
_append_desc(current, desc_blocks, receipt_width)
if current["discount"] is None:
current["discount"] = price_str
continue
# No preceding item — standalone negative price (refund/return)
current = {
"description": None,
"quantity": 1,
"unit_price": price_str,
"total_price": price_str,
"discount": None,
}
_append_desc(current, desc_blocks, receipt_width)
items.append(current)
return items
def _is_desc_block(block: dict, receipt_width: float) -> bool:
"""
Description blocks start before the price column (70% mark).
Exclude blocks in the 65-70% zone that are short fragments — these
are typically standalone ghost text from the receipt back.
"""
x = _left_x(block["bbox"])
return x < receipt_width * 0.65
def _append_desc(item: dict, desc_blocks: list[dict], receipt_width: float) -> None:
"""Merge extra desc blocks into an item, updating qty and unit_price."""
parts: list[str] = [item["description"]] if item["description"] else []
for b in desc_blocks:
text = b["text"].strip()
if _is_qty_block(b, receipt_width):
item["quantity"] = int(text)
continue
# Co-op style: qty embedded in description ("1 BATCH S/NOODLE B")
if not item["description"] and not parts:
m_qty = re.match(r"^(\d{1,2})\s+([A-Za-z].+)$", text)
if m_qty:
item["quantity"] = int(m_qty.group(1))
parts.append(m_qty.group(2).strip())
continue
if _DISCOUNT_PREFIX_RE.match(text):
continue
if _EACH_RE.search(text):
continue
iq = _INLINE_QTY_RE.match(text)
if iq:
item["quantity"] = int(iq.group(1))
continue
if not _is_desc_block(b, receipt_width):
continue
parts.append(text)
item["description"] = " ".join(parts).strip() or None
item["unit_price"] = _calc_unit_price(item["total_price"], item["quantity"])
# ---------------------------------------------------------------------------
# Totals extraction
# ---------------------------------------------------------------------------
def _extract_totals(totals_rows: list[list[dict]]) -> dict[str, str | None]:
"""Extract subtotal, savings, and total from the totals section."""
result: dict[str, str | None] = {"subtotal": None, "savings": None, "total": None}
for ri, row in enumerate(totals_rows):
row_text = " ".join(b["text"] for b in row).strip().lower()
# Find the price — prefer rightmost block
price = None
for block in sorted(row, key=lambda b: _left_x(b["bbox"]), reverse=True):
m = _PRICE_EXTRACT_RE.search(block["text"])
if m:
price = _normalise_price(block["text"])
break
if price is None:
continue
# Skip payment rows (CASH, CHANGE, CARD, etc.)
if _PAYMENT_SKIP.search(row_text):
continue
# For savings/promotions, prefer the negative price if available.
# Sometimes OCR splits "Savings: -£6.70" into two rows.
if "saving" in row_text or "promotion" in row_text:
if not price.startswith("-"):
# Check the next row for a standalone negative price
if ri + 1 < len(totals_rows):
next_row = totals_rows[ri + 1]
for nb in sorted(next_row, key=lambda b: _left_x(b["bbox"]), reverse=True):
nm = _PRICE_EXTRACT_RE.search(nb["text"])
if nm:
np_ = _normalise_price(nb["text"])
if np_.startswith("-"):
price = np_
break
result["savings"] = result["savings"] or price
elif "subtotal" in row_text or "sub total" in row_text:
result["subtotal"] = result["subtotal"] or price
elif "total" in row_text and "sub" not in row_text and "card" not in row_text:
result["total"] = result["total"] or price
elif "balance" in row_text:
result["total"] = result["total"] or price
elif result["total"] is None and not price.startswith("-"):
# Standalone positive price with no keyword — treat as total if
# not yet set (handles receipts where OCR misses the "TOTAL" text).
# Negative standalone prices are savings/discounts, not totals.
result["total"] = price
return result
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _normalise_price(text: str) -> str:
"""Extract and standardise a price string."""
m = _PRICE_EXTRACT_RE.search(text)
if not m:
return text.strip()
sign = m.group(1)
digits = m.group(2).replace(",", ".").replace(" ", ".")
cleaned = re.sub(r"[£$€¥\s]", "", digits)
return f"{sign}{cleaned}"
def _is_cc_discount_indicator(text: str) -> bool:
"""True when text is a Clubcard discount marker: 'Cc' followed by a price
and no other meaningful words. Distinguishes 'Cc £2.25' (discount) from
'Cc Any 3 For 2' (promotion label)."""
text = text.strip()
if not _DISCOUNT_PREFIX_RE.match(text):
return False
after_cc = text[2:].strip()
if not re.search(r"\d", after_cc):
return False
cleaned = _PRICE_EXTRACT_RE.sub("", after_cc)
cleaned = re.sub(r"[£$€¥\d.,\s]", "", cleaned)
return len(cleaned) <= 2
def _calc_unit_price(total_price: str, quantity: int) -> str:
"""Calculate unit price from total and quantity."""
if quantity <= 1:
return total_price
try:
return str(round(float(total_price) / quantity, 2))
except (ValueError, ZeroDivisionError):
return total_price