ghmk's picture
Initial deployment of Character Forge
5b6e956
"""
Image Library Manager
=====================
Central manager for the image library system. Handles registration,
retrieval, search, and management of all generated images.
"""
import json
import shutil
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from PIL import Image
import hashlib
from utils.logging_utils import get_logger
from config.settings import Settings
logger = get_logger(__name__)
class LibraryManager:
"""
Central manager for image library.
Manages a JSON-based registry of all generated images with metadata,
thumbnails, search, and filtering capabilities.
"""
def __init__(self, library_path: Path = None):
"""
Initialize library manager.
Args:
library_path: Path to library directory (default: outputs/.library)
"""
self.library_path = library_path or (Settings.OUTPUT_DIR / ".library")
self.index_file = self.library_path / "index.json"
self.thumbnails_dir = Settings.OUTPUT_DIR / ".thumbnails"
self.ensure_directories()
self._index_cache = None
self._cache_dirty = False
def ensure_directories(self):
"""Ensure library and thumbnail directories exist."""
self.library_path.mkdir(parents=True, exist_ok=True)
self.thumbnails_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Library directories ensured: {self.library_path}, {self.thumbnails_dir}")
def _load_index(self) -> Dict[str, Any]:
"""
Load library index from disk.
Returns:
Library index dictionary
"""
if self._index_cache is not None and not self._cache_dirty:
return self._index_cache
if not self.index_file.exists():
# Create new index
index = {
"version": "1.0",
"last_updated": datetime.now().isoformat(),
"entries": []
}
self._save_index(index)
return index
try:
with open(self.index_file, 'r', encoding='utf-8') as f:
index = json.load(f)
self._index_cache = index
self._cache_dirty = False
logger.debug(f"Loaded library index with {len(index.get('entries', []))} entries")
return index
except Exception as e:
logger.error(f"Failed to load library index: {e}")
# Return empty index on error
return {
"version": "1.0",
"last_updated": datetime.now().isoformat(),
"entries": []
}
def _save_index(self, index: Dict[str, Any]):
"""
Save library index to disk (atomic write).
Args:
index: Library index dictionary
"""
try:
# Update timestamp
index["last_updated"] = datetime.now().isoformat()
# Atomic write: write to temp file, then rename
temp_file = self.index_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# Rename to actual file (atomic on most systems)
temp_file.replace(self.index_file)
self._index_cache = index
self._cache_dirty = False
logger.debug(f"Saved library index with {len(index['entries'])} entries")
except Exception as e:
logger.error(f"Failed to save library index: {e}")
raise
def create_thumbnail(
self,
image: Image.Image,
size: int = 256,
quality: int = 85
) -> Image.Image:
"""
Generate thumbnail for library display.
Args:
image: Source PIL Image
size: Maximum dimension in pixels (default: 256)
quality: JPEG quality (default: 85)
Returns:
Thumbnail PIL Image
"""
# Calculate thumbnail size maintaining aspect ratio
img_width, img_height = image.size
ratio = min(size / img_width, size / img_height)
new_width = int(img_width * ratio)
new_height = int(img_height * ratio)
# Create thumbnail
thumbnail = image.copy()
thumbnail.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
logger.debug(f"Created thumbnail: {image.size} -> {thumbnail.size}")
return thumbnail
def _generate_entry_id(self, name: str) -> str:
"""
Generate unique entry ID.
Args:
name: Entry name
Returns:
Unique ID string (timestamp + hash)
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Add hash of name + timestamp for uniqueness
hash_input = f"{name}_{timestamp}_{datetime.now().microsecond}"
hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:6]
return f"{timestamp}_{hash_suffix}"
def register_image(
self,
image: Image.Image,
name: str,
type: str,
metadata: Dict[str, Any],
description: str = "",
tags: List[str] = None
) -> str:
"""
Register generated image in library.
Args:
image: PIL Image to register
name: User-facing name
type: Generation type ("character_sheet", "wardrobe", "composition", "standard")
metadata: Generation metadata dict (includes prompt, backend, etc.)
description: Optional user description
tags: Optional list of tags
Returns:
Entry ID of registered image
"""
try:
# Generate entry ID
entry_id = self._generate_entry_id(name)
# Determine paths
image_filename = f"{name}_{entry_id}.png"
thumbnail_filename = f"{name}_{entry_id}_thumb.png"
metadata_filename = f"{name}_{entry_id}.json"
# Determine output directory based on type
type_dir_map = {
"character_sheet": Settings.CHARACTER_SHEETS_DIR,
"wardrobe": Settings.WARDROBE_CHANGES_DIR,
"composition": Settings.COMPOSITIONS_DIR,
"standard": Settings.STANDARD_DIR
}
output_dir = type_dir_map.get(type, Settings.STANDARD_DIR)
image_path = output_dir / image_filename
thumbnail_path = self.thumbnails_dir / thumbnail_filename
metadata_path = output_dir / metadata_filename
# Save full image (uncompressed PNG for maximum quality)
image.save(image_path, format='PNG', compress_level=0)
logger.info(f"Saved library image: {image_path}")
# Generate and save thumbnail
thumbnail = self.create_thumbnail(image)
thumbnail.save(thumbnail_path, format='PNG')
logger.info(f"Saved thumbnail: {thumbnail_path}")
# Save metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# Create library entry
entry = {
"id": entry_id,
"name": name,
"description": description,
"tags": tags or [],
"type": type,
"backend": metadata.get("backend", "Unknown"),
"created_at": datetime.now().isoformat(),
"image_path": str(image_path.relative_to(Settings.PROJECT_ROOT)),
"thumbnail_path": str(thumbnail_path.relative_to(Settings.PROJECT_ROOT)),
"metadata_path": str(metadata_path.relative_to(Settings.PROJECT_ROOT)),
"width": image.width,
"height": image.height,
"aspect_ratio": f"{image.width}:{image.height}",
"file_size_bytes": image_path.stat().st_size,
"prompt": metadata.get("prompt", ""),
"temperature": metadata.get("temperature", 0.4),
"input_images_count": metadata.get("input_images_count", 0),
"times_used": 0,
"last_used": None,
"favorite": False
}
# Add to index
index = self._load_index()
index["entries"].append(entry)
self._save_index(index)
logger.info(f"✅ Registered image in library: {name} (ID: {entry_id})")
return entry_id
except Exception as e:
logger.error(f"Failed to register image in library: {e}")
raise
def get_entries(
self,
filter_type: str = None,
search: str = None,
tags: List[str] = None,
favorites_only: bool = False,
sort_by: str = "newest",
limit: int = 100,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
Get library entries with optional filtering.
Args:
filter_type: Filter by type ("character_sheet", "wardrobe", etc.)
search: Search query (matches name, description, prompt)
tags: Filter by tags (must have all tags)
favorites_only: Only return favorites
sort_by: Sort method ("newest", "oldest", "most_used", "name")
limit: Maximum number of entries to return
offset: Offset for pagination
Returns:
List of entry dictionaries
"""
index = self._load_index()
entries = index.get("entries", [])
# Filter by type
if filter_type:
entries = [e for e in entries if e.get("type") == filter_type]
# Filter by favorites
if favorites_only:
entries = [e for e in entries if e.get("favorite", False)]
# Filter by tags
if tags:
entries = [
e for e in entries
if all(tag in e.get("tags", []) for tag in tags)
]
# Search
if search:
search_lower = search.lower()
entries = [
e for e in entries
if search_lower in e.get("name", "").lower()
or search_lower in e.get("description", "").lower()
or search_lower in e.get("prompt", "").lower()
or any(search_lower in tag.lower() for tag in e.get("tags", []))
]
# Sort
if sort_by == "newest":
entries.sort(key=lambda e: e.get("created_at", ""), reverse=True)
elif sort_by == "oldest":
entries.sort(key=lambda e: e.get("created_at", ""))
elif sort_by == "most_used":
entries.sort(key=lambda e: e.get("times_used", 0), reverse=True)
elif sort_by == "name":
entries.sort(key=lambda e: e.get("name", "").lower())
# Pagination
total = len(entries)
entries = entries[offset:offset + limit]
logger.debug(f"Retrieved {len(entries)} entries (total: {total}, filters: type={filter_type}, search={search})")
return entries
def get_entry(self, entry_id: str) -> Optional[Dict[str, Any]]:
"""
Get single entry by ID.
Args:
entry_id: Entry ID
Returns:
Entry dictionary or None if not found
"""
index = self._load_index()
entries = index.get("entries", [])
for entry in entries:
if entry.get("id") == entry_id:
logger.debug(f"Retrieved entry: {entry_id}")
return entry
logger.warning(f"Entry not found: {entry_id}")
return None
def load_image(self, entry_id: str) -> Optional[Image.Image]:
"""
Load image from library entry.
Args:
entry_id: Entry ID
Returns:
PIL Image or None if not found
"""
entry = self.get_entry(entry_id)
if not entry:
return None
try:
image_path = Settings.PROJECT_ROOT / entry["image_path"]
if not image_path.exists():
logger.error(f"Image file not found: {image_path}")
return None
image = Image.open(image_path)
# Update usage stats
self.update_entry(entry_id, {
"times_used": entry.get("times_used", 0) + 1,
"last_used": datetime.now().isoformat()
})
logger.info(f"Loaded image from library: {entry['name']} ({image.size})")
return image
except Exception as e:
logger.error(f"Failed to load image: {e}")
return None
def load_thumbnail(self, entry_id: str) -> Optional[Image.Image]:
"""
Load thumbnail from library entry.
Args:
entry_id: Entry ID
Returns:
PIL Image (thumbnail) or None if not found
"""
entry = self.get_entry(entry_id)
if not entry:
return None
try:
thumbnail_path = Settings.PROJECT_ROOT / entry["thumbnail_path"]
if not thumbnail_path.exists():
logger.warning(f"Thumbnail not found: {thumbnail_path}")
return None
thumbnail = Image.open(thumbnail_path)
return thumbnail
except Exception as e:
logger.error(f"Failed to load thumbnail: {e}")
return None
def update_entry(self, entry_id: str, updates: Dict[str, Any]):
"""
Update entry metadata.
Args:
entry_id: Entry ID
updates: Dictionary of fields to update
"""
try:
index = self._load_index()
entries = index.get("entries", [])
for i, entry in enumerate(entries):
if entry.get("id") == entry_id:
# Update fields
entry.update(updates)
entries[i] = entry
# Save
self._save_index(index)
logger.info(f"Updated library entry: {entry_id}")
return
logger.warning(f"Entry not found for update: {entry_id}")
except Exception as e:
logger.error(f"Failed to update entry: {e}")
raise
def delete_entry(self, entry_id: str, delete_files: bool = False):
"""
Remove entry from library.
Args:
entry_id: Entry ID to delete
delete_files: If True, also delete image, thumbnail, and metadata files
"""
try:
index = self._load_index()
entries = index.get("entries", [])
# Find and remove entry
entry_to_delete = None
new_entries = []
for entry in entries:
if entry.get("id") == entry_id:
entry_to_delete = entry
else:
new_entries.append(entry)
if entry_to_delete is None:
logger.warning(f"Entry not found for deletion: {entry_id}")
return
# Delete files if requested
if delete_files and entry_to_delete:
try:
# Delete image
image_path = Settings.PROJECT_ROOT / entry_to_delete["image_path"]
if image_path.exists():
image_path.unlink()
logger.info(f"Deleted image file: {image_path}")
# Delete thumbnail
thumbnail_path = Settings.PROJECT_ROOT / entry_to_delete["thumbnail_path"]
if thumbnail_path.exists():
thumbnail_path.unlink()
logger.info(f"Deleted thumbnail file: {thumbnail_path}")
# Delete metadata
metadata_path = Settings.PROJECT_ROOT / entry_to_delete["metadata_path"]
if metadata_path.exists():
metadata_path.unlink()
logger.info(f"Deleted metadata file: {metadata_path}")
except Exception as e:
logger.error(f"Error deleting files: {e}")
# Update index
index["entries"] = new_entries
self._save_index(index)
logger.info(f"✅ Deleted library entry: {entry_id} (files deleted: {delete_files})")
except Exception as e:
logger.error(f"Failed to delete entry: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""
Get library statistics.
Returns:
Dictionary with stats (total entries, by type, total size, etc.)
"""
index = self._load_index()
entries = index.get("entries", [])
# Count by type
type_counts = {}
for entry in entries:
type_name = entry.get("type", "unknown")
type_counts[type_name] = type_counts.get(type_name, 0) + 1
# Total file size
total_size = sum(entry.get("file_size_bytes", 0) for entry in entries)
# Favorites
favorites_count = sum(1 for entry in entries if entry.get("favorite", False))
return {
"total_entries": len(entries),
"by_type": type_counts,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"favorites_count": favorites_count,
"last_updated": index.get("last_updated")
}
def rebuild_index(self):
"""
Rebuild library index from file system.
Useful for recovery if index becomes corrupted.
"""
logger.info("Rebuilding library index from file system...")
new_entries = []
# Scan all output directories
for type_name, directory in [
("character_sheet", Settings.CHARACTER_SHEETS_DIR),
("wardrobe", Settings.WARDROBE_CHANGES_DIR),
("composition", Settings.COMPOSITIONS_DIR),
("standard", Settings.STANDARD_DIR)
]:
if not directory.exists():
continue
# Find all PNG files
for image_path in directory.glob("*.png"):
metadata_path = image_path.with_suffix('.json')
# Skip if no metadata
if not metadata_path.exists():
logger.warning(f"No metadata for image: {image_path}")
continue
try:
# Load metadata
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Load image for size
image = Image.open(image_path)
# Check for existing thumbnail or create new
thumbnail_pattern = f"{image_path.stem}_thumb.png"
thumbnail_path = self.thumbnails_dir / thumbnail_pattern
if not thumbnail_path.exists():
thumbnail = self.create_thumbnail(image)
thumbnail.save(thumbnail_path, format='PNG')
# Extract or generate entry ID
# Try to get from filename or generate new
parts = image_path.stem.split('_')
if len(parts) >= 3:
entry_id = f"{parts[-3]}_{parts[-2]}_{parts[-1]}"
else:
entry_id = self._generate_entry_id(image_path.stem)
# Create entry
entry = {
"id": entry_id,
"name": metadata.get("name", image_path.stem),
"description": "",
"tags": [],
"type": type_name,
"backend": metadata.get("backend", "Unknown"),
"created_at": metadata.get("timestamp", datetime.fromtimestamp(image_path.stat().st_mtime).isoformat()),
"image_path": str(image_path.relative_to(Settings.PROJECT_ROOT)),
"thumbnail_path": str(thumbnail_path.relative_to(Settings.PROJECT_ROOT)),
"metadata_path": str(metadata_path.relative_to(Settings.PROJECT_ROOT)),
"width": image.width,
"height": image.height,
"aspect_ratio": f"{image.width}:{image.height}",
"file_size_bytes": image_path.stat().st_size,
"prompt": metadata.get("prompt", ""),
"temperature": metadata.get("temperature", 0.4),
"input_images_count": metadata.get("input_images_count", 0),
"times_used": 0,
"last_used": None,
"favorite": False
}
new_entries.append(entry)
logger.debug(f"Rebuilt entry: {entry['name']}")
except Exception as e:
logger.error(f"Failed to rebuild entry for {image_path}: {e}")
continue
# Create new index
index = {
"version": "1.0",
"last_updated": datetime.now().isoformat(),
"entries": new_entries
}
self._save_index(index)
logger.info(f"✅ Rebuilt library index with {len(new_entries)} entries")
return len(new_entries)