import asyncio import fitz import os from typing import List, Dict, Any, Optional import numpy as np from pdf2image import convert_from_path from doctr.models import ocr_predictor from doctr.io import DocumentFile import torch from src.config.config import settings from src.models.account_models import LineData, WordData from src.utils import model_manager class PDFProcessor: """Async PDF processor for handling both digital and scanned PDFs.""" def __init__(self): # Use the centralized model manager self._ensure_models_loaded() def _ensure_models_loaded(self): """Ensure models are loaded via the model manager.""" if not model_manager.models_loaded: print("🔄 Models not loaded, initializing model manager...") # This will trigger model loading if not already done _ = model_manager.doctr_model @property def doctr_model(self): """Get the loaded doctr model from model manager.""" return model_manager.doctr_model @property def device(self): """Get the device being used from model manager.""" return model_manager.device async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass async def is_pdf_scanned(self, pdf_path: str) -> bool: """Check if PDF is scanned (no extractable text).""" def _check_scanned(): doc = fitz.open(pdf_path) for page in doc: text = page.get_text() if text.strip(): return False return True return await asyncio.get_event_loop().run_in_executor(None, _check_scanned) async def save_uploaded_file(self, uploaded_file) -> str: """Save uploaded file to temporary location.""" def _save_file(): with open(settings.temp_file_name, "wb") as f: f.write(uploaded_file.read()) return settings.temp_file_name return await asyncio.get_event_loop().run_in_executor(None, _save_file) async def extract_text_from_digital_pdf(self, pdf_path: str) -> List[List[str]]: """Extract text from digital PDF using PyPDF2.""" from PyPDF2 import PdfReader def _extract_text(): reader = PdfReader(pdf_path) extracted_data = [] for page in reader.pages: ptext = page.extract_text() if ptext: data = [] for line in ptext.splitlines(): cleaned_line = self._split_on_repeated_pattern(line.strip()) if cleaned_line: data.append(cleaned_line[0]) extracted_data.append(data) return extracted_data return await asyncio.get_event_loop().run_in_executor(None, _extract_text) def _split_on_repeated_pattern(self, line: str, min_space: int = 10) -> List[str]: """Split line on repeated pattern.""" import re from difflib import SequenceMatcher original_line = line.strip() # Find all spans of spaces >= min_space space_spans = [ (m.start(), len(m.group())) for m in re.finditer(r" {%d,}" % min_space, original_line) ] if not space_spans: return [original_line] # Count how often each gap size occurs gaps = [span[1] for span in space_spans] gap_counts = {} for g in gaps: gap_counts[g] = gap_counts.get(g, 0) + 1 # Sort gaps by size × count (more dominant gaps first) sorted_gaps = sorted(gap_counts.items(), key=lambda x: x[1] * x[0], reverse=True) # No significant gaps, return original if not sorted_gaps: return [original_line] dominant_gap = sorted_gaps[0][0] # Use the dominant large gap to split chunks = re.split(rf" {{%d,}}" % dominant_gap, original_line) # Check if it's actually repeated using fuzzy match base = chunks[0].strip() repeated = False for chunk in chunks[1:]: chunk = chunk.strip() if chunk and SequenceMatcher(None, base, chunk).ratio() > 0.8: repeated = True break return [base] if repeated else [original_line]