"""Document service — async upload with background processing.""" from app.ports.storage import StoragePort from app.ports.document_processor import DocumentProcessorPort from app.ports.embedder import EmbedderPort from app.ports.vector_db import VectorDBPort, VectorChunk from app.services.chunking_service import ChunkingService from app.models import Document, User from app.database import SessionLocal from sqlalchemy.orm import Session from typing import BinaryIO, Optional, List import logging import uuid logger = logging.getLogger(__name__) class DocumentService: """Orchestrates document processing workflow.""" def __init__( self, storage: StoragePort, processor: DocumentProcessorPort, embedder: EmbedderPort, vector_db: VectorDBPort, chunking_service: ChunkingService, db: Session, ): self.storage = storage self.processor = processor self.embedder = embedder self.vector_db = vector_db self.chunking_service = chunking_service self.db = db # ── Public: called by the API endpoint ──────────────────────────────────── async def accept_upload( self, file_data: bytes, filename: str, file_size: int, user: User, folder_id: Optional[str] = None, ) -> Document: """ Persist the file immediately and return a Document with status='processing'. The heavy work (extract → embed → store) is scheduled as a background task. """ if not self.processor.supports_file(filename): raise ValueError(f"Unsupported file type: {filename}") storage_key = f"{user.org_id}/{uuid.uuid4()}_{filename}" content_type = self._get_content_type(filename) # Store raw bytes (non-blocking — our DB adapter just caches in memory) await self.storage.upload(storage_key, file_data, content_type) # Persist document record immediately — status = "processing" document = Document( name=filename, size=file_size, storage_path=storage_key, file_content=file_data, chunks=0, status="processing", user_id=user.id, org_id=user.org_id, folder_id=folder_id, ) self.db.add(document) self.db.commit() self.db.refresh(document) logger.info(f"Accepted upload: {filename} → doc {document.id} (processing in background)") return document async def process_document_background(self, document_id: str) -> None: """ Heavy processing: extract text → chunk → embed → store vectors. Runs as a FastAPI BackgroundTask so it never blocks the HTTP response. Uses its own DB session (the request session is already closed). """ db = SessionLocal() try: document = db.query(Document).filter(Document.id == document_id).first() if not document: logger.error(f"Background task: document {document_id} not found") return logger.info(f"Background processing: {document.name} ({document_id})") # 1. Extract text (runs in thread pool — non-blocking) from io import BytesIO text = await self.processor.extract_text(BytesIO(document.file_content), document.name) # 2. Chunk chunks = self.chunking_service.chunk_text(text) logger.info(f"Split into {len(chunks)} chunks") # 3. Embed (runs in thread pool — non-blocking) embeddings = await self.embedder.embed_batch(chunks) # 4. Store vectors from app.config import get_settings settings = get_settings() vector_chunks = [ VectorChunk( id=f"{document_id}_{i}", document_id=document_id, chunk_index=i, text=chunk, embedding=embedding, metadata={ "org_id": document.org_id, "document_name": document.name, }, ) for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)) ] await self.vector_db.store_chunks(vector_chunks, settings.QDRANT_COLLECTION) # 5. Mark done document.chunks = len(chunks) document.status = "done" db.commit() logger.info(f"Done processing {document.name}: {len(chunks)} chunks stored") except Exception as e: logger.error(f"Background processing failed for {document_id}: {e}", exc_info=True) try: document = db.query(Document).filter(Document.id == document_id).first() if document: document.status = "error" document.error_message = str(e) db.commit() except Exception: pass finally: db.close() # ── Other service methods ───────────────────────────────────────────────── async def list_documents( self, user: User, folder_id: Optional[str] = None, root_only: bool = False ) -> List[Document]: query = self.db.query(Document).filter(Document.org_id == user.org_id) if folder_id: # Specific folder query = query.filter(Document.folder_id == folder_id) elif root_only: # Only files with no folder (root level) query = query.filter(Document.folder_id == None) # else: no filter = all documents return query.order_by(Document.created_at.desc()).all() async def get_document_status(self, document_id: str, user: User) -> Document: doc = self.db.query(Document).filter( Document.id == document_id, Document.org_id == user.org_id, ).first() if not doc: raise ValueError("Document not found") return doc async def delete_document(self, document_id: str, user: User) -> None: document = self.db.query(Document).filter( Document.id == document_id, Document.org_id == user.org_id, ).first() if not document: raise ValueError("Document not found") await self.storage.delete(document.storage_path) from app.config import get_settings settings = get_settings() await self.vector_db.delete_document(document_id, settings.QDRANT_COLLECTION) self.db.delete(document) self.db.commit() logger.info(f"Deleted document {document_id}") async def get_download_url(self, document_id: str, user: User) -> str: document = self.db.query(Document).filter( Document.id == document_id, Document.org_id == user.org_id, ).first() if not document: raise ValueError("Document not found") return await self.storage.get_presigned_url(document.storage_path) def _get_content_type(self, filename: str) -> str: if filename.endswith(".pdf"): return "application/pdf" elif filename.endswith((".docx", ".doc")): return "application/vnd.openxmlformats-officedocument.wordprocessingml.document" return "application/octet-stream"