Bank-Scrubber / src /ocr /pdf_processor.py
Aryan Jain
bank scrubber streamlit application
4e71548
import asyncio
import fitz
import os
from typing import List, Dict, Any, Optional
import numpy as np
from pdf2image import convert_from_path
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
import torch
from src.config.config import settings
from src.models.account_models import LineData, WordData
from src.utils import model_manager
class PDFProcessor:
"""Async PDF processor for handling both digital and scanned PDFs."""
def __init__(self):
# Use the centralized model manager
self._ensure_models_loaded()
def _ensure_models_loaded(self):
"""Ensure models are loaded via the model manager."""
if not model_manager.models_loaded:
print("๐Ÿ”„ Models not loaded, initializing model manager...")
# This will trigger model loading if not already done
_ = model_manager.doctr_model
@property
def doctr_model(self):
"""Get the loaded doctr model from model manager."""
return model_manager.doctr_model
@property
def device(self):
"""Get the device being used from model manager."""
return model_manager.device
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def is_pdf_scanned(self, pdf_path: str) -> bool:
"""Check if PDF is scanned (no extractable text)."""
def _check_scanned():
doc = fitz.open(pdf_path)
for page in doc:
text = page.get_text()
if text.strip():
return False
return True
return await asyncio.get_event_loop().run_in_executor(None, _check_scanned)
async def save_uploaded_file(self, uploaded_file) -> str:
"""Save uploaded file to temporary location."""
def _save_file():
with open(settings.temp_file_name, "wb") as f:
f.write(uploaded_file.read())
return settings.temp_file_name
return await asyncio.get_event_loop().run_in_executor(None, _save_file)
async def extract_text_from_digital_pdf(self, pdf_path: str) -> List[List[str]]:
"""Extract text from digital PDF using PyPDF2."""
from PyPDF2 import PdfReader
def _extract_text():
reader = PdfReader(pdf_path)
extracted_data = []
for page in reader.pages:
ptext = page.extract_text()
if ptext:
data = []
for line in ptext.splitlines():
cleaned_line = self._split_on_repeated_pattern(line.strip())
if cleaned_line:
data.append(cleaned_line[0])
extracted_data.append(data)
return extracted_data
return await asyncio.get_event_loop().run_in_executor(None, _extract_text)
def _split_on_repeated_pattern(self, line: str, min_space: int = 10) -> List[str]:
"""Split line on repeated pattern."""
import re
from difflib import SequenceMatcher
original_line = line.strip()
# Find all spans of spaces >= min_space
space_spans = [
(m.start(), len(m.group()))
for m in re.finditer(r" {%d,}" % min_space, original_line)
]
if not space_spans:
return [original_line]
# Count how often each gap size occurs
gaps = [span[1] for span in space_spans]
gap_counts = {}
for g in gaps:
gap_counts[g] = gap_counts.get(g, 0) + 1
# Sort gaps by size ร— count (more dominant gaps first)
sorted_gaps = sorted(gap_counts.items(), key=lambda x: x[1] * x[0], reverse=True)
# No significant gaps, return original
if not sorted_gaps:
return [original_line]
dominant_gap = sorted_gaps[0][0]
# Use the dominant large gap to split
chunks = re.split(rf" {{%d,}}" % dominant_gap, original_line)
# Check if it's actually repeated using fuzzy match
base = chunks[0].strip()
repeated = False
for chunk in chunks[1:]:
chunk = chunk.strip()
if chunk and SequenceMatcher(None, base, chunk).ratio() > 0.8:
repeated = True
break
return [base] if repeated else [original_line]