MukeshKapoor25's picture
feat(catalogues,documents): implement permanent image URLs in catalogue collection
bfdddfa
"""
Repository layer for stored_objects persistence.
"""
from datetime import datetime
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.documents.models import StoredObject
class DocumentRepository:
"""Persistence layer for stored_objects."""
def __init__(self, session: AsyncSession):
self.session = session
async def create_upload_placeholder(
self,
*,
tenant_id: str,
domain: str,
entity_id: str,
category: str,
bucket: str,
object_key: str,
file_name: str,
mime_type: str,
file_size: int,
visibility: str,
created_by: str,
) -> StoredObject:
"""Create initial upload record with pending status."""
record = StoredObject(
tenant_id=tenant_id,
domain=domain,
entity_id=entity_id,
category=category,
bucket_name=bucket,
object_key=object_key,
file_name=file_name,
mime_type=mime_type,
file_size=file_size,
visibility=visibility,
created_by=created_by,
)
self.session.add(record)
await self.session.commit()
await self.session.refresh(record)
return record
async def finalize_upload(self, object_id: UUID, tenant_id: str, checksum: str) -> StoredObject:
"""Update record with checksum after verification."""
stmt = (
update(StoredObject)
.where(StoredObject.id == object_id, StoredObject.tenant_id == tenant_id)
.values(checksum_sha256=checksum)
.execution_options(synchronize_session="fetch")
)
await self.session.execute(stmt)
await self.session.commit()
return await self.get_active(object_id, tenant_id)
async def get_upload(self, object_id: UUID, tenant_id: str) -> StoredObject:
"""Get upload record (including soft-deleted if not removed)."""
stmt = select(StoredObject).where(
StoredObject.id == object_id,
StoredObject.tenant_id == tenant_id,
)
res = await self.session.execute(stmt)
record = res.scalar_one_or_none()
if not record:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
if record.deleted_at:
raise HTTPException(status_code=status.HTTP_410_GONE, detail="Object deleted")
return record
async def get_active(self, object_id: UUID, tenant_id: str) -> StoredObject:
"""Get active (non-deleted) object by ID and tenant."""
stmt = select(StoredObject).where(
StoredObject.id == object_id,
StoredObject.tenant_id == tenant_id,
StoredObject.deleted_at.is_(None),
)
res = await self.session.execute(stmt)
record = res.scalar_one_or_none()
if not record:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found")
return record
async def resolve_object(
self,
*,
tenant_id: str,
object_id: Optional[UUID],
domain: Optional[str],
entity_id: Optional[str],
category: Optional[str],
file_name: Optional[str],
) -> StoredObject:
"""Resolve object by ID or composite key (domain/entity/category/filename)."""
if object_id:
return await self.get_active(object_id, tenant_id)
if not all([domain, entity_id, category, file_name]):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incomplete lookup criteria")
stmt = select(StoredObject).where(
StoredObject.tenant_id == tenant_id,
StoredObject.domain == domain,
StoredObject.entity_id == entity_id,
StoredObject.category == category,
StoredObject.file_name == file_name,
StoredObject.deleted_at.is_(None),
)
res = await self.session.execute(stmt)
record = res.scalar_one_or_none()
if not record:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found")
return record
async def find_by_checksum(self, tenant_id: str, checksum: str) -> Optional[StoredObject]:
"""Find object by checksum for deduplication."""
stmt = select(StoredObject).where(
StoredObject.tenant_id == tenant_id,
StoredObject.checksum_sha256 == checksum,
StoredObject.deleted_at.is_(None),
)
res = await self.session.execute(stmt)
return res.scalar_one_or_none()
async def soft_delete(self, object_id: UUID, tenant_id: str) -> None:
"""Soft-delete object by marking deleted_at timestamp."""
stmt = (
update(StoredObject)
.where(
StoredObject.id == object_id,
StoredObject.tenant_id == tenant_id,
StoredObject.deleted_at.is_(None),
)
.values(deleted_at=datetime.utcnow())
.execution_options(synchronize_session="fetch")
)
result = await self.session.execute(stmt)
await self.session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found")
async def find_product_images(
self,
tenant_id: str,
domain: str,
entity_id: str
) -> list[StoredObject]:
"""Find all images for a product/entity."""
stmt = select(StoredObject).where(
StoredObject.tenant_id == tenant_id,
StoredObject.domain == domain,
StoredObject.entity_id == entity_id,
StoredObject.deleted_at.is_(None),
StoredObject.mime_type.like("image/%")
).order_by(StoredObject.created_at)
result = await self.session.execute(stmt)
return result.scalars().all()