Spaces:
Sleeping
Sleeping
| """Document service β async upload with background processing.""" | |
| from app.ports.storage import StoragePort | |
| from app.ports.document_processor import DocumentProcessorPort | |
| from app.ports.embedder import EmbedderPort | |
| from app.ports.vector_db import VectorDBPort, VectorChunk | |
| from app.services.chunking_service import ChunkingService | |
| from app.models import Document, User | |
| from app.database import SessionLocal | |
| from sqlalchemy.orm import Session | |
| from typing import BinaryIO, Optional, List | |
| import logging | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class DocumentService: | |
| """Orchestrates document processing workflow.""" | |
| def __init__( | |
| self, | |
| storage: StoragePort, | |
| processor: DocumentProcessorPort, | |
| embedder: EmbedderPort, | |
| vector_db: VectorDBPort, | |
| chunking_service: ChunkingService, | |
| db: Session, | |
| ): | |
| self.storage = storage | |
| self.processor = processor | |
| self.embedder = embedder | |
| self.vector_db = vector_db | |
| self.chunking_service = chunking_service | |
| self.db = db | |
| # ββ Public: called by the API endpoint ββββββββββββββββββββββββββββββββββββ | |
| async def accept_upload( | |
| self, | |
| file_data: bytes, | |
| filename: str, | |
| file_size: int, | |
| user: User, | |
| folder_id: Optional[str] = None, | |
| ) -> Document: | |
| """ | |
| Persist the file immediately and return a Document with status='processing'. | |
| The heavy work (extract β embed β store) is scheduled as a background task. | |
| """ | |
| if not self.processor.supports_file(filename): | |
| raise ValueError(f"Unsupported file type: {filename}") | |
| storage_key = f"{user.org_id}/{uuid.uuid4()}_{filename}" | |
| content_type = self._get_content_type(filename) | |
| # Store raw bytes (non-blocking β our DB adapter just caches in memory) | |
| await self.storage.upload(storage_key, file_data, content_type) | |
| # Persist document record immediately β status = "processing" | |
| document = Document( | |
| name=filename, | |
| size=file_size, | |
| storage_path=storage_key, | |
| file_content=file_data, | |
| chunks=0, | |
| status="processing", | |
| user_id=user.id, | |
| org_id=user.org_id, | |
| folder_id=folder_id, | |
| ) | |
| self.db.add(document) | |
| self.db.commit() | |
| self.db.refresh(document) | |
| logger.info(f"Accepted upload: {filename} β doc {document.id} (processing in background)") | |
| return document | |
| async def process_document_background(self, document_id: str) -> None: | |
| """ | |
| Heavy processing: extract text β chunk β embed β store vectors. | |
| Runs as a FastAPI BackgroundTask so it never blocks the HTTP response. | |
| Uses its own DB session (the request session is already closed). | |
| """ | |
| db = SessionLocal() | |
| try: | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if not document: | |
| logger.error(f"Background task: document {document_id} not found") | |
| return | |
| logger.info(f"Background processing: {document.name} ({document_id})") | |
| # 1. Extract text (runs in thread pool β non-blocking) | |
| from io import BytesIO | |
| text = await self.processor.extract_text(BytesIO(document.file_content), document.name) | |
| # 2. Chunk | |
| chunks = self.chunking_service.chunk_text(text) | |
| logger.info(f"Split into {len(chunks)} chunks") | |
| # 3. Embed (runs in thread pool β non-blocking) | |
| embeddings = await self.embedder.embed_batch(chunks) | |
| # 4. Store vectors | |
| from app.config import get_settings | |
| settings = get_settings() | |
| vector_chunks = [ | |
| VectorChunk( | |
| id=f"{document_id}_{i}", | |
| document_id=document_id, | |
| chunk_index=i, | |
| text=chunk, | |
| embedding=embedding, | |
| metadata={ | |
| "org_id": document.org_id, | |
| "document_name": document.name, | |
| }, | |
| ) | |
| for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)) | |
| ] | |
| await self.vector_db.store_chunks(vector_chunks, settings.QDRANT_COLLECTION) | |
| # 5. Mark done | |
| document.chunks = len(chunks) | |
| document.status = "done" | |
| db.commit() | |
| logger.info(f"Done processing {document.name}: {len(chunks)} chunks stored") | |
| except Exception as e: | |
| logger.error(f"Background processing failed for {document_id}: {e}", exc_info=True) | |
| try: | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if document: | |
| document.status = "error" | |
| document.error_message = str(e) | |
| db.commit() | |
| except Exception: | |
| pass | |
| finally: | |
| db.close() | |
| # ββ Other service methods βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_documents( | |
| self, | |
| user: User, | |
| folder_id: Optional[str] = None, | |
| root_only: bool = False | |
| ) -> List[Document]: | |
| query = self.db.query(Document).filter(Document.org_id == user.org_id) | |
| if folder_id: | |
| # Specific folder | |
| query = query.filter(Document.folder_id == folder_id) | |
| elif root_only: | |
| # Only files with no folder (root level) | |
| query = query.filter(Document.folder_id == None) | |
| # else: no filter = all documents | |
| return query.order_by(Document.created_at.desc()).all() | |
| async def get_document_status(self, document_id: str, user: User) -> Document: | |
| doc = self.db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.org_id == user.org_id, | |
| ).first() | |
| if not doc: | |
| raise ValueError("Document not found") | |
| return doc | |
| async def delete_document(self, document_id: str, user: User) -> None: | |
| document = self.db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.org_id == user.org_id, | |
| ).first() | |
| if not document: | |
| raise ValueError("Document not found") | |
| await self.storage.delete(document.storage_path) | |
| from app.config import get_settings | |
| settings = get_settings() | |
| await self.vector_db.delete_document(document_id, settings.QDRANT_COLLECTION) | |
| self.db.delete(document) | |
| self.db.commit() | |
| logger.info(f"Deleted document {document_id}") | |
| async def get_download_url(self, document_id: str, user: User) -> str: | |
| document = self.db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.org_id == user.org_id, | |
| ).first() | |
| if not document: | |
| raise ValueError("Document not found") | |
| return await self.storage.get_presigned_url(document.storage_path) | |
| def _get_content_type(self, filename: str) -> str: | |
| if filename.endswith(".pdf"): | |
| return "application/pdf" | |
| elif filename.endswith((".docx", ".doc")): | |
| return "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| return "application/octet-stream" | |