Spaces:
Sleeping
Sleeping
| """ | |
| Image Processing Module | |
| Handles image validation, preprocessing, and optimization for caption generation. | |
| Ensures images meet model requirements while maintaining quality. | |
| """ | |
| import io | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Tuple, Union | |
| from PIL import Image, ImageOps | |
| from config import image_config | |
| class ImageProcessingError(Exception): | |
| """Custom exception for image processing errors""" | |
| pass | |
| class ImageProcessor: | |
| """ | |
| Enterprise-grade image processor for caption generation pipeline | |
| Responsibilities: | |
| - Validate image format and size | |
| - Resize and optimize images | |
| - Generate cache keys | |
| - Handle edge cases and errors gracefully | |
| """ | |
| def __init__(self): | |
| """Initialize image processor with configuration""" | |
| self.max_size = image_config.MAX_FILE_SIZE_BYTES | |
| self.max_dimension = image_config.MAX_DIMENSION | |
| self.min_dimension = image_config.MIN_DIMENSION | |
| self.allowed_formats = image_config.ALLOWED_FORMATS | |
| self.quality = image_config.RESIZE_QUALITY | |
| def validate_image(self, image: Union[str, Path, Image.Image, bytes]) -> Tuple[bool, str]: | |
| """ | |
| Validate image meets all requirements | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| Returns: | |
| Tuple[bool, str]: (is_valid, error_message) | |
| """ | |
| try: | |
| # Load image if path or bytes provided | |
| if isinstance(image, (str, Path)): | |
| img = Image.open(image) | |
| elif isinstance(image, bytes): | |
| img = Image.open(io.BytesIO(image)) | |
| elif isinstance(image, Image.Image): | |
| img = image | |
| else: | |
| return False, f"Unsupported image type: {type(image)}" | |
| # Check format (handle None format from Gradio) | |
| # When Gradio passes PIL images with type="pil", format can be None | |
| if hasattr(img, 'format') and img.format is not None: | |
| if img.format.upper() not in [fmt.upper() for fmt in self.allowed_formats]: | |
| return False, f"Unsupported format: {img.format}. Allowed: {', '.join(self.allowed_formats)}" | |
| else: | |
| # Format is None - likely from Gradio's PIL conversion | |
| # We'll validate by checking if it's a valid PIL image | |
| print(f"DEBUG: Image format is None (from Gradio), skipping format check") | |
| # Check dimensions | |
| width, height = img.size | |
| if width < self.min_dimension or height < self.min_dimension: | |
| return False, f"Image too small. Minimum: {self.min_dimension}x{self.min_dimension}px" | |
| if width > 10000 or height > 10000: | |
| return False, "Image dimensions too large (max: 10000x10000px)" | |
| # Check file size (if path provided) | |
| if isinstance(image, (str, Path)): | |
| file_size = Path(image).stat().st_size | |
| if file_size > self.max_size: | |
| max_mb = self.max_size / (1024 * 1024) | |
| actual_mb = file_size / (1024 * 1024) | |
| return False, f"File too large: {actual_mb:.1f}MB (max: {max_mb}MB)" | |
| # Try to verify image integrity (skip if format is None) | |
| if hasattr(img, 'format') and img.format is not None: | |
| # Create a copy before verify (verify closes the file) | |
| img_copy = img.copy() | |
| img_copy.verify() | |
| return True, "" | |
| except Exception as e: | |
| return False, f"Image validation failed: {str(e)}" | |
| def preprocess_image( | |
| self, | |
| image: Union[str, Path, Image.Image, bytes] | |
| ) -> Tuple[Image.Image, dict]: | |
| """ | |
| Preprocess image for model input | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| Returns: | |
| Tuple[Image.Image, dict]: (processed_image, metadata) | |
| Raises: | |
| ImageProcessingError: If preprocessing fails | |
| """ | |
| try: | |
| print(f"DEBUG: Preprocessing image of type: {type(image)}") | |
| # Validate first | |
| is_valid, error_msg = self.validate_image(image) | |
| if not is_valid: | |
| print(f"DEBUG: Validation failed: {error_msg}") | |
| raise ImageProcessingError(error_msg) | |
| # Load image | |
| if isinstance(image, (str, Path)): | |
| img = Image.open(image) | |
| elif isinstance(image, bytes): | |
| img = Image.open(io.BytesIO(image)) | |
| elif isinstance(image, Image.Image): | |
| img = image.copy() # Don't modify original | |
| else: | |
| raise ImageProcessingError(f"Unsupported image type: {type(image)}") | |
| # Store original metadata | |
| original_size = img.size | |
| original_format = img.format if hasattr(img, 'format') else 'Unknown' | |
| original_mode = img.mode | |
| print(f"DEBUG: Original format: {original_format}, mode: {original_mode}, size: {original_size}") | |
| # Convert to RGB if needed (handles RGBA, grayscale, etc.) | |
| if img.mode != "RGB": | |
| if img.mode == "RGBA": | |
| # Create white background for transparent images | |
| background = Image.new("RGB", img.size, (255, 255, 255)) | |
| background.paste(img, mask=img.split()[-1]) # Use alpha channel as mask | |
| img = background | |
| else: | |
| img = img.convert("RGB") | |
| # Auto-orient based on EXIF data | |
| img = ImageOps.exif_transpose(img) | |
| # Resize if needed | |
| if max(img.size) > self.max_dimension: | |
| img = self._resize_image(img) | |
| # Generate metadata | |
| metadata = { | |
| "original_size": original_size, | |
| "original_format": original_format, | |
| "original_mode": original_mode, | |
| "processed_size": img.size, | |
| "processed_mode": img.mode, | |
| "was_resized": original_size != img.size, | |
| "was_converted": original_mode != img.mode | |
| } | |
| print(f"DEBUG: Preprocessing complete. Final size: {img.size}, mode: {img.mode}") | |
| return img, metadata | |
| except ImageProcessingError: | |
| raise | |
| except Exception as e: | |
| print(f"DEBUG: Exception during preprocessing: {str(e)}") | |
| raise ImageProcessingError(f"Preprocessing failed: {str(e)}") | |
| def _resize_image(self, img: Image.Image) -> Image.Image: | |
| """ | |
| Resize image maintaining aspect ratio | |
| Args: | |
| img: PIL Image | |
| Returns: | |
| Image.Image: Resized image | |
| """ | |
| width, height = img.size | |
| if image_config.MAINTAIN_ASPECT_RATIO: | |
| # Calculate new dimensions maintaining aspect ratio | |
| if width > height: | |
| new_width = self.max_dimension | |
| new_height = int((height / width) * self.max_dimension) | |
| else: | |
| new_height = self.max_dimension | |
| new_width = int((width / height) * self.max_dimension) | |
| else: | |
| new_width = self.max_dimension | |
| new_height = self.max_dimension | |
| # Use high-quality resampling | |
| img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| return img | |
| def generate_image_hash( | |
| self, | |
| image: Union[str, Path, Image.Image, bytes], | |
| algorithm: str = "md5" | |
| ) -> str: | |
| """ | |
| Generate unique hash for image (for caching) | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| algorithm: Hash algorithm (md5, sha256) | |
| Returns: | |
| str: Hexadecimal hash string | |
| """ | |
| try: | |
| # Convert to bytes | |
| if isinstance(image, (str, Path)): | |
| with open(image, "rb") as f: | |
| image_bytes = f.read() | |
| elif isinstance(image, bytes): | |
| image_bytes = image | |
| elif isinstance(image, Image.Image): | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="PNG") | |
| image_bytes = buffer.getvalue() | |
| else: | |
| raise ValueError(f"Unsupported type for hashing: {type(image)}") | |
| # Generate hash | |
| if algorithm == "md5": | |
| return hashlib.md5(image_bytes).hexdigest() | |
| elif algorithm == "sha256": | |
| return hashlib.sha256(image_bytes).hexdigest() | |
| else: | |
| raise ValueError(f"Unsupported hash algorithm: {algorithm}") | |
| except Exception as e: | |
| raise ImageProcessingError(f"Hash generation failed: {str(e)}") | |
| def image_to_bytes(self, img: Image.Image, format: str = "PNG") -> bytes: | |
| """ | |
| Convert PIL Image to bytes | |
| Args: | |
| img: PIL Image | |
| format: Output format (PNG, JPEG) | |
| Returns: | |
| bytes: Image bytes | |
| """ | |
| buffer = io.BytesIO() | |
| img.save(buffer, format=format, quality=self.quality) | |
| return buffer.getvalue() | |
| def get_image_info(self, image: Union[str, Path, Image.Image]) -> dict: | |
| """ | |
| Get detailed image information | |
| Args: | |
| image: Image path or PIL Image | |
| Returns: | |
| dict: Image information | |
| """ | |
| try: | |
| if isinstance(image, (str, Path)): | |
| img = Image.open(image) | |
| file_size = Path(image).stat().st_size | |
| elif isinstance(image, Image.Image): | |
| img = image | |
| file_size = len(self.image_to_bytes(img)) | |
| else: | |
| raise ValueError(f"Unsupported type: {type(image)}") | |
| return { | |
| "format": img.format, | |
| "mode": img.mode, | |
| "size": img.size, | |
| "width": img.size[0], | |
| "height": img.size[1], | |
| "file_size": file_size, | |
| "file_size_mb": file_size / (1024 * 1024), | |
| "aspect_ratio": img.size[0] / img.size[1], | |
| "megapixels": (img.size[0] * img.size[1]) / 1_000_000 | |
| } | |
| except Exception as e: | |
| raise ImageProcessingError(f"Failed to get image info: {str(e)}") | |
| # ============================================================================ | |
| # SINGLETON INSTANCE AND CONVENIENCE FUNCTIONS | |
| # ============================================================================ | |
| _image_processor = None | |
| def get_image_processor() -> ImageProcessor: | |
| """Get singleton ImageProcessor instance""" | |
| global _image_processor | |
| if _image_processor is None: | |
| _image_processor = ImageProcessor() | |
| return _image_processor | |
| # Convenience wrapper functions for backward compatibility | |
| def validate_image(image: Union[str, Path, Image.Image, bytes]) -> Tuple[bool, str]: | |
| """ | |
| Convenience function: Validate image using singleton processor | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| Returns: | |
| Tuple[bool, str]: (is_valid, error_message) | |
| """ | |
| return get_image_processor().validate_image(image) | |
| def preprocess_image( | |
| image: Union[str, Path, Image.Image, bytes] | |
| ) -> Tuple[Image.Image, dict]: | |
| """ | |
| Convenience function: Preprocess image using singleton processor | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| Returns: | |
| Tuple[Image.Image, dict]: (processed_image, metadata) | |
| """ | |
| return get_image_processor().preprocess_image(image) | |
| def generate_image_hash( | |
| image: Union[str, Path, Image.Image, bytes], | |
| algorithm: str = "md5" | |
| ) -> str: | |
| """ | |
| Convenience function: Generate image hash using singleton processor | |
| Args: | |
| image: Image path, PIL Image, or bytes | |
| algorithm: Hash algorithm (md5, sha256) | |
| Returns: | |
| str: Hexadecimal hash string | |
| """ | |
| return get_image_processor().generate_image_hash(image, algorithm) | |
| if __name__ == "__main__": | |
| # Test the image processor | |
| print("=" * 60) | |
| print("IMAGE PROCESSOR - TEST MODE") | |
| print("=" * 60) | |
| processor = get_image_processor() | |
| print(f"✓ ImageProcessor initialized") | |
| print(f" - Max file size: {processor.max_size / (1024*1024):.1f}MB") | |
| print(f" - Max dimension: {processor.max_dimension}px") | |
| print(f" - Allowed formats: {', '.join(processor.allowed_formats)}") | |
| print(f" - Quality: {processor.quality}") | |
| print("=" * 60) | |
| print("Ready for testing with actual images") | |
| print("=" * 60) |