Agentic-Service-Data-Eyond / src /document /document_service.py
Rifqi Hafizuddin
[KM-436-437] edit knowledge handler pipeline
767625e
raw
history blame
3.93 kB
"""Service for managing documents."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, text
from src.db.postgres.models import Document
from src.db.postgres.connection import _pgvector_engine
from src.storage.az_blob.az_blob import blob_storage
from src.middlewares.logging import get_logger
from typing import List, Optional
from datetime import datetime
logger = get_logger("document_service")
class DocumentService:
"""Service for managing documents."""
async def create_document(
self,
db: AsyncSession,
user_id: str,
filename: str,
blob_name: str,
file_size: int,
file_type: str
) -> Document:
"""Create a new document record."""
import uuid
document = Document(
id=str(uuid.uuid4()),
user_id=user_id,
filename=filename,
blob_name=blob_name,
file_size=file_size,
file_type=file_type,
status="uploaded"
)
db.add(document)
await db.commit()
await db.refresh(document)
logger.info(f"Created document {document.id} for user {user_id}")
return document
async def get_user_documents(
self,
db: AsyncSession,
user_id: str
) -> List[Document]:
"""Get all documents for a user."""
result = await db.execute(
select(Document)
.where(Document.user_id == user_id)
.order_by(Document.created_at.desc())
)
return result.scalars().all()
async def get_document(
self,
db: AsyncSession,
document_id: str
) -> Optional[Document]:
"""Get a specific document."""
result = await db.execute(
select(Document).where(Document.id == document_id)
)
return result.scalars().first()
async def delete_document(
self,
db: AsyncSession,
document_id: str
) -> bool:
"""Delete a document (from DB and Blob storage)."""
document = await self.get_document(db, document_id)
if not document:
return False
# Delete from blob storage
await blob_storage.delete_file(document.blob_name)
# Delete vector embeddings from pgvector (scoped to user + collection to avoid cross-user over-delete)
async with _pgvector_engine.begin() as conn:
await conn.execute(
text("""
DELETE FROM langchain_pg_embedding
WHERE cmetadata->>'user_id' = :user_id
AND cmetadata->>'source_type' = 'document'
AND cmetadata->'data'->>'document_id' = :doc_id
AND collection_id = (
SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'
)
"""),
{"user_id": document.user_id, "doc_id": document_id},
)
# Delete from database
await db.execute(
delete(Document).where(Document.id == document_id)
)
await db.commit()
logger.info(f"Deleted document {document_id}")
return True
async def update_document_status(
self,
db: AsyncSession,
document_id: str,
status: str,
error_message: Optional[str] = None
) -> Document:
"""Update document processing status."""
document = await self.get_document(db, document_id)
if document:
document.status = status
document.processed_at = datetime.utcnow()
document.error_message = error_message
await db.commit()
await db.refresh(document)
logger.info(f"Updated document {document_id} status to {status}")
return document
document_service = DocumentService()