ishaq101's picture
[NOTICKET] Demo agentic agent
bef5e76
"""Service for managing documents."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from src.db.postgres.models import Document
from src.storage.az_blob.az_blob import blob_storage
from src.middlewares.logging import get_logger
from typing import List, Optional
from datetime import datetime
logger = get_logger("document_service")
class DocumentService:
"""Service for managing documents."""
async def create_document(
self,
db: AsyncSession,
user_id: str,
filename: str,
blob_name: str,
file_size: int,
file_type: str
) -> Document:
"""Create a new document record."""
import uuid
document = Document(
id=str(uuid.uuid4()),
user_id=user_id,
filename=filename,
blob_name=blob_name,
file_size=file_size,
file_type=file_type,
status="uploaded"
)
db.add(document)
await db.commit()
await db.refresh(document)
logger.info(f"Created document {document.id} for user {user_id}")
return document
async def get_user_documents(
self,
db: AsyncSession,
user_id: str
) -> List[Document]:
"""Get all documents for a user."""
result = await db.execute(
select(Document)
.where(Document.user_id == user_id)
.order_by(Document.created_at.desc())
)
return result.scalars().all()
async def get_document(
self,
db: AsyncSession,
document_id: str
) -> Optional[Document]:
"""Get a specific document."""
result = await db.execute(
select(Document).where(Document.id == document_id)
)
return result.scalars().first()
async def delete_document(
self,
db: AsyncSession,
document_id: str
) -> bool:
"""Delete a document (from DB and Blob storage)."""
document = await self.get_document(db, document_id)
if not document:
return False
# Delete from blob storage
await blob_storage.delete_file(document.blob_name)
# Delete from database
await db.execute(
delete(Document).where(Document.id == document_id)
)
await db.commit()
logger.info(f"Deleted document {document_id}")
return True
async def update_document_status(
self,
db: AsyncSession,
document_id: str,
status: str,
error_message: Optional[str] = None
) -> Document:
"""Update document processing status."""
document = await self.get_document(db, document_id)
if document:
document.status = status
document.processed_at = datetime.utcnow()
document.error_message = error_message
await db.commit()
await db.refresh(document)
logger.info(f"Updated document {document_id} status to {status}")
return document
document_service = DocumentService()