'''Smart Parchi OCR v7 — Local Hybrid Architecture''' from __future__ import annotations import asyncio import gc import hashlib import io import logging import os import re import threading import time import uuid import warnings from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from PIL import Image from rapidfuzz import fuzz, process as fuzz_process """Pakistani Grocery Lexicon & Urdu-to-English Transliteration Map. This module provides: 1. URDU_TO_ENGLISH: Direct transliteration of common Urdu grocery terms. 2. GROCERY_LEXICON: Canonical grocery items for fuzzy auto-correction. 3. COMMON_MISSPELLINGS: Maps mangled OCR output to correct English names. 4. TRANSACTION_KEYWORDS: Urdu/English cues for udhaar/cash/return detection. 5. UNIT_MAP: Normalizes unit strings (kilogram -> kg, dozen -> dz, etc.). """ # ── Urdu → English Transliteration (common parchi items) ───────────────────── URDU_TO_ENGLISH: dict[str, str] = { # Staples "آٹا": "Atta", "چاول": "Chawal", "دال": "Daal", "چنے": "Chanay", "مسور": "Masoor", "ماش": "Maash", "بیسن": "Besan", "میدہ": "Maida", "سوجی": "Suji", # Sugar & Salt "چینی": "Cheeni", "گڑ": "Gur", "نمک": "Namak", "شکر": "Shakar", # Oils & Ghee "تیل": "Tel", "گھی": "Ghee", "بناسپتی": "Banaspati", "مکھن": "Makkhan", # Spices "مرچ": "Mirch", "ہلدی": "Haldi", "دھنیا": "Dhaniya", "زیرہ": "Zeera", "اجوائن": "Ajwain", "کالی مرچ": "Kali Mirch", "لال مرچ": "Lal Mirch", "گرم مصالحہ": "Garam Masala", "ادرک": "Adrak", "لہسن": "Lehsun", "پیاز": "Piyaz", "ٹماٹر": "Tamatar", "آلو": "Aloo", # Dairy "دودھ": "Doodh", "دہی": "Dahi", "پنیر": "Paneer", "کریم": "Cream", "لسی": "Lassi", # Beverages "چائے": "Chai", "پانی": "Paani", # Meat & Protein "گوشت": "Gosht", "مرغی": "Murghi", "مچھلی": "Machhli", "انڈے": "Anday", "قیمہ": "Qeema", # Bread & Bakery "روٹی": "Roti", "نان": "Naan", "ڈبل روٹی": "Double Roti", "بسکٹ": "Biscuit", "کیک": "Cake", # Fruits & Vegetables "سیب": "Seb", "کیلا": "Kela", "انگور": "Angoor", "آم": "Aam", "گاجر": "Gajar", "مٹر": "Matar", "بھنڈی": "Bhindi", "گوبھی": "Gobhi", "پالک": "Palak", "بینگن": "Baingan", # Miscellaneous "صابن": "Sabun", "تیل": "Tel", "سرکہ": "Sirka", "اچار": "Achaar", "چٹنی": "Chutney", "برگر": "Burger", "سموسہ": "Samosa", "پراٹھا": "Paratha", "بریانی": "Biryani", # Snacks & Packaged "چپس": "Chips", "نوڈلز": "Noodles", "جوس": "Juice", "کولڈ ڈرنک": "Cold Drink", "پیپسی": "Pepsi", "کوکا کولا": "Coca Cola", } # ── Canonical grocery item list (for fuzzy matching) ───────────────────────── GROCERY_LEXICON: list[str] = [ "Atta", "Chawal", "Daal", "Chanay", "Masoor", "Maash", "Besan", "Maida", "Suji", "Cheeni", "Gur", "Namak", "Shakar", "Tel", "Ghee", "Banaspati", "Makkhan", "Mirch", "Haldi", "Dhaniya", "Zeera", "Ajwain", "Kali Mirch", "Lal Mirch", "Garam Masala", "Adrak", "Lehsun", "Piyaz", "Tamatar", "Aloo", "Doodh", "Dahi", "Paneer", "Cream", "Lassi", "Chai", "Paani", "Gosht", "Murghi", "Machhli", "Anday", "Qeema", "Roti", "Naan", "Double Roti", "Biscuit", "Cake", "Bread", "Seb", "Kela", "Angoor", "Aam", "Gajar", "Matar", "Bhindi", "Gobhi", "Palak", "Baingan", "Sabun", "Sirka", "Achaar", "Chutney", "Burger", "Samosa", "Paratha", "Biryani", "Chips", "Noodles", "Juice", "Cold Drink", "Pepsi", "Coca Cola", "Rice", "Sugar", "Salt", "Oil", "Butter", "Milk", "Eggs", "Chicken", "Mutton", "Fish", "Flour", "Potato", "Onion", "Tomato", "Ginger", "Garlic", "Water", "Tea", "Coffee", "Soap", "Detergent", "Shampoo", ] # ── OCR Misspelling Auto-Correction ────────────────────────────────────────── COMMON_MISSPELLINGS: dict[str, str] = { "bubiger": "Burger", "buger": "Burger", "brger": "Burger", "ata": "Atta", "aata": "Atta", "tta": "Atta", "cheni": "Cheeni", "chini": "Cheeni", "cheeni": "Cheeni", "chaval": "Chawal", "chawl": "Chawal", "chwal": "Chawal", "dal": "Daal", "daal": "Daal", "dal": "Daal", "gee": "Ghee", "ghi": "Ghee", "tel": "Tel", "oil": "Tel", "doodh": "Doodh", "dudh": "Doodh", "milk": "Doodh", "ande": "Anday", "andy": "Anday", "egg": "Anday", "eggs": "Anday", "murgi": "Murghi", "murgh": "Murghi", "chicken": "Murghi", "goosht": "Gosht", "gosth": "Gosht", "meat": "Gosht", "qeema": "Qeema", "keema": "Qeema", "kema": "Qeema", "namk": "Namak", "nmk": "Namak", "pyaz": "Piyaz", "piaz": "Piyaz", "onion": "Piyaz", "tmatar": "Tamatar", "tomato": "Tamatar", "alu": "Aloo", "aaloo": "Aloo", "potato": "Aloo", "hldi": "Haldi", "turmeric": "Haldi", "mirchi": "Mirch", "mrch": "Mirch", "chai": "Chai", "chay": "Chai", "tea": "Chai", "rotti": "Roti", "ruti": "Roti", "nan": "Naan", "chips": "Chips", "chps": "Chips", "smaosa": "Samosa", "smosa": "Samosa", "paratha": "Paratha", "pratha": "Paratha", "biryni": "Biryani", "bryani": "Biryani", "sabon": "Sabun", "soap": "Sabun", "pepsi": "Pepsi", "ppsi": "Pepsi", "cola": "Coca Cola", "coke": "Coca Cola", "juice": "Juice", "juce": "Juice", "noodls": "Noodles", "noodlez": "Noodles", "biscut": "Biscuit", "biskit": "Biscuit", "bred": "Bread", "braed": "Bread", "suger": "Sugar", "sugr": "Sugar", "flor": "Flour", "flwr": "Flour", } # ── Transaction Type Detection ──────────────────────────────────────────────── TRANSACTION_KEYWORDS: dict[str, list[str]] = { "udhaar": [ "ادھار", "اُدھار", "udhaar", "udhar", "udhr", "credit", "قرض", "قرضہ", "بعد میں", "baad mein", "ابھی نہیں", "khata", "کھاتا", "کھاتے", ], # wasooli = collection/recovery → maps to 'debit' in scan.tsx (line 388) "wasooli": [ "واصولی", "وصولی", "wasooli", "wasoli", "wasool", "recovery", "collection", "وصول", ], "cash": [ "نقد", "نقدی", "cash", "paid", "pesa", "پیسے", "ادا", "رقم", "jama", "جمع", ], "return": [ "واپسی", "واپس", "return", "refund", "wapsi", "wapis", ], } # ── Unit Normalization ──────────────────────────────────────────────────────── UNIT_MAP: dict[str, str] = { "kilogram": "kg", "kilograms": "kg", "kilo": "kg", "kg": "kg", "gram": "g", "grams": "g", "gm": "g", "g": "g", "liter": "liter", "litre": "liter", "liters": "liter", "l": "liter", "dozen": "dozen", "dz": "dozen", "doz": "dozen", "درجن": "dozen", "piece": "pc", "pieces": "pc", "pc": "pc", "pcs": "pc", "عدد": "pc", "کلو": "kg", "گرام": "g", "لیٹر": "liter", "packet": "pkt", "pkt": "pkt", "pack": "pkt", "پیکٹ": "pkt", } """Image Preprocessing Pipeline for handwritten parchi images. Stages: CLAHE → Denoise → Sharpen → Adaptive threshold. Also provides quality analysis and multi-variant generation. """ logger = logging.getLogger("parchi.preprocess") # ── Constants ───────────────────────────────────────────────────────────────── CLAHE_CLIP = 2.5 CLAHE_TILE = (8, 8) DENOISE_H = 10 SHARPEN_KERNEL = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) TARGET_LONG_EDGE = 1024 # 1024px max — enough detail for handwriting, fewer VLM tokens def analyze_quality(image: np.ndarray) -> Dict[str, float]: """Return normalized quality metrics (0-1) for the input image.""" gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image sharpness = min(1.0, cv2.Laplacian(gray, cv2.CV_64F).var() / 500) brightness = float(np.mean(gray)) / 255 contrast = min(1.0, float(gray.std()) / 100) noise_raw = float(np.std(gray - cv2.GaussianBlur(gray, (5, 5), 0))) noise = max(0.0, 1.0 - noise_raw / 50) overall = (sharpness + brightness + contrast + noise) / 4 return { "sharpness": round(sharpness, 3), "brightness": round(brightness, 3), "contrast": round(contrast, 3), "noise": round(noise, 3), "overall": round(overall, 3), } def resize_for_vlm(image: np.ndarray, max_edge: int = TARGET_LONG_EDGE) -> np.ndarray: """Resize so longest edge ≤ max_edge (VLM memory savings).""" h, w = image.shape[:2] if max(h, w) <= max_edge: return image scale = max_edge / max(h, w) new_w, new_h = int(w * scale), int(h * scale) return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) def auto_orient(image: np.ndarray) -> np.ndarray: """Deskew using Hough lines (correct rotation up to ±45°).""" try: h, w = image.shape[:2] if min(h, w) < 100: return image gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) small = cv2.resize(gray, (max(400, w // 3), max(300, h // 3))) edges = cv2.Canny(small, 50, 150) lines = cv2.HoughLines(edges, 1, np.pi / 180, threshold=int(len(small) * 0.3)) if lines is not None: angles = [] for line in lines[:20]: theta = line[0][1] angle = theta * 180 / np.pi - 90 if -45 < angle < 45: angles.append(angle) if angles: median_angle = float(np.median(angles)) if abs(median_angle) > 3: center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, median_angle, 1.0) return cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE) return image except Exception as e: logger.warning("auto_orient failed: %s", e) return image def enhance(rgb: np.ndarray) -> np.ndarray: """Full preprocessing pipeline: orient → CLAHE → denoise → sharpen → binarize.""" oriented = auto_orient(rgb) gray = cv2.cvtColor(oriented, cv2.COLOR_RGB2GRAY) # CLAHE for shadow/contrast normalization clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP, tileGridSize=CLAHE_TILE) enhanced = clahe.apply(gray) # Non-local means denoising denoised = cv2.fastNlMeansDenoising(enhanced, h=DENOISE_H) # Sharpen sharpened = cv2.filter2D(denoised, -1, SHARPEN_KERNEL) # Morphological closing (connect broken strokes) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) morphed = cv2.morphologyEx(sharpened, cv2.MORPH_CLOSE, kernel) # Adaptive threshold binarization binary = cv2.adaptiveThreshold( morphed, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 15, 5 ) return cv2.cvtColor(binary, cv2.COLOR_GRAY2RGB) def preprocess_for_vlm(rgb: np.ndarray) -> np.ndarray: """Lightweight preprocessing for VLM input (keep color, just resize + denoise).""" resized = resize_for_vlm(rgb) # Light denoise only — VLMs work better with natural images than binarized lab = cv2.cvtColor(resized, cv2.COLOR_RGB2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) l = clahe.apply(l) merged = cv2.merge([l, a, b]) return cv2.cvtColor(merged, cv2.COLOR_LAB2RGB) """Brain Layer — Regex + Lexicon post-processor. Converts raw VLM text output into structured JSON: { customer_name, items: [{name, qty, price}], total, transaction_type, mismatch } No LLM needed — deterministic, 0 RAM overhead, < 1ms latency. """ logger = logging.getLogger("parchi.brain") # ── Pre-compiled regex patterns ─────────────────────────────────────────────── # Matches lines like: "Atta 2kg 200", "چینی 1 150", "Daal x3 - 450" RE_ITEM_LINE = re.compile( r"(?P[A-Za-z\u0600-\u06FF\u0750-\u077F\s\-\.]+?)" # item name (Latin or Urdu) r"\s*[xX×\-]?\s*" r"(?P\d+(?:\.\d+)?)\s*" # quantity r"(?Pkg|g|gram|liter|litre|pkt|pc|dozen|dz|کلو|گرام|لیٹر|عدد|درجن)?\s*" r"[\-–—:=]?\s*" r"(?:Rs\.?\s*|PKR\s*|₨\s*)?" # optional currency r"(?P\d+(?:\.\d+)?)", # price re.IGNORECASE | re.UNICODE, ) # Simpler fallback: just "name price" on a line RE_SIMPLE_LINE = re.compile( r"^(?P[A-Za-z\u0600-\u06FF\u0750-\u077F\s\-\.]{2,}?)\s+" r"(?:Rs\.?\s*|PKR\s*|₨\s*)?" r"(?P\d+(?:\.\d{1,2})?)$", re.IGNORECASE | re.UNICODE | re.MULTILINE, ) RE_TOTAL = re.compile( r"(?:total|ٹوٹل|کل|جمع|مجموعی|grand\s*total|net|amount)" r"\s*[:=\-–—]?\s*(?:Rs\.?\s*|PKR\s*|₨\s*)?" r"(\d+(?:\.\d+)?)", re.IGNORECASE | re.UNICODE, ) RE_NAME = re.compile( r"(?:name|نام|customer|گاہک|صارف)\s*[:=\-–—]?\s*(.+)", re.IGNORECASE | re.UNICODE, ) def transliterate_urdu(text: str) -> str: """Replace Urdu words with their English transliterations.""" for urdu, eng in URDU_TO_ENGLISH.items(): text = text.replace(urdu, eng) return text def _repair_got_ocr_fragments(raw: str) -> str: """ GOT-OCR produces fragmented words (e.g. 'Che eni' for 'Cheeni', 'b v 0 ger' for 'Burger'). This repair stage: 1. Strips noise tokens (single chars, '0' mixed with text) 2. Collapses consecutive short tokens into a single word and fuzzy-matches them 3. Returns a cleaner string suitable for the brain parser """ lines = [] for line in raw.split("\n"): tokens = line.split() if not tokens: continue repaired_tokens = [] i = 0 while i < len(tokens): tok = tokens[i] # Skip pure noise: single letter that is not a unit, or '0' between words if len(tok) == 1 and not tok.isdigit() and tok.lower() not in ("v", "l", "g"): i += 1 continue # Try merging consecutive short non-numeric tokens into one word if len(tok) <= 3 and not re.match(r"^\d", tok): merged = tok j = i + 1 while j < len(tokens) and len(tokens[j]) <= 4 and not re.match(r"^\d", tokens[j]): merged += tokens[j] j += 1 # Check if merged form fuzzy-matches the lexicon candidate = fuzz_process.extractOne( merged.lower(), GROCERY_LEXICON, scorer=fuzz.WRatio, score_cutoff=55 ) if candidate: repaired_tokens.append(candidate[0]) i = j continue repaired_tokens.append(tok) i += 1 if repaired_tokens: lines.append(" ".join(repaired_tokens)) return "\n".join(lines) def correct_item_name(raw: str, aggressive: bool = False) -> str: """Auto-correct OCR garbage using misspelling map + fuzzy lexicon match. When aggressive=True (used for GOT-OCR fallback), the fuzzy threshold is lowered to 55 to catch highly fragmented token output. """ cleaned = raw.strip().lower() threshold = 55 if aggressive else 70 # Step 1: Direct misspelling lookup if cleaned in COMMON_MISSPELLINGS: return COMMON_MISSPELLINGS[cleaned] # Step 2: Fuzzy match against canonical lexicon match = fuzz_process.extractOne( cleaned, GROCERY_LEXICON, scorer=fuzz.WRatio, score_cutoff=threshold ) if match: return match[0] # Step 3: Return title-cased original return raw.strip().title() def normalize_unit(raw: str) -> str: """Normalize unit string using the UNIT_MAP.""" return UNIT_MAP.get(raw.lower().strip(), "pc") def detect_transaction_type(text: str) -> str: """Detect transaction type from raw text using keyword matching.""" text_lower = text.lower() scores = {} for tx_type, keywords in TRANSACTION_KEYWORDS.items(): score = sum(1 for kw in keywords if kw.lower() in text_lower) if score > 0: scores[tx_type] = score if scores: return max(scores, key=scores.get) return "unknown" # Known grocery/item words that should NOT be treated as customer names _ITEM_WORDS_LOWER = {w.lower() for w in GROCERY_LEXICON} | { w.lower() for w in COMMON_MISSPELLINGS } | { "total", "udhaar", "wasooli", "cash", "rs", "pkr", "amount", "date", "bill", "invoice", "receipt", "parchi", } def extract_customer_name(text: str) -> Optional[str]: """ Extract customer name from raw text. Strategy (in priority order): 1. Explicit 'name:' / 'customer:' label 2. Capitalized proper name at the very START of text (before any item/digit) 3. First pure-text line in the top 20% of the receipt """ # Strategy 1: explicit label m = RE_NAME.search(text) if m: name = m.group(1).strip() name = re.sub(r"[\d\-\u2013\u2014:=]+$", "", name).strip() if 2 <= len(name) <= 50: return name # Strategy 2: capitalized proper name at the START of text # Matches e.g. "Umar", "Umar Khan", "Muhammad Ali" before the first digit/item start_match = re.match( r"^([A-Z][a-z]{1,20}(?:\s+[A-Z][a-z]{1,20}){0,2})", text.strip(), ) if start_match: candidate = start_match.group(1).strip() # Reject if it's a known grocery item or keyword if candidate.lower() not in _ITEM_WORDS_LOWER: return candidate # Strategy 3: first pure-text line in top 20% of multiline text lines = text.strip().split("\n") top_lines = lines[:max(3, len(lines) // 5)] skip = {"total", "\u0679\u0648\u0679\u0644", "\u06a9\u0644", "\u062c\u0645\u0639", "date", "\u062a\u0627\u0631\u06cc\u062e", "rs", "pkr"} for line in top_lines: line = line.strip() if not line or len(line) < 2: continue if any(kw in line.lower() for kw in skip): continue if re.match(r"^\d+[\.\-/]\d+", line): # date-like continue if not re.search(r"\d", line): # pure text line return line[:50] return None def extract_total(text: str) -> Optional[float]: """Extract total amount from text.""" m = RE_TOTAL.search(text) if m: try: return float(m.group(1)) except ValueError: pass return None def parse_items(text: str) -> List[Dict[str, Any]]: """ Extract line items from raw OCR text. Deduplication key is (name, price) so that the same item with different prices (e.g. 'milk-3 1200' and 'milk-2 500') is preserved as two entries. """ items: List[Dict[str, Any]] = [] seen_keys: set = set() # Pass 1: Full pattern (name + qty + price) for m in RE_ITEM_LINE.finditer(text): name = correct_item_name(m.group("name")) qty = float(m.group("qty")) price = float(m.group("price")) unit = normalize_unit(m.group("unit") or "pc") if price < 1 or price > 50000 or qty <= 0 or qty > 1000: continue # Dedup by (name, price) — allows same item at different prices key = f"{name.lower()}:{price}" if key in seen_keys: continue seen_keys.add(key) items.append({"name": name, "quantity": qty, "price": price, "unit": unit}) # Pass 2: Simple fallback (name + price only) if not items: for m in RE_SIMPLE_LINE.finditer(text): name = correct_item_name(m.group("name")) price = float(m.group("price")) if price < 1 or price > 50000: continue key = f"{name.lower()}:{price}" if key in seen_keys: continue seen_keys.add(key) items.append({"name": name, "quantity": 1.0, "price": price, "unit": "pc"}) return items def validate_math(items: List[Dict[str, Any]], extracted_total: Optional[float]) -> bool: """ Return True if mismatch detected. Pakistani parchi convention: the price on each item line is the LINE TOTAL (e.g. '2.5kg Cheeni 200' means Rs 200 for 2.5kg total, NOT Rs 200/kg). So computed total = sum(item prices), NOT sum(qty * price). """ if not extracted_total or not items: return False computed = sum(item["price"] for item in items) # line totals tolerance = max(2.0, extracted_total * 0.05) # 5% or Rs 2 return abs(computed - extracted_total) > tolerance def process_raw_text(raw_text: str) -> Dict[str, Any]: """ Master brain function: raw VLM output -> structured parchi JSON. Field names match SmartParchiBackendItem in scan.tsx: items[].name, items[].quantity (NOT qty), items[].price Also sends total_amount (preferred by scan.tsx) alongside total (fallback). """ # Step 1: Transliterate Urdu -> English text = transliterate_urdu(raw_text) # Step 2: Extract structured fields customer_name = extract_customer_name(text) items = parse_items(text) total = extract_total(text) transaction_type = detect_transaction_type(text) # Step 3: If no explicit total, compute from item LINE TOTALS (Pakistani convention) if total is None and items: total = sum(item["price"] for item in items) # Step 4: Math validation mismatch = validate_math(items, total) total_val = total or 0.0 return { "customer_name": customer_name, # 'quantity' matches SmartParchiBackendItem.quantity in scan.tsx rowsFromBackendItems() "items": [ {"name": it["name"], "quantity": it["quantity"], "price": it["price"]} for it in items ], "total": total_val, # legacy fallback "total_amount": total_val, # preferred by scan.tsx line 370 "transaction_type": transaction_type, "mismatch": mismatch, } def try_parse_json_response(text: str) -> Optional[Dict[str, Any]]: """ If VLM returned JSON directly (Gemini / OpenRouter / Qaari with JSON prompt), parse and normalize it into our standard output schema. Returns None if text is not valid JSON or lacks required fields. """ import json json_match = re.search(r"\{[\s\S]*\}", text) if not json_match: return None try: data = json.loads(json_match.group()) if not ("items" in data or "total" in data): return None return _normalize_api_result(data) except json.JSONDecodeError: return None def _normalize_api_result(data: Dict[str, Any]) -> Dict[str, Any]: """ Normalize a raw dict from Gemini/OpenRouter/Qaari JSON into our standard schema matching SmartParchiBackendItem in scan.tsx. """ import json # --- Normalize items --- raw_items = data.get("items", []) or [] items: List[Dict[str, Any]] = [] for it in raw_items: if not isinstance(it, dict): continue name = it.get("name") or it.get("item") or "" if not name: continue name = correct_item_name(str(name)) qty = float(it.get("quantity") or it.get("qty") or 1.0) price = float(it.get("price") or it.get("total_price") or 0.0) items.append({"name": name, "quantity": qty, "price": price}) # --- Normalize totals --- total_raw = data.get("total") or data.get("total_amount") or 0.0 total_val = float(total_raw) if total_raw else 0.0 if total_val == 0.0 and items: total_val = sum(it["price"] for it in items) # --- Normalize transaction type --- tx = str(data.get("transaction_type") or "unknown").lower() if tx not in ("udhaar", "wasooli", "cash", "return", "unknown"): tx = detect_transaction_type(tx) # fuzzy-match via brain # --- Normalize customer name --- cname = data.get("customer_name") or None if isinstance(cname, str) and not cname.strip(): cname = None return { "customer_name": cname, "items": items, "total": total_val, "total_amount": total_val, "transaction_type": tx, "mismatch": validate_math(items, total_val), } """SmartOCR Engine — Lazy-loading VLM manager. Primary: Qaari-0.1-Urdu-OCR-VL-2B (Qwen2-VL fine-tuned for Urdu Nastaliq) Fallback: GOT-OCR 2.0 (580MB layout specialist, loaded only on primary failure) Memory strategy: - Models loaded lazily on first request (not at startup). - Only ONE model in RAM at a time. - gc.collect() after every inference pass. - Memory guard: abort if RSS > VLM_MEMORY_LIMIT_MB. """ logger = logging.getLogger("parchi.ocr_engine") # ── Config from environment ─────────────────────────────────────────────────── # Qaari is a PEFT LoRA adapter; base model is required to load it BASE_MODEL_ID = os.getenv("BASE_MODEL_ID", "Qwen/Qwen2-VL-2B-Instruct") PRIMARY_MODEL_ID = os.getenv("PRIMARY_MODEL_ID", "oddadmix/Qaari-0.1-Urdu-OCR-VL-2B-Instruct") FALLBACK_MODEL_ID = os.getenv("FALLBACK_MODEL_ID", "stepfun-ai/GOT-OCR-2.0-hf") ENABLE_FALLBACK = os.getenv("ENABLE_FALLBACK", "1").strip() not in ("0", "false", "no") VLM_MEMORY_LIMIT_MB = float(os.getenv("VLM_MEMORY_LIMIT_MB", "12000")) # CRITICAL: HF Space may have VLM_TIMEOUT_SECONDS=75 as env var — set it to 300 in Space settings. # 60 BPE tokens ≈ 240 chars — enough for any grocery receipt; keeps CPU inference under 2 min. VLM_MAX_TOKENS = int(os.getenv("VLM_MAX_NEW_TOKENS", "60")) VLM_TIMEOUT = float(os.getenv("VLM_TIMEOUT_SECONDS", "300")) # override in HF Space env to 300 # ── Cloud API Keys (Engine 1 & 2 — fast path) ───────────────────────────────── # Engine 1: Gemini 2.5 Flash — 2-3s, free 250-1000 req/day GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyAb25SsZIRcDIEbFc1P5s--LIqcHWdnH64") # gen-lang-client-0429107468 GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") # confirmed 200 OK from Oracle server # CRITICAL: Google API uses colon (:) not slash (/) before the method name GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent" # Engine 2: OpenRouter free VLM cascade (tried in order; stop at first success) # Exact slugs verified via GET /api/v1/models on 2026-05-09 OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "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") OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" OPENROUTER_MODELS = [ "baidu/qianfan-ocr-fast:free", # OCR-specialized fastest "google/gemma-4-26b-a4b-it:free", # Gemma 4 26B A4B-IT "google/gemma-4-31b-it:free", # Gemma 4 31B-IT "nvidia/nemotron-nano-12b-v2-vl:free", # NVIDIA Nemotron 12B VL "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free", # NVIDIA Nemotron Omni ] def _rss_mb() -> float: """Current process RSS in MB.""" try: import psutil return psutil.Process().memory_info().rss / 1024 / 1024 except Exception: return 0.0 def _free_mem(): """Aggressively release memory.""" gc.collect() try: import torch torch.cuda.empty_cache() except Exception: pass class SmartOCR: """Manages VLM lifecycle: lazy load → inference → cleanup.""" def __init__(self): self._primary_model = None self._primary_processor = None self._fallback_model = None self._fallback_processor = None self._lock = threading.Lock() self._primary_loaded = False self._fallback_loaded = False self._primary_failed = False # ── Lazy Loaders ────────────────────────────────────────────────────────── def _load_primary(self): """Load Qaari-0.1 as a PEFT LoRA adapter on top of Qwen2-VL-2B-Instruct.""" if self._primary_loaded: return with self._lock: if self._primary_loaded: return logger.info("Loading base model: %s ...", BASE_MODEL_ID) logger.info("Applying PEFT adapter: %s ...", PRIMARY_MODEL_ID) t0 = time.time() try: import torch from peft import PeftModel from transformers import AutoProcessor, Qwen2VLForConditionalGeneration # Step 1: Load Qwen2-VL-2B-Instruct base in fp32 for CPU base_model = Qwen2VLForConditionalGeneration.from_pretrained( BASE_MODEL_ID, torch_dtype=torch.float32, device_map="cpu", low_cpu_mem_usage=True, ) # Step 2: Merge the Qaari LoRA adapter onto the base self._primary_model = PeftModel.from_pretrained(base_model, PRIMARY_MODEL_ID) self._primary_model.eval() # Processor comes from the base model (Qaari has no separate processor) self._primary_processor = AutoProcessor.from_pretrained(BASE_MODEL_ID) self._primary_loaded = True logger.info( "Primary model (base+adapter) loaded in %.1fs | RSS=%.0f MB", time.time() - t0, _rss_mb(), ) except Exception as e: logger.error("Primary model load FAILED: %s", e) self._primary_failed = True _free_mem() def _load_fallback(self): """Load GOT-OCR 2.0 — only called if primary fails.""" if self._fallback_loaded: return with self._lock: if self._fallback_loaded: return # Unload primary to free RAM self._unload_primary() logger.info("Loading fallback model: %s ...", FALLBACK_MODEL_ID) t0 = time.time() try: import torch from transformers import AutoModelForImageTextToText, AutoProcessor self._fallback_model = AutoModelForImageTextToText.from_pretrained( FALLBACK_MODEL_ID, torch_dtype=torch.float32, device_map="cpu", low_cpu_mem_usage=True, trust_remote_code=True, ) self._fallback_model.eval() self._fallback_processor = AutoProcessor.from_pretrained( FALLBACK_MODEL_ID, trust_remote_code=True ) self._fallback_loaded = True logger.info( "Fallback model loaded in %.1fs | RSS=%.0f MB", time.time() - t0, _rss_mb(), ) except Exception as e: logger.error("Fallback model load FAILED: %s", e) _free_mem() def _unload_primary(self): """Free primary model from RAM.""" self._primary_model = None self._primary_processor = None self._primary_loaded = False _free_mem() logger.info("Primary model unloaded | RSS=%.0f MB", _rss_mb()) def _unload_fallback(self): """Free fallback model from RAM.""" self._fallback_model = None self._fallback_processor = None self._fallback_loaded = False _free_mem() logger.info("Fallback model unloaded | RSS=%.0f MB", _rss_mb()) # ── Memory Guard ────────────────────────────────────────────────────────── def _check_memory(self) -> bool: """Return True if safe to proceed.""" rss = _rss_mb() if rss > VLM_MEMORY_LIMIT_MB: logger.warning("RSS %.0f MB exceeds limit %.0f MB — aborting", rss, VLM_MEMORY_LIMIT_MB) return False return True # ── Inference: Primary (Qaari) ──────────────────────────────────────────── def _infer_qaari(self, pil_image: Image.Image) -> Optional[str]: """Run Qaari-0.1 inference on a PIL image. Returns raw text or None.""" try: import concurrent.futures import torch from qwen_vl_utils import process_vision_info self._load_primary() if not self._primary_loaded or not self._check_memory(): return None # Plain-text prompt — Qaari (2B) is an OCR model, NOT a JSON generator. # Asking for JSON produces malformed/truncated output. # Gemini/OpenRouter handle JSON; Qaari outputs clean plain text. prompt = ( "You are a Pakistani grocery receipt (parchi) OCR reader. " "Read this handwritten receipt and output ALL text clearly:\n" "Line 1: customer name (if visible at top)\n" "Line 2: transaction type (udhaar / wasooli / cash)\n" "Lines 3+: each item as: name quantity unit price\n" "Last line: Total amount\n" "Output plain text only. Do not explain." ) messages = [ { "role": "user", "content": [ {"type": "image", "image": pil_image}, {"type": "text", "text": prompt}, ], } ] text_input = self._primary_processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = self._primary_processor( text=[text_input], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) # CPU -- no .to("cuda") def _generate(): with torch.no_grad(): return self._primary_model.generate( **inputs, max_new_tokens=VLM_MAX_TOKENS, do_sample=False, use_cache=True, # KV-cache: mandatory for fast CPU decoding repetition_penalty=1.2, # Prevents looping; triggers early EOS ) # Enforce hard timeout on generate() so long images don't block forever with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit(_generate) try: output_ids = future.result(timeout=VLM_TIMEOUT) except concurrent.futures.TimeoutError: logger.warning( "Qaari generate() timed out after %.0fs -- returning partial", VLM_TIMEOUT, ) # Cancel is best-effort on CPU; return None to trigger fallback return None # Trim input tokens from output trimmed = [ out[len(inp):] for inp, out in zip(inputs.input_ids, output_ids) ] result = self._primary_processor.batch_decode( trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] logger.info("Qaari output (%d chars): %.100s...", len(result), result) return result except Exception as e: logger.error("Qaari inference failed: %s", e) self._primary_failed = True return None finally: _free_mem() # ── Inference: Fallback (GOT-OCR) ───────────────────────────────────────── def _infer_got_ocr(self, pil_image: Image.Image) -> Optional[str]: """Run GOT-OCR 2.0 fallback. Returns raw text or None.""" if not ENABLE_FALLBACK: return None try: import torch self._load_fallback() if not self._fallback_loaded or not self._check_memory(): return None inputs = self._fallback_processor(pil_image, return_tensors="pt") with torch.no_grad(): output_ids = self._fallback_model.generate( **inputs, do_sample=False, tokenizer=self._fallback_processor.tokenizer, stop_strings="<|im_end|>", max_new_tokens=VLM_MAX_TOKENS, ) result = self._fallback_processor.decode( output_ids[0, inputs["input_ids"].shape[1]:], skip_special_tokens=True, ) logger.info("GOT-OCR output (%d chars): %.100s...", len(result), result) return result except Exception as e: logger.error("GOT-OCR inference failed: %s", e) return None finally: _free_mem() # ── Public API ──────────────────────────────────────────────────────────── # ── Engine 1: Gemini 2.5 Flash API ──────────────────────────────────────── def _infer_gemini_api(self, image_bytes: bytes) -> Optional[Dict[str, Any]]: """Call Gemini REST API. Returns normalized dict or None.""" if not GEMINI_API_KEY: return None import base64, json as _json import httpx CLOUD_PROMPT = ( "Pakistani grocery receipt OCR. Rules:\n" "1. Name at top with no price beside it = customer_name\n" "2. udhaar/\u0627\u062f\u06be\u0627\u0631=credit, wasooli/\u0648\u0627\u0635\u0648\u0644\u06cc=payment, cash/\u0646\u0642\u062f=cash\n" "3. Each item line: [Name] [qty][unit] [LINE_TOTAL]. " "Last number = LINE TOTAL price (not unit price). " "cheeni-2.5 200 -> Cheeni qty=2.5 price=200\n" "4. Number after Total/\u06a9\u0644/\u062c\u0645\u0639 = grand total\n" "5. Fix OCR errors (g->9, I->1 if context requires)\n" "Return ONLY valid JSON (no markdown):\n" '{"customer_name":null,"transaction_type":"unknown",' '"items":[{"name":"Atta","quantity":2.0,"unit":"kg","price":200.0}],' '"total":200.0}' ) try: mime = "image/jpeg" encoded = base64.b64encode(image_bytes).decode() payload = { "contents": [{ "parts": [ {"text": CLOUD_PROMPT}, {"inline_data": {"mime_type": mime, "data": encoded}}, ] }], "generationConfig": { "temperature": 0.1, "maxOutputTokens": 1024, # 512 caused truncation on complex parchis # responseMimeType removed -- not supported on all model versions }, } url = GEMINI_URL.format(GEMINI_MODEL) with httpx.Client(timeout=30.0) as client: resp = client.post( f"{url}?key={GEMINI_API_KEY}", json=payload, headers={"Content-Type": "application/json"}, ) if resp.status_code == 429: logger.warning("Gemini rate-limited (429) -- trying OpenRouter") return None if resp.status_code != 200: logger.warning("Gemini API error %d: %.200s", resp.status_code, resp.text) return None raw = resp.json()["candidates"][0]["content"]["parts"][0]["text"] # Robust JSON extraction: handles plain JSON, markdown fences, partial wrapping data = None try: data = _json.loads(raw.strip()) except _json.JSONDecodeError: pass if data is None: import re as _re cleaned = _re.sub(r"```(?:json)?\s*", "", raw).strip().rstrip("`").strip() try: data = _json.loads(cleaned) except _json.JSONDecodeError: pass if data is None: m = _re.search(r"\{[\s\S]*\}", raw) if m: try: data = _json.loads(m.group()) except _json.JSONDecodeError: pass if not data: logger.warning("Gemini non-JSON (truncated?): %.120s", raw) return None logger.info("Engine 1 (Gemini) success: %d items", len(data.get("items", []))) return _normalize_api_result(data) except Exception as e: logger.warning("Gemini inference failed: %s", e) return None # ── Engine 2: OpenRouter Free VLM Cascade ───────────────────────────────── def _infer_openrouter_api(self, image_bytes: bytes) -> Optional[Dict[str, Any]]: """Try each free OpenRouter VLM model in sequence. Returns first success.""" if not OPENROUTER_API_KEY: return None import base64, json as _json import httpx CLOUD_PROMPT = ( "Pakistani grocery receipt. Extract: customer name (top, no price beside), " "transaction type (udhaar=credit/wasooli=payment/cash), items with qty+unit+price " "(last number on each line is LINE TOTAL, not unit price), and grand total. " "Return ONLY valid JSON: " '{"customer_name":null,"transaction_type":"unknown",' '"items":[{"name":"","quantity":1.0,"unit":"pc","price":0.0}],"total":0.0}' ) encoded = base64.b64encode(image_bytes).decode() img_url = f"data:image/jpeg;base64,{encoded}" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://bazaar-bridge.app", "X-Title": "Bazaar Bridge OCR", } for model in OPENROUTER_MODELS: try: payload = { "model": model, "messages": [{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": img_url}}, {"type": "text", "text": CLOUD_PROMPT}, ], }], "max_tokens": 512, "temperature": 0.1, } with httpx.Client(timeout=25.0) as client: resp = client.post(OPENROUTER_URL, json=payload, headers=headers) if resp.status_code == 429: logger.warning("OpenRouter model %s rate-limited — trying next", model) continue if resp.status_code not in (200, 201): logger.warning("OpenRouter %s returned %d — trying next", model, resp.status_code) continue content = resp.json()["choices"][0]["message"]["content"] # Extract JSON from content (may have markdown code fences) json_match = re.search(r"\{[\s\S]*\}", content) if not json_match: logger.warning("OpenRouter %s returned no JSON — trying next", model) continue data = _json.loads(json_match.group()) logger.info("Engine 2 (OpenRouter/%s) success: %d items", model, len(data.get("items", []))) return _normalize_api_result(data) except Exception as e: logger.warning("OpenRouter model %s failed: %s — trying next", model, e) continue logger.warning("All OpenRouter models failed — falling back to local Qaari") return None def extract_structured(self, image_bytes: bytes) -> Optional[Dict[str, Any]]: """ Try fast cloud APIs (Engine 1 + 2) and return structured result dict. Returns None if both fail (caller should use local VLM fallback). """ # Engine 1: Gemini result = self._infer_gemini_api(image_bytes) if result: result["_engine"] = "gemini" return result # Engine 2: OpenRouter cascade result = self._infer_openrouter_api(image_bytes) if result: result["_engine"] = "openrouter" return result return None def extract_text(self, pil_image: Image.Image) -> str: """ Local VLM extraction (Engine 3 — emergency fallback only). Qaari -> GOT-OCR. Returns plain text for brain layer processing. """ # Qaari primary if not self._primary_failed: result = self._infer_qaari(pil_image) if result and len(result.strip()) > 5: return result # GOT-OCR secondary logger.info("Primary returned nothing useful -- trying GOT-OCR fallback") result = self._infer_got_ocr(pil_image) if result and len(result.strip()) > 5: repaired = _repair_got_ocr_fragments(result) logger.info("GOT-OCR repaired (%d->%d chars): %.100s...", len(result), len(repaired), repaired) return repaired return "" def health_check(self) -> dict: """Return engine status for /health endpoint.""" return { "engine1_gemini": "ready" if GEMINI_API_KEY else "disabled", "engine2_openrouter": "ready" if OPENROUTER_API_KEY else "disabled", "engine2_models": OPENROUTER_MODELS, "engine3_primary": PRIMARY_MODEL_ID, "primary_loaded": self._primary_loaded, "primary_failed": self._primary_failed, "engine3_fallback": FALLBACK_MODEL_ID, "fallback_enabled": ENABLE_FALLBACK, "fallback_loaded": self._fallback_loaded, "rss_mb": round(_rss_mb(), 1), "memory_limit_mb": VLM_MEMORY_LIMIT_MB, } """Smart Parchi OCR v7 — FastAPI Orchestrator. Local Hybrid Architecture: Vision → Qaari-0.1 (primary) / GOT-OCR 2.0 (fallback) Brain → Regex + Pakistani Lexicon (deterministic JSON formatting) Endpoints: POST /process-parchi → structured JSON extraction from receipt image GET /health → engine status + memory usage """ # ── Suppress noisy warnings ────────────────────────────────────────────────── warnings.filterwarnings("ignore") os.environ.setdefault("OMP_NUM_THREADS", "1") os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", ) logger = logging.getLogger("parchi.app") # ── Constants ───────────────────────────────────────────────────────────────── MAX_IMAGE_SIZE_MB = 10 CONCURRENCY_LIMIT = 1 # 1 worker only — Qwen2-VL-2B fp32 uses ~9GB on CPU CACHE_SIZE = 50 # LRU cache entries CACHE_TTL = 3600 # 1 hour # ── Globals ─────────────────────────────────────────────────────────────────── ocr_engine = SmartOCR() semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) result_cache: Dict[str, dict] = {} # hash → {result, timestamp} # ── Async Job Store (bypasses HF platform HTTP timeout) ────────────────────────── # Jobs older than JOB_TTL seconds are pruned automatically JOB_TTL = 3600 # 1 hour job_store: Dict[str, dict] = {} # job_id → {status, result, ts, error} # ── FastAPI App ─────────────────────────────────────────────────────────────── from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): """Pre-warm the VLM at container startup so first request isn't penalized.""" logger.info("=== Startup: pre-warming primary OCR model ===") loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, ocr_engine._load_primary) logger.info("=== Startup: model ready | RSS=%.0f MB ===", _rss_mb()) except Exception as e: logger.error("=== Startup: model pre-warm FAILED: %s ===", e) yield # App runs here logger.info("=== Shutdown: releasing model ===") ocr_engine._unload_primary() ocr_engine._unload_fallback() app = FastAPI( title="Smart Parchi OCR v7", description=( "Local Hybrid OCR for Pakistani handwritten receipts. " "Qaari-0.1 (Urdu Nastaliq) + GOT-OCR 2.0 fallback. No external APIs." ), version="7.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ── Cache Helpers ───────────────────────────────────────────────────────────── def _image_hash(data: bytes) -> str: return hashlib.sha256(data).hexdigest()[:16] def _cache_get(h: str) -> dict | None: entry = result_cache.get(h) if entry and (time.time() - entry["ts"]) < CACHE_TTL: return entry["result"] if entry: del result_cache[h] return None def _cache_put(h: str, result: dict): if len(result_cache) >= CACHE_SIZE: oldest_key = min(result_cache, key=lambda k: result_cache[k]["ts"]) del result_cache[oldest_key] result_cache[h] = {"result": result, "ts": time.time()} # ── Image Loading ───────────────────────────────────────────────────────────── def load_image(raw_bytes: bytes) -> np.ndarray: """Load image bytes -> RGB numpy array, with size validation.""" size_mb = len(raw_bytes) / (1024 * 1024) if size_mb > MAX_IMAGE_SIZE_MB: raise ValueError(f"Image too large: {size_mb:.1f} MB (max {MAX_IMAGE_SIZE_MB})") pil = Image.open(io.BytesIO(raw_bytes)).convert("RGB") return np.array(pil) # ── Core Processing ─────────────────────────────────────────────────────────── def process_image(rgb: np.ndarray, raw_bytes: bytes = None) -> Dict[str, Any]: """Full pipeline: cloud APIs first -> VLM fallback -> brain -> structured JSON.""" t0 = time.time() # Step 1: Image quality analysis quality = analyze_quality(rgb) logger.info("Image quality: %s", quality) # Step 2: Try fast cloud APIs (Engine 1: Gemini, Engine 2: OpenRouter) if raw_bytes and (GEMINI_API_KEY or OPENROUTER_API_KEY): struct_result = ocr_engine.extract_structured(raw_bytes) if struct_result: engine_name = struct_result.pop("_engine", "cloud") struct_result["processing_time_ms"] = int((time.time() - t0) * 1000) struct_result["raw_text"] = f"[{engine_name.upper()} API]" struct_result["image_quality"] = quality struct_result["engine"] = {**ocr_engine.health_check(), "active_engine": engine_name} logger.info("Cloud engine (%s) returned result in %.1fs", engine_name, time.time() - t0) return struct_result # Step 3: Preprocess for local VLM (Engine 3: Qaari + GOT-OCR) logger.info("Cloud engines unavailable — falling back to local Qaari (Engine 3)") processed = preprocess_for_vlm(rgb) pil_image = Image.fromarray(processed) # Step 4: Local VLM inference raw_text = ocr_engine.extract_text(pil_image) logger.info("VLM raw output (%d chars)", len(raw_text)) if not raw_text.strip(): logger.info("Retrying with binarized image...") enhanced_rgb = enhance(rgb) pil_enhanced = Image.fromarray(enhanced_rgb) raw_text = ocr_engine.extract_text(pil_enhanced) # Step 5: Brain — try JSON parse first, then regex result = try_parse_json_response(raw_text) if not result: result = process_raw_text(raw_text) # Step 6: Enrich with metadata result["processing_time_ms"] = int((time.time() - t0) * 1000) result["raw_text"] = raw_text[:500] result["image_quality"] = quality result["engine"] = {**ocr_engine.health_check(), "active_engine": "qaari_local"} return result # ── Background OCR Worker (Async Job Queue) ─────────────────────────────────── def _run_ocr_job(job_id: str, raw_bytes: bytes, img_hash: str): """Blocking OCR function executed in a thread-pool worker.""" try: job_store[job_id]["status"] = "processing" rgb = load_image(raw_bytes) # Pass raw_bytes so process_image can try Gemini/OpenRouter first result = process_image(rgb, raw_bytes=raw_bytes) result["job_id"] = job_id result["success"] = bool(result.get("items")) result["cached"] = False _cache_put(img_hash, result) job_store[job_id].update({"status": "done", "result": result}) elapsed = time.time() - job_store[job_id]["ts"] logger.info("[%s] Job completed in %.1fs", job_id, elapsed) except Exception as e: logger.exception("[%s] Job failed", job_id) job_store[job_id].update({"status": "error", "error": str(e)}) finally: gc.collect() # ── Endpoints ───────────────────────────────────────────────────────────────── @app.post("/process-parchi") async def process_parchi(image: UploadFile = File(...)): """ Submit a parchi image for OCR processing. Returns immediately with a job_id (typically <1s). Poll GET /result/{job_id} every 10s until status == 'done'. This async pattern is required because CPU inference takes 2-4 minutes, which exceeds the HF platform HTTP timeout (~60s). """ job_id = str(uuid.uuid4())[:12] logger.info("[%s] Received: %s (%s)", job_id, image.filename, image.content_type) try: raw_bytes = await image.read() except Exception as e: raise HTTPException(400, f"Failed to read file: {e}") # Cache hit -- return result immediately without spawning a job img_hash = _image_hash(raw_bytes) cached = _cache_get(img_hash) if cached: logger.info("[%s] Cache hit -- returning immediately", job_id) cached["job_id"] = job_id cached["cached"] = True cached["status"] = "done" return JSONResponse(cached) # Validate image before queuing try: load_image(raw_bytes) except ValueError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(400, f"Invalid image: {e}") # Register job and prune stale ones job_store[job_id] = {"status": "queued", "ts": time.time(), "result": None, "error": None} now = time.time() stale = [k for k, v in job_store.items() if now - v["ts"] > JOB_TTL] for k in stale: del job_store[k] # Submit to thread pool (non-blocking -- returns immediately) loop = asyncio.get_event_loop() loop.run_in_executor(None, _run_ocr_job, job_id, raw_bytes, img_hash) logger.info("[%s] Job queued -- returning job_id immediately", job_id) return JSONResponse({ "job_id": job_id, "status": "queued", "poll_url": f"/result/{job_id}", "message": "Image accepted. Poll /result/{job_id} every 10s until status=done.", }) @app.get("/result/{job_id}") async def get_result(job_id: str): """ Poll for OCR job result. Returns: status=queued|processing : not ready yet, poll again in 10s status=done : result field contains the structured parchi JSON status=error : error field contains the failure message """ job = job_store.get(job_id) if not job: raise HTTPException(404, f"Job '{job_id}' not found. It may have expired (TTL=1h).") response: Dict[str, Any] = {"job_id": job_id, "status": job["status"]} if job["status"] == "done": response.update(job["result"] or {}) elif job["status"] == "error": response["error"] = job["error"] else: response["elapsed_seconds"] = int(time.time() - job["ts"]) response["message"] = "Job is processing. Poll again in 10 seconds." return JSONResponse(response) @app.get("/health") async def health(): """Health check with engine and queue status.""" active = sum(1 for j in job_store.values() if j["status"] in ("queued", "processing")) return { "status": "healthy", "version": "7.1.0", "architecture": "Local Hybrid (Qaari + GOT-OCR) -- Async Job Queue", "engine": ocr_engine.health_check(), "cache_entries": len(result_cache), "active_jobs": active, "total_jobs": len(job_store), } @app.get("/") async def root(): """Root endpoint.""" return { "service": "Smart Parchi OCR v7.1", "docs": "/docs", "health": "/health", "submit": "POST /process-parchi -> {job_id, status: queued}", "poll": "GET /result/{job_id} -> {status, result (when done)}", }