""" DocumentProcessor — orchestrates the full document evidence pipeline. Pipeline per uploaded file: 1. ocr.py → raw_text (pdfplumber / pytesseract + preprocessing) 2. EvidenceNER → model-predicted entity spans from the OCR text 3. DocumentViT → image-based entity spans (ViT score × regex) 4. Merge → deduplicate and rank all spans by confidence Input: file_path (str) — local path to an uploaded PDF or image file. Output: {"raw_text": str, "entities": list[Entity]} Used by the CMA tool process_document. """ from __future__ import annotations import logging import re from pathlib import Path from PIL import Image def _normalise_currency(text: str) -> str: """ Normalise Indian currency strings that OCR commonly garbles, so NER sees formats present in its training data. Transforms applied (in order): Rs.1000 → Rs. 1000 (missing space after dot) Rs 1000 → Rs. 1000 (missing dot) ₹1000 → ₹1,000 (missing thousands comma) Rs. 1000 → Rs. 1,000 (missing thousands comma, space present) Numbers with lakhs/crores are formatted with Indian grouping: 100000 → 1,00,000 when preceded by Rs/₹ """ # Rs.NNNN → Rs. NNNN text = re.sub(r'\bRs\.(\d)', r'Rs. \1', text) # Rs NNNN (no dot) → Rs. NNNN text = re.sub(r'\bRs\s+(\d)', r'Rs. \1', text) def _add_indian_commas(m: re.Match) -> str: prefix = m.group(1) # "Rs. " or "₹" digits = m.group(2).replace(',', '') # strip existing commas n = int(digits) if n < 1000: return f"{prefix}{n}" # Indian grouping: last 3 digits, then groups of 2 s = str(n) last3 = s[-3:] rest = s[:-3] parts = [] while len(rest) > 2: parts.append(rest[-2:]) rest = rest[:-2] if rest: parts.append(rest) grouped = ','.join(reversed(parts)) + ',' + last3 return f"{prefix}{grouped}" # Apply Indian comma grouping to Rs./₹ followed by digits (with or without commas) text = re.sub(r'(Rs\.\s*|₹)([\d,]+)', _add_indian_commas, text) return text from src.document_processor.ocr import ( SUPPORTED_IMAGE_EXTS, extract_text, ) from src.document_processor.vit_model import DocumentViT from src.ner.model import Entity from src.ner.predict import extract_entities logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Entity merging # --------------------------------------------------------------------------- def _merge_entities( *entity_lists: list[Entity], ) -> list[Entity]: """ Merge multiple Entity lists, deduplicating by (text, label). When two sources find the same entity (identical text + label), the one with the higher confidence is kept. The merged list is sorted by character position (start index) so the caller can display entities in document order. """ best: dict[tuple[str, str], Entity] = {} for entities in entity_lists: for entity in entities: key = (entity.text.strip().lower(), entity.label) existing = best.get(key) if existing is None or entity.confidence > existing.confidence: best[key] = entity # Sort by position in document; use confidence as tie-breaker return sorted(best.values(), key=lambda e: (e.start, -e.confidence)) # --------------------------------------------------------------------------- # DocumentProcessor # --------------------------------------------------------------------------- class DocumentProcessor: """ Runs OCR → EvidenceNER → DocumentViT and returns unified evidence. Component initialisation is deferred to __init__ so the heavy model loads (ViT ~330 MB, NER checkpoint) happen once and are reused across calls. Use the module-level get_processor() accessor for a cached singleton. """ def __init__(self, vit_model_name: str = "google/vit-base-patch16-224") -> None: """Initialise OCR (stateless), EvidenceNER singleton, and DocumentViT.""" # EvidenceNER is accessed through the cached singleton in ner/predict.py # (no explicit init needed here; extract_entities() initialises lazily). # DocumentViT — load eagerly so the first process() call is not slow self._vit = DocumentViT(model_name=vit_model_name) def process(self, file_path: str) -> dict: """ Process *file_path* and return {"raw_text": str, "entities": list[Entity]}. Supported formats: .pdf, .png, .jpg, .jpeg, .webp Raises ValueError for unsupported extensions. """ path = Path(file_path) ext = path.suffix.lower() if ext not in SUPPORTED_IMAGE_EXTS | {".pdf"}: raise ValueError( f"Unsupported file extension {ext!r}. " f"Supported: .pdf, {', '.join(sorted(SUPPORTED_IMAGE_EXTS))}" ) # ------------------------------------------------------------------ # Step 1: OCR → raw text # ------------------------------------------------------------------ logger.info("DocumentProcessor: extracting text from %s", path.name) raw_text = extract_text(file_path) logger.debug("Extracted %d characters of text.", len(raw_text)) # Normalise currency OCR artefacts before NER so the model sees # formats it was trained on (e.g. "Rs. 1,000" not "Rs.1000"). ner_text = _normalise_currency(raw_text) # ------------------------------------------------------------------ # Step 2: EvidenceNER → model-based entity spans # ------------------------------------------------------------------ ner_entities: list[Entity] = [] if ner_text: try: ner_entities = extract_entities(ner_text) logger.debug("EvidenceNER: %d entities.", len(ner_entities)) except Exception: logger.warning("EvidenceNER failed — skipping.", exc_info=True) # ------------------------------------------------------------------ # Step 3: DocumentViT → image-based entity spans # ------------------------------------------------------------------ vit_entities: list[Entity] = [] if ext in SUPPORTED_IMAGE_EXTS: try: img = Image.open(file_path) vit_entities = self._vit.extract(img, ocr_text=raw_text) logger.debug("DocumentViT: %d entities.", len(vit_entities)) except Exception: logger.warning("DocumentViT failed — skipping.", exc_info=True) elif ext == ".pdf": # For PDFs there is no single source image; ViT is skipped. # Per-page images could be supported in future by extracting them # from pdfplumber and running DocumentViT on each. logger.debug("DocumentViT: skipped for PDF (no single source image).") # ------------------------------------------------------------------ # Step 4: Merge # ------------------------------------------------------------------ merged = _merge_entities(ner_entities, vit_entities) logger.info( "DocumentProcessor: %d merged entities " "(NER=%d, ViT=%d) from %s.", len(merged), len(ner_entities), len(vit_entities), path.name, ) return {"raw_text": raw_text, "entities": merged} # --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- _processor: "DocumentProcessor | None" = None def get_processor(**kwargs) -> DocumentProcessor: """ Return the module-level DocumentProcessor singleton. Keyword arguments are forwarded to DocumentProcessor.__init__ on the first call and ignored on subsequent calls. """ global _processor if _processor is None: _processor = DocumentProcessor(**kwargs) return _processor