Spaces:
Sleeping
Sleeping
| """Document processor adapter β text extraction runs in thread pool.""" | |
| from app.ports.document_processor import DocumentProcessorPort | |
| from typing import BinaryIO | |
| from pathlib import Path | |
| import asyncio | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Shared executor for I/O-bound extraction work | |
| _executor = None | |
| def _get_executor(): | |
| global _executor | |
| if _executor is None: | |
| from concurrent.futures import ThreadPoolExecutor | |
| _executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="doc_processor") | |
| return _executor | |
| def _extract_pdf_sync(data: bytes) -> str: | |
| """Synchronous PDF extraction β runs in thread pool.""" | |
| from io import BytesIO | |
| from PyPDF2 import PdfReader | |
| reader = PdfReader(BytesIO(data)) | |
| text = "" | |
| for page in reader.pages: | |
| text += (page.extract_text() or "") + "\n" | |
| if not text.strip(): | |
| raise ValueError("No text could be extracted from PDF") | |
| logger.info(f"Extracted {len(text)} characters from PDF") | |
| return text.strip() | |
| def _extract_docx_sync(data: bytes) -> str: | |
| """Synchronous DOCX extraction β runs in thread pool.""" | |
| from io import BytesIO | |
| from docx import Document | |
| doc = Document(BytesIO(data)) | |
| text = "\n".join(p.text for p in doc.paragraphs) | |
| if not text.strip(): | |
| raise ValueError("No text could be extracted from DOCX") | |
| logger.info(f"Extracted {len(text)} characters from DOCX") | |
| return text.strip() | |
| class DocumentProcessorAdapter(DocumentProcessorPort): | |
| """Non-blocking text extraction via thread pool.""" | |
| async def extract_text(self, file: BinaryIO, filename: str) -> str: | |
| """Extract text without blocking the event loop.""" | |
| ext = Path(filename).suffix.lower() | |
| data = file.read() # read bytes once | |
| loop = asyncio.get_event_loop() | |
| if ext == ".pdf": | |
| return await loop.run_in_executor(_get_executor(), _extract_pdf_sync, data) | |
| elif ext in (".docx", ".doc"): | |
| return await loop.run_in_executor(_get_executor(), _extract_docx_sync, data) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| def supports_file(self, filename: str) -> bool: | |
| return Path(filename).suffix.lower() in (".pdf", ".docx", ".doc") | |