"""Image catalog service — stores and queries image metadata.""" from __future__ import annotations import hashlib import json import logging import uuid from datetime import datetime from pathlib import Path from typing import Any from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from content_engine.config import settings from content_engine.models.database import Image, catalog_session_factory logger = logging.getLogger(__name__) class CatalogService: """Manages the image catalog: inserting, querying, and organizing generated images.""" async def insert_image( self, *, file_path: str, image_bytes: bytes | None = None, character_id: str | None = None, template_id: str | None = None, content_rating: str = "sfw", batch_id: str | None = None, positive_prompt: str | None = None, negative_prompt: str | None = None, checkpoint: str | None = None, loras: list[dict[str, Any]] | None = None, seed: int | None = None, steps: int | None = None, cfg: float | None = None, sampler: str | None = None, scheduler: str | None = None, width: int | None = None, height: int | None = None, generation_backend: str | None = None, comfyui_prompt_id: str | None = None, generation_time_seconds: float | None = None, variables: dict[str, str] | None = None, ) -> str: """Insert a new image record into the catalog. Returns the image ID.""" image_id = str(uuid.uuid4()) variables = variables or {} # Compute file hash if bytes provided file_hash = None file_size = None if image_bytes: file_hash = hashlib.sha256(image_bytes).hexdigest() file_size = len(image_bytes) record = Image( id=image_id, batch_id=batch_id, character_id=character_id, template_id=template_id, content_rating=content_rating, positive_prompt=positive_prompt, negative_prompt=negative_prompt, checkpoint=checkpoint, loras_json=json.dumps(loras) if loras else None, seed=seed, steps=steps, cfg=cfg, sampler=sampler, scheduler=scheduler, width=width, height=height, pose=variables.get("pose"), outfit=variables.get("outfit"), emotion=variables.get("emotion"), camera_angle=variables.get("camera_angle"), lighting=variables.get("lighting"), scene=variables.get("scene"), file_path=file_path, file_hash=file_hash, file_size=file_size, generation_backend=generation_backend, comfyui_prompt_id=comfyui_prompt_id, generation_time_seconds=generation_time_seconds, ) async with catalog_session_factory() as session: session.add(record) await session.commit() logger.info("Cataloged image %s at %s", image_id, file_path) return image_id async def get_image(self, image_id: str) -> Image | None: """Get a single image by ID.""" async with catalog_session_factory() as session: return await session.get(Image, image_id) async def search( self, *, character_id: str | None = None, content_rating: str | None = None, template_id: str | None = None, is_approved: bool | None = None, is_published: bool | None = None, pose: str | None = None, outfit: str | None = None, emotion: str | None = None, limit: int = 50, offset: int = 0, ) -> list[Image]: """Search images with filters.""" stmt = select(Image) if character_id is not None: stmt = stmt.where(Image.character_id == character_id) if content_rating is not None: stmt = stmt.where(Image.content_rating == content_rating) if template_id is not None: stmt = stmt.where(Image.template_id == template_id) if is_approved is not None: stmt = stmt.where(Image.is_approved == is_approved) if is_published is not None: stmt = stmt.where(Image.is_published == is_published) if pose is not None: stmt = stmt.where(Image.pose == pose) if outfit is not None: stmt = stmt.where(Image.outfit == outfit) if emotion is not None: stmt = stmt.where(Image.emotion == emotion) stmt = stmt.order_by(Image.created_at.desc()).limit(limit).offset(offset) async with catalog_session_factory() as session: result = await session.execute(stmt) return list(result.scalars().all()) async def approve_image(self, image_id: str) -> bool: """Mark an image as approved for publishing.""" async with catalog_session_factory() as session: image = await session.get(Image, image_id) if not image: return False image.is_approved = True await session.commit() return True async def delete_image(self, image_id: str) -> bool: """Delete an image record and its file from disk.""" async with catalog_session_factory() as session: image = await session.get(Image, image_id) if not image: return False file_path = Path(image.file_path) if file_path.exists(): file_path.unlink() await session.delete(image) await session.commit() return True async def get_approved_unpublished( self, character_id: str, content_rating: str | None = None, limit: int = 100 ) -> list[Image]: """Get approved but unpublished images for a character.""" stmt = ( select(Image) .where(Image.character_id == character_id) .where(Image.is_approved == True) # noqa: E712 .where(Image.is_published == False) # noqa: E712 ) if content_rating: stmt = stmt.where(Image.content_rating == content_rating) stmt = stmt.order_by(Image.created_at.asc()).limit(limit) async with catalog_session_factory() as session: result = await session.execute(stmt) return list(result.scalars().all()) async def get_total_count(self) -> int: """Get total number of images in catalog.""" async with catalog_session_factory() as session: result = await session.execute(select(func.count(Image.id))) return result.scalar() or 0 def resolve_output_path( self, character_id: str, content_rating: str, filename: str, subfolder: str = "raw", ) -> Path: """Resolve the output file path for a generated image. Structure: output/{character_id}/{rating}/{subfolder}/{year-month}/{filename} """ now = datetime.now() date_folder = now.strftime("%Y-%m") path = ( settings.paths.output_dir / character_id / content_rating / subfolder / date_folder ) path.mkdir(parents=True, exist_ok=True) return path / filename