[NO TICKET][document]: delete vector embedding on table langchain_pg_embedding if user delete document on knowledge
ac3d8c1 | """Service for managing documents.""" | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, delete, text | |
| from src.db.postgres.models import Document | |
| from src.storage.az_blob.az_blob import blob_storage | |
| from src.middlewares.logging import get_logger | |
| from typing import List, Optional | |
| from datetime import datetime | |
| logger = get_logger("document_service") | |
| class DocumentService: | |
| """Service for managing documents.""" | |
| async def create_document( | |
| self, | |
| db: AsyncSession, | |
| user_id: str, | |
| filename: str, | |
| blob_name: str, | |
| file_size: int, | |
| file_type: str | |
| ) -> Document: | |
| """Create a new document record.""" | |
| import uuid | |
| document = Document( | |
| id=str(uuid.uuid4()), | |
| user_id=user_id, | |
| filename=filename, | |
| blob_name=blob_name, | |
| file_size=file_size, | |
| file_type=file_type, | |
| status="uploaded" | |
| ) | |
| db.add(document) | |
| await db.commit() | |
| await db.refresh(document) | |
| logger.info(f"Created document {document.id} for user {user_id}") | |
| return document | |
| async def get_user_documents( | |
| self, | |
| db: AsyncSession, | |
| user_id: str | |
| ) -> List[Document]: | |
| """Get all documents for a user.""" | |
| result = await db.execute( | |
| select(Document) | |
| .where(Document.user_id == user_id) | |
| .order_by(Document.created_at.desc()) | |
| ) | |
| return result.scalars().all() | |
| async def get_document( | |
| self, | |
| db: AsyncSession, | |
| document_id: str | |
| ) -> Optional[Document]: | |
| """Get a specific document.""" | |
| result = await db.execute( | |
| select(Document).where(Document.id == document_id) | |
| ) | |
| return result.scalars().first() | |
| async def delete_document( | |
| self, | |
| db: AsyncSession, | |
| document_id: str | |
| ) -> bool: | |
| """Delete a document (from DB and Blob storage).""" | |
| document = await self.get_document(db, document_id) | |
| if not document: | |
| return False | |
| # Delete from blob storage | |
| await blob_storage.delete_file(document.blob_name) | |
| # Delete vector embeddings from pgvector | |
| await db.execute( | |
| text("DELETE FROM langchain_pg_embedding WHERE cmetadata->'data'->>'document_id' = :doc_id"), | |
| {"doc_id": document_id} | |
| ) | |
| # Delete from database | |
| await db.execute( | |
| delete(Document).where(Document.id == document_id) | |
| ) | |
| await db.commit() | |
| logger.info(f"Deleted document {document_id}") | |
| return True | |
| async def update_document_status( | |
| self, | |
| db: AsyncSession, | |
| document_id: str, | |
| status: str, | |
| error_message: Optional[str] = None | |
| ) -> Document: | |
| """Update document processing status.""" | |
| document = await self.get_document(db, document_id) | |
| if document: | |
| document.status = status | |
| document.processed_at = datetime.utcnow() | |
| document.error_message = error_message | |
| await db.commit() | |
| await db.refresh(document) | |
| logger.info(f"Updated document {document_id} status to {status}") | |
| return document | |
| document_service = DocumentService() | |