guide / src /document_processor /processor.py
Saravanakumar R
intial traces bug fixes commit
b016462
Raw
History Blame Contribute Delete
8.23 kB
"""
DocumentProcessor β€” orchestrates the full document evidence pipeline.
Pipeline per uploaded file:
1. ocr.py β†’ raw_text (pdfplumber / pytesseract + preprocessing)
2. EvidenceNER β†’ model-predicted entity spans from the OCR text
3. DocumentViT β†’ image-based entity spans (ViT score Γ— regex)
4. Merge β†’ deduplicate and rank all spans by confidence
Input: file_path (str) β€” local path to an uploaded PDF or image file.
Output: {"raw_text": str, "entities": list[Entity]}
Used by the CMA tool process_document.
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
from PIL import Image
def _normalise_currency(text: str) -> str:
"""
Normalise Indian currency strings that OCR commonly garbles, so NER
sees formats present in its training data.
Transforms applied (in order):
Rs.1000 β†’ Rs. 1000 (missing space after dot)
Rs 1000 β†’ Rs. 1000 (missing dot)
β‚Ή1000 β†’ β‚Ή1,000 (missing thousands comma)
Rs. 1000 β†’ Rs. 1,000 (missing thousands comma, space present)
Numbers with lakhs/crores are formatted with Indian grouping:
100000 β†’ 1,00,000 when preceded by Rs/β‚Ή
"""
# Rs.NNNN β†’ Rs. NNNN
text = re.sub(r'\bRs\.(\d)', r'Rs. \1', text)
# Rs NNNN (no dot) β†’ Rs. NNNN
text = re.sub(r'\bRs\s+(\d)', r'Rs. \1', text)
def _add_indian_commas(m: re.Match) -> str:
prefix = m.group(1) # "Rs. " or "β‚Ή"
digits = m.group(2).replace(',', '') # strip existing commas
n = int(digits)
if n < 1000:
return f"{prefix}{n}"
# Indian grouping: last 3 digits, then groups of 2
s = str(n)
last3 = s[-3:]
rest = s[:-3]
parts = []
while len(rest) > 2:
parts.append(rest[-2:])
rest = rest[:-2]
if rest:
parts.append(rest)
grouped = ','.join(reversed(parts)) + ',' + last3
return f"{prefix}{grouped}"
# Apply Indian comma grouping to Rs./β‚Ή followed by digits (with or without commas)
text = re.sub(r'(Rs\.\s*|β‚Ή)([\d,]+)', _add_indian_commas, text)
return text
from src.document_processor.ocr import (
SUPPORTED_IMAGE_EXTS,
extract_text,
)
from src.document_processor.vit_model import DocumentViT
from src.ner.model import Entity
from src.ner.predict import extract_entities
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Entity merging
# ---------------------------------------------------------------------------
def _merge_entities(
*entity_lists: list[Entity],
) -> list[Entity]:
"""
Merge multiple Entity lists, deduplicating by (text, label).
When two sources find the same entity (identical text + label), the one
with the higher confidence is kept. The merged list is sorted by
character position (start index) so the caller can display entities in
document order.
"""
best: dict[tuple[str, str], Entity] = {}
for entities in entity_lists:
for entity in entities:
key = (entity.text.strip().lower(), entity.label)
existing = best.get(key)
if existing is None or entity.confidence > existing.confidence:
best[key] = entity
# Sort by position in document; use confidence as tie-breaker
return sorted(best.values(), key=lambda e: (e.start, -e.confidence))
# ---------------------------------------------------------------------------
# DocumentProcessor
# ---------------------------------------------------------------------------
class DocumentProcessor:
"""
Runs OCR β†’ EvidenceNER β†’ DocumentViT and returns unified evidence.
Component initialisation is deferred to __init__ so the heavy model loads
(ViT ~330 MB, NER checkpoint) happen once and are reused across calls.
Use the module-level get_processor() accessor for a cached singleton.
"""
def __init__(self, vit_model_name: str = "google/vit-base-patch16-224") -> None:
"""Initialise OCR (stateless), EvidenceNER singleton, and DocumentViT."""
# EvidenceNER is accessed through the cached singleton in ner/predict.py
# (no explicit init needed here; extract_entities() initialises lazily).
# DocumentViT β€” load eagerly so the first process() call is not slow
self._vit = DocumentViT(model_name=vit_model_name)
def process(self, file_path: str) -> dict:
"""
Process *file_path* and return {"raw_text": str, "entities": list[Entity]}.
Supported formats: .pdf, .png, .jpg, .jpeg, .webp
Raises ValueError for unsupported extensions.
"""
path = Path(file_path)
ext = path.suffix.lower()
if ext not in SUPPORTED_IMAGE_EXTS | {".pdf"}:
raise ValueError(
f"Unsupported file extension {ext!r}. "
f"Supported: .pdf, {', '.join(sorted(SUPPORTED_IMAGE_EXTS))}"
)
# ------------------------------------------------------------------
# Step 1: OCR β†’ raw text
# ------------------------------------------------------------------
logger.info("DocumentProcessor: extracting text from %s", path.name)
raw_text = extract_text(file_path)
logger.debug("Extracted %d characters of text.", len(raw_text))
# Normalise currency OCR artefacts before NER so the model sees
# formats it was trained on (e.g. "Rs. 1,000" not "Rs.1000").
ner_text = _normalise_currency(raw_text)
# ------------------------------------------------------------------
# Step 2: EvidenceNER β†’ model-based entity spans
# ------------------------------------------------------------------
ner_entities: list[Entity] = []
if ner_text:
try:
ner_entities = extract_entities(ner_text)
logger.debug("EvidenceNER: %d entities.", len(ner_entities))
except Exception:
logger.warning("EvidenceNER failed β€” skipping.", exc_info=True)
# ------------------------------------------------------------------
# Step 3: DocumentViT β†’ image-based entity spans
# ------------------------------------------------------------------
vit_entities: list[Entity] = []
if ext in SUPPORTED_IMAGE_EXTS:
try:
img = Image.open(file_path)
vit_entities = self._vit.extract(img, ocr_text=raw_text)
logger.debug("DocumentViT: %d entities.", len(vit_entities))
except Exception:
logger.warning("DocumentViT failed β€” skipping.", exc_info=True)
elif ext == ".pdf":
# For PDFs there is no single source image; ViT is skipped.
# Per-page images could be supported in future by extracting them
# from pdfplumber and running DocumentViT on each.
logger.debug("DocumentViT: skipped for PDF (no single source image).")
# ------------------------------------------------------------------
# Step 4: Merge
# ------------------------------------------------------------------
merged = _merge_entities(ner_entities, vit_entities)
logger.info(
"DocumentProcessor: %d merged entities "
"(NER=%d, ViT=%d) from %s.",
len(merged), len(ner_entities), len(vit_entities), path.name,
)
return {"raw_text": raw_text, "entities": merged}
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_processor: "DocumentProcessor | None" = None
def get_processor(**kwargs) -> DocumentProcessor:
"""
Return the module-level DocumentProcessor singleton.
Keyword arguments are forwarded to DocumentProcessor.__init__ on the
first call and ignored on subsequent calls.
"""
global _processor
if _processor is None:
_processor = DocumentProcessor(**kwargs)
return _processor