Um34ER's picture
Update app.py
267da03 verified
'''Smart Parchi OCR v7 β€” Local Hybrid Architecture'''
from __future__ import annotations
import asyncio
import gc
import hashlib
import io
import logging
import os
import re
import threading
import time
import uuid
import warnings
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from PIL import Image
from rapidfuzz import fuzz, process as fuzz_process
"""Pakistani Grocery Lexicon & Urdu-to-English Transliteration Map.
This module provides:
1. URDU_TO_ENGLISH: Direct transliteration of common Urdu grocery terms.
2. GROCERY_LEXICON: Canonical grocery items for fuzzy auto-correction.
3. COMMON_MISSPELLINGS: Maps mangled OCR output to correct English names.
4. TRANSACTION_KEYWORDS: Urdu/English cues for udhaar/cash/return detection.
5. UNIT_MAP: Normalizes unit strings (kilogram -> kg, dozen -> dz, etc.).
"""
# ── Urdu β†’ English Transliteration (common parchi items) ─────────────────────
URDU_TO_ENGLISH: dict[str, str] = {
# Staples
"Ψ’ΩΉΨ§": "Atta",
"Ϊ†Ψ§ΩˆΩ„": "Chawal",
"Ψ―Ψ§Ω„": "Daal",
"Ϊ†Ω†Ϋ’": "Chanay",
"Ω…Ψ³ΩˆΨ±": "Masoor",
"Ω…Ψ§Ψ΄": "Maash",
"Ψ¨ΫŒΨ³Ω†": "Besan",
"Ω…ΫŒΨ―Ϋ": "Maida",
"سوجی": "Suji",
# Sugar & Salt
"Ϊ†ΫŒΩ†ΫŒ": "Cheeni",
"Ϊ―Ϊ‘": "Gur",
"Ω†Ω…Ϊ©": "Namak",
"Ψ΄Ϊ©Ψ±": "Shakar",
# Oils & Ghee
"ΨͺΫŒΩ„": "Tel",
"گھی": "Ghee",
"Ψ¨Ω†Ψ§Ψ³ΩΎΨͺی": "Banaspati",
"Ω…Ϊ©ΪΎΩ†": "Makkhan",
# Spices
"Ω…Ψ±Ϊ†": "Mirch",
"ΫΩ„Ψ―ΫŒ": "Haldi",
"Ψ―ΪΎΩ†ΫŒΨ§": "Dhaniya",
"زیرہ": "Zeera",
"Ψ§Ψ¬ΩˆΨ§Ψ¦Ω†": "Ajwain",
"Ϊ©Ψ§Ω„ΫŒ Ω…Ψ±Ϊ†": "Kali Mirch",
"Ω„Ψ§Ω„ Ω…Ψ±Ϊ†": "Lal Mirch",
"Ϊ―Ψ±Ω… مءالحہ": "Garam Masala",
"Ψ§Ψ―Ψ±Ϊ©": "Adrak",
"لہسن": "Lehsun",
"پیاز": "Piyaz",
"ΩΉΩ…Ψ§ΩΉΨ±": "Tamatar",
"Ψ’Ω„Ωˆ": "Aloo",
# Dairy
"دودھ": "Doodh",
"دہی": "Dahi",
"ΩΎΩ†ΫŒΨ±": "Paneer",
"Ϊ©Ψ±ΫŒΩ…": "Cream",
"Ω„Ψ³ΫŒ": "Lassi",
# Beverages
"Ϊ†Ψ§Ψ¦Ϋ’": "Chai",
"ΩΎΨ§Ω†ΫŒ": "Paani",
# Meat & Protein
"گوشΨͺ": "Gosht",
"Ω…Ψ±ΨΊΫŒ": "Murghi",
"Ω…Ϊ†ΪΎΩ„ΫŒ": "Machhli",
"Ψ§Ω†ΪˆΫ’": "Anday",
"Ω‚ΫŒΩ…Ϋ": "Qeema",
# Bread & Bakery
"روٹی": "Roti",
"Ω†Ψ§Ω†": "Naan",
"ΪˆΨ¨Ω„ روٹی": "Double Roti",
"Ψ¨Ψ³Ϊ©ΩΉ": "Biscuit",
"کیک": "Cake",
# Fruits & Vegetables
"سیب": "Seb",
"Ϊ©ΫŒΩ„Ψ§": "Kela",
"Ψ§Ω†Ϊ―ΩˆΨ±": "Angoor",
"Ψ’Ω…": "Aam",
"Ϊ―Ψ§Ψ¬Ψ±": "Gajar",
"Ω…ΩΉΨ±": "Matar",
"Ψ¨ΪΎΩ†ΪˆΫŒ": "Bhindi",
"گوبھی": "Gobhi",
"ΩΎΨ§Ω„Ϊ©": "Palak",
"Ψ¨ΫŒΩ†Ϊ―Ω†": "Baingan",
# Miscellaneous
"Ψ΅Ψ§Ψ¨Ω†": "Sabun",
"ΨͺΫŒΩ„": "Tel",
"سرکہ": "Sirka",
"Ψ§Ϊ†Ψ§Ψ±": "Achaar",
"Ϊ†ΩΉΩ†ΫŒ": "Chutney",
"Ψ¨Ψ±Ϊ―Ψ±": "Burger",
"Ψ³Ω…ΩˆΨ³Ϋ": "Samosa",
"ΩΎΨ±Ψ§ΩΉΪΎΨ§": "Paratha",
"Ψ¨Ψ±ΫŒΨ§Ω†ΫŒ": "Biryani",
# Snacks & Packaged
"Ϊ†ΩΎΨ³": "Chips",
"Ω†ΩˆΪˆΩ„Ψ²": "Noodles",
"جوس": "Juice",
"Ϊ©ΩˆΩ„Ϊˆ ΪˆΨ±Ω†Ϊ©": "Cold Drink",
"پیپسی": "Pepsi",
"کوکا Ϊ©ΩˆΩ„Ψ§": "Coca Cola",
}
# ── Canonical grocery item list (for fuzzy matching) ─────────────────────────
GROCERY_LEXICON: list[str] = [
"Atta", "Chawal", "Daal", "Chanay", "Masoor", "Maash", "Besan",
"Maida", "Suji", "Cheeni", "Gur", "Namak", "Shakar",
"Tel", "Ghee", "Banaspati", "Makkhan",
"Mirch", "Haldi", "Dhaniya", "Zeera", "Ajwain", "Kali Mirch",
"Lal Mirch", "Garam Masala", "Adrak", "Lehsun", "Piyaz", "Tamatar", "Aloo",
"Doodh", "Dahi", "Paneer", "Cream", "Lassi",
"Chai", "Paani",
"Gosht", "Murghi", "Machhli", "Anday", "Qeema",
"Roti", "Naan", "Double Roti", "Biscuit", "Cake", "Bread",
"Seb", "Kela", "Angoor", "Aam", "Gajar", "Matar",
"Bhindi", "Gobhi", "Palak", "Baingan",
"Sabun", "Sirka", "Achaar", "Chutney",
"Burger", "Samosa", "Paratha", "Biryani",
"Chips", "Noodles", "Juice", "Cold Drink", "Pepsi", "Coca Cola",
"Rice", "Sugar", "Salt", "Oil", "Butter", "Milk", "Eggs", "Chicken",
"Mutton", "Fish", "Flour", "Potato", "Onion", "Tomato", "Ginger", "Garlic",
"Water", "Tea", "Coffee", "Soap", "Detergent", "Shampoo",
]
# ── OCR Misspelling Auto-Correction ──────────────────────────────────────────
COMMON_MISSPELLINGS: dict[str, str] = {
"bubiger": "Burger", "buger": "Burger", "brger": "Burger",
"ata": "Atta", "aata": "Atta", "tta": "Atta",
"cheni": "Cheeni", "chini": "Cheeni", "cheeni": "Cheeni",
"chaval": "Chawal", "chawl": "Chawal", "chwal": "Chawal",
"dal": "Daal", "daal": "Daal", "dal": "Daal",
"gee": "Ghee", "ghi": "Ghee",
"tel": "Tel", "oil": "Tel",
"doodh": "Doodh", "dudh": "Doodh", "milk": "Doodh",
"ande": "Anday", "andy": "Anday", "egg": "Anday", "eggs": "Anday",
"murgi": "Murghi", "murgh": "Murghi", "chicken": "Murghi",
"goosht": "Gosht", "gosth": "Gosht", "meat": "Gosht",
"qeema": "Qeema", "keema": "Qeema", "kema": "Qeema",
"namk": "Namak", "nmk": "Namak",
"pyaz": "Piyaz", "piaz": "Piyaz", "onion": "Piyaz",
"tmatar": "Tamatar", "tomato": "Tamatar",
"alu": "Aloo", "aaloo": "Aloo", "potato": "Aloo",
"hldi": "Haldi", "turmeric": "Haldi",
"mirchi": "Mirch", "mrch": "Mirch",
"chai": "Chai", "chay": "Chai", "tea": "Chai",
"rotti": "Roti", "ruti": "Roti",
"nan": "Naan",
"chips": "Chips", "chps": "Chips",
"smaosa": "Samosa", "smosa": "Samosa",
"paratha": "Paratha", "pratha": "Paratha",
"biryni": "Biryani", "bryani": "Biryani",
"sabon": "Sabun", "soap": "Sabun",
"pepsi": "Pepsi", "ppsi": "Pepsi",
"cola": "Coca Cola", "coke": "Coca Cola",
"juice": "Juice", "juce": "Juice",
"noodls": "Noodles", "noodlez": "Noodles",
"biscut": "Biscuit", "biskit": "Biscuit",
"bred": "Bread", "braed": "Bread",
"suger": "Sugar", "sugr": "Sugar",
"flor": "Flour", "flwr": "Flour",
}
# ── Transaction Type Detection ────────────────────────────────────────────────
TRANSACTION_KEYWORDS: dict[str, list[str]] = {
"udhaar": [
"ادھار", "اُدھار", "udhaar", "udhar", "udhr", "credit",
"Ω‚Ψ±ΨΆ", "قرآہ", "Ψ¨ΨΉΨ― Ω…ΫŒΪΊ", "baad mein", "ابھی Ω†ΫΫŒΪΊ",
"khata", "Ϊ©ΪΎΨ§ΨͺΨ§", "Ϊ©ΪΎΨ§ΨͺΫ’",
],
# wasooli = collection/recovery β†’ maps to 'debit' in scan.tsx (line 388)
"wasooli": [
"ΩˆΨ§Ψ΅ΩˆΩ„ΫŒ", "ΩˆΨ΅ΩˆΩ„ΫŒ", "wasooli", "wasoli", "wasool",
"recovery", "collection", "ΩˆΨ΅ΩˆΩ„",
],
"cash": [
"Ω†Ω‚Ψ―", "Ω†Ω‚Ψ―ΫŒ", "cash", "paid", "pesa", "ΩΎΫŒΨ³Ϋ’",
"Ψ§Ψ―Ψ§", "Ψ±Ω‚Ω…", "jama", "Ψ¬Ω…ΨΉ",
],
"return": [
"واپسی", "واپس", "return", "refund", "wapsi", "wapis",
],
}
# ── Unit Normalization ────────────────────────────────────────────────────────
UNIT_MAP: dict[str, str] = {
"kilogram": "kg", "kilograms": "kg", "kilo": "kg", "kg": "kg",
"gram": "g", "grams": "g", "gm": "g", "g": "g",
"liter": "liter", "litre": "liter", "liters": "liter", "l": "liter",
"dozen": "dozen", "dz": "dozen", "doz": "dozen", "Ψ―Ψ±Ψ¬Ω†": "dozen",
"piece": "pc", "pieces": "pc", "pc": "pc", "pcs": "pc",
"ΨΉΨ―Ψ―": "pc", "Ϊ©Ω„Ωˆ": "kg", "Ϊ―Ψ±Ψ§Ω…": "g", "Ω„ΫŒΩΉΨ±": "liter",
"packet": "pkt", "pkt": "pkt", "pack": "pkt", "پیکٹ": "pkt",
}
"""Image Preprocessing Pipeline for handwritten parchi images.
Stages: CLAHE β†’ Denoise β†’ Sharpen β†’ Adaptive threshold.
Also provides quality analysis and multi-variant generation.
"""
logger = logging.getLogger("parchi.preprocess")
# ── Constants ─────────────────────────────────────────────────────────────────
CLAHE_CLIP = 2.5
CLAHE_TILE = (8, 8)
DENOISE_H = 10
SHARPEN_KERNEL = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
TARGET_LONG_EDGE = 1024 # 1024px max β€” enough detail for handwriting, fewer VLM tokens
def analyze_quality(image: np.ndarray) -> Dict[str, float]:
"""Return normalized quality metrics (0-1) for the input image."""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
sharpness = min(1.0, cv2.Laplacian(gray, cv2.CV_64F).var() / 500)
brightness = float(np.mean(gray)) / 255
contrast = min(1.0, float(gray.std()) / 100)
noise_raw = float(np.std(gray - cv2.GaussianBlur(gray, (5, 5), 0)))
noise = max(0.0, 1.0 - noise_raw / 50)
overall = (sharpness + brightness + contrast + noise) / 4
return {
"sharpness": round(sharpness, 3),
"brightness": round(brightness, 3),
"contrast": round(contrast, 3),
"noise": round(noise, 3),
"overall": round(overall, 3),
}
def resize_for_vlm(image: np.ndarray, max_edge: int = TARGET_LONG_EDGE) -> np.ndarray:
"""Resize so longest edge ≀ max_edge (VLM memory savings)."""
h, w = image.shape[:2]
if max(h, w) <= max_edge:
return image
scale = max_edge / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
def auto_orient(image: np.ndarray) -> np.ndarray:
"""Deskew using Hough lines (correct rotation up to Β±45Β°)."""
try:
h, w = image.shape[:2]
if min(h, w) < 100:
return image
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
small = cv2.resize(gray, (max(400, w // 3), max(300, h // 3)))
edges = cv2.Canny(small, 50, 150)
lines = cv2.HoughLines(edges, 1, np.pi / 180, threshold=int(len(small) * 0.3))
if lines is not None:
angles = []
for line in lines[:20]:
theta = line[0][1]
angle = theta * 180 / np.pi - 90
if -45 < angle < 45:
angles.append(angle)
if angles:
median_angle = float(np.median(angles))
if abs(median_angle) > 3:
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
return cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
return image
except Exception as e:
logger.warning("auto_orient failed: %s", e)
return image
def enhance(rgb: np.ndarray) -> np.ndarray:
"""Full preprocessing pipeline: orient β†’ CLAHE β†’ denoise β†’ sharpen β†’ binarize."""
oriented = auto_orient(rgb)
gray = cv2.cvtColor(oriented, cv2.COLOR_RGB2GRAY)
# CLAHE for shadow/contrast normalization
clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP, tileGridSize=CLAHE_TILE)
enhanced = clahe.apply(gray)
# Non-local means denoising
denoised = cv2.fastNlMeansDenoising(enhanced, h=DENOISE_H)
# Sharpen
sharpened = cv2.filter2D(denoised, -1, SHARPEN_KERNEL)
# Morphological closing (connect broken strokes)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
morphed = cv2.morphologyEx(sharpened, cv2.MORPH_CLOSE, kernel)
# Adaptive threshold binarization
binary = cv2.adaptiveThreshold(
morphed, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 15, 5
)
return cv2.cvtColor(binary, cv2.COLOR_GRAY2RGB)
def preprocess_for_vlm(rgb: np.ndarray) -> np.ndarray:
"""Lightweight preprocessing for VLM input (keep color, just resize + denoise)."""
resized = resize_for_vlm(rgb)
# Light denoise only β€” VLMs work better with natural images than binarized
lab = cv2.cvtColor(resized, cv2.COLOR_RGB2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
l = clahe.apply(l)
merged = cv2.merge([l, a, b])
return cv2.cvtColor(merged, cv2.COLOR_LAB2RGB)
"""Brain Layer β€” Regex + Lexicon post-processor.
Converts raw VLM text output into structured JSON:
{ customer_name, items: [{name, qty, price}], total, transaction_type, mismatch }
No LLM needed β€” deterministic, 0 RAM overhead, < 1ms latency.
"""
logger = logging.getLogger("parchi.brain")
# ── Pre-compiled regex patterns ───────────────────────────────────────────────
# Matches lines like: "Atta 2kg 200", "Ϊ†ΫŒΩ†ΫŒ 1 150", "Daal x3 - 450"
RE_ITEM_LINE = re.compile(
r"(?P<name>[A-Za-z\u0600-\u06FF\u0750-\u077F\s\-\.]+?)" # item name (Latin or Urdu)
r"\s*[xXΓ—\-]?\s*"
r"(?P<qty>\d+(?:\.\d+)?)\s*" # quantity
r"(?P<unit>kg|g|gram|liter|litre|pkt|pc|dozen|dz|Ϊ©Ω„Ωˆ|Ϊ―Ψ±Ψ§Ω…|Ω„ΫŒΩΉΨ±|ΨΉΨ―Ψ―|Ψ―Ψ±Ψ¬Ω†)?\s*"
r"[\-–—:=]?\s*"
r"(?:Rs\.?\s*|PKR\s*|₨\s*)?" # optional currency
r"(?P<price>\d+(?:\.\d+)?)", # price
re.IGNORECASE | re.UNICODE,
)
# Simpler fallback: just "name price" on a line
RE_SIMPLE_LINE = re.compile(
r"^(?P<name>[A-Za-z\u0600-\u06FF\u0750-\u077F\s\-\.]{2,}?)\s+"
r"(?:Rs\.?\s*|PKR\s*|₨\s*)?"
r"(?P<price>\d+(?:\.\d{1,2})?)$",
re.IGNORECASE | re.UNICODE | re.MULTILINE,
)
RE_TOTAL = re.compile(
r"(?:total|ΩΉΩˆΩΉΩ„|Ϊ©Ω„|Ψ¬Ω…ΨΉ|Ω…Ψ¬Ω…ΩˆΨΉΫŒ|grand\s*total|net|amount)"
r"\s*[:=\-–—]?\s*(?:Rs\.?\s*|PKR\s*|₨\s*)?"
r"(\d+(?:\.\d+)?)",
re.IGNORECASE | re.UNICODE,
)
RE_NAME = re.compile(
r"(?:name|Ω†Ψ§Ω…|customer|گاہک|ءارف)\s*[:=\-–—]?\s*(.+)",
re.IGNORECASE | re.UNICODE,
)
def transliterate_urdu(text: str) -> str:
"""Replace Urdu words with their English transliterations."""
for urdu, eng in URDU_TO_ENGLISH.items():
text = text.replace(urdu, eng)
return text
def _repair_got_ocr_fragments(raw: str) -> str:
"""
GOT-OCR produces fragmented words (e.g. 'Che eni' for 'Cheeni', 'b v 0 ger' for 'Burger').
This repair stage:
1. Strips noise tokens (single chars, '0' mixed with text)
2. Collapses consecutive short tokens into a single word and fuzzy-matches them
3. Returns a cleaner string suitable for the brain parser
"""
lines = []
for line in raw.split("\n"):
tokens = line.split()
if not tokens:
continue
repaired_tokens = []
i = 0
while i < len(tokens):
tok = tokens[i]
# Skip pure noise: single letter that is not a unit, or '0' between words
if len(tok) == 1 and not tok.isdigit() and tok.lower() not in ("v", "l", "g"):
i += 1
continue
# Try merging consecutive short non-numeric tokens into one word
if len(tok) <= 3 and not re.match(r"^\d", tok):
merged = tok
j = i + 1
while j < len(tokens) and len(tokens[j]) <= 4 and not re.match(r"^\d", tokens[j]):
merged += tokens[j]
j += 1
# Check if merged form fuzzy-matches the lexicon
candidate = fuzz_process.extractOne(
merged.lower(), GROCERY_LEXICON, scorer=fuzz.WRatio, score_cutoff=55
)
if candidate:
repaired_tokens.append(candidate[0])
i = j
continue
repaired_tokens.append(tok)
i += 1
if repaired_tokens:
lines.append(" ".join(repaired_tokens))
return "\n".join(lines)
def correct_item_name(raw: str, aggressive: bool = False) -> str:
"""Auto-correct OCR garbage using misspelling map + fuzzy lexicon match.
When aggressive=True (used for GOT-OCR fallback), the fuzzy threshold is
lowered to 55 to catch highly fragmented token output.
"""
cleaned = raw.strip().lower()
threshold = 55 if aggressive else 70
# Step 1: Direct misspelling lookup
if cleaned in COMMON_MISSPELLINGS:
return COMMON_MISSPELLINGS[cleaned]
# Step 2: Fuzzy match against canonical lexicon
match = fuzz_process.extractOne(
cleaned, GROCERY_LEXICON, scorer=fuzz.WRatio, score_cutoff=threshold
)
if match:
return match[0]
# Step 3: Return title-cased original
return raw.strip().title()
def normalize_unit(raw: str) -> str:
"""Normalize unit string using the UNIT_MAP."""
return UNIT_MAP.get(raw.lower().strip(), "pc")
def detect_transaction_type(text: str) -> str:
"""Detect transaction type from raw text using keyword matching."""
text_lower = text.lower()
scores = {}
for tx_type, keywords in TRANSACTION_KEYWORDS.items():
score = sum(1 for kw in keywords if kw.lower() in text_lower)
if score > 0:
scores[tx_type] = score
if scores:
return max(scores, key=scores.get)
return "unknown"
# Known grocery/item words that should NOT be treated as customer names
_ITEM_WORDS_LOWER = {w.lower() for w in GROCERY_LEXICON} | {
w.lower() for w in COMMON_MISSPELLINGS
} | {
"total", "udhaar", "wasooli", "cash", "rs", "pkr", "amount",
"date", "bill", "invoice", "receipt", "parchi",
}
def extract_customer_name(text: str) -> Optional[str]:
"""
Extract customer name from raw text.
Strategy (in priority order):
1. Explicit 'name:' / 'customer:' label
2. Capitalized proper name at the very START of text (before any item/digit)
3. First pure-text line in the top 20% of the receipt
"""
# Strategy 1: explicit label
m = RE_NAME.search(text)
if m:
name = m.group(1).strip()
name = re.sub(r"[\d\-\u2013\u2014:=]+$", "", name).strip()
if 2 <= len(name) <= 50:
return name
# Strategy 2: capitalized proper name at the START of text
# Matches e.g. "Umar", "Umar Khan", "Muhammad Ali" before the first digit/item
start_match = re.match(
r"^([A-Z][a-z]{1,20}(?:\s+[A-Z][a-z]{1,20}){0,2})",
text.strip(),
)
if start_match:
candidate = start_match.group(1).strip()
# Reject if it's a known grocery item or keyword
if candidate.lower() not in _ITEM_WORDS_LOWER:
return candidate
# Strategy 3: first pure-text line in top 20% of multiline text
lines = text.strip().split("\n")
top_lines = lines[:max(3, len(lines) // 5)]
skip = {"total", "\u0679\u0648\u0679\u0644", "\u06a9\u0644", "\u062c\u0645\u0639",
"date", "\u062a\u0627\u0631\u06cc\u062e", "rs", "pkr"}
for line in top_lines:
line = line.strip()
if not line or len(line) < 2:
continue
if any(kw in line.lower() for kw in skip):
continue
if re.match(r"^\d+[\.\-/]\d+", line): # date-like
continue
if not re.search(r"\d", line): # pure text line
return line[:50]
return None
def extract_total(text: str) -> Optional[float]:
"""Extract total amount from text."""
m = RE_TOTAL.search(text)
if m:
try:
return float(m.group(1))
except ValueError:
pass
return None
def parse_items(text: str) -> List[Dict[str, Any]]:
"""
Extract line items from raw OCR text.
Deduplication key is (name, price) so that the same item with different
prices (e.g. 'milk-3 1200' and 'milk-2 500') is preserved as two entries.
"""
items: List[Dict[str, Any]] = []
seen_keys: set = set()
# Pass 1: Full pattern (name + qty + price)
for m in RE_ITEM_LINE.finditer(text):
name = correct_item_name(m.group("name"))
qty = float(m.group("qty"))
price = float(m.group("price"))
unit = normalize_unit(m.group("unit") or "pc")
if price < 1 or price > 50000 or qty <= 0 or qty > 1000:
continue
# Dedup by (name, price) β€” allows same item at different prices
key = f"{name.lower()}:{price}"
if key in seen_keys:
continue
seen_keys.add(key)
items.append({"name": name, "quantity": qty, "price": price, "unit": unit})
# Pass 2: Simple fallback (name + price only)
if not items:
for m in RE_SIMPLE_LINE.finditer(text):
name = correct_item_name(m.group("name"))
price = float(m.group("price"))
if price < 1 or price > 50000:
continue
key = f"{name.lower()}:{price}"
if key in seen_keys:
continue
seen_keys.add(key)
items.append({"name": name, "quantity": 1.0, "price": price, "unit": "pc"})
return items
def validate_math(items: List[Dict[str, Any]], extracted_total: Optional[float]) -> bool:
"""
Return True if mismatch detected.
Pakistani parchi convention: the price on each item line is the LINE TOTAL
(e.g. '2.5kg Cheeni 200' means Rs 200 for 2.5kg total, NOT Rs 200/kg).
So computed total = sum(item prices), NOT sum(qty * price).
"""
if not extracted_total or not items:
return False
computed = sum(item["price"] for item in items) # line totals
tolerance = max(2.0, extracted_total * 0.05) # 5% or Rs 2
return abs(computed - extracted_total) > tolerance
def process_raw_text(raw_text: str) -> Dict[str, Any]:
"""
Master brain function: raw VLM output -> structured parchi JSON.
Field names match SmartParchiBackendItem in scan.tsx:
items[].name, items[].quantity (NOT qty), items[].price
Also sends total_amount (preferred by scan.tsx) alongside total (fallback).
"""
# Step 1: Transliterate Urdu -> English
text = transliterate_urdu(raw_text)
# Step 2: Extract structured fields
customer_name = extract_customer_name(text)
items = parse_items(text)
total = extract_total(text)
transaction_type = detect_transaction_type(text)
# Step 3: If no explicit total, compute from item LINE TOTALS (Pakistani convention)
if total is None and items:
total = sum(item["price"] for item in items)
# Step 4: Math validation
mismatch = validate_math(items, total)
total_val = total or 0.0
return {
"customer_name": customer_name,
# 'quantity' matches SmartParchiBackendItem.quantity in scan.tsx rowsFromBackendItems()
"items": [
{"name": it["name"], "quantity": it["quantity"], "price": it["price"]}
for it in items
],
"total": total_val, # legacy fallback
"total_amount": total_val, # preferred by scan.tsx line 370
"transaction_type": transaction_type,
"mismatch": mismatch,
}
def try_parse_json_response(text: str) -> Optional[Dict[str, Any]]:
"""
If VLM returned JSON directly (Gemini / OpenRouter / Qaari with JSON prompt),
parse and normalize it into our standard output schema.
Returns None if text is not valid JSON or lacks required fields.
"""
import json
json_match = re.search(r"\{[\s\S]*\}", text)
if not json_match:
return None
try:
data = json.loads(json_match.group())
if not ("items" in data or "total" in data):
return None
return _normalize_api_result(data)
except json.JSONDecodeError:
return None
def _normalize_api_result(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a raw dict from Gemini/OpenRouter/Qaari JSON into our
standard schema matching SmartParchiBackendItem in scan.tsx.
"""
import json
# --- Normalize items ---
raw_items = data.get("items", []) or []
items: List[Dict[str, Any]] = []
for it in raw_items:
if not isinstance(it, dict):
continue
name = it.get("name") or it.get("item") or ""
if not name:
continue
name = correct_item_name(str(name))
qty = float(it.get("quantity") or it.get("qty") or 1.0)
price = float(it.get("price") or it.get("total_price") or 0.0)
items.append({"name": name, "quantity": qty, "price": price})
# --- Normalize totals ---
total_raw = data.get("total") or data.get("total_amount") or 0.0
total_val = float(total_raw) if total_raw else 0.0
if total_val == 0.0 and items:
total_val = sum(it["price"] for it in items)
# --- Normalize transaction type ---
tx = str(data.get("transaction_type") or "unknown").lower()
if tx not in ("udhaar", "wasooli", "cash", "return", "unknown"):
tx = detect_transaction_type(tx) # fuzzy-match via brain
# --- Normalize customer name ---
cname = data.get("customer_name") or None
if isinstance(cname, str) and not cname.strip():
cname = None
return {
"customer_name": cname,
"items": items,
"total": total_val,
"total_amount": total_val,
"transaction_type": tx,
"mismatch": validate_math(items, total_val),
}
"""SmartOCR Engine β€” Lazy-loading VLM manager.
Primary: Qaari-0.1-Urdu-OCR-VL-2B (Qwen2-VL fine-tuned for Urdu Nastaliq)
Fallback: GOT-OCR 2.0 (580MB layout specialist, loaded only on primary failure)
Memory strategy:
- Models loaded lazily on first request (not at startup).
- Only ONE model in RAM at a time.
- gc.collect() after every inference pass.
- Memory guard: abort if RSS > VLM_MEMORY_LIMIT_MB.
"""
logger = logging.getLogger("parchi.ocr_engine")
# ── Config from environment ───────────────────────────────────────────────────
# Qaari is a PEFT LoRA adapter; base model is required to load it
BASE_MODEL_ID = os.getenv("BASE_MODEL_ID", "Qwen/Qwen2-VL-2B-Instruct")
PRIMARY_MODEL_ID = os.getenv("PRIMARY_MODEL_ID", "oddadmix/Qaari-0.1-Urdu-OCR-VL-2B-Instruct")
FALLBACK_MODEL_ID = os.getenv("FALLBACK_MODEL_ID", "stepfun-ai/GOT-OCR-2.0-hf")
ENABLE_FALLBACK = os.getenv("ENABLE_FALLBACK", "1").strip() not in ("0", "false", "no")
VLM_MEMORY_LIMIT_MB = float(os.getenv("VLM_MEMORY_LIMIT_MB", "12000"))
# CRITICAL: HF Space may have VLM_TIMEOUT_SECONDS=75 as env var β€” set it to 300 in Space settings.
# 60 BPE tokens β‰ˆ 240 chars β€” enough for any grocery receipt; keeps CPU inference under 2 min.
VLM_MAX_TOKENS = int(os.getenv("VLM_MAX_NEW_TOKENS", "60"))
VLM_TIMEOUT = float(os.getenv("VLM_TIMEOUT_SECONDS", "300")) # override in HF Space env to 300
# ── Cloud API Keys (Engine 1 & 2 β€” fast path) ─────────────────────────────────
# Engine 1: Gemini 2.5 Flash β€” 2-3s, free 250-1000 req/day
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyAb25SsZIRcDIEbFc1P5s--LIqcHWdnH64") # gen-lang-client-0429107468
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") # confirmed 200 OK from Oracle server
# CRITICAL: Google API uses colon (:) not slash (/) before the method name
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent"
# Engine 2: OpenRouter free VLM cascade (tried in order; stop at first success)
# Exact slugs verified via GET /api/v1/models on 2026-05-09
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY",
"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")
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
OPENROUTER_MODELS = [
"baidu/qianfan-ocr-fast:free", # OCR-specialized fastest
"google/gemma-4-26b-a4b-it:free", # Gemma 4 26B A4B-IT
"google/gemma-4-31b-it:free", # Gemma 4 31B-IT
"nvidia/nemotron-nano-12b-v2-vl:free", # NVIDIA Nemotron 12B VL
"nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free", # NVIDIA Nemotron Omni
]
def _rss_mb() -> float:
"""Current process RSS in MB."""
try:
import psutil
return psutil.Process().memory_info().rss / 1024 / 1024
except Exception:
return 0.0
def _free_mem():
"""Aggressively release memory."""
gc.collect()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
class SmartOCR:
"""Manages VLM lifecycle: lazy load β†’ inference β†’ cleanup."""
def __init__(self):
self._primary_model = None
self._primary_processor = None
self._fallback_model = None
self._fallback_processor = None
self._lock = threading.Lock()
self._primary_loaded = False
self._fallback_loaded = False
self._primary_failed = False
# ── Lazy Loaders ──────────────────────────────────────────────────────────
def _load_primary(self):
"""Load Qaari-0.1 as a PEFT LoRA adapter on top of Qwen2-VL-2B-Instruct."""
if self._primary_loaded:
return
with self._lock:
if self._primary_loaded:
return
logger.info("Loading base model: %s ...", BASE_MODEL_ID)
logger.info("Applying PEFT adapter: %s ...", PRIMARY_MODEL_ID)
t0 = time.time()
try:
import torch
from peft import PeftModel
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
# Step 1: Load Qwen2-VL-2B-Instruct base in fp32 for CPU
base_model = Qwen2VLForConditionalGeneration.from_pretrained(
BASE_MODEL_ID,
torch_dtype=torch.float32,
device_map="cpu",
low_cpu_mem_usage=True,
)
# Step 2: Merge the Qaari LoRA adapter onto the base
self._primary_model = PeftModel.from_pretrained(base_model, PRIMARY_MODEL_ID)
self._primary_model.eval()
# Processor comes from the base model (Qaari has no separate processor)
self._primary_processor = AutoProcessor.from_pretrained(BASE_MODEL_ID)
self._primary_loaded = True
logger.info(
"Primary model (base+adapter) loaded in %.1fs | RSS=%.0f MB",
time.time() - t0, _rss_mb(),
)
except Exception as e:
logger.error("Primary model load FAILED: %s", e)
self._primary_failed = True
_free_mem()
def _load_fallback(self):
"""Load GOT-OCR 2.0 β€” only called if primary fails."""
if self._fallback_loaded:
return
with self._lock:
if self._fallback_loaded:
return
# Unload primary to free RAM
self._unload_primary()
logger.info("Loading fallback model: %s ...", FALLBACK_MODEL_ID)
t0 = time.time()
try:
import torch
from transformers import AutoModelForImageTextToText, AutoProcessor
self._fallback_model = AutoModelForImageTextToText.from_pretrained(
FALLBACK_MODEL_ID,
torch_dtype=torch.float32,
device_map="cpu",
low_cpu_mem_usage=True,
trust_remote_code=True,
)
self._fallback_model.eval()
self._fallback_processor = AutoProcessor.from_pretrained(
FALLBACK_MODEL_ID, trust_remote_code=True
)
self._fallback_loaded = True
logger.info(
"Fallback model loaded in %.1fs | RSS=%.0f MB",
time.time() - t0, _rss_mb(),
)
except Exception as e:
logger.error("Fallback model load FAILED: %s", e)
_free_mem()
def _unload_primary(self):
"""Free primary model from RAM."""
self._primary_model = None
self._primary_processor = None
self._primary_loaded = False
_free_mem()
logger.info("Primary model unloaded | RSS=%.0f MB", _rss_mb())
def _unload_fallback(self):
"""Free fallback model from RAM."""
self._fallback_model = None
self._fallback_processor = None
self._fallback_loaded = False
_free_mem()
logger.info("Fallback model unloaded | RSS=%.0f MB", _rss_mb())
# ── Memory Guard ──────────────────────────────────────────────────────────
def _check_memory(self) -> bool:
"""Return True if safe to proceed."""
rss = _rss_mb()
if rss > VLM_MEMORY_LIMIT_MB:
logger.warning("RSS %.0f MB exceeds limit %.0f MB β€” aborting", rss, VLM_MEMORY_LIMIT_MB)
return False
return True
# ── Inference: Primary (Qaari) ────────────────────────────────────────────
def _infer_qaari(self, pil_image: Image.Image) -> Optional[str]:
"""Run Qaari-0.1 inference on a PIL image. Returns raw text or None."""
try:
import concurrent.futures
import torch
from qwen_vl_utils import process_vision_info
self._load_primary()
if not self._primary_loaded or not self._check_memory():
return None
# Plain-text prompt β€” Qaari (2B) is an OCR model, NOT a JSON generator.
# Asking for JSON produces malformed/truncated output.
# Gemini/OpenRouter handle JSON; Qaari outputs clean plain text.
prompt = (
"You are a Pakistani grocery receipt (parchi) OCR reader. "
"Read this handwritten receipt and output ALL text clearly:\n"
"Line 1: customer name (if visible at top)\n"
"Line 2: transaction type (udhaar / wasooli / cash)\n"
"Lines 3+: each item as: name quantity unit price\n"
"Last line: Total amount\n"
"Output plain text only. Do not explain."
)
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": pil_image},
{"type": "text", "text": prompt},
],
}
]
text_input = self._primary_processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = self._primary_processor(
text=[text_input],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
# CPU -- no .to("cuda")
def _generate():
with torch.no_grad():
return self._primary_model.generate(
**inputs,
max_new_tokens=VLM_MAX_TOKENS,
do_sample=False,
use_cache=True, # KV-cache: mandatory for fast CPU decoding
repetition_penalty=1.2, # Prevents looping; triggers early EOS
)
# Enforce hard timeout on generate() so long images don't block forever
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(_generate)
try:
output_ids = future.result(timeout=VLM_TIMEOUT)
except concurrent.futures.TimeoutError:
logger.warning(
"Qaari generate() timed out after %.0fs -- returning partial",
VLM_TIMEOUT,
)
# Cancel is best-effort on CPU; return None to trigger fallback
return None
# Trim input tokens from output
trimmed = [
out[len(inp):] for inp, out in zip(inputs.input_ids, output_ids)
]
result = self._primary_processor.batch_decode(
trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
logger.info("Qaari output (%d chars): %.100s...", len(result), result)
return result
except Exception as e:
logger.error("Qaari inference failed: %s", e)
self._primary_failed = True
return None
finally:
_free_mem()
# ── Inference: Fallback (GOT-OCR) ─────────────────────────────────────────
def _infer_got_ocr(self, pil_image: Image.Image) -> Optional[str]:
"""Run GOT-OCR 2.0 fallback. Returns raw text or None."""
if not ENABLE_FALLBACK:
return None
try:
import torch
self._load_fallback()
if not self._fallback_loaded or not self._check_memory():
return None
inputs = self._fallback_processor(pil_image, return_tensors="pt")
with torch.no_grad():
output_ids = self._fallback_model.generate(
**inputs,
do_sample=False,
tokenizer=self._fallback_processor.tokenizer,
stop_strings="<|im_end|>",
max_new_tokens=VLM_MAX_TOKENS,
)
result = self._fallback_processor.decode(
output_ids[0, inputs["input_ids"].shape[1]:],
skip_special_tokens=True,
)
logger.info("GOT-OCR output (%d chars): %.100s...", len(result), result)
return result
except Exception as e:
logger.error("GOT-OCR inference failed: %s", e)
return None
finally:
_free_mem()
# ── Public API ────────────────────────────────────────────────────────────
# ── Engine 1: Gemini 2.5 Flash API ────────────────────────────────────────
def _infer_gemini_api(self, image_bytes: bytes) -> Optional[Dict[str, Any]]:
"""Call Gemini REST API. Returns normalized dict or None."""
if not GEMINI_API_KEY:
return None
import base64, json as _json
import httpx
CLOUD_PROMPT = (
"Pakistani grocery receipt OCR. Rules:\n"
"1. Name at top with no price beside it = customer_name\n"
"2. udhaar/\u0627\u062f\u06be\u0627\u0631=credit, wasooli/\u0648\u0627\u0635\u0648\u0644\u06cc=payment, cash/\u0646\u0642\u062f=cash\n"
"3. Each item line: [Name] [qty][unit] [LINE_TOTAL]. "
"Last number = LINE TOTAL price (not unit price). "
"cheeni-2.5 200 -> Cheeni qty=2.5 price=200\n"
"4. Number after Total/\u06a9\u0644/\u062c\u0645\u0639 = grand total\n"
"5. Fix OCR errors (g->9, I->1 if context requires)\n"
"Return ONLY valid JSON (no markdown):\n"
'{"customer_name":null,"transaction_type":"unknown",'
'"items":[{"name":"Atta","quantity":2.0,"unit":"kg","price":200.0}],'
'"total":200.0}'
)
try:
mime = "image/jpeg"
encoded = base64.b64encode(image_bytes).decode()
payload = {
"contents": [{
"parts": [
{"text": CLOUD_PROMPT},
{"inline_data": {"mime_type": mime, "data": encoded}},
]
}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 1024, # 512 caused truncation on complex parchis
# responseMimeType removed -- not supported on all model versions
},
}
url = GEMINI_URL.format(GEMINI_MODEL)
with httpx.Client(timeout=30.0) as client:
resp = client.post(
f"{url}?key={GEMINI_API_KEY}",
json=payload,
headers={"Content-Type": "application/json"},
)
if resp.status_code == 429:
logger.warning("Gemini rate-limited (429) -- trying OpenRouter")
return None
if resp.status_code != 200:
logger.warning("Gemini API error %d: %.200s", resp.status_code, resp.text)
return None
raw = resp.json()["candidates"][0]["content"]["parts"][0]["text"]
# Robust JSON extraction: handles plain JSON, markdown fences, partial wrapping
data = None
try:
data = _json.loads(raw.strip())
except _json.JSONDecodeError:
pass
if data is None:
import re as _re
cleaned = _re.sub(r"```(?:json)?\s*", "", raw).strip().rstrip("`").strip()
try:
data = _json.loads(cleaned)
except _json.JSONDecodeError:
pass
if data is None:
m = _re.search(r"\{[\s\S]*\}", raw)
if m:
try:
data = _json.loads(m.group())
except _json.JSONDecodeError:
pass
if not data:
logger.warning("Gemini non-JSON (truncated?): %.120s", raw)
return None
logger.info("Engine 1 (Gemini) success: %d items", len(data.get("items", [])))
return _normalize_api_result(data)
except Exception as e:
logger.warning("Gemini inference failed: %s", e)
return None
# ── Engine 2: OpenRouter Free VLM Cascade ─────────────────────────────────
def _infer_openrouter_api(self, image_bytes: bytes) -> Optional[Dict[str, Any]]:
"""Try each free OpenRouter VLM model in sequence. Returns first success."""
if not OPENROUTER_API_KEY:
return None
import base64, json as _json
import httpx
CLOUD_PROMPT = (
"Pakistani grocery receipt. Extract: customer name (top, no price beside), "
"transaction type (udhaar=credit/wasooli=payment/cash), items with qty+unit+price "
"(last number on each line is LINE TOTAL, not unit price), and grand total. "
"Return ONLY valid JSON: "
'{"customer_name":null,"transaction_type":"unknown",'
'"items":[{"name":"","quantity":1.0,"unit":"pc","price":0.0}],"total":0.0}'
)
encoded = base64.b64encode(image_bytes).decode()
img_url = f"data:image/jpeg;base64,{encoded}"
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://bazaar-bridge.app",
"X-Title": "Bazaar Bridge OCR",
}
for model in OPENROUTER_MODELS:
try:
payload = {
"model": model,
"messages": [{
"role": "user",
"content": [
{"type": "image_url",
"image_url": {"url": img_url}},
{"type": "text", "text": CLOUD_PROMPT},
],
}],
"max_tokens": 512,
"temperature": 0.1,
}
with httpx.Client(timeout=25.0) as client:
resp = client.post(OPENROUTER_URL, json=payload, headers=headers)
if resp.status_code == 429:
logger.warning("OpenRouter model %s rate-limited β€” trying next", model)
continue
if resp.status_code not in (200, 201):
logger.warning("OpenRouter %s returned %d β€” trying next", model, resp.status_code)
continue
content = resp.json()["choices"][0]["message"]["content"]
# Extract JSON from content (may have markdown code fences)
json_match = re.search(r"\{[\s\S]*\}", content)
if not json_match:
logger.warning("OpenRouter %s returned no JSON β€” trying next", model)
continue
data = _json.loads(json_match.group())
logger.info("Engine 2 (OpenRouter/%s) success: %d items",
model, len(data.get("items", [])))
return _normalize_api_result(data)
except Exception as e:
logger.warning("OpenRouter model %s failed: %s β€” trying next", model, e)
continue
logger.warning("All OpenRouter models failed β€” falling back to local Qaari")
return None
def extract_structured(self, image_bytes: bytes) -> Optional[Dict[str, Any]]:
"""
Try fast cloud APIs (Engine 1 + 2) and return structured result dict.
Returns None if both fail (caller should use local VLM fallback).
"""
# Engine 1: Gemini
result = self._infer_gemini_api(image_bytes)
if result:
result["_engine"] = "gemini"
return result
# Engine 2: OpenRouter cascade
result = self._infer_openrouter_api(image_bytes)
if result:
result["_engine"] = "openrouter"
return result
return None
def extract_text(self, pil_image: Image.Image) -> str:
"""
Local VLM extraction (Engine 3 β€” emergency fallback only).
Qaari -> GOT-OCR. Returns plain text for brain layer processing.
"""
# Qaari primary
if not self._primary_failed:
result = self._infer_qaari(pil_image)
if result and len(result.strip()) > 5:
return result
# GOT-OCR secondary
logger.info("Primary returned nothing useful -- trying GOT-OCR fallback")
result = self._infer_got_ocr(pil_image)
if result and len(result.strip()) > 5:
repaired = _repair_got_ocr_fragments(result)
logger.info("GOT-OCR repaired (%d->%d chars): %.100s...",
len(result), len(repaired), repaired)
return repaired
return ""
def health_check(self) -> dict:
"""Return engine status for /health endpoint."""
return {
"engine1_gemini": "ready" if GEMINI_API_KEY else "disabled",
"engine2_openrouter": "ready" if OPENROUTER_API_KEY else "disabled",
"engine2_models": OPENROUTER_MODELS,
"engine3_primary": PRIMARY_MODEL_ID,
"primary_loaded": self._primary_loaded,
"primary_failed": self._primary_failed,
"engine3_fallback": FALLBACK_MODEL_ID,
"fallback_enabled": ENABLE_FALLBACK,
"fallback_loaded": self._fallback_loaded,
"rss_mb": round(_rss_mb(), 1),
"memory_limit_mb": VLM_MEMORY_LIMIT_MB,
}
"""Smart Parchi OCR v7 β€” FastAPI Orchestrator.
Local Hybrid Architecture:
Vision β†’ Qaari-0.1 (primary) / GOT-OCR 2.0 (fallback)
Brain β†’ Regex + Pakistani Lexicon (deterministic JSON formatting)
Endpoints:
POST /process-parchi β†’ structured JSON extraction from receipt image
GET /health β†’ engine status + memory usage
"""
# ── Suppress noisy warnings ──────────────────────────────────────────────────
warnings.filterwarnings("ignore")
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("parchi.app")
# ── Constants ─────────────────────────────────────────────────────────────────
MAX_IMAGE_SIZE_MB = 10
CONCURRENCY_LIMIT = 1 # 1 worker only β€” Qwen2-VL-2B fp32 uses ~9GB on CPU
CACHE_SIZE = 50 # LRU cache entries
CACHE_TTL = 3600 # 1 hour
# ── Globals ───────────────────────────────────────────────────────────────────
ocr_engine = SmartOCR()
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
result_cache: Dict[str, dict] = {} # hash β†’ {result, timestamp}
# ── Async Job Store (bypasses HF platform HTTP timeout) ──────────────────────────
# Jobs older than JOB_TTL seconds are pruned automatically
JOB_TTL = 3600 # 1 hour
job_store: Dict[str, dict] = {} # job_id β†’ {status, result, ts, error}
# ── FastAPI App ───────────────────────────────────────────────────────────────
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Pre-warm the VLM at container startup so first request isn't penalized."""
logger.info("=== Startup: pre-warming primary OCR model ===")
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, ocr_engine._load_primary)
logger.info("=== Startup: model ready | RSS=%.0f MB ===", _rss_mb())
except Exception as e:
logger.error("=== Startup: model pre-warm FAILED: %s ===", e)
yield # App runs here
logger.info("=== Shutdown: releasing model ===")
ocr_engine._unload_primary()
ocr_engine._unload_fallback()
app = FastAPI(
title="Smart Parchi OCR v7",
description=(
"Local Hybrid OCR for Pakistani handwritten receipts. "
"Qaari-0.1 (Urdu Nastaliq) + GOT-OCR 2.0 fallback. No external APIs."
),
version="7.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Cache Helpers ─────────────────────────────────────────────────────────────
def _image_hash(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()[:16]
def _cache_get(h: str) -> dict | None:
entry = result_cache.get(h)
if entry and (time.time() - entry["ts"]) < CACHE_TTL:
return entry["result"]
if entry:
del result_cache[h]
return None
def _cache_put(h: str, result: dict):
if len(result_cache) >= CACHE_SIZE:
oldest_key = min(result_cache, key=lambda k: result_cache[k]["ts"])
del result_cache[oldest_key]
result_cache[h] = {"result": result, "ts": time.time()}
# ── Image Loading ─────────────────────────────────────────────────────────────
def load_image(raw_bytes: bytes) -> np.ndarray:
"""Load image bytes -> RGB numpy array, with size validation."""
size_mb = len(raw_bytes) / (1024 * 1024)
if size_mb > MAX_IMAGE_SIZE_MB:
raise ValueError(f"Image too large: {size_mb:.1f} MB (max {MAX_IMAGE_SIZE_MB})")
pil = Image.open(io.BytesIO(raw_bytes)).convert("RGB")
return np.array(pil)
# ── Core Processing ───────────────────────────────────────────────────────────
def process_image(rgb: np.ndarray, raw_bytes: bytes = None) -> Dict[str, Any]:
"""Full pipeline: cloud APIs first -> VLM fallback -> brain -> structured JSON."""
t0 = time.time()
# Step 1: Image quality analysis
quality = analyze_quality(rgb)
logger.info("Image quality: %s", quality)
# Step 2: Try fast cloud APIs (Engine 1: Gemini, Engine 2: OpenRouter)
if raw_bytes and (GEMINI_API_KEY or OPENROUTER_API_KEY):
struct_result = ocr_engine.extract_structured(raw_bytes)
if struct_result:
engine_name = struct_result.pop("_engine", "cloud")
struct_result["processing_time_ms"] = int((time.time() - t0) * 1000)
struct_result["raw_text"] = f"[{engine_name.upper()} API]"
struct_result["image_quality"] = quality
struct_result["engine"] = {**ocr_engine.health_check(),
"active_engine": engine_name}
logger.info("Cloud engine (%s) returned result in %.1fs",
engine_name, time.time() - t0)
return struct_result
# Step 3: Preprocess for local VLM (Engine 3: Qaari + GOT-OCR)
logger.info("Cloud engines unavailable β€” falling back to local Qaari (Engine 3)")
processed = preprocess_for_vlm(rgb)
pil_image = Image.fromarray(processed)
# Step 4: Local VLM inference
raw_text = ocr_engine.extract_text(pil_image)
logger.info("VLM raw output (%d chars)", len(raw_text))
if not raw_text.strip():
logger.info("Retrying with binarized image...")
enhanced_rgb = enhance(rgb)
pil_enhanced = Image.fromarray(enhanced_rgb)
raw_text = ocr_engine.extract_text(pil_enhanced)
# Step 5: Brain β€” try JSON parse first, then regex
result = try_parse_json_response(raw_text)
if not result:
result = process_raw_text(raw_text)
# Step 6: Enrich with metadata
result["processing_time_ms"] = int((time.time() - t0) * 1000)
result["raw_text"] = raw_text[:500]
result["image_quality"] = quality
result["engine"] = {**ocr_engine.health_check(), "active_engine": "qaari_local"}
return result
# ── Background OCR Worker (Async Job Queue) ───────────────────────────────────
def _run_ocr_job(job_id: str, raw_bytes: bytes, img_hash: str):
"""Blocking OCR function executed in a thread-pool worker."""
try:
job_store[job_id]["status"] = "processing"
rgb = load_image(raw_bytes)
# Pass raw_bytes so process_image can try Gemini/OpenRouter first
result = process_image(rgb, raw_bytes=raw_bytes)
result["job_id"] = job_id
result["success"] = bool(result.get("items"))
result["cached"] = False
_cache_put(img_hash, result)
job_store[job_id].update({"status": "done", "result": result})
elapsed = time.time() - job_store[job_id]["ts"]
logger.info("[%s] Job completed in %.1fs", job_id, elapsed)
except Exception as e:
logger.exception("[%s] Job failed", job_id)
job_store[job_id].update({"status": "error", "error": str(e)})
finally:
gc.collect()
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.post("/process-parchi")
async def process_parchi(image: UploadFile = File(...)):
"""
Submit a parchi image for OCR processing.
Returns immediately with a job_id (typically <1s).
Poll GET /result/{job_id} every 10s until status == 'done'.
This async pattern is required because CPU inference takes 2-4 minutes,
which exceeds the HF platform HTTP timeout (~60s).
"""
job_id = str(uuid.uuid4())[:12]
logger.info("[%s] Received: %s (%s)", job_id, image.filename, image.content_type)
try:
raw_bytes = await image.read()
except Exception as e:
raise HTTPException(400, f"Failed to read file: {e}")
# Cache hit -- return result immediately without spawning a job
img_hash = _image_hash(raw_bytes)
cached = _cache_get(img_hash)
if cached:
logger.info("[%s] Cache hit -- returning immediately", job_id)
cached["job_id"] = job_id
cached["cached"] = True
cached["status"] = "done"
return JSONResponse(cached)
# Validate image before queuing
try:
load_image(raw_bytes)
except ValueError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(400, f"Invalid image: {e}")
# Register job and prune stale ones
job_store[job_id] = {"status": "queued", "ts": time.time(), "result": None, "error": None}
now = time.time()
stale = [k for k, v in job_store.items() if now - v["ts"] > JOB_TTL]
for k in stale:
del job_store[k]
# Submit to thread pool (non-blocking -- returns immediately)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, _run_ocr_job, job_id, raw_bytes, img_hash)
logger.info("[%s] Job queued -- returning job_id immediately", job_id)
return JSONResponse({
"job_id": job_id,
"status": "queued",
"poll_url": f"/result/{job_id}",
"message": "Image accepted. Poll /result/{job_id} every 10s until status=done.",
})
@app.get("/result/{job_id}")
async def get_result(job_id: str):
"""
Poll for OCR job result.
Returns:
status=queued|processing : not ready yet, poll again in 10s
status=done : result field contains the structured parchi JSON
status=error : error field contains the failure message
"""
job = job_store.get(job_id)
if not job:
raise HTTPException(404, f"Job '{job_id}' not found. It may have expired (TTL=1h).")
response: Dict[str, Any] = {"job_id": job_id, "status": job["status"]}
if job["status"] == "done":
response.update(job["result"] or {})
elif job["status"] == "error":
response["error"] = job["error"]
else:
response["elapsed_seconds"] = int(time.time() - job["ts"])
response["message"] = "Job is processing. Poll again in 10 seconds."
return JSONResponse(response)
@app.get("/health")
async def health():
"""Health check with engine and queue status."""
active = sum(1 for j in job_store.values() if j["status"] in ("queued", "processing"))
return {
"status": "healthy",
"version": "7.1.0",
"architecture": "Local Hybrid (Qaari + GOT-OCR) -- Async Job Queue",
"engine": ocr_engine.health_check(),
"cache_entries": len(result_cache),
"active_jobs": active,
"total_jobs": len(job_store),
}
@app.get("/")
async def root():
"""Root endpoint."""
return {
"service": "Smart Parchi OCR v7.1",
"docs": "/docs",
"health": "/health",
"submit": "POST /process-parchi -> {job_id, status: queued}",
"poll": "GET /result/{job_id} -> {status, result (when done)}",
}