Spaces:
Running
Running
| import asyncio | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import List | |
| import aiofiles | |
| import fitz | |
| from fastapi import UploadFile | |
| from loguru import logger | |
| from src.utils import TextExtractor, model_manager | |
| class PDFProcessorService: | |
| """Async PDF processor for handling both digital and scanned PDFs.""" | |
| def __init__(self): | |
| # Use the centralized model manager | |
| self._ensure_models_loaded() | |
| def _ensure_models_loaded(self): | |
| """Ensure models are loaded via the model manager.""" | |
| if not model_manager.models_loaded: | |
| logger.info("🔄 Models not loaded, initializing model manager...") | |
| # This will trigger model loading if not already done | |
| _ = model_manager.doctr_model | |
| def doctr_model(self): | |
| """Get the loaded doctr model from model manager.""" | |
| return model_manager.doctr_model | |
| def device(self): | |
| """Get the device being used from model manager.""" | |
| return model_manager.device | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| async def is_pdf_scanned(self, pdf_path: str) -> bool: | |
| """Check if PDF is scanned (no extractable text).""" | |
| def _check_scanned(): | |
| doc = fitz.open(pdf_path) | |
| for page in doc: | |
| text = page.get_text() | |
| if text.strip(): | |
| return False | |
| return True | |
| return await asyncio.get_event_loop().run_in_executor(None, _check_scanned) | |
| async def save_uploaded_file(self, uploaded_file: UploadFile) -> str: | |
| file_name = uploaded_file.filename | |
| suffix = Path(file_name).suffix | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| temp_path = tmp.name | |
| async with aiofiles.open(temp_path, "wb") as f: | |
| await f.write(await uploaded_file.read()) | |
| return temp_path | |
| async def extract_text_from_digital_pdf(self, pdf_path: str) -> List[List[str]]: | |
| """Extract text from digital PDF using PyPDF2.""" | |
| async def _extract_text(): | |
| doc = fitz.open(pdf_path) | |
| extracted_data = [] | |
| for page in doc: | |
| ptext = page.get_text() | |
| if ptext: | |
| data = [] | |
| for line in ptext.splitlines(): | |
| cleaned_line = await self._split_on_repeated_pattern( | |
| line.strip() | |
| ) | |
| if cleaned_line: | |
| data.append(cleaned_line[0]) | |
| extracted_data.append(data) | |
| return extracted_data | |
| return await asyncio.get_event_loop().run_in_executor(None, _extract_text) | |
| async def _split_on_repeated_pattern( | |
| self, line: str, min_space: int = 10 | |
| ) -> List[str]: | |
| """Split line on repeated pattern.""" | |
| import re | |
| from difflib import SequenceMatcher | |
| original_line = line.strip() | |
| # Find all spans of spaces >= min_space | |
| space_spans = [ | |
| (m.start(), len(m.group())) | |
| for m in re.finditer(r" {%d,}" % min_space, original_line) | |
| ] | |
| if not space_spans: | |
| return [original_line] | |
| # Count how often each gap size occurs | |
| gaps = [span[1] for span in space_spans] | |
| gap_counts = {} | |
| for g in gaps: | |
| gap_counts[g] = gap_counts.get(g, 0) + 1 | |
| # Sort gaps by size × count (more dominant gaps first) | |
| sorted_gaps = sorted( | |
| gap_counts.items(), key=lambda x: x[1] * x[0], reverse=True | |
| ) | |
| # No significant gaps, return original | |
| if not sorted_gaps: | |
| return [original_line] | |
| dominant_gap = sorted_gaps[0][0] | |
| # Use the dominant large gap to split | |
| chunks = re.split(rf" {{%d,}}" % dominant_gap, original_line) | |
| # Check if it's actually repeated using fuzzy match | |
| base = chunks[0].strip() | |
| repeated = False | |
| for chunk in chunks[1:]: | |
| chunk = chunk.strip() | |
| if chunk and SequenceMatcher(None, base, chunk).ratio() > 0.8: | |
| repeated = True | |
| break | |
| return [base] if repeated else [original_line] | |
| async def process_pdf(self, file): | |
| pdf_path = await self.save_uploaded_file(file) | |
| is_scanned = await self.is_pdf_scanned(pdf_path) | |
| text_extractor = TextExtractor(self.doctr_model) | |
| if is_scanned: | |
| logger.info(f"{pdf_path} is likely a scanned PDF.") | |
| extracted_text_list = ( | |
| await text_extractor.extract_lines_with_bbox_from_scanned_pdf(pdf_path) | |
| ) | |
| else: | |
| logger.info(f"{pdf_path} is not a scanned PDF. Extracting text...") | |
| extracted_text_list = await text_extractor.extract_lines_with_bbox(pdf_path) | |
| pdf_text = "" | |
| for block in extracted_text_list: | |
| for line in block: | |
| pdf_text += " " + line["line"] | |
| text_noisy = text_extractor.is_text_noisy(pdf_text) | |
| if text_noisy: | |
| logger.info("Text is noisy. Extracting text again...") | |
| extracted_text_list = ( | |
| await text_extractor.extract_lines_with_bbox_from_scanned_pdf( | |
| pdf_path | |
| ) | |
| ) | |
| return extracted_text_list | |
| async def extract_entity(self, text: str): | |
| text = re.sub(r"[^\w\s]", " ", text) | |
| doc = model_manager.spacy_model(text) | |
| entities = {ent.text: ent.label_ for ent in doc.ents} | |
| for key, value in entities.items(): | |
| if value == "ORG": | |
| return key | |
| if entities: | |
| return list(entities.keys())[0] | |
| return text | |