Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utility Functions | |
| ================= | |
| Helper functions for image processing and file operations. | |
| """ | |
| import re | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| from datetime import datetime | |
| from PIL import Image | |
| logger = logging.getLogger(__name__) | |
| def ensure_pil_image( | |
| obj: Union[Image.Image, str, Path, None], | |
| context: str = "" | |
| ) -> Image.Image: | |
| """ | |
| Ensure object is a PIL Image. | |
| Args: | |
| obj: Image, path, or None | |
| context: Context for error messages | |
| Returns: | |
| PIL Image | |
| Raises: | |
| ValueError: If object cannot be converted to Image | |
| """ | |
| if obj is None: | |
| raise ValueError(f"[{context}] Image is None") | |
| if isinstance(obj, Image.Image): | |
| return obj | |
| if isinstance(obj, (str, Path)): | |
| try: | |
| return Image.open(obj) | |
| except Exception as e: | |
| raise ValueError(f"[{context}] Failed to load image from path: {e}") | |
| raise ValueError(f"[{context}] Unsupported image type: {type(obj)}") | |
| def sanitize_filename(name: str) -> str: | |
| """ | |
| Sanitize string for use as filename. | |
| Args: | |
| name: Original name | |
| Returns: | |
| Safe filename string | |
| """ | |
| # Replace problematic characters | |
| safe_name = re.sub(r'[<>:"/\\|?*]', '_', name) | |
| # Remove leading/trailing spaces and dots | |
| safe_name = safe_name.strip('. ') | |
| # Limit length | |
| if len(safe_name) > 100: | |
| safe_name = safe_name[:100] | |
| return safe_name or "unnamed" | |
| def save_image( | |
| image: Image.Image, | |
| directory: Path, | |
| base_name: str, | |
| format: str = "PNG" | |
| ) -> Path: | |
| """ | |
| Save image to directory. | |
| Args: | |
| image: PIL Image to save | |
| directory: Output directory | |
| base_name: Base filename (without extension) | |
| format: Image format | |
| Returns: | |
| Path to saved file | |
| """ | |
| directory = Path(directory) | |
| directory.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_name = sanitize_filename(base_name) | |
| ext = format.lower() | |
| filename = f"{safe_name}_{timestamp}.{ext}" | |
| filepath = directory / filename | |
| image.save(filepath, format=format) | |
| logger.info(f"Saved: {filepath}") | |
| return filepath | |
| def resize_for_display( | |
| image: Image.Image, | |
| max_size: int = 1024 | |
| ) -> Image.Image: | |
| """ | |
| Resize image for display while maintaining aspect ratio. | |
| Args: | |
| image: PIL Image | |
| max_size: Maximum dimension | |
| Returns: | |
| Resized image | |
| """ | |
| width, height = image.size | |
| if width <= max_size and height <= max_size: | |
| return image | |
| if width > height: | |
| new_width = max_size | |
| new_height = int(height * max_size / width) | |
| else: | |
| new_height = max_size | |
| new_width = int(width * max_size / height) | |
| return image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| def get_image_info(image: Image.Image) -> str: | |
| """Get human-readable image info string.""" | |
| return f"{image.size[0]}x{image.size[1]} {image.mode}" | |
| def preprocess_input_image( | |
| image: Image.Image, | |
| max_size: int = 1024, | |
| target_size: tuple = None, | |
| ensure_rgb: bool = True | |
| ) -> Image.Image: | |
| """ | |
| Preprocess input image for model consumption. | |
| Handles various formats (JFIF, TIFF, WebP, etc.) by converting to RGB PNG-compatible format. | |
| Args: | |
| image: PIL Image to preprocess | |
| max_size: Maximum dimension (used if target_size not specified) | |
| target_size: Specific (width, height) to resize to | |
| ensure_rgb: Convert to RGB mode | |
| Returns: | |
| Preprocessed PIL Image in RGB format | |
| """ | |
| # Ensure we have a copy to avoid modifying original | |
| img = image.copy() | |
| # Force re-encode as PNG-compatible by saving to memory and reloading | |
| # This handles weird formats like JFIF, TIFF, etc. | |
| import io | |
| buf = io.BytesIO() | |
| # Convert to RGB first if needed | |
| if img.mode not in ('RGB', 'RGBA'): | |
| img = img.convert('RGB') | |
| # Save as PNG to buffer and reload - this normalizes the format | |
| img.save(buf, format='PNG') | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| img.load() # Force load into memory | |
| # Convert to RGB if needed (handle RGBA) | |
| if ensure_rgb and img.mode != 'RGB': | |
| if img.mode == 'RGBA': | |
| # Handle transparency by compositing on white background | |
| background = Image.new('RGB', img.size, (255, 255, 255)) | |
| background.paste(img, mask=img.split()[3]) | |
| img = background | |
| else: | |
| img = img.convert('RGB') | |
| # Resize to target size or max_size | |
| if target_size: | |
| img = img.resize(target_size, Image.Resampling.LANCZOS) | |
| else: | |
| width, height = img.size | |
| if width > max_size or height > max_size: | |
| if width > height: | |
| new_width = max_size | |
| new_height = int(height * max_size / width) | |
| else: | |
| new_height = max_size | |
| new_width = int(width * max_size / height) | |
| img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| return img | |
| def preprocess_images_for_backend( | |
| images: list, | |
| backend_type: str, | |
| aspect_ratio: str = "1:1" | |
| ) -> list: | |
| """ | |
| Preprocess a list of images for a specific backend. | |
| Args: | |
| images: List of PIL Images | |
| backend_type: Backend type string (e.g., 'flux_klein', 'qwen_comfyui') | |
| aspect_ratio: Target aspect ratio | |
| Returns: | |
| List of preprocessed PIL Images | |
| """ | |
| if not images: | |
| return images | |
| # Backend-specific settings | |
| # FLUX models work best with smaller input images (512-768px) | |
| backend_configs = { | |
| 'flux_klein': {'max_size': 768}, # 4B - faster with smaller inputs | |
| 'flux_klein_9b_fp8': {'max_size': 768}, # 9B - same, quality comes from model not input size | |
| 'qwen_image_edit': {'max_size': 1024}, | |
| 'qwen_comfyui': {'max_size': 1024}, | |
| 'zimage_turbo': {'max_size': 768}, | |
| 'zimage_base': {'max_size': 768}, | |
| 'longcat_edit': {'max_size': 768}, | |
| 'gemini_flash': {'max_size': 1024}, # Gemini handles larger but 1024 is fine | |
| 'gemini_pro': {'max_size': 1024}, | |
| } | |
| config = backend_configs.get(backend_type, {'max_size': 1024}) | |
| max_size = config['max_size'] | |
| processed = [] | |
| for img in images: | |
| if img is not None: | |
| processed.append(preprocess_input_image(img, max_size=max_size)) | |
| else: | |
| processed.append(None) | |
| return processed | |