Spaces:
Runtime error
Runtime error
| """ | |
| Image Processor Module | |
| Handles all image processing operations including loading, validation, | |
| resizing, normalization, and format conversion. | |
| """ | |
| import hashlib | |
| import magic | |
| from pathlib import Path | |
| from typing import Tuple, Optional, Union | |
| import numpy as np | |
| from PIL import Image, ImageOps | |
| from loguru import logger | |
| from core.config import config | |
| from core.exceptions import ( | |
| ImageProcessingError, | |
| InvalidFileError, | |
| FileSizeError, | |
| UnsupportedFormatError, | |
| ) | |
| class ImageProcessor: | |
| """ | |
| Process images for analysis. | |
| Handles validation, resizing, normalization, and format conversion | |
| for images before they are passed to AI models. | |
| """ | |
| def __init__(self): | |
| """Initialize ImageProcessor.""" | |
| self.max_size = config.MAX_IMAGE_SIZE | |
| self.max_dimension = config.IMAGE_MAX_DIMENSION | |
| self.allowed_formats = config.ALLOWED_IMAGE_FORMATS | |
| logger.info("ImageProcessor initialized") | |
| def load_image(self, image_path: Union[str, Path]) -> Image.Image: | |
| """ | |
| Load image from file path. | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| PIL Image object | |
| Raises: | |
| InvalidFileError: If image cannot be loaded | |
| """ | |
| try: | |
| image_path = Path(image_path) | |
| if not image_path.exists(): | |
| raise InvalidFileError( | |
| f"Image file not found: {image_path}", | |
| {"path": str(image_path)} | |
| ) | |
| # Validate file | |
| self.validate_image(image_path) | |
| # Load image | |
| image = Image.open(image_path) | |
| # Convert to RGB if necessary | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| logger.info(f"Loaded image: {image_path.name} ({image.size})") | |
| return image | |
| except Exception as e: | |
| logger.error(f"Failed to load image: {e}") | |
| raise InvalidFileError( | |
| f"Cannot load image: {str(e)}", | |
| {"path": str(image_path), "error": str(e)} | |
| ) | |
| def validate_image(self, image_path: Path) -> bool: | |
| """ | |
| Validate image file. | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| True if valid | |
| Raises: | |
| FileSizeError: If file too large | |
| UnsupportedFormatError: If format not supported | |
| InvalidFileError: If file is corrupted | |
| """ | |
| # Check file size | |
| file_size = image_path.stat().st_size | |
| if file_size > self.max_size: | |
| raise FileSizeError( | |
| f"Image too large: {file_size / 1024 / 1024:.1f}MB", | |
| {"max_size": self.max_size, "actual_size": file_size} | |
| ) | |
| # Check file extension | |
| ext = image_path.suffix.lower() | |
| if ext not in self.allowed_formats: | |
| raise UnsupportedFormatError( | |
| f"Unsupported image format: {ext}", | |
| {"allowed": self.allowed_formats, "received": ext} | |
| ) | |
| # Check MIME type using magic bytes | |
| try: | |
| mime = magic.from_file(str(image_path), mime=True) | |
| if not mime.startswith("image/"): | |
| raise InvalidFileError( | |
| f"File is not a valid image: {mime}", | |
| {"mime_type": mime} | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Could not verify MIME type: {e}") | |
| return True | |
| def resize_image( | |
| self, | |
| image: Image.Image, | |
| max_size: Optional[Tuple[int, int]] = None, | |
| maintain_aspect_ratio: bool = True | |
| ) -> Image.Image: | |
| """ | |
| Resize image to specified dimensions. | |
| Args: | |
| image: PIL Image object | |
| max_size: Maximum (width, height) tuple | |
| maintain_aspect_ratio: Whether to maintain aspect ratio | |
| Returns: | |
| Resized PIL Image | |
| """ | |
| if max_size is None: | |
| max_size = config.DEFAULT_IMAGE_SIZE | |
| original_size = image.size | |
| if maintain_aspect_ratio: | |
| # Calculate new size maintaining aspect ratio | |
| image.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| else: | |
| # Resize to exact dimensions | |
| image = image.resize(max_size, Image.Resampling.LANCZOS) | |
| logger.debug(f"Resized image: {original_size} -> {image.size}") | |
| return image | |
| def normalize_image(self, image: Image.Image) -> np.ndarray: | |
| """ | |
| Normalize image to numpy array with values [0, 1]. | |
| Args: | |
| image: PIL Image object | |
| Returns: | |
| Normalized numpy array (H, W, C) | |
| """ | |
| # Convert to numpy array | |
| img_array = np.array(image, dtype=np.float32) | |
| # Normalize to [0, 1] | |
| img_array = img_array / 255.0 | |
| logger.debug(f"Normalized image to shape: {img_array.shape}") | |
| return img_array | |
| def apply_exif_orientation(self, image: Image.Image) -> Image.Image: | |
| """ | |
| Apply EXIF orientation to image. | |
| Args: | |
| image: PIL Image object | |
| Returns: | |
| Oriented PIL Image | |
| """ | |
| try: | |
| image = ImageOps.exif_transpose(image) | |
| logger.debug("Applied EXIF orientation") | |
| except Exception as e: | |
| logger.warning(f"Could not apply EXIF orientation: {e}") | |
| return image | |
| def get_image_hash(self, image_path: Path) -> str: | |
| """ | |
| Generate SHA256 hash of image file. | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| Hex string of hash | |
| """ | |
| sha256_hash = hashlib.sha256() | |
| with open(image_path, "rb") as f: | |
| # Read in chunks to handle large files | |
| for chunk in iter(lambda: f.read(8192), b""): | |
| sha256_hash.update(chunk) | |
| return sha256_hash.hexdigest() | |
| def process( | |
| self, | |
| image_path: Union[str, Path], | |
| resize: bool = True, | |
| normalize: bool = False, | |
| apply_orientation: bool = True | |
| ) -> Union[Image.Image, np.ndarray]: | |
| """ | |
| Complete image processing pipeline. | |
| Args: | |
| image_path: Path to image file | |
| resize: Whether to resize image | |
| normalize: Whether to normalize to numpy array | |
| apply_orientation: Whether to apply EXIF orientation | |
| Returns: | |
| Processed image (PIL Image or numpy array) | |
| """ | |
| try: | |
| # Load image | |
| image = self.load_image(image_path) | |
| # Apply EXIF orientation | |
| if apply_orientation: | |
| image = self.apply_exif_orientation(image) | |
| # Resize if needed | |
| if resize: | |
| image = self.resize_image(image) | |
| # Normalize if needed | |
| if normalize: | |
| return self.normalize_image(image) | |
| return image | |
| except Exception as e: | |
| logger.error(f"Image processing failed: {e}") | |
| raise ImageProcessingError( | |
| f"Failed to process image: {str(e)}", | |
| {"path": str(image_path), "error": str(e)} | |
| ) | |
| def get_image_info(self, image_path: Union[str, Path]) -> dict: | |
| """ | |
| Get information about an image. | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| Dictionary with image information | |
| """ | |
| image_path = Path(image_path) | |
| image = self.load_image(image_path) | |
| return { | |
| "filename": image_path.name, | |
| "format": image.format, | |
| "mode": image.mode, | |
| "size": image.size, | |
| "width": image.size[0], | |
| "height": image.size[1], | |
| "file_size": image_path.stat().st_size, | |
| "hash": self.get_image_hash(image_path), | |
| } | |