File size: 3,931 Bytes
027123c
 
 
767625e
027123c
767625e
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
767625e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Service for managing documents."""

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, text
from src.db.postgres.models import Document
from src.db.postgres.connection import _pgvector_engine
from src.storage.az_blob.az_blob import blob_storage
from src.middlewares.logging import get_logger
from typing import List, Optional
from datetime import datetime

logger = get_logger("document_service")


class DocumentService:
    """Service for managing documents."""

    async def create_document(
        self,
        db: AsyncSession,
        user_id: str,
        filename: str,
        blob_name: str,
        file_size: int,
        file_type: str
    ) -> Document:
        """Create a new document record."""
        import uuid
        document = Document(
            id=str(uuid.uuid4()),
            user_id=user_id,
            filename=filename,
            blob_name=blob_name,
            file_size=file_size,
            file_type=file_type,
            status="uploaded"
        )
        db.add(document)
        await db.commit()
        await db.refresh(document)
        logger.info(f"Created document {document.id} for user {user_id}")
        return document

    async def get_user_documents(
        self,
        db: AsyncSession,
        user_id: str
    ) -> List[Document]:
        """Get all documents for a user."""
        result = await db.execute(
            select(Document)
            .where(Document.user_id == user_id)
            .order_by(Document.created_at.desc())
        )
        return result.scalars().all()

    async def get_document(
        self,
        db: AsyncSession,
        document_id: str
    ) -> Optional[Document]:
        """Get a specific document."""
        result = await db.execute(
            select(Document).where(Document.id == document_id)
        )
        return result.scalars().first()

    async def delete_document(
        self,
        db: AsyncSession,
        document_id: str
    ) -> bool:
        """Delete a document (from DB and Blob storage)."""
        document = await self.get_document(db, document_id)
        if not document:
            return False

        # Delete from blob storage
        await blob_storage.delete_file(document.blob_name)

        # Delete vector embeddings from pgvector (scoped to user + collection to avoid cross-user over-delete)
        async with _pgvector_engine.begin() as conn:
            await conn.execute(
                text("""
                    DELETE FROM langchain_pg_embedding
                    WHERE cmetadata->>'user_id' = :user_id
                      AND cmetadata->>'source_type' = 'document'
                      AND cmetadata->'data'->>'document_id' = :doc_id
                      AND collection_id = (
                        SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'
                      )
                """),
                {"user_id": document.user_id, "doc_id": document_id},
            )

        # Delete from database
        await db.execute(
            delete(Document).where(Document.id == document_id)
        )
        await db.commit()

        logger.info(f"Deleted document {document_id}")
        return True

    async def update_document_status(
        self,
        db: AsyncSession,
        document_id: str,
        status: str,
        error_message: Optional[str] = None
    ) -> Document:
        """Update document processing status."""
        document = await self.get_document(db, document_id)
        if document:
            document.status = status
            document.processed_at = datetime.utcnow()
            document.error_message = error_message
            await db.commit()
            await db.refresh(document)
            logger.info(f"Updated document {document_id} status to {status}")
        return document


document_service = DocumentService()