Spaces:
Runtime error
Runtime error
| """ | |
| Repository layer for stored_objects persistence. | |
| """ | |
| from datetime import datetime | |
| from typing import Optional | |
| from uuid import UUID | |
| from fastapi import HTTPException, status | |
| from sqlalchemy import select, update | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.documents.models import StoredObject | |
| class DocumentRepository: | |
| """Persistence layer for stored_objects.""" | |
| def __init__(self, session: AsyncSession): | |
| self.session = session | |
| async def create_upload_placeholder( | |
| self, | |
| *, | |
| tenant_id: str, | |
| domain: str, | |
| entity_id: str, | |
| category: str, | |
| bucket: str, | |
| object_key: str, | |
| file_name: str, | |
| mime_type: str, | |
| file_size: int, | |
| visibility: str, | |
| created_by: str, | |
| ) -> StoredObject: | |
| """Create initial upload record with pending status.""" | |
| record = StoredObject( | |
| tenant_id=tenant_id, | |
| domain=domain, | |
| entity_id=entity_id, | |
| category=category, | |
| bucket_name=bucket, | |
| object_key=object_key, | |
| file_name=file_name, | |
| mime_type=mime_type, | |
| file_size=file_size, | |
| visibility=visibility, | |
| created_by=created_by, | |
| ) | |
| self.session.add(record) | |
| await self.session.commit() | |
| await self.session.refresh(record) | |
| return record | |
| async def finalize_upload(self, object_id: UUID, tenant_id: str, checksum: str) -> StoredObject: | |
| """Update record with checksum after verification.""" | |
| stmt = ( | |
| update(StoredObject) | |
| .where(StoredObject.id == object_id, StoredObject.tenant_id == tenant_id) | |
| .values(checksum_sha256=checksum) | |
| .execution_options(synchronize_session="fetch") | |
| ) | |
| await self.session.execute(stmt) | |
| await self.session.commit() | |
| return await self.get_active(object_id, tenant_id) | |
| async def get_upload(self, object_id: UUID, tenant_id: str) -> StoredObject: | |
| """Get upload record (including soft-deleted if not removed).""" | |
| stmt = select(StoredObject).where( | |
| StoredObject.id == object_id, | |
| StoredObject.tenant_id == tenant_id, | |
| ) | |
| res = await self.session.execute(stmt) | |
| record = res.scalar_one_or_none() | |
| if not record: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") | |
| if record.deleted_at: | |
| raise HTTPException(status_code=status.HTTP_410_GONE, detail="Object deleted") | |
| return record | |
| async def get_active(self, object_id: UUID, tenant_id: str) -> StoredObject: | |
| """Get active (non-deleted) object by ID and tenant.""" | |
| stmt = select(StoredObject).where( | |
| StoredObject.id == object_id, | |
| StoredObject.tenant_id == tenant_id, | |
| StoredObject.deleted_at.is_(None), | |
| ) | |
| res = await self.session.execute(stmt) | |
| record = res.scalar_one_or_none() | |
| if not record: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found") | |
| return record | |
| async def resolve_object( | |
| self, | |
| *, | |
| tenant_id: str, | |
| object_id: Optional[UUID], | |
| domain: Optional[str], | |
| entity_id: Optional[str], | |
| category: Optional[str], | |
| file_name: Optional[str], | |
| ) -> StoredObject: | |
| """Resolve object by ID or composite key (domain/entity/category/filename).""" | |
| if object_id: | |
| return await self.get_active(object_id, tenant_id) | |
| if not all([domain, entity_id, category, file_name]): | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incomplete lookup criteria") | |
| stmt = select(StoredObject).where( | |
| StoredObject.tenant_id == tenant_id, | |
| StoredObject.domain == domain, | |
| StoredObject.entity_id == entity_id, | |
| StoredObject.category == category, | |
| StoredObject.file_name == file_name, | |
| StoredObject.deleted_at.is_(None), | |
| ) | |
| res = await self.session.execute(stmt) | |
| record = res.scalar_one_or_none() | |
| if not record: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found") | |
| return record | |
| async def find_by_checksum(self, tenant_id: str, checksum: str) -> Optional[StoredObject]: | |
| """Find object by checksum for deduplication.""" | |
| stmt = select(StoredObject).where( | |
| StoredObject.tenant_id == tenant_id, | |
| StoredObject.checksum_sha256 == checksum, | |
| StoredObject.deleted_at.is_(None), | |
| ) | |
| res = await self.session.execute(stmt) | |
| return res.scalar_one_or_none() | |
| async def soft_delete(self, object_id: UUID, tenant_id: str) -> None: | |
| """Soft-delete object by marking deleted_at timestamp.""" | |
| stmt = ( | |
| update(StoredObject) | |
| .where( | |
| StoredObject.id == object_id, | |
| StoredObject.tenant_id == tenant_id, | |
| StoredObject.deleted_at.is_(None), | |
| ) | |
| .values(deleted_at=datetime.utcnow()) | |
| .execution_options(synchronize_session="fetch") | |
| ) | |
| result = await self.session.execute(stmt) | |
| await self.session.commit() | |
| if result.rowcount == 0: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found") | |
| async def find_product_images( | |
| self, | |
| tenant_id: str, | |
| domain: str, | |
| entity_id: str | |
| ) -> list[StoredObject]: | |
| """Find all images for a product/entity.""" | |
| stmt = select(StoredObject).where( | |
| StoredObject.tenant_id == tenant_id, | |
| StoredObject.domain == domain, | |
| StoredObject.entity_id == entity_id, | |
| StoredObject.deleted_at.is_(None), | |
| StoredObject.mime_type.like("image/%") | |
| ).order_by(StoredObject.created_at) | |
| result = await self.session.execute(stmt) | |
| return result.scalars().all() |