""" Image Library Manager ===================== Central manager for the image library system. Handles registration, retrieval, search, and management of all generated images. """ import json import shutil from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Any, Tuple from PIL import Image import hashlib from utils.logging_utils import get_logger from config.settings import Settings logger = get_logger(__name__) class LibraryManager: """ Central manager for image library. Manages a JSON-based registry of all generated images with metadata, thumbnails, search, and filtering capabilities. """ def __init__(self, library_path: Path = None): """ Initialize library manager. Args: library_path: Path to library directory (default: outputs/.library) """ self.library_path = library_path or (Settings.OUTPUT_DIR / ".library") self.index_file = self.library_path / "index.json" self.thumbnails_dir = Settings.OUTPUT_DIR / ".thumbnails" self.ensure_directories() self._index_cache = None self._cache_dirty = False def ensure_directories(self): """Ensure library and thumbnail directories exist.""" self.library_path.mkdir(parents=True, exist_ok=True) self.thumbnails_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Library directories ensured: {self.library_path}, {self.thumbnails_dir}") def _load_index(self) -> Dict[str, Any]: """ Load library index from disk. Returns: Library index dictionary """ if self._index_cache is not None and not self._cache_dirty: return self._index_cache if not self.index_file.exists(): # Create new index index = { "version": "1.0", "last_updated": datetime.now().isoformat(), "entries": [] } self._save_index(index) return index try: with open(self.index_file, 'r', encoding='utf-8') as f: index = json.load(f) self._index_cache = index self._cache_dirty = False logger.debug(f"Loaded library index with {len(index.get('entries', []))} entries") return index except Exception as e: logger.error(f"Failed to load library index: {e}") # Return empty index on error return { "version": "1.0", "last_updated": datetime.now().isoformat(), "entries": [] } def _save_index(self, index: Dict[str, Any]): """ Save library index to disk (atomic write). Args: index: Library index dictionary """ try: # Update timestamp index["last_updated"] = datetime.now().isoformat() # Atomic write: write to temp file, then rename temp_file = self.index_file.with_suffix('.tmp') with open(temp_file, 'w', encoding='utf-8') as f: json.dump(index, f, indent=2, ensure_ascii=False) # Rename to actual file (atomic on most systems) temp_file.replace(self.index_file) self._index_cache = index self._cache_dirty = False logger.debug(f"Saved library index with {len(index['entries'])} entries") except Exception as e: logger.error(f"Failed to save library index: {e}") raise def create_thumbnail( self, image: Image.Image, size: int = 256, quality: int = 85 ) -> Image.Image: """ Generate thumbnail for library display. Args: image: Source PIL Image size: Maximum dimension in pixels (default: 256) quality: JPEG quality (default: 85) Returns: Thumbnail PIL Image """ # Calculate thumbnail size maintaining aspect ratio img_width, img_height = image.size ratio = min(size / img_width, size / img_height) new_width = int(img_width * ratio) new_height = int(img_height * ratio) # Create thumbnail thumbnail = image.copy() thumbnail.thumbnail((new_width, new_height), Image.Resampling.LANCZOS) logger.debug(f"Created thumbnail: {image.size} -> {thumbnail.size}") return thumbnail def _generate_entry_id(self, name: str) -> str: """ Generate unique entry ID. Args: name: Entry name Returns: Unique ID string (timestamp + hash) """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Add hash of name + timestamp for uniqueness hash_input = f"{name}_{timestamp}_{datetime.now().microsecond}" hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:6] return f"{timestamp}_{hash_suffix}" def register_image( self, image: Image.Image, name: str, type: str, metadata: Dict[str, Any], description: str = "", tags: List[str] = None ) -> str: """ Register generated image in library. Args: image: PIL Image to register name: User-facing name type: Generation type ("character_sheet", "wardrobe", "composition", "standard") metadata: Generation metadata dict (includes prompt, backend, etc.) description: Optional user description tags: Optional list of tags Returns: Entry ID of registered image """ try: # Generate entry ID entry_id = self._generate_entry_id(name) # Determine paths image_filename = f"{name}_{entry_id}.png" thumbnail_filename = f"{name}_{entry_id}_thumb.png" metadata_filename = f"{name}_{entry_id}.json" # Determine output directory based on type type_dir_map = { "character_sheet": Settings.CHARACTER_SHEETS_DIR, "wardrobe": Settings.WARDROBE_CHANGES_DIR, "composition": Settings.COMPOSITIONS_DIR, "standard": Settings.STANDARD_DIR } output_dir = type_dir_map.get(type, Settings.STANDARD_DIR) image_path = output_dir / image_filename thumbnail_path = self.thumbnails_dir / thumbnail_filename metadata_path = output_dir / metadata_filename # Save full image (uncompressed PNG for maximum quality) image.save(image_path, format='PNG', compress_level=0) logger.info(f"Saved library image: {image_path}") # Generate and save thumbnail thumbnail = self.create_thumbnail(image) thumbnail.save(thumbnail_path, format='PNG') logger.info(f"Saved thumbnail: {thumbnail_path}") # Save metadata with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) # Create library entry entry = { "id": entry_id, "name": name, "description": description, "tags": tags or [], "type": type, "backend": metadata.get("backend", "Unknown"), "created_at": datetime.now().isoformat(), "image_path": str(image_path.relative_to(Settings.PROJECT_ROOT)), "thumbnail_path": str(thumbnail_path.relative_to(Settings.PROJECT_ROOT)), "metadata_path": str(metadata_path.relative_to(Settings.PROJECT_ROOT)), "width": image.width, "height": image.height, "aspect_ratio": f"{image.width}:{image.height}", "file_size_bytes": image_path.stat().st_size, "prompt": metadata.get("prompt", ""), "temperature": metadata.get("temperature", 0.4), "input_images_count": metadata.get("input_images_count", 0), "times_used": 0, "last_used": None, "favorite": False } # Add to index index = self._load_index() index["entries"].append(entry) self._save_index(index) logger.info(f"✅ Registered image in library: {name} (ID: {entry_id})") return entry_id except Exception as e: logger.error(f"Failed to register image in library: {e}") raise def get_entries( self, filter_type: str = None, search: str = None, tags: List[str] = None, favorites_only: bool = False, sort_by: str = "newest", limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ Get library entries with optional filtering. Args: filter_type: Filter by type ("character_sheet", "wardrobe", etc.) search: Search query (matches name, description, prompt) tags: Filter by tags (must have all tags) favorites_only: Only return favorites sort_by: Sort method ("newest", "oldest", "most_used", "name") limit: Maximum number of entries to return offset: Offset for pagination Returns: List of entry dictionaries """ index = self._load_index() entries = index.get("entries", []) # Filter by type if filter_type: entries = [e for e in entries if e.get("type") == filter_type] # Filter by favorites if favorites_only: entries = [e for e in entries if e.get("favorite", False)] # Filter by tags if tags: entries = [ e for e in entries if all(tag in e.get("tags", []) for tag in tags) ] # Search if search: search_lower = search.lower() entries = [ e for e in entries if search_lower in e.get("name", "").lower() or search_lower in e.get("description", "").lower() or search_lower in e.get("prompt", "").lower() or any(search_lower in tag.lower() for tag in e.get("tags", [])) ] # Sort if sort_by == "newest": entries.sort(key=lambda e: e.get("created_at", ""), reverse=True) elif sort_by == "oldest": entries.sort(key=lambda e: e.get("created_at", "")) elif sort_by == "most_used": entries.sort(key=lambda e: e.get("times_used", 0), reverse=True) elif sort_by == "name": entries.sort(key=lambda e: e.get("name", "").lower()) # Pagination total = len(entries) entries = entries[offset:offset + limit] logger.debug(f"Retrieved {len(entries)} entries (total: {total}, filters: type={filter_type}, search={search})") return entries def get_entry(self, entry_id: str) -> Optional[Dict[str, Any]]: """ Get single entry by ID. Args: entry_id: Entry ID Returns: Entry dictionary or None if not found """ index = self._load_index() entries = index.get("entries", []) for entry in entries: if entry.get("id") == entry_id: logger.debug(f"Retrieved entry: {entry_id}") return entry logger.warning(f"Entry not found: {entry_id}") return None def load_image(self, entry_id: str) -> Optional[Image.Image]: """ Load image from library entry. Args: entry_id: Entry ID Returns: PIL Image or None if not found """ entry = self.get_entry(entry_id) if not entry: return None try: image_path = Settings.PROJECT_ROOT / entry["image_path"] if not image_path.exists(): logger.error(f"Image file not found: {image_path}") return None image = Image.open(image_path) # Update usage stats self.update_entry(entry_id, { "times_used": entry.get("times_used", 0) + 1, "last_used": datetime.now().isoformat() }) logger.info(f"Loaded image from library: {entry['name']} ({image.size})") return image except Exception as e: logger.error(f"Failed to load image: {e}") return None def load_thumbnail(self, entry_id: str) -> Optional[Image.Image]: """ Load thumbnail from library entry. Args: entry_id: Entry ID Returns: PIL Image (thumbnail) or None if not found """ entry = self.get_entry(entry_id) if not entry: return None try: thumbnail_path = Settings.PROJECT_ROOT / entry["thumbnail_path"] if not thumbnail_path.exists(): logger.warning(f"Thumbnail not found: {thumbnail_path}") return None thumbnail = Image.open(thumbnail_path) return thumbnail except Exception as e: logger.error(f"Failed to load thumbnail: {e}") return None def update_entry(self, entry_id: str, updates: Dict[str, Any]): """ Update entry metadata. Args: entry_id: Entry ID updates: Dictionary of fields to update """ try: index = self._load_index() entries = index.get("entries", []) for i, entry in enumerate(entries): if entry.get("id") == entry_id: # Update fields entry.update(updates) entries[i] = entry # Save self._save_index(index) logger.info(f"Updated library entry: {entry_id}") return logger.warning(f"Entry not found for update: {entry_id}") except Exception as e: logger.error(f"Failed to update entry: {e}") raise def delete_entry(self, entry_id: str, delete_files: bool = False): """ Remove entry from library. Args: entry_id: Entry ID to delete delete_files: If True, also delete image, thumbnail, and metadata files """ try: index = self._load_index() entries = index.get("entries", []) # Find and remove entry entry_to_delete = None new_entries = [] for entry in entries: if entry.get("id") == entry_id: entry_to_delete = entry else: new_entries.append(entry) if entry_to_delete is None: logger.warning(f"Entry not found for deletion: {entry_id}") return # Delete files if requested if delete_files and entry_to_delete: try: # Delete image image_path = Settings.PROJECT_ROOT / entry_to_delete["image_path"] if image_path.exists(): image_path.unlink() logger.info(f"Deleted image file: {image_path}") # Delete thumbnail thumbnail_path = Settings.PROJECT_ROOT / entry_to_delete["thumbnail_path"] if thumbnail_path.exists(): thumbnail_path.unlink() logger.info(f"Deleted thumbnail file: {thumbnail_path}") # Delete metadata metadata_path = Settings.PROJECT_ROOT / entry_to_delete["metadata_path"] if metadata_path.exists(): metadata_path.unlink() logger.info(f"Deleted metadata file: {metadata_path}") except Exception as e: logger.error(f"Error deleting files: {e}") # Update index index["entries"] = new_entries self._save_index(index) logger.info(f"✅ Deleted library entry: {entry_id} (files deleted: {delete_files})") except Exception as e: logger.error(f"Failed to delete entry: {e}") raise def get_stats(self) -> Dict[str, Any]: """ Get library statistics. Returns: Dictionary with stats (total entries, by type, total size, etc.) """ index = self._load_index() entries = index.get("entries", []) # Count by type type_counts = {} for entry in entries: type_name = entry.get("type", "unknown") type_counts[type_name] = type_counts.get(type_name, 0) + 1 # Total file size total_size = sum(entry.get("file_size_bytes", 0) for entry in entries) # Favorites favorites_count = sum(1 for entry in entries if entry.get("favorite", False)) return { "total_entries": len(entries), "by_type": type_counts, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2), "favorites_count": favorites_count, "last_updated": index.get("last_updated") } def rebuild_index(self): """ Rebuild library index from file system. Useful for recovery if index becomes corrupted. """ logger.info("Rebuilding library index from file system...") new_entries = [] # Scan all output directories for type_name, directory in [ ("character_sheet", Settings.CHARACTER_SHEETS_DIR), ("wardrobe", Settings.WARDROBE_CHANGES_DIR), ("composition", Settings.COMPOSITIONS_DIR), ("standard", Settings.STANDARD_DIR) ]: if not directory.exists(): continue # Find all PNG files for image_path in directory.glob("*.png"): metadata_path = image_path.with_suffix('.json') # Skip if no metadata if not metadata_path.exists(): logger.warning(f"No metadata for image: {image_path}") continue try: # Load metadata with open(metadata_path, 'r', encoding='utf-8') as f: metadata = json.load(f) # Load image for size image = Image.open(image_path) # Check for existing thumbnail or create new thumbnail_pattern = f"{image_path.stem}_thumb.png" thumbnail_path = self.thumbnails_dir / thumbnail_pattern if not thumbnail_path.exists(): thumbnail = self.create_thumbnail(image) thumbnail.save(thumbnail_path, format='PNG') # Extract or generate entry ID # Try to get from filename or generate new parts = image_path.stem.split('_') if len(parts) >= 3: entry_id = f"{parts[-3]}_{parts[-2]}_{parts[-1]}" else: entry_id = self._generate_entry_id(image_path.stem) # Create entry entry = { "id": entry_id, "name": metadata.get("name", image_path.stem), "description": "", "tags": [], "type": type_name, "backend": metadata.get("backend", "Unknown"), "created_at": metadata.get("timestamp", datetime.fromtimestamp(image_path.stat().st_mtime).isoformat()), "image_path": str(image_path.relative_to(Settings.PROJECT_ROOT)), "thumbnail_path": str(thumbnail_path.relative_to(Settings.PROJECT_ROOT)), "metadata_path": str(metadata_path.relative_to(Settings.PROJECT_ROOT)), "width": image.width, "height": image.height, "aspect_ratio": f"{image.width}:{image.height}", "file_size_bytes": image_path.stat().st_size, "prompt": metadata.get("prompt", ""), "temperature": metadata.get("temperature", 0.4), "input_images_count": metadata.get("input_images_count", 0), "times_used": 0, "last_used": None, "favorite": False } new_entries.append(entry) logger.debug(f"Rebuilt entry: {entry['name']}") except Exception as e: logger.error(f"Failed to rebuild entry for {image_path}: {e}") continue # Create new index index = { "version": "1.0", "last_updated": datetime.now().isoformat(), "entries": new_entries } self._save_index(index) logger.info(f"✅ Rebuilt library index with {len(new_entries)} entries") return len(new_entries)