Spaces:
Sleeping
Sleeping
File size: 2,290 Bytes
8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 f02c5b9 8b37702 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | """Document processor adapter — text extraction runs in thread pool."""
from app.ports.document_processor import DocumentProcessorPort
from typing import BinaryIO
from pathlib import Path
import asyncio
import logging
logger = logging.getLogger(__name__)
# Shared executor for I/O-bound extraction work
_executor = None
def _get_executor():
global _executor
if _executor is None:
from concurrent.futures import ThreadPoolExecutor
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="doc_processor")
return _executor
def _extract_pdf_sync(data: bytes) -> str:
"""Synchronous PDF extraction — runs in thread pool."""
from io import BytesIO
from PyPDF2 import PdfReader
reader = PdfReader(BytesIO(data))
text = ""
for page in reader.pages:
text += (page.extract_text() or "") + "\n"
if not text.strip():
raise ValueError("No text could be extracted from PDF")
logger.info(f"Extracted {len(text)} characters from PDF")
return text.strip()
def _extract_docx_sync(data: bytes) -> str:
"""Synchronous DOCX extraction — runs in thread pool."""
from io import BytesIO
from docx import Document
doc = Document(BytesIO(data))
text = "\n".join(p.text for p in doc.paragraphs)
if not text.strip():
raise ValueError("No text could be extracted from DOCX")
logger.info(f"Extracted {len(text)} characters from DOCX")
return text.strip()
class DocumentProcessorAdapter(DocumentProcessorPort):
"""Non-blocking text extraction via thread pool."""
async def extract_text(self, file: BinaryIO, filename: str) -> str:
"""Extract text without blocking the event loop."""
ext = Path(filename).suffix.lower()
data = file.read() # read bytes once
loop = asyncio.get_event_loop()
if ext == ".pdf":
return await loop.run_in_executor(_get_executor(), _extract_pdf_sync, data)
elif ext in (".docx", ".doc"):
return await loop.run_in_executor(_get_executor(), _extract_docx_sync, data)
else:
raise ValueError(f"Unsupported file type: {ext}")
def supports_file(self, filename: str) -> bool:
return Path(filename).suffix.lower() in (".pdf", ".docx", ".doc")
|