"""Document upload and processing pipeline.""" from fastapi import HTTPException, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from src.document.document_service import document_service from src.knowledge.processing_service import knowledge_processor from src.knowledge.parquet_service import delete_document_parquets from src.middlewares.logging import get_logger from src.storage.az_blob.az_blob import blob_storage logger = get_logger("document_pipeline") # NOTE: Keep in sync with _DOC_TYPES in src/api/v1/document.py SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"] MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10 MB class DocumentPipeline: """Orchestrates the full document upload, process, and delete flows.""" async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict: """Validate → upload to blob → save to DB.""" content = await file.read() if not file.filename: raise HTTPException(status_code=400, detail="Filename is required.") file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt" if len(content) > MAX_FILE_SIZE_BYTES: raise HTTPException( status_code=400, detail="File size exceeds maximum allowed size of 10 MB.", ) if file_type not in SUPPORTED_FILE_TYPES: raise HTTPException( status_code=400, detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}", ) blob_name = await blob_storage.upload_file(content, file.filename, user_id) document = await document_service.create_document( db=db, user_id=user_id, filename=file.filename, blob_name=blob_name, file_size=len(content), file_type=file_type, ) logger.info(f"Uploaded document {document.id} for user {user_id}") return {"id": document.id, "filename": document.filename, "status": document.status} async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict: """Validate ownership → extract text → chunk → ingest to vector store.""" document = await document_service.get_document(db, document_id) if not document: raise HTTPException(status_code=404, detail="Document not found") if document.user_id != user_id: raise HTTPException(status_code=403, detail="Access denied") try: await document_service.update_document_status(db, document_id, "processing") chunks_count = await knowledge_processor.process_document(document, db) await document_service.update_document_status(db, document_id, "completed") logger.info(f"Processed document {document_id}: {chunks_count} chunks") return {"document_id": document_id, "chunks_processed": chunks_count} except Exception as e: logger.error(f"Processing failed for document {document_id}", error=str(e)) await document_service.update_document_status(db, document_id, "failed", str(e)) raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict: """Validate ownership → delete from blob and DB.""" document = await document_service.get_document(db, document_id) if not document: raise HTTPException(status_code=404, detail="Document not found") if document.user_id != user_id: raise HTTPException(status_code=403, detail="Access denied") await document_service.delete_document(db, document_id) if document.file_type in ("csv", "xlsx"): await delete_document_parquets(user_id, document_id) logger.info(f"Deleted document {document_id} for user {user_id}") return {"document_id": document_id} document_pipeline = DocumentPipeline()