Spaces:
Sleeping
Sleeping
| """ | |
| RF-DETR Optimized Preprocessing | |
| This module provides preprocessing specifically optimized for RF-DETR model. | |
| Unlike generic preprocessing, this version preserves the pixel value distributions | |
| expected by RF-DETR's ImageNet normalization (mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]). | |
| Key Principles: | |
| 1. Denoise to remove compression artifacts WITHOUT changing distributions | |
| 2. Color harmonization for cross-device consistency | |
| 3. PRESERVE global mean/std values for ImageNet normalization compatibility | |
| 4. Gentle adjustments only (no aggressive CLAHE or histogram equalization) | |
| Differences from generic preprocessing: | |
| - Generic: Aggressive normalization, CLAHE, brightness adjustment | |
| - RF-DETR optimized: Gentle denoising, color balance, distribution-preserving | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from typing import Union, Tuple, Optional | |
| from pathlib import Path | |
| class RFDETRPreprocessor: | |
| """ | |
| Preprocessing optimized specifically for RF-DETR model | |
| Focuses on: | |
| - Denoising compression artifacts | |
| - Cross-device color consistency | |
| - Preserving pixel value distributions for ImageNet normalization | |
| """ | |
| # ImageNet normalization values used by RF-DETR | |
| IMAGENET_MEAN = [0.485, 0.456, 0.406] # Expected by RF-DETR | |
| IMAGENET_STD = [0.229, 0.224, 0.225] # Expected by RF-DETR | |
| def __init__( | |
| self, | |
| denoise: bool = True, | |
| color_balance: bool = True, | |
| preserve_distribution: bool = True, | |
| denoise_strength: int = 5 # Gentle by default | |
| ): | |
| """ | |
| Initialize RF-DETR optimized preprocessor | |
| Args: | |
| denoise: Remove JPEG/PNG compression artifacts | |
| color_balance: Balance colors for cross-device consistency | |
| preserve_distribution: Preserve mean/std for ImageNet norm | |
| denoise_strength: Denoising strength (1-10, lower=gentler) | |
| """ | |
| self.denoise = denoise | |
| self.color_balance = color_balance | |
| self.preserve_distribution = preserve_distribution | |
| self.denoise_strength = denoise_strength | |
| def preprocess(self, image: Union[str, Path, np.ndarray, Image.Image]) -> np.ndarray: | |
| """ | |
| Apply RF-DETR optimized preprocessing | |
| Args: | |
| image: Input image (path, PIL, or numpy array) | |
| Returns: | |
| Preprocessed numpy array in RGB format, ready for RF-DETR | |
| """ | |
| # Load image | |
| img_array = self._load_image(image) | |
| # Store original statistics if preservation is needed | |
| if self.preserve_distribution: | |
| original_mean = np.mean(img_array, axis=(0, 1)) | |
| original_std = np.std(img_array, axis=(0, 1)) | |
| # 1. Gentle denoising (removes artifacts without changing distributions) | |
| if self.denoise: | |
| img_array = self._gentle_denoise(img_array) | |
| # 2. Color balance for cross-device consistency | |
| if self.color_balance: | |
| img_array = self._balance_colors(img_array) | |
| # 3. Restore original distribution if needed | |
| if self.preserve_distribution: | |
| img_array = self._restore_distribution( | |
| img_array, | |
| original_mean, | |
| original_std | |
| ) | |
| return img_array | |
| def _load_image(self, image: Union[str, Path, np.ndarray, Image.Image]) -> np.ndarray: | |
| """Load image from various formats""" | |
| if isinstance(image, (str, Path)): | |
| pil_image = Image.open(image).convert('RGB') | |
| return np.array(pil_image) | |
| elif isinstance(image, Image.Image): | |
| return np.array(image.convert('RGB')) | |
| elif isinstance(image, np.ndarray): | |
| if len(image.shape) == 2: | |
| return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) | |
| elif image.shape[2] == 4: | |
| return cv2.cvtColor(image, cv2.COLOR_RGBA2RGB) | |
| elif image.shape[2] == 3: | |
| return image.copy() | |
| else: | |
| raise ValueError(f"Unsupported image type: {type(image)}") | |
| def _gentle_denoise(self, img: np.ndarray) -> np.ndarray: | |
| """ | |
| Gentle denoising that removes compression artifacts | |
| WITHOUT significantly changing pixel distributions | |
| Uses bilateral filter which preserves edges and distributions | |
| better than other methods. | |
| """ | |
| # Convert RGB to BGR for OpenCV | |
| img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| # Bilateral filter: removes noise while preserving edges | |
| # and maintaining distribution better than other methods | |
| denoised = cv2.bilateralFilter( | |
| img_bgr, | |
| d=self.denoise_strength, # Diameter | |
| sigmaColor=self.denoise_strength * 10, | |
| sigmaSpace=self.denoise_strength * 10 | |
| ) | |
| # Convert back to RGB | |
| return cv2.cvtColor(denoised, cv2.COLOR_BGR2RGB) | |
| def _balance_colors(self, img: np.ndarray) -> np.ndarray: | |
| """ | |
| Balance colors for cross-device consistency | |
| Uses gray world assumption: average color should be gray. | |
| This reduces impact of different color profiles (Samsung vivid vs Pixel neutral) | |
| while preserving overall brightness and contrast. | |
| """ | |
| # Calculate mean for each channel | |
| mean_r = np.mean(img[:, :, 0]) | |
| mean_g = np.mean(img[:, :, 1]) | |
| mean_b = np.mean(img[:, :, 2]) | |
| # Calculate gray average | |
| gray_avg = (mean_r + mean_g + mean_b) / 3.0 | |
| # Gentle color balance (only 50% correction to preserve original look) | |
| alpha = 0.5 # 50% correction | |
| img_balanced = img.copy().astype(np.float32) | |
| if mean_r > 0: | |
| img_balanced[:, :, 0] = img_balanced[:, :, 0] * (1 - alpha + alpha * gray_avg / mean_r) | |
| if mean_g > 0: | |
| img_balanced[:, :, 1] = img_balanced[:, :, 1] * (1 - alpha + alpha * gray_avg / mean_g) | |
| if mean_b > 0: | |
| img_balanced[:, :, 2] = img_balanced[:, :, 2] * (1 - alpha + alpha * gray_avg / mean_b) | |
| # Clip to valid range | |
| img_balanced = np.clip(img_balanced, 0, 255).astype(np.uint8) | |
| return img_balanced | |
| def _restore_distribution( | |
| self, | |
| img: np.ndarray, | |
| target_mean: np.ndarray, | |
| target_std: np.ndarray | |
| ) -> np.ndarray: | |
| """ | |
| Restore original mean/std distribution | |
| This ensures that preprocessing doesn't interfere with | |
| RF-DETR's ImageNet normalization expectations. | |
| """ | |
| img_float = img.astype(np.float32) | |
| # Calculate current statistics | |
| current_mean = np.mean(img_float, axis=(0, 1)) | |
| current_std = np.std(img_float, axis=(0, 1)) | |
| # Restore distribution for each channel | |
| for c in range(3): | |
| if current_std[c] > 1e-6: # Avoid division by zero | |
| # Standardize to zero mean, unit std | |
| img_float[:, :, c] = (img_float[:, :, c] - current_mean[c]) / current_std[c] | |
| # Restore original distribution | |
| img_float[:, :, c] = img_float[:, :, c] * target_std[c] + target_mean[c] | |
| # Clip to valid range | |
| img_restored = np.clip(img_float, 0, 255).astype(np.uint8) | |
| return img_restored | |
| # Preset configurations for RF-DETR | |
| RFDETR_PRESETS = { | |
| "gentle": RFDETRPreprocessor( | |
| denoise=True, | |
| color_balance=False, | |
| preserve_distribution=True, | |
| denoise_strength=3 # Very gentle | |
| ), | |
| "standard": RFDETRPreprocessor( | |
| denoise=True, | |
| color_balance=True, | |
| preserve_distribution=True, | |
| denoise_strength=5 # Moderate | |
| ), | |
| "aggressive_denoise": RFDETRPreprocessor( | |
| denoise=True, | |
| color_balance=True, | |
| preserve_distribution=True, | |
| denoise_strength=8 # Strong denoising | |
| ), | |
| "color_only": RFDETRPreprocessor( | |
| denoise=False, | |
| color_balance=True, | |
| preserve_distribution=True, | |
| denoise_strength=0 | |
| ), | |
| } | |
| def preprocess_for_rfdetr( | |
| image: Union[str, Path, np.ndarray, Image.Image], | |
| preset: str = "standard" | |
| ) -> np.ndarray: | |
| """ | |
| Convenience function for RF-DETR optimized preprocessing | |
| Args: | |
| image: Input image | |
| preset: Preprocessing preset optimized for RF-DETR | |
| ('gentle', 'standard', 'aggressive_denoise', 'color_only') | |
| Returns: | |
| Preprocessed numpy array in RGB format, ready for RF-DETR | |
| Example: | |
| >>> img = preprocess_for_rfdetr("samsung.png", preset="standard") | |
| >>> results = rfdetr_model.predict(img, threshold=0.35) | |
| """ | |
| if preset not in RFDETR_PRESETS: | |
| raise ValueError( | |
| f"Unknown preset: {preset}. Available: {list(RFDETR_PRESETS.keys())}" | |
| ) | |
| preprocessor = RFDETR_PRESETS[preset] | |
| return preprocessor.preprocess(image) | |
| def compare_distributions(original: np.ndarray, preprocessed: np.ndarray) -> dict: | |
| """ | |
| Compare pixel distributions before/after preprocessing | |
| Useful for verifying that preprocessing doesn't distort distributions | |
| too much for RF-DETR's ImageNet normalization. | |
| Args: | |
| original: Original image | |
| preprocessed: Preprocessed image | |
| Returns: | |
| Dict with distribution statistics | |
| """ | |
| orig_mean = np.mean(original, axis=(0, 1)) | |
| orig_std = np.std(original, axis=(0, 1)) | |
| prep_mean = np.mean(preprocessed, axis=(0, 1)) | |
| prep_std = np.std(preprocessed, axis=(0, 1)) | |
| return { | |
| "original": { | |
| "mean": orig_mean.tolist(), | |
| "std": orig_std.tolist(), | |
| "mean_normalized": (orig_mean / 255.0).tolist(), # ImageNet scale | |
| }, | |
| "preprocessed": { | |
| "mean": prep_mean.tolist(), | |
| "std": prep_std.tolist(), | |
| "mean_normalized": (prep_mean / 255.0).tolist(), | |
| }, | |
| "difference": { | |
| "mean_delta": (prep_mean - orig_mean).tolist(), | |
| "std_delta": (prep_std - orig_std).tolist(), | |
| "mean_delta_pct": ((prep_mean - orig_mean) / (orig_mean + 1e-6) * 100).tolist(), | |
| }, | |
| "imagenet_expected": { | |
| "mean": [0.485, 0.456, 0.406], | |
| "std": [0.229, 0.224, 0.225] | |
| } | |
| } | |