File size: 4,060 Bytes
2ba0613
 
 
 
 
 
 
52999bc
2ba0613
 
 
 
 
b1f1ccd
2ba0613
b1f1ccd
2ba0613
 
 
 
 
 
 
 
c93ec90
 
2ba0613
 
b1f1ccd
 
 
 
 
 
2ba0613
 
 
52999bc
2ba0613
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52999bc
 
 
2ba0613
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Document upload and processing pipeline."""

from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession

from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.knowledge.parquet_service import delete_document_parquets
from src.middlewares.logging import get_logger
from src.storage.az_blob.az_blob import blob_storage

logger = get_logger("document_pipeline")

# NOTE: Keep in sync with _DOC_TYPES in src/api/v1/document.py
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"]
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024  # 10 MB


class DocumentPipeline:
    """Orchestrates the full document upload, process, and delete flows."""

    async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
        """Validate β†’ upload to blob β†’ save to DB."""
        content = await file.read()
        if not file.filename:
            raise HTTPException(status_code=400, detail="Filename is required.")
        file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"

        if len(content) > MAX_FILE_SIZE_BYTES:
            raise HTTPException(
                status_code=400,
                detail="File size exceeds maximum allowed size of 10 MB.",
            )

        if file_type not in SUPPORTED_FILE_TYPES:
            raise HTTPException(
                status_code=400,
                detail=f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_TYPES)}",
            )

        blob_name = await blob_storage.upload_file(content, file.filename, user_id)
        document = await document_service.create_document(
            db=db,
            user_id=user_id,
            filename=file.filename,
            blob_name=blob_name,
            file_size=len(content),
            file_type=file_type,
        )

        logger.info(f"Uploaded document {document.id} for user {user_id}")
        return {"id": document.id, "filename": document.filename, "status": document.status}

    async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
        """Validate ownership β†’ extract text β†’ chunk β†’ ingest to vector store."""
        document = await document_service.get_document(db, document_id)

        if not document:
            raise HTTPException(status_code=404, detail="Document not found")
        if document.user_id != user_id:
            raise HTTPException(status_code=403, detail="Access denied")

        try:
            await document_service.update_document_status(db, document_id, "processing")
            chunks_count = await knowledge_processor.process_document(document, db)
            await document_service.update_document_status(db, document_id, "completed")

            logger.info(f"Processed document {document_id}: {chunks_count} chunks")
            return {"document_id": document_id, "chunks_processed": chunks_count}

        except Exception as e:
            logger.error(f"Processing failed for document {document_id}", error=str(e))
            await document_service.update_document_status(db, document_id, "failed", str(e))
            raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")

    async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
        """Validate ownership β†’ delete from blob and DB."""
        document = await document_service.get_document(db, document_id)

        if not document:
            raise HTTPException(status_code=404, detail="Document not found")
        if document.user_id != user_id:
            raise HTTPException(status_code=403, detail="Access denied")

        await document_service.delete_document(db, document_id)

        if document.file_type in ("csv", "xlsx"):
            await delete_document_parquets(user_id, document_id)

        logger.info(f"Deleted document {document_id} for user {user_id}")
        return {"document_id": document_id}


document_pipeline = DocumentPipeline()