| """Document upload and processing pipeline.""" |
|
|
| from fastapi import HTTPException, UploadFile |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from src.document.document_service import document_service |
| from src.knowledge.processing_service import knowledge_processor |
| from src.knowledge.parquet_service import delete_document_parquets |
| from src.middlewares.logging import get_logger |
| from src.storage.az_blob.az_blob import blob_storage |
|
|
| logger = get_logger("document_pipeline") |
|
|
| |
| SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"] |
| MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 |
|
|
|
|
| class DocumentPipeline: |
| """Orchestrates the full document upload, process, and delete flows.""" |
|
|
| async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict: |
| """Validate β upload to blob β save to DB.""" |
| content = await file.read() |
| if not file.filename: |
| raise HTTPException(status_code=400, detail="Filename is required.") |
| file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt" |
|
|
| if len(content) > MAX_FILE_SIZE_BYTES: |
| raise HTTPException( |
| status_code=400, |
| detail="File size exceeds maximum allowed size of 10 MB.", |
| ) |
|
|
| if file_type not in SUPPORTED_FILE_TYPES: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}", |
| ) |
|
|
| blob_name = await blob_storage.upload_file(content, file.filename, user_id) |
| document = await document_service.create_document( |
| db=db, |
| user_id=user_id, |
| filename=file.filename, |
| blob_name=blob_name, |
| file_size=len(content), |
| file_type=file_type, |
| ) |
|
|
| logger.info(f"Uploaded document {document.id} for user {user_id}") |
| return {"id": document.id, "filename": document.filename, "status": document.status} |
|
|
| async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict: |
| """Validate ownership β extract text β chunk β ingest to vector store.""" |
| document = await document_service.get_document(db, document_id) |
|
|
| if not document: |
| raise HTTPException(status_code=404, detail="Document not found") |
| if document.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| try: |
| await document_service.update_document_status(db, document_id, "processing") |
| chunks_count = await knowledge_processor.process_document(document, db) |
| await document_service.update_document_status(db, document_id, "completed") |
|
|
| logger.info(f"Processed document {document_id}: {chunks_count} chunks") |
| return {"document_id": document_id, "chunks_processed": chunks_count} |
|
|
| except Exception as e: |
| logger.error(f"Processing failed for document {document_id}", error=str(e)) |
| await document_service.update_document_status(db, document_id, "failed", str(e)) |
| raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |
|
|
| async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict: |
| """Validate ownership β delete from blob and DB.""" |
| document = await document_service.get_document(db, document_id) |
|
|
| if not document: |
| raise HTTPException(status_code=404, detail="Document not found") |
| if document.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| await document_service.delete_document(db, document_id) |
|
|
| if document.file_type in ("csv", "xlsx"): |
| await delete_document_parquets(user_id, document_id) |
|
|
| logger.info(f"Deleted document {document_id} for user {user_id}") |
| return {"document_id": document_id} |
|
|
|
|
| document_pipeline = DocumentPipeline() |
|
|