File size: 4,060 Bytes
2ba0613 52999bc 2ba0613 b1f1ccd 2ba0613 b1f1ccd 2ba0613 c93ec90 2ba0613 b1f1ccd 2ba0613 52999bc 2ba0613 52999bc 2ba0613 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | """Document upload and processing pipeline."""
from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.knowledge.parquet_service import delete_document_parquets
from src.middlewares.logging import get_logger
from src.storage.az_blob.az_blob import blob_storage
logger = get_logger("document_pipeline")
# NOTE: Keep in sync with _DOC_TYPES in src/api/v1/document.py
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"]
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10 MB
class DocumentPipeline:
"""Orchestrates the full document upload, process, and delete flows."""
async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
"""Validate β upload to blob β save to DB."""
content = await file.read()
if not file.filename:
raise HTTPException(status_code=400, detail="Filename is required.")
file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"
if len(content) > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail="File size exceeds maximum allowed size of 10 MB.",
)
if file_type not in SUPPORTED_FILE_TYPES:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}",
)
blob_name = await blob_storage.upload_file(content, file.filename, user_id)
document = await document_service.create_document(
db=db,
user_id=user_id,
filename=file.filename,
blob_name=blob_name,
file_size=len(content),
file_type=file_type,
)
logger.info(f"Uploaded document {document.id} for user {user_id}")
return {"id": document.id, "filename": document.filename, "status": document.status}
async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership β extract text β chunk β ingest to vector store."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
try:
await document_service.update_document_status(db, document_id, "processing")
chunks_count = await knowledge_processor.process_document(document, db)
await document_service.update_document_status(db, document_id, "completed")
logger.info(f"Processed document {document_id}: {chunks_count} chunks")
return {"document_id": document_id, "chunks_processed": chunks_count}
except Exception as e:
logger.error(f"Processing failed for document {document_id}", error=str(e))
await document_service.update_document_status(db, document_id, "failed", str(e))
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership β delete from blob and DB."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
await document_service.delete_document(db, document_id)
if document.file_type in ("csv", "xlsx"):
await delete_document_parquets(user_id, document_id)
logger.info(f"Deleted document {document_id} for user {user_id}")
return {"document_id": document_id}
document_pipeline = DocumentPipeline()
|