Spaces:
Sleeping
Sleeping
| """ | |
| Image Preprocessor β Adaptive image analysis and preprocessing for OCR. | |
| Analyzes image properties (resolution, contrast, noise, skew) and applies | |
| optimal preprocessing pipeline. Part of the agentic system's perception layer. | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from dataclasses import dataclass, field | |
| from typing import Optional, Tuple | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class ImageProperties: | |
| """Properties extracted from image analysis.""" | |
| width: int = 0 | |
| height: int = 0 | |
| resolution_dpi: int = 72 | |
| contrast_score: float = 0.0 # 0-1 scale | |
| brightness_score: float = 0.0 # 0-1 scale | |
| noise_level: float = 0.0 # 0-1 scale (higher = noisier) | |
| skew_angle: float = 0.0 # degrees | |
| is_grayscale: bool = False | |
| is_binary: bool = False | |
| sharpness_score: float = 0.0 # 0-1 scale | |
| quality_rating: str = "unknown" # "excellent", "good", "fair", "poor" | |
| class PreprocessingConfig: | |
| """Configuration for preprocessing steps, determined by image analysis.""" | |
| apply_grayscale: bool = True | |
| apply_clahe: bool = True | |
| clahe_clip_limit: float = 2.0 | |
| clahe_grid_size: Tuple[int, int] = (8, 8) | |
| apply_denoise: bool = True | |
| denoise_strength: int = 10 | |
| apply_binarize: bool = True | |
| binarize_method: str = "otsu" # "otsu", "adaptive", "none" | |
| apply_deskew: bool = False | |
| apply_sharpen: bool = False | |
| apply_resize: bool = False | |
| target_dpi: int = 300 | |
| def analyze_image(image_path: str) -> ImageProperties: | |
| """ | |
| Analyze an image and extract its properties for the agent to make | |
| preprocessing decisions. | |
| Args: | |
| image_path: Path to the input image. | |
| Returns: | |
| ImageProperties with analysis results. | |
| """ | |
| props = ImageProperties() | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| raise FileNotFoundError(f"Cannot open image: {image_path}") | |
| props.height, props.width = img.shape[:2] | |
| props.is_grayscale = len(img.shape) == 2 or (len(img.shape) == 3 and img.shape[2] == 1) | |
| # Convert to grayscale for analysis | |
| if not props.is_grayscale: | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| else: | |
| gray = img if len(img.shape) == 2 else img[:, :, 0] | |
| # Contrast score (standard deviation of pixel values, normalized) | |
| std_dev = np.std(gray.astype(np.float64)) | |
| props.contrast_score = min(std_dev / 80.0, 1.0) | |
| # Brightness score (mean pixel value, normalized) | |
| mean_val = np.mean(gray.astype(np.float64)) | |
| props.brightness_score = mean_val / 255.0 | |
| # Noise estimation (using Laplacian variance) | |
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| props.sharpness_score = min(laplacian_var / 500.0, 1.0) | |
| # High Laplacian can also mean noise; estimate noise from smooth regions | |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) | |
| noise_estimate = np.mean(np.abs(gray.astype(np.float64) - blur.astype(np.float64))) | |
| props.noise_level = min(noise_estimate / 30.0, 1.0) | |
| # Check if already binary | |
| unique_vals = len(np.unique(gray)) | |
| props.is_binary = unique_vals <= 10 | |
| # Skew detection via Hough Line Transform | |
| props.skew_angle = _detect_skew(gray) | |
| # DPI estimation from image metadata | |
| try: | |
| pil_img = Image.open(image_path) | |
| dpi_info = pil_img.info.get('dpi', (72, 72)) | |
| props.resolution_dpi = int(dpi_info[0]) if isinstance(dpi_info, tuple) else int(dpi_info) | |
| except Exception: | |
| props.resolution_dpi = 72 | |
| # Overall quality rating | |
| props.quality_rating = _rate_quality(props) | |
| logger.info(f"Image analysis: {props.width}x{props.height}, " | |
| f"contrast={props.contrast_score:.2f}, noise={props.noise_level:.2f}, " | |
| f"quality={props.quality_rating}") | |
| return props | |
| def _detect_skew(gray: np.ndarray) -> float: | |
| """Detect text skew angle using Hough line transform.""" | |
| try: | |
| edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
| lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 100, | |
| minLineLength=gray.shape[1] // 4, | |
| maxLineGap=10) | |
| if lines is None or len(lines) == 0: | |
| return 0.0 | |
| angles = [] | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = np.degrees(np.arctan2(y2 - y1, x2 - x1)) | |
| # Only consider near-horizontal lines | |
| if abs(angle) < 15: | |
| angles.append(angle) | |
| if angles: | |
| return float(np.median(angles)) | |
| except Exception as e: | |
| logger.warning(f"Skew detection failed: {e}") | |
| return 0.0 | |
| def _rate_quality(props: ImageProperties) -> str: | |
| """Rate overall image quality for OCR.""" | |
| score = 0 | |
| # Resolution | |
| if props.resolution_dpi >= 300: | |
| score += 3 | |
| elif props.resolution_dpi >= 150: | |
| score += 2 | |
| else: | |
| score += 1 | |
| # Contrast | |
| if props.contrast_score > 0.6: | |
| score += 3 | |
| elif props.contrast_score > 0.3: | |
| score += 2 | |
| else: | |
| score += 1 | |
| # Noise | |
| if props.noise_level < 0.2: | |
| score += 3 | |
| elif props.noise_level < 0.5: | |
| score += 2 | |
| else: | |
| score += 1 | |
| # Sharpness | |
| if props.sharpness_score > 0.4: | |
| score += 3 | |
| elif props.sharpness_score > 0.15: | |
| score += 2 | |
| else: | |
| score += 1 | |
| if score >= 10: | |
| return "excellent" | |
| elif score >= 7: | |
| return "good" | |
| elif score >= 5: | |
| return "fair" | |
| else: | |
| return "poor" | |
| def determine_preprocessing(props: ImageProperties) -> PreprocessingConfig: | |
| """ | |
| Agent decision function: determine optimal preprocessing based on | |
| image properties. This is the intelligence layer for preprocessing. | |
| Args: | |
| props: Image properties from analysis. | |
| Returns: | |
| PreprocessingConfig with recommended preprocessing steps. | |
| """ | |
| config = PreprocessingConfig() | |
| # Already binary? Skip binarization | |
| if props.is_binary: | |
| config.apply_binarize = False | |
| config.apply_clahe = False | |
| logger.info("Image already binary β skipping CLAHE and binarization") | |
| # Low contrast β stronger CLAHE | |
| if props.contrast_score < 0.3: | |
| config.apply_clahe = True | |
| config.clahe_clip_limit = 3.0 | |
| logger.info("Low contrast detected β increasing CLAHE clip limit") | |
| elif props.contrast_score > 0.7: | |
| config.apply_clahe = False | |
| logger.info("High contrast β CLAHE not needed") | |
| # High noise β stronger denoising | |
| if props.noise_level > 0.5: | |
| config.apply_denoise = True | |
| config.denoise_strength = 15 | |
| logger.info("High noise β increasing denoise strength") | |
| elif props.noise_level < 0.15: | |
| config.apply_denoise = False | |
| logger.info("Low noise β denoising not needed") | |
| # Skewed β deskew | |
| if abs(props.skew_angle) > 0.5: | |
| config.apply_deskew = True | |
| logger.info(f"Skew detected ({props.skew_angle:.1f}Β°) β enabling deskew") | |
| # Low sharpness β sharpen | |
| if props.sharpness_score < 0.15: | |
| config.apply_sharpen = True | |
| logger.info("Low sharpness β enabling sharpening") | |
| # Low resolution β upscale | |
| if props.resolution_dpi < 150 and max(props.width, props.height) < 1500: | |
| config.apply_resize = True | |
| logger.info("Low resolution β enabling upscaling") | |
| # Adaptive binarization for uneven lighting | |
| if props.brightness_score < 0.3 or props.brightness_score > 0.7: | |
| config.binarize_method = "adaptive" | |
| logger.info("Uneven brightness β using adaptive binarization") | |
| return config | |
| def preprocess_image(image_path: str, config: Optional[PreprocessingConfig] = None) -> np.ndarray: | |
| """ | |
| Preprocess an image for OCR based on the given configuration. | |
| Args: | |
| image_path: Path to the input image. | |
| config: Preprocessing configuration. If None, auto-determine. | |
| Returns: | |
| Preprocessed image as numpy array. | |
| """ | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| raise FileNotFoundError(f"Cannot open image: {image_path}") | |
| if config is None: | |
| props = analyze_image(image_path) | |
| config = determine_preprocessing(props) | |
| # Step 1: Grayscale | |
| if config.apply_grayscale and len(img.shape) == 3: | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| elif len(img.shape) == 2: | |
| gray = img | |
| else: | |
| gray = img[:, :, 0] if img.shape[2] == 1 else cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Step 2: Resize/upscale | |
| if config.apply_resize: | |
| scale = config.target_dpi / 72.0 | |
| scale = min(scale, 3.0) # Cap at 3x | |
| new_w = int(gray.shape[1] * scale) | |
| new_h = int(gray.shape[0] * scale) | |
| gray = cv2.resize(gray, (new_w, new_h), interpolation=cv2.INTER_CUBIC) | |
| logger.info(f"Resized to {new_w}x{new_h}") | |
| # Step 3: Deskew | |
| if config.apply_deskew: | |
| gray = _deskew(gray) | |
| # Step 4: CLAHE contrast enhancement | |
| if config.apply_clahe: | |
| clahe = cv2.createCLAHE( | |
| clipLimit=config.clahe_clip_limit, | |
| tileGridSize=config.clahe_grid_size | |
| ) | |
| gray = clahe.apply(gray) | |
| # Step 5: Denoise | |
| if config.apply_denoise: | |
| gray = cv2.fastNlMeansDenoising(gray, h=config.denoise_strength) | |
| # Step 6: Sharpen | |
| if config.apply_sharpen: | |
| kernel = np.array([[-1, -1, -1], | |
| [-1, 9, -1], | |
| [-1, -1, -1]]) | |
| gray = cv2.filter2D(gray, -1, kernel) | |
| # Step 7: Binarize | |
| if config.apply_binarize: | |
| if config.binarize_method == "otsu": | |
| _, gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| elif config.binarize_method == "adaptive": | |
| gray = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2 | |
| ) | |
| return gray | |
| def _deskew(image: np.ndarray) -> np.ndarray: | |
| """Correct text skew in an image.""" | |
| coords = np.column_stack(np.where(image > 0)) | |
| if len(coords) < 100: | |
| return image | |
| try: | |
| angle = cv2.minAreaRect(coords)[-1] | |
| if angle < -45: | |
| angle = -(90 + angle) | |
| else: | |
| angle = -angle | |
| if abs(angle) < 0.5: | |
| return image | |
| h, w = image.shape[:2] | |
| center = (w // 2, h // 2) | |
| M = cv2.getRotationMatrix2D(center, angle, 1.0) | |
| rotated = cv2.warpAffine( | |
| image, M, (w, h), | |
| flags=cv2.INTER_CUBIC, | |
| borderMode=cv2.BORDER_REPLICATE | |
| ) | |
| logger.info(f"Deskewed by {angle:.2f}Β°") | |
| return rotated | |
| except Exception as e: | |
| logger.warning(f"Deskew failed: {e}") | |
| return image | |