| """Service for managing documents.""" |
|
|
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy import select, delete |
| from src.db.postgres.models import Document |
| from src.storage.az_blob.az_blob import blob_storage |
| from src.middlewares.logging import get_logger |
| from typing import List, Optional |
| from datetime import datetime |
|
|
| logger = get_logger("document_service") |
|
|
|
|
| class DocumentService: |
| """Service for managing documents.""" |
|
|
| async def create_document( |
| self, |
| db: AsyncSession, |
| user_id: str, |
| filename: str, |
| blob_name: str, |
| file_size: int, |
| file_type: str |
| ) -> Document: |
| """Create a new document record.""" |
| import uuid |
| document = Document( |
| id=str(uuid.uuid4()), |
| user_id=user_id, |
| filename=filename, |
| blob_name=blob_name, |
| file_size=file_size, |
| file_type=file_type, |
| status="uploaded" |
| ) |
| db.add(document) |
| await db.commit() |
| await db.refresh(document) |
| logger.info(f"Created document {document.id} for user {user_id}") |
| return document |
|
|
| async def get_user_documents( |
| self, |
| db: AsyncSession, |
| user_id: str |
| ) -> List[Document]: |
| """Get all documents for a user.""" |
| result = await db.execute( |
| select(Document) |
| .where(Document.user_id == user_id) |
| .order_by(Document.created_at.desc()) |
| ) |
| return result.scalars().all() |
|
|
| async def get_document( |
| self, |
| db: AsyncSession, |
| document_id: str |
| ) -> Optional[Document]: |
| """Get a specific document.""" |
| result = await db.execute( |
| select(Document).where(Document.id == document_id) |
| ) |
| return result.scalars().first() |
|
|
| async def delete_document( |
| self, |
| db: AsyncSession, |
| document_id: str |
| ) -> bool: |
| """Delete a document (from DB and Blob storage).""" |
| document = await self.get_document(db, document_id) |
| if not document: |
| return False |
|
|
| |
| await blob_storage.delete_file(document.blob_name) |
|
|
| |
| await db.execute( |
| delete(Document).where(Document.id == document_id) |
| ) |
| await db.commit() |
|
|
| logger.info(f"Deleted document {document_id}") |
| return True |
|
|
| async def update_document_status( |
| self, |
| db: AsyncSession, |
| document_id: str, |
| status: str, |
| error_message: Optional[str] = None |
| ) -> Document: |
| """Update document processing status.""" |
| document = await self.get_document(db, document_id) |
| if document: |
| document.status = status |
| document.processed_at = datetime.utcnow() |
| document.error_message = error_message |
| await db.commit() |
| await db.refresh(document) |
| logger.info(f"Updated document {document_id} status to {status}") |
| return document |
|
|
|
|
| document_service = DocumentService() |
|
|