ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""Document upload and processing pipeline."""
from io import BytesIO
import pandas as pd
from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.storage.parquet import delete_document_parquets, upload_parquet
from src.middlewares.logging import get_logger
from src.storage.az_blob.az_blob import blob_storage
from src.retrieval.router import retrieval_router
logger = get_logger("document_pipeline")
# NOTE: Keep in sync with _DOC_TYPES in src/api/v1/document.py
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"]
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10 MB
class DocumentPipeline:
"""Orchestrates the full document upload, process, and delete flows."""
async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
"""Validate → upload to blob → save to DB."""
content = await file.read()
if not file.filename:
raise HTTPException(status_code=400, detail="Filename is required.")
file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"
if len(content) > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail="File size exceeds maximum allowed size of 10 MB.",
)
if file_type not in SUPPORTED_FILE_TYPES:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}",
)
blob_name = await blob_storage.upload_file(content, file.filename, user_id)
document = await document_service.create_document(
db=db,
user_id=user_id,
filename=file.filename,
blob_name=blob_name,
file_size=len(content),
file_type=file_type,
)
logger.info(f"Uploaded document {document.id} for user {user_id}")
return {"id": document.id, "filename": document.filename, "status": document.status}
async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership → extract text → chunk → ingest to vector store."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
try:
await document_service.update_document_status(db, document_id, "processing")
if document.file_type not in ("csv", "xlsx"):
chunks_count = await knowledge_processor.process_document(document, db)
else:
await _upload_parquet(document)
chunks_count = 0
await document_service.update_document_status(db, document_id, "completed")
try:
await retrieval_router.invalidate_cache(user_id)
except Exception as e:
logger.warning("Failed to invalidate retrieval cache", user_id=user_id, error=str(e))
logger.info(f"Processed document {document_id}: {chunks_count} chunks")
return {"document_id": document_id, "chunks_processed": chunks_count, "file_type": document.file_type}
except Exception as e:
logger.error(f"Processing failed for document {document_id}", error=str(e))
await document_service.update_document_status(db, document_id, "failed", str(e))
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
"""Validate ownership → delete from blob and DB."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
if document.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
await document_service.delete_document(db, document_id)
if document.file_type in ("csv", "xlsx"):
await delete_document_parquets(user_id, document_id)
try:
await retrieval_router.invalidate_cache(user_id)
except Exception as e:
logger.warning("Failed to invalidate retrieval cache", user_id=user_id, error=str(e))
logger.info(f"Deleted document {document_id} for user {user_id}")
return {"document_id": document_id}
async def _upload_parquet(document) -> None:
"""Download original blob and upload Parquet(s) without vector embedding."""
content = await blob_storage.download_file(document.blob_name)
if document.file_type == "csv":
df = pd.read_csv(BytesIO(content))
await upload_parquet(df, document.user_id, document.id)
else: # xlsx
sheets = pd.read_excel(BytesIO(content), sheet_name=None)
for sheet_name, df in sheets.items():
await upload_parquet(df, document.user_id, document.id, sheet_name)
document_pipeline = DocumentPipeline()