Spaces:
Sleeping
Sleeping
| """ | |
| File Utilities | |
| ============== | |
| File I/O operations for Nano Banana Streamlit. | |
| Handles image saving/loading, metadata management, and filename generation. | |
| """ | |
| import json | |
| import hashlib | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, Tuple, Union | |
| from PIL import Image | |
| from config.settings import Settings | |
| from utils.logging_utils import get_logger | |
| logger = get_logger(__name__) | |
| # ============================================================================= | |
| # FILENAME UTILITIES | |
| # ============================================================================= | |
| def sanitize_filename(name: str) -> str: | |
| """ | |
| Sanitize a string to be safe for use as a filename. | |
| Removes or replaces unsafe characters. | |
| Args: | |
| name: Raw filename string | |
| Returns: | |
| Sanitized filename safe for all operating systems | |
| """ | |
| # Remove/replace unsafe characters | |
| safe = re.sub(r'[<>:"/\\|?*]', '_', name) | |
| # Remove leading/trailing spaces and dots | |
| safe = safe.strip('. ') | |
| # Limit length (leave room for timestamp and extension) | |
| max_len = 100 | |
| if len(safe) > max_len: | |
| safe = safe[:max_len] | |
| # If empty after sanitization, use default | |
| if not safe: | |
| safe = "generated" | |
| return safe | |
| def generate_timestamp_filename( | |
| base_name: str, | |
| extension: str = "png" | |
| ) -> str: | |
| """ | |
| Generate a filename with timestamp. | |
| Format: {base_name}_{YYYYMMDD_HHMMSS}.{extension} | |
| Args: | |
| base_name: Base name for file (will be sanitized) | |
| extension: File extension (default: "png") | |
| Returns: | |
| Filename string with timestamp | |
| """ | |
| safe_name = sanitize_filename(base_name) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return f"{safe_name}_{timestamp}.{extension}" | |
| def get_unique_filename(directory: Path, base_name: str, extension: str = "png") -> Path: | |
| """ | |
| Generate a unique filename in a directory. | |
| If file exists, appends a number: _1, _2, etc. | |
| Args: | |
| directory: Directory where file will be saved | |
| base_name: Base name for file | |
| extension: File extension | |
| Returns: | |
| Path object with unique filename | |
| """ | |
| safe_name = sanitize_filename(base_name) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Try without counter first | |
| filename = f"{safe_name}_{timestamp}.{extension}" | |
| path = directory / filename | |
| if not path.exists(): | |
| return path | |
| # Add counter if file exists | |
| counter = 1 | |
| while True: | |
| filename = f"{safe_name}_{timestamp}_{counter}.{extension}" | |
| path = directory / filename | |
| if not path.exists(): | |
| return path | |
| counter += 1 | |
| # ============================================================================= | |
| # IMAGE SAVE/LOAD | |
| # ============================================================================= | |
| def ensure_pil_image(obj: Union[Image.Image, str, Path], context: str = "") -> Image.Image: | |
| """ | |
| Ensure the provided object is a PIL Image. | |
| Accepts a PIL Image directly, or a string/Path pointing to an image file. | |
| Args: | |
| obj: PIL Image, file path string, or Path | |
| context: Optional context string for clearer error messages | |
| Returns: | |
| PIL Image object | |
| Raises: | |
| TypeError: If the object cannot be converted to an Image | |
| FileNotFoundError: If a provided path does not exist | |
| IOError: If the path cannot be opened as an image | |
| """ | |
| if isinstance(obj, Image.Image): | |
| return obj | |
| # Handle path-like inputs | |
| if isinstance(obj, (str, Path)): | |
| p = Path(obj) | |
| if not p.exists(): | |
| raise FileNotFoundError(f"Image path not found: {p} {('['+context+']') if context else ''}") | |
| try: | |
| image = Image.open(p) | |
| image.load() # Validate/load into memory | |
| return image | |
| except Exception as e: | |
| raise IOError(f"Cannot open image at {p}: {e} {('['+context+']') if context else ''}") | |
| raise TypeError( | |
| f"Expected PIL Image or path-like, got {type(obj).__name__} {('['+context+']') if context else ''}" | |
| ) | |
| def save_image( | |
| image: Image.Image, | |
| directory: Path, | |
| base_name: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> Tuple[Path, Optional[Path]]: | |
| """ | |
| Save an image and optionally its metadata. | |
| Args: | |
| image: PIL Image to save | |
| directory: Directory to save in | |
| base_name: Base name for files | |
| metadata: Optional metadata dictionary to save as JSON | |
| Returns: | |
| Tuple of (image_path, metadata_path) | |
| metadata_path is None if metadata not provided | |
| """ | |
| # Ensure directory exists | |
| directory.mkdir(parents=True, exist_ok=True) | |
| # Generate unique filename | |
| image_path = get_unique_filename(directory, base_name, "png") | |
| # Save image (uncompressed PNG for maximum quality) | |
| try: | |
| # Normalize/validate input to avoid 'str' object errors | |
| image = ensure_pil_image(image, context="save_image") | |
| image.save(image_path, format="PNG", compress_level=0) | |
| logger.info(f"Saved image: {image_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save image (type={type(image).__name__}): {e}") | |
| raise | |
| # Save metadata if provided | |
| metadata_path = None | |
| if metadata is not None: | |
| metadata_path = image_path.with_suffix(".json") | |
| try: | |
| save_metadata(metadata_path, metadata) | |
| logger.info(f"Saved metadata: {metadata_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata: {e}") | |
| # Don't raise - image is saved, metadata is optional | |
| return image_path, metadata_path | |
| def load_image(file_path: Path) -> Image.Image: | |
| """ | |
| Load an image from disk. | |
| Args: | |
| file_path: Path to image file | |
| Returns: | |
| PIL Image object | |
| Raises: | |
| FileNotFoundError: If file doesn't exist | |
| IOError: If file can't be read as image | |
| """ | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Image not found: {file_path}") | |
| try: | |
| image = Image.open(file_path) | |
| logger.debug(f"Loaded image: {file_path}") | |
| return image | |
| except Exception as e: | |
| logger.error(f"Failed to load image {file_path}: {e}") | |
| raise IOError(f"Cannot read image: {e}") | |
| # ============================================================================= | |
| # METADATA MANAGEMENT | |
| # ============================================================================= | |
| def save_metadata(file_path: Path, metadata: Dict[str, Any]): | |
| """ | |
| Save metadata dictionary as JSON. | |
| Args: | |
| file_path: Path for JSON file | |
| metadata: Dictionary to save | |
| Raises: | |
| IOError: If write fails | |
| """ | |
| try: | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(metadata, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata to {file_path}: {e}") | |
| raise IOError(f"Cannot write metadata: {e}") | |
| def load_metadata(file_path: Path) -> Dict[str, Any]: | |
| """ | |
| Load metadata from JSON file. | |
| Args: | |
| file_path: Path to JSON file | |
| Returns: | |
| Metadata dictionary | |
| Raises: | |
| FileNotFoundError: If file doesn't exist | |
| json.JSONDecodeError: If file is not valid JSON | |
| """ | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Metadata file not found: {file_path}") | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| metadata = json.load(f) | |
| logger.debug(f"Loaded metadata: {file_path}") | |
| return metadata | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Invalid JSON in {file_path}: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to load metadata from {file_path}: {e}") | |
| raise IOError(f"Cannot read metadata: {e}") | |
| def create_generation_metadata( | |
| prompt: str, | |
| backend: str, | |
| aspect_ratio: str, | |
| temperature: float, | |
| input_images: Optional[list] = None, | |
| generation_time: Optional[float] = None, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a standard metadata dictionary for a generation. | |
| Args: | |
| prompt: Generation prompt | |
| backend: Backend used | |
| aspect_ratio: Aspect ratio used | |
| temperature: Temperature used | |
| input_images: Optional list of input image paths | |
| generation_time: Optional time taken (seconds) | |
| **kwargs: Additional custom fields | |
| Returns: | |
| Metadata dictionary | |
| """ | |
| metadata = { | |
| "timestamp": datetime.now().isoformat(), | |
| "prompt": prompt, | |
| "backend": backend, | |
| "aspect_ratio": aspect_ratio, | |
| "temperature": temperature, | |
| "version": "2.0.0-streamlit" | |
| } | |
| if input_images: | |
| metadata["input_images"] = input_images | |
| if generation_time is not None: | |
| metadata["generation_time_seconds"] = round(generation_time, 2) | |
| # Add any custom fields | |
| metadata.update(kwargs) | |
| return metadata | |
| # ============================================================================= | |
| # IMAGE HASHING (for metadata) | |
| # ============================================================================= | |
| def compute_image_hash(image: Image.Image) -> str: | |
| """ | |
| Compute SHA-256 hash of image data. | |
| Useful for detecting if input images have changed. | |
| Args: | |
| image: PIL Image | |
| Returns: | |
| Hex string of SHA-256 hash | |
| """ | |
| # Convert to bytes | |
| img_bytes = image.tobytes() | |
| # Compute hash | |
| hash_obj = hashlib.sha256(img_bytes) | |
| return hash_obj.hexdigest() | |
| # ============================================================================= | |
| # DIRECTORY UTILITIES | |
| # ============================================================================= | |
| def ensure_directory_exists(directory: Path): | |
| """ | |
| Ensure a single directory exists. | |
| Creates the directory (and any parent directories) if it doesn't exist. | |
| Args: | |
| directory: Path to directory to ensure exists | |
| """ | |
| directory.mkdir(parents=True, exist_ok=True) | |
| logger.debug(f"Ensured directory exists: {directory}") | |
| def ensure_output_directories(): | |
| """ | |
| Ensure all output directories exist. | |
| Creates directories defined in Settings if they don't exist. | |
| """ | |
| directories = [ | |
| Settings.OUTPUT_DIR, | |
| Settings.CHARACTER_SHEETS_DIR, | |
| Settings.WARDROBE_CHANGES_DIR, | |
| Settings.COMPOSITIONS_DIR, | |
| Settings.STANDARD_DIR | |
| ] | |
| for directory in directories: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| logger.debug(f"Ensured directory exists: {directory}") | |
| def get_output_directory_for_type(generation_type: str) -> Path: | |
| """ | |
| Get the appropriate output directory for a generation type. | |
| Args: | |
| generation_type: Type of generation | |
| ("character_sheet", "wardrobe", "composition", "standard") | |
| Returns: | |
| Path to output directory | |
| Raises: | |
| ValueError: If generation_type is unknown | |
| """ | |
| mapping = { | |
| "character_sheet": Settings.CHARACTER_SHEETS_DIR, | |
| "wardrobe": Settings.WARDROBE_CHANGES_DIR, | |
| "composition": Settings.COMPOSITIONS_DIR, | |
| "standard": Settings.STANDARD_DIR | |
| } | |
| if generation_type not in mapping: | |
| raise ValueError(f"Unknown generation type: {generation_type}") | |
| return mapping[generation_type] | |
| def list_recent_generations( | |
| generation_type: str, | |
| count: int = 10 | |
| ) -> list: | |
| """ | |
| List recent generation files in a directory. | |
| Args: | |
| generation_type: Type of generation | |
| count: Number of recent files to return | |
| Returns: | |
| List of (image_path, metadata_path) tuples, newest first | |
| """ | |
| directory = get_output_directory_for_type(generation_type) | |
| # Get all PNG files | |
| png_files = sorted( | |
| directory.glob("*.png"), | |
| key=lambda p: p.stat().st_mtime, | |
| reverse=True | |
| ) | |
| # Limit to count | |
| png_files = png_files[:count] | |
| # Pair with metadata files | |
| results = [] | |
| for png_path in png_files: | |
| json_path = png_path.with_suffix(".json") | |
| results.append((png_path, json_path if json_path.exists() else None)) | |
| return results | |