import cv2 import numpy as np from PIL import Image, ImageEnhance from typing import List, Optional, Tuple import logging import asyncio from pathlib import Path from pdf2image import convert_from_path from typing import Dict logger = logging.getLogger(__name__) class ImageProcessor: """Image processing utilities for certificate analysis""" def __init__(self): self.supported_formats = ['.pdf', '.jpg', '.jpeg', '.png', '.tiff', '.bmp'] self.model_version = "2024.1.0-image-processor" logger.info(f"ImageProcessor v{self.model_version} initialized") async def process_document(self, document_path: str) -> List[np.ndarray]: """Process document and extract images from all pages""" try: images = [] # Check file type file_path = Path(document_path) suffix = file_path.suffix.lower() if suffix == '.pdf': images = await self._extract_from_pdf(document_path) elif suffix in ['.jpg', '.jpeg', '.png', '.tiff', '.bmp']: images = await self._extract_from_image(document_path) else: raise ValueError(f"Unsupported file format: {suffix}") if not images: raise Exception("No images extracted from document") # Process each image processed_images = [] for img in images: processed = await self._process_image(img) if processed is not None: processed_images.append(processed) logger.info(f"Extracted {len(processed_images)} processed images") return processed_images except Exception as e: logger.error(f"Document processing failed: {e}") raise async def _extract_from_pdf(self, pdf_path: str) -> List[np.ndarray]: """Extract images from PDF""" try: images = convert_from_path(pdf_path, dpi=200) image_arrays = [] for img in images: # Convert PIL to numpy img_array = np.array(img) # Convert RGB to BGR for OpenCV if needed if len(img_array.shape) == 3 and img_array.shape[2] == 3: img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) image_arrays.append(img_array) return image_arrays except ImportError: logger.error("pdf2image not installed. Install with: pip install pdf2image") raise except Exception as e: logger.error(f"PDF extraction failed: {e}") raise async def _extract_from_image(self, image_path: str) -> List[np.ndarray]: """Load image from file""" try: img = cv2.imread(image_path) if img is None: raise Exception(f"Failed to load image: {image_path}") return [img] except Exception as e: logger.error(f"Image loading failed: {e}") raise async def _process_image(self, image: np.ndarray) -> Optional[np.ndarray]: """Process single image for analysis""" try: # Convert to RGB if needed if len(image.shape) == 3 and image.shape[2] == 4: image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) elif len(image.shape) == 3 and image.shape[2] == 3: pass # Already BGR elif len(image.shape) == 2: image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) else: raise ValueError(f"Unsupported image shape: {image.shape}") # Resize if too large h, w = image.shape[:2] max_dimension = 2000 if max(h, w) > max_dimension: scale = max_dimension / max(h, w) new_h, new_w = int(h * scale), int(w * scale) image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) # Enhance image for better OCR enhanced = await self._enhance_image(image) return enhanced except Exception as e: logger.error(f"Image processing failed: {e}") return None async def _enhance_image(self, image: np.ndarray) -> np.ndarray: """Enhance image for OCR""" try: # Convert to PIL for enhancement pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) # Enhance contrast enhancer = ImageEnhance.Contrast(pil_img) pil_img = enhancer.enhance(1.2) # Enhance sharpness enhancer = ImageEnhance.Sharpness(pil_img) pil_img = enhancer.enhance(1.1) # Convert back to numpy enhanced = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) # Additional OpenCV enhancements # Convert to LAB color space lab = cv2.cvtColor(enhanced, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) # Apply CLAHE to L-channel clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) cl = clahe.apply(l) # Merge channels limg = cv2.merge([cl, a, b]) enhanced = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) # Denoise - REMOVED for performance (too slow on CPU) # enhanced = cv2.fastNlMeansDenoisingColored(enhanced, None, 10, 10, 7, 21) return enhanced except Exception as e: logger.debug(f"Image enhancement failed, using original: {e}") return image def deskew_image(self, image: np.ndarray) -> np.ndarray: """Deskew image if tilted""" try: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) gray = cv2.bitwise_not(gray) # Threshold the image thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)[1] # Find coordinates of non-zero pixels coords = np.column_stack(np.where(thresh > 0)) # Get angle of minimum area rectangle angle = cv2.minAreaRect(coords)[-1] # Adjust angle if angle < -45: angle = 90 + angle else: angle = -angle # Rotate image if angle is significant if abs(angle) > 0.5: (h, w) = image.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE) return rotated return image except Exception as e: logger.debug(f"Deskew failed: {e}") return image def detect_and_crop_edges(self, image: np.ndarray) -> np.ndarray: """Detect and crop edges to remove borders""" try: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Apply edge detection edges = cv2.Canny(gray, 50, 150) # Find contours contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: # Find largest contour (assumed to be document) largest_contour = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(largest_contour) # Add padding padding = 20 x = max(0, x - padding) y = max(0, y - padding) w = min(image.shape[1] - x, w + 2 * padding) h = min(image.shape[0] - y, h + 2 * padding) # Crop image cropped = image[y:y+h, x:x+w] # Only return if crop is significantly smaller than original if w < image.shape[1] * 0.9 or h < image.shape[0] * 0.9: return cropped return image except Exception as e: logger.debug(f"Edge cropping failed: {e}") return image def calculate_image_quality(self, image: np.ndarray) -> Dict[str, float]: """Calculate image quality metrics""" try: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Calculate blurriness (Laplacian variance) blur_score = cv2.Laplacian(gray, cv2.CV_64F).var() # Calculate contrast contrast_score = np.std(gray) # Calculate brightness brightness_score = np.mean(gray) # Calculate entropy hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) hist = hist.ravel() / hist.sum() entropy_score = -np.sum(hist * np.log2(hist + 1e-10)) # Calculate noise denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21) noise_score = np.mean(np.abs(gray.astype(float) - denoised.astype(float))) return { 'blur_score': float(blur_score), 'contrast_score': float(contrast_score), 'brightness_score': float(brightness_score), 'entropy_score': float(entropy_score), 'noise_score': float(noise_score), 'overall_quality': float(min( (blur_score / 100) * 0.3 + (contrast_score / 50) * 0.3 + (1 - brightness_score / 255) * 0.2 + (entropy_score / 8) * 0.2, 1.0 )) } except Exception as e: logger.debug(f"Quality calculation failed: {e}") return { 'blur_score': 0.0, 'contrast_score': 0.0, 'brightness_score': 0.0, 'entropy_score': 0.0, 'noise_score': 0.0, 'overall_quality': 0.0 }