ishaq101's picture
[KM-438][KM-439] Improve Retrieval and Querying feature (#15)
c93ec90
"""Document upload and processing pipeline."""
from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.knowledge.parquet_service import delete_document_parquets
from src.middlewares.logging import get_logger
from src.storage.az_blob.az_blob import blob_storage
logger = get_logger("document_pipeline")
# NOTE: Keep in sync with _DOC_TYPES in src/api/v1/document.py
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"]
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10 MB
class DocumentPipeline:
"""Orchestrates the full document upload, process, and delete flows."""
async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
"""Validate β†’ upload to blob β†’ save to DB."""
content = await file.read()
if not file.filename:
raise HTTPException(status_code=400, detail="Filename is required.")
file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"
if len(content) > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail="File size exceeds maximum allowed size of 10 MB.",
)
if file_type not in SUPPORTED_FILE_TYPES:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}",
)
blob_name = await blob_storage.upload_file(content, file.filename, user_id)
document = await document_service.create_document(
db=db,
user_id=user_id,
filename=file.filename,
blob_name=blob_name,
file_size=len(content),
file_type=file_type,
)
logger.info(f"Uploaded document {document.id} for user {user_id}")
return {"id": document.id, "filename": document.filename, "status": document.status}
async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership β†’ extract text β†’ chunk β†’ ingest to vector store."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
try:
await document_service.update_document_status(db, document_id, "processing")
chunks_count = await knowledge_processor.process_document(document, db)
await document_service.update_document_status(db, document_id, "completed")
logger.info(f"Processed document {document_id}: {chunks_count} chunks")
return {"document_id": document_id, "chunks_processed": chunks_count}
except Exception as e:
logger.error(f"Processing failed for document {document_id}", error=str(e))
await document_service.update_document_status(db, document_id, "failed", str(e))
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership β†’ delete from blob and DB."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
await document_service.delete_document(db, document_id)
if document.file_type in ("csv", "xlsx"):
await delete_document_parquets(user_id, document_id)
logger.info(f"Deleted document {document_id} for user {user_id}")
return {"document_id": document_id}
document_pipeline = DocumentPipeline()