Spaces:
Sleeping
Sleeping
| """Step A + B: extraction of raw transaction rows from a PDF or image. | |
| Two paths: | |
| * Digital PDF with a text layer -> pdfplumber tables (no model, fast, free) | |
| * Scanned PDF or image -> MiniCPM-V vision model, page-by-page | |
| The vision path is optional: if torch/transformers/spaces aren't installed | |
| (e.g. running locally on CPU), it degrades gracefully and reports why. | |
| """ | |
| import json | |
| import os | |
| import re | |
| # Vision model is loaded lazily; these flags let the app report which path ran. | |
| # MiniCPM-V-4.6 is the newest MiniCPM-V and is transformers-native (uses the | |
| # standard AutoModelForImageTextToText API). The older 2_6 relied on a custom | |
| # .chat() method that breaks on recent transformers. | |
| VISION_MODEL_ID = os.environ.get("VISION_MODEL_ID", "openbmb/MiniCPM-V-4.6") | |
| VISION_FALLBACK_ID = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| VISION_PROMPT = """You are reading one page of an Indian bank statement. Extract EVERY transaction row into JSON. | |
| Output ONLY a JSON array, no markdown, no commentary. Schema per row: | |
| {"date": "<as printed>", "narration": "<full narration text>", "ref_no": "<or null>", | |
| "debit": <number or null>, "credit": <number or null>, "balance": <number or null>} | |
| Rules: | |
| - One object per transaction row. Skip headers, footers, page totals, opening balance lines. | |
| - Amounts: numbers only, no commas, no currency symbols. | |
| - If a narration wraps across lines, merge it into one string. | |
| - If a cell is unreadable, use null. NEVER invent values.""" | |
| # --------------------------------------------------------------------------- # | |
| # Helpers | |
| # --------------------------------------------------------------------------- # | |
| def parse_amount(raw): | |
| """'1,234.50' / '' / '12500.00 Cr' -> float or None.""" | |
| if raw is None: | |
| return None | |
| s = str(raw).strip() | |
| if not s: | |
| return None | |
| s = re.sub(r"[^\d.\-]", "", s.replace(",", "")) | |
| if s in ("", "-", ".", "-."): | |
| return None | |
| try: | |
| v = float(s) | |
| except ValueError: | |
| return None | |
| return v if v != 0 else None | |
| def parse_date(raw): | |
| """Parse any common Indian date format to ISO 'YYYY-MM-DD'. Returns None on failure.""" | |
| if not raw: | |
| return None | |
| s = str(raw).strip() | |
| # already ISO? | |
| m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", s) | |
| if m: | |
| return s | |
| months = { | |
| "jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6, | |
| "jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12, | |
| } | |
| # 1 Apr 2026 / 01-Apr-26 / 1 April 2026 | |
| m = re.match(r"^(\d{1,2})[\s\-/]+([A-Za-z]{3,})[\s\-/]+(\d{2,4})$", s) | |
| if m: | |
| d = int(m.group(1)) | |
| mon = months.get(m.group(2)[:3].lower()) | |
| y = int(m.group(3)) | |
| if mon: | |
| if y < 100: | |
| y += 2000 | |
| return f"{y:04d}-{mon:02d}-{d:02d}" | |
| # DD/MM/YYYY or DD-MM-YY (assume day-first, Indian convention) | |
| m = re.match(r"^(\d{1,2})[/\-.](\d{1,2})[/\-.](\d{2,4})$", s) | |
| if m: | |
| d, mon, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) | |
| if y < 100: | |
| y += 2000 | |
| if 1 <= mon <= 12 and 1 <= d <= 31: | |
| return f"{y:04d}-{mon:02d}-{d:02d}" | |
| return None | |
| def _is_header_or_total(cells): | |
| joined = " ".join(c for c in cells if c).lower() | |
| if not joined.strip(): | |
| return True | |
| skip = ("opening balance", "closing balance", "balance b/f", "carried forward", | |
| "total", "statement", "narration", "particulars", "date") | |
| # 'date' alone is the header row; only skip if it looks like a header line | |
| if joined.startswith("date") and "narration" in joined or "particulars" in joined: | |
| return True | |
| return any(k in joined for k in ("opening balance", "closing balance", | |
| "carried forward", "page total", "grand total")) | |
| # --------------------------------------------------------------------------- # | |
| # Digital PDF path (pdfplumber) | |
| # --------------------------------------------------------------------------- # | |
| def extract_from_pdf(path, max_pages=5): | |
| """Extract transactions from a digital PDF using pdfplumber tables. | |
| Returns (transactions, meta). meta['path'] == 'text-layer'. | |
| Raises ValueError if the PDF has no usable text layer. | |
| """ | |
| import pdfplumber | |
| txns = [] | |
| text_chars = 0 | |
| pages_used = 0 | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages[:max_pages]: | |
| pages_used += 1 | |
| text_chars += len((page.extract_text() or "")) | |
| for table in (page.extract_tables() or []): | |
| _rows_from_table(table, txns) | |
| if text_chars < 20: | |
| raise ValueError("No text layer detected -- this looks like a scan.") | |
| if not txns: | |
| raise ValueError("Text layer present but no transaction table found.") | |
| meta = {"path": "text-layer", "pages": pages_used, | |
| "engine": "pdfplumber", "gpu_used": False} | |
| return txns, meta | |
| def _rows_from_table(table, out): | |
| """Turn one pdfplumber table into transaction dicts, appending to `out`.""" | |
| if not table or len(table) < 2: | |
| return | |
| for row in table: | |
| cells = [(c or "").replace("\n", " ").strip() for c in row] | |
| if _is_header_or_total(cells): | |
| continue | |
| date = parse_date(cells[0]) if cells else None | |
| if not date: | |
| continue # not a transaction row | |
| # Expected column order: date, narration, ref, debit, credit, balance | |
| narration = cells[1] if len(cells) > 1 else "" | |
| ref = cells[2] if len(cells) > 2 and cells[2] else None | |
| debit = parse_amount(cells[3]) if len(cells) > 3 else None | |
| credit = parse_amount(cells[4]) if len(cells) > 4 else None | |
| balance = parse_amount(cells[5]) if len(cells) > 5 else None | |
| out.append({ | |
| "date": date, | |
| "narration": narration, | |
| "ref_no": ref, | |
| "debit": debit, | |
| "credit": credit, | |
| "balance": balance, | |
| }) | |
| # --------------------------------------------------------------------------- # | |
| # Vision path (MiniCPM-V) -- optional, GPU-backed | |
| # --------------------------------------------------------------------------- # | |
| _VISION = {"model": None, "processor": None, "id": None} | |
| def vision_available(): | |
| """True only if torch is actually usable for inference (not just importable). | |
| transformers may import while torch is present-but-disabled (e.g. wrong | |
| version), so we verify a real tensor op works. | |
| """ | |
| try: | |
| import torch # noqa: F401 | |
| from transformers.utils import is_torch_available | |
| return bool(is_torch_available()) | |
| except Exception: | |
| return False | |
| def _load_vision(): | |
| if _VISION["model"] is not None: | |
| return | |
| from transformers import AutoModelForImageTextToText, AutoProcessor | |
| def _load(model_id): | |
| processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) | |
| model = AutoModelForImageTextToText.from_pretrained( | |
| model_id, trust_remote_code=True, dtype="auto", device_map="auto") | |
| return model, processor | |
| try: | |
| model_id = VISION_MODEL_ID | |
| model, processor = _load(model_id) | |
| except Exception: | |
| model_id = VISION_FALLBACK_ID | |
| model, processor = _load(model_id) | |
| _VISION.update(model=model, processor=processor, id=model_id) | |
| def _pdf_to_images(path, max_pages=5): | |
| """Render PDF pages to PIL images (for scanned PDFs).""" | |
| import pdfplumber | |
| images = [] | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages[:max_pages]: | |
| images.append(page.to_image(resolution=150).original) | |
| return images | |
| def extract_from_images(images): | |
| """Run MiniCPM-V over a list of PIL images. ONE GPU acquisition, loop inside. | |
| Returns (transactions, meta). Requires torch/transformers (+ a GPU in prod). | |
| """ | |
| if not vision_available(): | |
| raise RuntimeError( | |
| "Vision path needs torch + transformers. On this machine the " | |
| "digital-PDF (pdfplumber) path is available; vision runs on the " | |
| "ZeroGPU Space.") | |
| return _run_vision(images) | |
| # Decorate with @spaces.GPU only when the `spaces` lib is present (HF Space). | |
| try: | |
| import spaces | |
| # first call also loads the ~8B model | |
| def _run_vision(images): | |
| return _run_vision_impl(images) | |
| except Exception: | |
| def _run_vision(images): | |
| return _run_vision_impl(images) | |
| def _run_vision_impl(images): | |
| """Actual inference: load model once (device_map=auto), loop over pages. | |
| Uses the standard transformers multimodal API (apply_chat_template + | |
| generate), which works for MiniCPM-V-4.6 and the Qwen2.5-VL fallback alike. | |
| """ | |
| import torch | |
| _load_vision() | |
| model, processor = _VISION["model"], _VISION["processor"] | |
| is_minicpm = "minicpm" in (_VISION["id"] or "").lower() | |
| txns = [] | |
| for img in images: | |
| messages = [{"role": "user", "content": [ | |
| {"type": "image", "image": img}, | |
| {"type": "text", "text": VISION_PROMPT}, | |
| ]}] | |
| tmpl_kwargs = dict(tokenize=True, add_generation_prompt=True, | |
| return_dict=True, return_tensors="pt") | |
| gen_kwargs = dict(max_new_tokens=2048, do_sample=False) | |
| if is_minicpm: | |
| # MiniCPM-V-4.6-specific knobs (ignored by other models). | |
| tmpl_kwargs.update(downsample_mode="16x", max_slice_nums=36) | |
| gen_kwargs.update(downsample_mode="16x") | |
| try: | |
| inputs = processor.apply_chat_template(messages, **tmpl_kwargs).to(model.device) | |
| except TypeError: | |
| # model doesn't accept the MiniCPM knobs -> retry plain | |
| for k in ("downsample_mode", "max_slice_nums"): | |
| tmpl_kwargs.pop(k, None) | |
| gen_kwargs.pop(k, None) | |
| inputs = processor.apply_chat_template(messages, **tmpl_kwargs).to(model.device) | |
| with torch.no_grad(): | |
| out = model.generate(**inputs, **gen_kwargs) | |
| trimmed = [o[len(i):] for i, o in zip(inputs["input_ids"], out)] | |
| text = processor.batch_decode( | |
| trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] | |
| txns.extend(_parse_model_json(text)) | |
| meta = {"path": "vision", "pages": len(images), | |
| "engine": _VISION["id"], "gpu_used": bool(torch.cuda.is_available())} | |
| return txns, meta | |
| def _parse_model_json(text): | |
| """Pull a JSON array out of a model response and normalize the rows.""" | |
| if not text: | |
| return [] | |
| m = re.search(r"\[.*\]", text, re.DOTALL) | |
| if not m: | |
| return [] | |
| try: | |
| rows = json.loads(m.group(0)) | |
| except Exception: | |
| return [] | |
| out = [] | |
| for r in rows: | |
| if not isinstance(r, dict): | |
| continue | |
| out.append({ | |
| "date": parse_date(r.get("date")), | |
| "narration": str(r.get("narration") or "").strip(), | |
| "ref_no": (str(r["ref_no"]).strip() if r.get("ref_no") else None), | |
| "debit": parse_amount(r.get("debit")), | |
| "credit": parse_amount(r.get("credit")), | |
| "balance": parse_amount(r.get("balance")), | |
| }) | |
| return out | |
| # --------------------------------------------------------------------------- # | |
| # Top-level dispatcher | |
| # --------------------------------------------------------------------------- # | |
| def extract(path, max_pages=5): | |
| """Auto-detect the right path for `path` (PDF or image) and extract. | |
| Returns (transactions, meta). | |
| """ | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext == ".pdf": | |
| try: | |
| return extract_from_pdf(path, max_pages=max_pages) | |
| except ValueError: | |
| # scanned PDF: render pages and run vision | |
| images = _pdf_to_images(path, max_pages=max_pages) | |
| return extract_from_images(images) | |
| elif ext in (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"): | |
| from PIL import Image | |
| img = Image.open(path).convert("RGB") | |
| return extract_from_images([img]) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |