Spaces:
Running
Running
| """ | |
| Utility functions for OCR processing with Mistral AI. | |
| Contains helper functions for working with OCR responses and image handling. | |
| """ | |
| import json | |
| import base64 | |
| import io | |
| import zipfile | |
| import logging | |
| import numpy as np | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Union, Any, Tuple | |
| from functools import lru_cache | |
| # Configure logging | |
| logger = logging.getLogger("ocr_utils") | |
| try: | |
| from PIL import Image, ImageEnhance, ImageFilter, ImageOps | |
| import cv2 | |
| PILLOW_AVAILABLE = True | |
| CV2_AVAILABLE = True | |
| except ImportError as e: | |
| # Check which image libraries are available | |
| if "PIL" in str(e): | |
| PILLOW_AVAILABLE = False | |
| if "cv2" in str(e): | |
| CV2_AVAILABLE = False | |
| from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
| # Import configuration | |
| try: | |
| from config import IMAGE_PREPROCESSING | |
| except ImportError: | |
| # Fallback defaults if config not available | |
| IMAGE_PREPROCESSING = { | |
| "enhance_contrast": 1.5, | |
| "sharpen": True, | |
| "denoise": True, | |
| "max_size_mb": 8.0, | |
| "target_dpi": 300, | |
| "compression_quality": 92 | |
| } | |
| def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str: | |
| """ | |
| Replace image placeholders in markdown with base64-encoded images. | |
| Args: | |
| markdown_str: Markdown text containing image placeholders | |
| images_dict: Dictionary mapping image IDs to base64 strings | |
| Returns: | |
| Markdown text with images replaced by base64 data | |
| """ | |
| for img_name, base64_str in images_dict.items(): | |
| markdown_str = markdown_str.replace( | |
| f"", f"" | |
| ) | |
| return markdown_str | |
| def get_combined_markdown(ocr_response) -> str: | |
| """ | |
| Combine OCR text and images into a single markdown document. | |
| Args: | |
| ocr_response: OCR response object from Mistral AI | |
| Returns: | |
| Combined markdown string with embedded images | |
| """ | |
| markdowns = [] | |
| # Process each page of the OCR response | |
| for page in ocr_response.pages: | |
| # Extract image data if available | |
| image_data = {} | |
| if hasattr(page, "images"): | |
| for img in page.images: | |
| if hasattr(img, "id") and hasattr(img, "image_base64"): | |
| image_data[img.id] = img.image_base64 | |
| # Replace image placeholders with base64 data | |
| page_markdown = page.markdown if hasattr(page, "markdown") else "" | |
| processed_markdown = replace_images_in_markdown(page_markdown, image_data) | |
| markdowns.append(processed_markdown) | |
| # Join all pages' markdown with double newlines | |
| return "\n\n".join(markdowns) | |
| def encode_image_for_api(image_path: Union[str, Path]) -> str: | |
| """ | |
| Encode an image as base64 data URL for API submission. | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Base64 data URL for the image | |
| """ | |
| # Convert to Path object if string | |
| image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
| # Verify image exists | |
| if not image_file.is_file(): | |
| raise FileNotFoundError(f"Image file not found: {image_file}") | |
| # Encode image as base64 | |
| encoded = base64.b64encode(image_file.read_bytes()).decode() | |
| return f"data:image/jpeg;base64,{encoded}" | |
| def process_image_with_ocr(client, image_path: Union[str, Path], model: str = "mistral-ocr-latest"): | |
| """ | |
| Process an image with OCR and return the response. | |
| Args: | |
| client: Mistral AI client | |
| image_path: Path to the image file | |
| model: OCR model to use | |
| Returns: | |
| OCR response object | |
| """ | |
| # Encode image as base64 | |
| base64_data_url = encode_image_for_api(image_path) | |
| # Process image with OCR | |
| image_response = client.ocr.process( | |
| document=ImageURLChunk(image_url=base64_data_url), | |
| model=model | |
| ) | |
| return image_response | |
| def ocr_response_to_json(ocr_response, indent: int = 4) -> str: | |
| """ | |
| Convert OCR response to a formatted JSON string. | |
| Args: | |
| ocr_response: OCR response object | |
| indent: Indentation level for JSON formatting | |
| Returns: | |
| Formatted JSON string | |
| """ | |
| # Convert OCR response to a dictionary | |
| response_dict = { | |
| "text": ocr_response.text if hasattr(ocr_response, "text") else "", | |
| "pages": [] | |
| } | |
| # Process pages if available | |
| if hasattr(ocr_response, "pages"): | |
| for page in ocr_response.pages: | |
| page_dict = { | |
| "text": page.text if hasattr(page, "text") else "", | |
| "markdown": page.markdown if hasattr(page, "markdown") else "", | |
| "images": [] | |
| } | |
| # Process images if available | |
| if hasattr(page, "images"): | |
| for img in page.images: | |
| img_dict = { | |
| "id": img.id if hasattr(img, "id") else "", | |
| "base64": img.image_base64 if hasattr(img, "image_base64") else "" | |
| } | |
| page_dict["images"].append(img_dict) | |
| response_dict["pages"].append(page_dict) | |
| # Convert dictionary to JSON | |
| return json.dumps(response_dict, indent=indent) | |
| def create_results_zip_in_memory(results): | |
| """ | |
| Create a zip file containing OCR results in memory. | |
| Args: | |
| results: Dictionary or list of OCR results | |
| Returns: | |
| Binary zip file data | |
| """ | |
| # Create a BytesIO object | |
| zip_buffer = io.BytesIO() | |
| # Check if results is a list or a dictionary | |
| is_list = isinstance(results, list) | |
| # Create zip file in memory | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| if is_list: | |
| # Handle list of results | |
| for i, result in enumerate(results): | |
| try: | |
| # Add JSON results for each file | |
| result_json = json.dumps(result, indent=2) | |
| zipf.writestr(f"results_{i+1}.json", result_json) | |
| # Add HTML content (generated from the result) | |
| html_content = create_html_with_images(result) | |
| filename = result.get('file_name', f'document_{i+1}').split('.')[0] | |
| zipf.writestr(f"{filename}_with_images.html", html_content) | |
| # Add raw OCR text if available | |
| if "ocr_contents" in result and "raw_text" in result["ocr_contents"]: | |
| zipf.writestr(f"ocr_text_{i+1}.txt", result["ocr_contents"]["raw_text"]) | |
| # Add HTML visualization if available | |
| if "html_visualization" in result: | |
| zipf.writestr(f"visualization_{i+1}.html", result["html_visualization"]) | |
| # Add images if available (limit to conserve memory) | |
| if "pages_data" in result: | |
| for page_idx, page in enumerate(result["pages_data"]): | |
| for img_idx, img in enumerate(page.get("images", [])[:3]): # Limit to first 3 images per page | |
| img_base64 = img.get("image_base64", "") | |
| if img_base64: | |
| # Strip data URL prefix if present | |
| if img_base64.startswith("data:image"): | |
| img_base64 = img_base64.split(",", 1)[1] | |
| # Decode base64 and add to zip | |
| try: | |
| img_data = base64.b64decode(img_base64) | |
| zipf.writestr(f"images/result_{i+1}_page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
| except: | |
| pass | |
| except Exception: | |
| # If any result fails, skip it and continue | |
| continue | |
| else: | |
| # Handle single result | |
| try: | |
| # Add JSON results | |
| results_json = json.dumps(results, indent=2) | |
| zipf.writestr("results.json", results_json) | |
| # Add HTML content | |
| html_content = create_html_with_images(results) | |
| filename = results.get('file_name', 'document').split('.')[0] | |
| zipf.writestr(f"{filename}_with_images.html", html_content) | |
| # Add raw OCR text if available | |
| if "ocr_contents" in results and "raw_text" in results["ocr_contents"]: | |
| zipf.writestr("ocr_text.txt", results["ocr_contents"]["raw_text"]) | |
| # Add HTML visualization if available | |
| if "html_visualization" in results: | |
| zipf.writestr("visualization.html", results["html_visualization"]) | |
| # Add images if available | |
| if "pages_data" in results: | |
| for page_idx, page in enumerate(results["pages_data"]): | |
| for img_idx, img in enumerate(page.get("images", [])): | |
| img_base64 = img.get("image_base64", "") | |
| if img_base64: | |
| # Strip data URL prefix if present | |
| if img_base64.startswith("data:image"): | |
| img_base64 = img_base64.split(",", 1)[1] | |
| # Decode base64 and add to zip | |
| try: | |
| img_data = base64.b64decode(img_base64) | |
| zipf.writestr(f"images/page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
| except: | |
| pass | |
| except Exception: | |
| # If processing fails, return empty zip | |
| pass | |
| # Seek to the beginning of the BytesIO object | |
| zip_buffer.seek(0) | |
| # Return the zip file bytes | |
| return zip_buffer.getvalue() | |
| def create_results_zip(results, output_dir=None, zip_name=None): | |
| """ | |
| Create a zip file containing OCR results. | |
| Args: | |
| results: Dictionary or list of OCR results | |
| output_dir: Optional output directory | |
| zip_name: Optional zip file name | |
| Returns: | |
| Path to the created zip file | |
| """ | |
| # Create temporary output directory if not provided | |
| if output_dir is None: | |
| output_dir = Path.cwd() / "output" | |
| output_dir.mkdir(exist_ok=True) | |
| else: | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(exist_ok=True) | |
| # Check if results is a list or a dictionary | |
| is_list = isinstance(results, list) | |
| # Generate zip name if not provided | |
| if zip_name is None: | |
| if is_list: | |
| # For list of results, use timestamp and generic name | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| zip_name = f"ocr-results_{timestamp}.zip" | |
| else: | |
| # For single result, use original file's info | |
| # Check if processed_at exists, otherwise use current timestamp | |
| if "processed_at" in results: | |
| timestamp = results.get("processed_at", "").replace(":", "-").replace(".", "-") | |
| else: | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| file_name = results.get("file_name", "ocr-results") | |
| zip_name = f"{file_name}_{timestamp}.zip" | |
| try: | |
| # Get zip data in memory first | |
| zip_data = create_results_zip_in_memory(results) | |
| # Save to file | |
| zip_path = output_dir / zip_name | |
| with open(zip_path, 'wb') as f: | |
| f.write(zip_data) | |
| return zip_path | |
| except Exception as e: | |
| # Create an empty zip file as fallback | |
| zip_path = output_dir / zip_name | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.writestr("info.txt", "Could not create complete archive") | |
| return zip_path | |
| # Advanced image preprocessing functions | |
| def preprocess_image_for_ocr(image_path: Union[str, Path]) -> Tuple[Image.Image, str]: | |
| """ | |
| Preprocess an image for optimal OCR performance with enhanced speed and memory optimization. | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Tuple of (processed PIL Image, base64 string) | |
| """ | |
| # Fast path: Skip all processing if PIL not available | |
| if not PILLOW_AVAILABLE: | |
| logger.info("PIL not available, skipping image preprocessing") | |
| return None, encode_image_for_api(image_path) | |
| # Convert to Path object if string | |
| image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
| # Thread-safe caching with early exit for already processed images | |
| try: | |
| # Fast stat calls for file metadata - consolidate to reduce I/O | |
| file_stat = image_file.stat() | |
| file_size = file_stat.st_size | |
| file_size_mb = file_size / (1024 * 1024) | |
| mod_time = file_stat.st_mtime | |
| # Create a cache key based on essential file properties | |
| cache_key = f"{image_file.name}_{file_size}_{mod_time}" | |
| # Fast path: Return cached result if available | |
| if hasattr(preprocess_image_for_ocr, "_cache") and cache_key in preprocess_image_for_ocr._cache: | |
| logger.debug(f"Using cached preprocessing result for {image_file.name}") | |
| return preprocess_image_for_ocr._cache[cache_key] | |
| # Optimization: Skip heavy processing for very small files | |
| # Small images (less than 100KB) likely don't need preprocessing | |
| if file_size < 100000: # 100KB | |
| logger.info(f"Image {image_file.name} is small ({file_size/1024:.1f}KB), using minimal processing") | |
| with Image.open(image_file) as img: | |
| # Normalize mode only | |
| if img.mode not in ('RGB', 'L'): | |
| img = img.convert('RGB') | |
| # Save with light optimization | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="JPEG", quality=95, optimize=True) | |
| buffer.seek(0) | |
| # Get base64 | |
| encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| # Cache and return | |
| result = (img, base64_data_url) | |
| if not hasattr(preprocess_image_for_ocr, "_cache"): | |
| preprocess_image_for_ocr._cache = {} | |
| # Clean cache if needed | |
| if len(preprocess_image_for_ocr._cache) > 20: # Increased cache size for better performance | |
| # Remove oldest 5 entries for better batch processing | |
| for _ in range(5): | |
| if preprocess_image_for_ocr._cache: | |
| preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
| preprocess_image_for_ocr._cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| # If stat or cache handling fails, log and continue with processing | |
| logger.debug(f"Cache handling failed for {image_path}: {str(e)}") | |
| # Ensure we have a valid file_size_mb for later decisions | |
| try: | |
| file_size_mb = image_file.stat().st_size / (1024 * 1024) | |
| except: | |
| file_size_mb = 0 # Default if we can't determine size | |
| try: | |
| # Process start time for performance logging | |
| start_time = time.time() | |
| # Open and process the image with minimal memory footprint | |
| with Image.open(image_file) as img: | |
| # Normalize image mode | |
| if img.mode not in ('RGB', 'L'): | |
| img = img.convert('RGB') | |
| # Fast path: Quick check of image properties to determine appropriate processing | |
| width, height = img.size | |
| image_area = width * height | |
| # Detect document type only for medium to large images to save processing time | |
| is_document = False | |
| if image_area > 500000: # Approx 700x700 or larger | |
| # Store image for document detection | |
| _detect_document_type_impl._current_img = img | |
| is_document = _detect_document_type_impl(None) | |
| logger.debug(f"Document type detection for {image_file.name}: {'document' if is_document else 'photo'}") | |
| # Resize large images for API efficiency | |
| if file_size_mb > IMAGE_PREPROCESSING["max_size_mb"] or max(width, height) > 3000: | |
| # Calculate target dimensions directly instead of using the heavier resize function | |
| target_width, target_height = width, height | |
| max_dimension = max(width, height) | |
| # Use a sliding scale for reduction based on image size | |
| if max_dimension > 5000: | |
| scale_factor = 0.25 # Aggressive reduction for very large images | |
| elif max_dimension > 3000: | |
| scale_factor = 0.4 # Significant reduction for large images | |
| else: | |
| scale_factor = 0.6 # Moderate reduction for medium images | |
| # Calculate new dimensions | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| # Use direct resize with optimized resampling filter based on image size | |
| if image_area > 3000000: # Very large, use faster but lower quality | |
| processed_img = img.resize((new_width, new_height), Image.BILINEAR) | |
| else: # Medium size, use better quality | |
| processed_img = img.resize((new_width, new_height), Image.LANCZOS) | |
| logger.debug(f"Resized image from {width}x{height} to {new_width}x{new_height}") | |
| else: | |
| # Skip resizing for smaller images | |
| processed_img = img | |
| # Apply appropriate processing based on document type and size | |
| if is_document: | |
| # Process as document with optimized path based on size | |
| if image_area > 1000000: # Full processing for larger documents | |
| preprocess_document_image._current_img = processed_img | |
| processed = _preprocess_document_image_impl() | |
| else: # Lightweight processing for smaller documents | |
| # Just enhance contrast for small documents to save time | |
| enhancer = ImageEnhance.Contrast(processed_img) | |
| processed = enhancer.enhance(1.3) | |
| else: | |
| # Process as photo with optimized path based on size | |
| if image_area > 1000000: # Full processing for larger photos | |
| preprocess_general_image._current_img = processed_img | |
| processed = _preprocess_general_image_impl() | |
| else: # Skip processing for smaller photos | |
| processed = processed_img | |
| # Optimize memory handling during encoding | |
| buffer = io.BytesIO() | |
| # Adjust quality based on image size to optimize API payload | |
| if file_size_mb > 5: | |
| quality = 85 # Lower quality for large files | |
| else: | |
| quality = IMAGE_PREPROCESSING["compression_quality"] | |
| # Save with optimized parameters | |
| processed.save(buffer, format="JPEG", quality=quality, optimize=True) | |
| buffer.seek(0) | |
| # Get base64 with minimal memory footprint | |
| encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| # Update cache thread-safely | |
| result = (processed, base64_data_url) | |
| if not hasattr(preprocess_image_for_ocr, "_cache"): | |
| preprocess_image_for_ocr._cache = {} | |
| # LRU-like cache management with improved clearing | |
| if len(preprocess_image_for_ocr._cache) > 20: | |
| try: | |
| # Remove several entries to avoid frequent cache clearing | |
| for _ in range(5): | |
| if preprocess_image_for_ocr._cache: | |
| preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
| except: | |
| # If removal fails, just continue | |
| pass | |
| # Add to cache | |
| try: | |
| preprocess_image_for_ocr._cache[cache_key] = result | |
| except Exception: | |
| # If caching fails, just proceed | |
| pass | |
| # Log performance metrics | |
| processing_time = time.time() - start_time | |
| logger.debug(f"Image preprocessing completed in {processing_time:.3f}s for {image_file.name}") | |
| # Return both processed image and base64 string | |
| return result | |
| except Exception as e: | |
| # If preprocessing fails, log error and use original image | |
| logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
| return None, encode_image_for_api(image_path) | |
| # Removed caching decorator to fix unhashable type error | |
| def detect_document_type(img: Image.Image) -> bool: | |
| """ | |
| Detect if an image is likely a document (text-heavy) vs. a photo. | |
| Args: | |
| img: PIL Image object | |
| Returns: | |
| True if likely a document, False otherwise | |
| """ | |
| # Direct implementation without caching | |
| return _detect_document_type_impl(None) | |
| def _detect_document_type_impl(img_hash=None) -> bool: | |
| """ | |
| Optimized implementation of document type detection for faster processing. | |
| The img_hash parameter is unused but kept for backward compatibility. | |
| Enhanced to better detect handwritten documents. | |
| """ | |
| # Fast path: Get the image from thread-local storage | |
| if not hasattr(_detect_document_type_impl, "_current_img"): | |
| return False # Fail safe in case image is not set | |
| img = _detect_document_type_impl._current_img | |
| # Skip processing for tiny images - just classify as non-documents | |
| width, height = img.size | |
| if width * height < 100000: # Approx 300x300 or smaller | |
| return False | |
| # Convert to grayscale for analysis (using faster conversion) | |
| gray_img = img.convert('L') | |
| # PIL-only path for systems without OpenCV | |
| if not CV2_AVAILABLE: | |
| # Faster method: Sample a subset of the image for edge detection | |
| # Downscale image for faster processing | |
| sample_size = min(width, height, 1000) | |
| scale_factor = sample_size / max(width, height) | |
| if scale_factor < 0.9: # Only resize if significant reduction | |
| sample_img = gray_img.resize( | |
| (int(width * scale_factor), int(height * scale_factor)), | |
| Image.NEAREST # Fastest resampling method | |
| ) | |
| else: | |
| sample_img = gray_img | |
| # Fast edge detection on sample | |
| edges = sample_img.filter(ImageFilter.FIND_EDGES) | |
| # Count edge pixels using threshold (faster than summing individual pixels) | |
| edge_data = edges.getdata() | |
| edge_threshold = 40 # Lowered threshold to better detect handwritten texts | |
| # Use list comprehension for better performance | |
| edge_count = sum(1 for p in edge_data if p > edge_threshold) | |
| total_pixels = len(edge_data) | |
| edge_ratio = edge_count / total_pixels | |
| # Check if bright areas exist - simple approximation of text/background contrast | |
| bright_count = sum(1 for p in gray_img.getdata() if p > 200) | |
| bright_ratio = bright_count / (width * height) | |
| # Documents typically have more edges (text boundaries) and bright areas (background) | |
| # Lowered edge threshold to better detect handwritten documents | |
| return edge_ratio > 0.035 or bright_ratio > 0.4 | |
| # OpenCV path - optimized for speed and enhanced for handwritten documents | |
| img_np = np.array(gray_img) | |
| # 1. Fast check: Variance of pixel values | |
| # Documents typically have high variance (text on background) | |
| # Handwritten documents may have less contrast than printed text | |
| std_dev = np.std(img_np) | |
| if std_dev > 45: # Lowered threshold to better detect handwritten documents | |
| return True | |
| # 2. Quick check using downsampled image for edges | |
| # Downscale for faster processing on large images | |
| if max(img_np.shape) > 1000: | |
| scale = 1000 / max(img_np.shape) | |
| small_img = cv2.resize(img_np, None, fx=scale, fy=scale, interpolation=cv2.INTER_NEAREST) | |
| else: | |
| small_img = img_np | |
| # Use adaptive edge detection parameters for handwritten documents | |
| # Lowered threshold to better detect fainter handwritten text | |
| edges = cv2.Canny(small_img, 30, 130, L2gradient=False) | |
| edge_ratio = np.count_nonzero(edges) / edges.size | |
| # 3. Fast histogram approximation using bins | |
| # Instead of calculating full histogram, use bins for dark and light regions | |
| # Adjusted for handwritten documents which may have more gray values | |
| dark_mask = img_np < 60 # Increased threshold to capture lighter handwritten text | |
| light_mask = img_np > 180 # Lowered threshold to account for aged paper | |
| dark_ratio = np.count_nonzero(dark_mask) / img_np.size | |
| light_ratio = np.count_nonzero(light_mask) / img_np.size | |
| # Special analysis for handwritten documents | |
| # Check for line-like structures typical in handwritten text | |
| if CV2_AVAILABLE and edge_ratio > 0.02: # Lower threshold to capture handwritten documents | |
| # Try to find line segments that could indicate text lines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, | |
| threshold=50, # Lower threshold for detection | |
| minLineLength=30, # Shorter lines for handwriting | |
| maxLineGap=20) # Larger gap for discontinuous handwriting | |
| # If we find enough line segments, it's likely a document with text | |
| if lines is not None and len(lines) > 10: | |
| return True | |
| # Combine heuristics for final decision | |
| # Documents typically have both dark (text) and light (background) regions, | |
| # and/or well-defined edges | |
| # Lower thresholds for handwritten documents | |
| return (dark_ratio > 0.03 and light_ratio > 0.25) or edge_ratio > 0.03 | |
| # Removed caching to fix unhashable type error | |
| def preprocess_document_image(img: Image.Image) -> Image.Image: | |
| """ | |
| Preprocess a document image for optimal OCR. | |
| Args: | |
| img: PIL Image object | |
| Returns: | |
| Processed PIL Image | |
| """ | |
| # Store the image for the implementation function | |
| preprocess_document_image._current_img = img | |
| # The actual implementation is separated for cleaner code organization | |
| return _preprocess_document_image_impl() | |
| def _preprocess_document_image_impl() -> Image.Image: | |
| """ | |
| Optimized implementation of document preprocessing with adaptive processing based on image size. | |
| Enhanced for better handwritten document processing. | |
| """ | |
| # Fast path: Get image from thread-local storage | |
| if not hasattr(preprocess_document_image, "_current_img"): | |
| raise ValueError("No image set for document preprocessing") | |
| img = preprocess_document_image._current_img | |
| # Analyze image size to determine processing strategy | |
| width, height = img.size | |
| img_size = width * height | |
| # Check if the image might be a handwritten document - use special processing | |
| is_handwritten = False | |
| try: | |
| # Simple check for handwritten document characteristics | |
| # Handwritten documents often have more varied strokes and less stark contrast | |
| if CV2_AVAILABLE: | |
| # Convert to grayscale and calculate local variance | |
| gray_np = np.array(img.convert('L')) | |
| # Higher variance in edge strengths can indicate handwriting | |
| edges = cv2.Canny(gray_np, 30, 100) | |
| if np.count_nonzero(edges) / edges.size > 0.02: # Low edge threshold for handwriting | |
| # Additional check with gradient magnitudes | |
| sobelx = cv2.Sobel(gray_np, cv2.CV_64F, 1, 0, ksize=3) | |
| sobely = cv2.Sobel(gray_np, cv2.CV_64F, 0, 1, ksize=3) | |
| magnitude = np.sqrt(sobelx**2 + sobely**2) | |
| # Handwriting typically has more variation in gradient magnitudes | |
| if np.std(magnitude) > 20: | |
| is_handwritten = True | |
| except: | |
| # If detection fails, assume it's not handwritten | |
| pass | |
| # Ultra-fast path for tiny images - just convert to grayscale with contrast enhancement | |
| if img_size < 300000: # ~500x600 or smaller | |
| gray = img.convert('L') | |
| # Lower contrast enhancement for handwritten documents | |
| contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
| enhancer = ImageEnhance.Contrast(gray) | |
| return enhancer.enhance(contrast_level) | |
| # Fast path for small images - minimal processing | |
| if img_size < 1000000: # ~1000x1000 or smaller | |
| gray = img.convert('L') | |
| # Use gentler contrast enhancement for handwritten documents | |
| contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
| enhancer = ImageEnhance.Contrast(gray) | |
| enhanced = enhancer.enhance(contrast_level) | |
| # Light sharpening only if sharpen is enabled | |
| # Use milder sharpening for handwritten documents to preserve stroke detail | |
| if IMAGE_PREPROCESSING["sharpen"]: | |
| if is_handwritten: | |
| # Use edge enhancement which is gentler than SHARPEN for handwriting | |
| enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
| else: | |
| enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
| return enhanced | |
| # Standard path for medium images | |
| # Convert to grayscale (faster processing) | |
| gray = img.convert('L') | |
| # Adaptive contrast enhancement based on document type | |
| contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
| enhancer = ImageEnhance.Contrast(gray) | |
| enhanced = enhancer.enhance(contrast_level) | |
| # Apply light sharpening for text clarity - adapt based on document type | |
| if IMAGE_PREPROCESSING["sharpen"]: | |
| if is_handwritten: | |
| # Use edge enhancement which is gentler than SHARPEN for handwriting | |
| enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
| else: | |
| enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
| # Advanced processing with OpenCV if available | |
| if CV2_AVAILABLE and IMAGE_PREPROCESSING["denoise"]: | |
| try: | |
| # Convert to numpy array for OpenCV processing | |
| img_np = np.array(enhanced) | |
| if is_handwritten: | |
| # Special treatment for handwritten documents | |
| # Use guided filter which preserves edges better than NLMeans | |
| # Guided filter works well for handwriting by preserving stroke details | |
| if img_size > 3000000: # Large images - downsample first | |
| scale_factor = 0.5 | |
| small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
| interpolation=cv2.INTER_AREA) | |
| # Apply bilateral filter which preserves edges while smoothing | |
| filtered = cv2.bilateralFilter(small_img, 9, 75, 75) | |
| # Resize back | |
| filtered = cv2.resize(filtered, (width, height), interpolation=cv2.INTER_LINEAR) | |
| else: | |
| # Use bilateral filter directly for smaller images | |
| filtered = cv2.bilateralFilter(img_np, 7, 50, 50) | |
| # Convert back to PIL Image | |
| enhanced = Image.fromarray(filtered) | |
| # For handwritten docs, avoid binary thresholding which can destroy subtle strokes | |
| return enhanced | |
| else: | |
| # Standard document processing - optimized for printed text | |
| # Optimize denoising parameters based on image size | |
| if img_size > 4000000: # Very large images | |
| # More aggressive downsampling for very large images | |
| scale_factor = 0.5 | |
| downsample = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
| interpolation=cv2.INTER_AREA) | |
| # Lighter denoising for downsampled image | |
| h_value = 7 # Strength parameter | |
| template_window = 5 | |
| search_window = 13 | |
| # Apply denoising on smaller image | |
| denoised_np = cv2.fastNlMeansDenoising(downsample, None, h_value, template_window, search_window) | |
| # Resize back to original size | |
| denoised_np = cv2.resize(denoised_np, (width, height), interpolation=cv2.INTER_LINEAR) | |
| else: | |
| # Direct denoising for medium-large images | |
| h_value = 8 # Balanced for speed and quality | |
| template_window = 5 | |
| search_window = 15 | |
| # Apply denoising | |
| denoised_np = cv2.fastNlMeansDenoising(img_np, None, h_value, template_window, search_window) | |
| # Convert back to PIL Image | |
| enhanced = Image.fromarray(denoised_np) | |
| # Apply adaptive thresholding only if it improves text visibility | |
| # Create a binarized version of the image | |
| if img_size < 8000000: # Skip for extremely large images to save processing time | |
| binary = cv2.adaptiveThreshold(denoised_np, 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2) | |
| # Quick verification that binarization preserves text information | |
| # Use simplified check that works well for document images | |
| white_pixels_binary = np.count_nonzero(binary > 200) | |
| white_pixels_orig = np.count_nonzero(denoised_np > 200) | |
| # Check if binary preserves reasonable amount of white pixels (background) | |
| if white_pixels_binary > white_pixels_orig * 0.8: | |
| # Binarization looks good, use it | |
| return Image.fromarray(binary) | |
| return enhanced | |
| except Exception as e: | |
| # If OpenCV processing fails, continue with PIL-enhanced image | |
| pass | |
| elif IMAGE_PREPROCESSING["denoise"]: | |
| # Fallback PIL denoising for systems without OpenCV | |
| if is_handwritten: | |
| # Lighter filtering for handwritten text to preserve details | |
| # Use a smaller median filter for handwritten documents | |
| enhanced = enhanced.filter(ImageFilter.MedianFilter(1)) | |
| else: | |
| # Standard filtering for printed documents | |
| enhanced = enhanced.filter(ImageFilter.MedianFilter(3)) | |
| # Return enhanced grayscale image | |
| return enhanced | |
| # Removed caching to fix unhashable type error | |
| def preprocess_general_image(img: Image.Image) -> Image.Image: | |
| """ | |
| Preprocess a general image for OCR. | |
| Args: | |
| img: PIL Image object | |
| Returns: | |
| Processed PIL Image | |
| """ | |
| # Store the image for implementation function | |
| preprocess_general_image._current_img = img | |
| return _preprocess_general_image_impl() | |
| def _preprocess_general_image_impl() -> Image.Image: | |
| """ | |
| Optimized implementation of general image preprocessing with size-based processing paths | |
| """ | |
| # Fast path: Get the image from thread-local storage | |
| if not hasattr(preprocess_general_image, "_current_img"): | |
| raise ValueError("No image set for general preprocessing") | |
| img = preprocess_general_image._current_img | |
| # Ultra-fast path: Skip processing completely for small images to improve performance | |
| width, height = img.size | |
| img_size = width * height | |
| if img_size < 300000: # Skip for tiny images under ~0.3 megapixel | |
| # Just ensure correct color mode | |
| if img.mode != 'RGB': | |
| return img.convert('RGB') | |
| return img | |
| # Fast path: Minimal processing for smaller images | |
| if img_size < 600000: # ~800x750 or smaller | |
| # Ensure RGB mode | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Very light contrast enhancement only | |
| enhancer = ImageEnhance.Contrast(img) | |
| return enhancer.enhance(1.15) # Lighter enhancement for small images | |
| # Standard path: Apply moderate enhancements for medium images | |
| # Convert to RGB to ensure compatibility | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Moderate enhancement only | |
| enhancer = ImageEnhance.Contrast(img) | |
| enhanced = enhancer.enhance(1.2) # Less aggressive than document enhancement | |
| # Skip additional processing for medium-sized images | |
| if img_size < 1000000: # Skip for images under ~1 megapixel | |
| return enhanced | |
| # Enhanced path: Additional processing for larger images | |
| try: | |
| # Apply optimized enhancement pipeline for large non-document images | |
| # 1. Improve color saturation slightly for better feature extraction | |
| saturation = ImageEnhance.Color(enhanced) | |
| enhanced = saturation.enhance(1.1) | |
| # 2. Apply adaptive sharpening based on image size | |
| if img_size > 2500000: # Very large images (~1600x1600 or larger) | |
| # Use EDGE_ENHANCE instead of SHARPEN for more subtle enhancement on large images | |
| enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
| else: | |
| # Standard sharpening for regular large images | |
| enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
| # 3. Apply additional processing with OpenCV if available (for largest images) | |
| if CV2_AVAILABLE and img_size > 3000000: | |
| # Convert to numpy array | |
| img_np = np.array(enhanced) | |
| # Apply subtle enhancement of details (CLAHE) | |
| try: | |
| # Convert to LAB color space for better processing | |
| lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB) | |
| # Only enhance the L channel (luminance) | |
| l, a, b = cv2.split(lab) | |
| # Create CLAHE object with optimal parameters for photos | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| # Apply CLAHE to L channel | |
| l = clahe.apply(l) | |
| # Merge channels back and convert to RGB | |
| lab = cv2.merge((l, a, b)) | |
| enhanced_np = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB) | |
| # Convert back to PIL | |
| enhanced = Image.fromarray(enhanced_np) | |
| except: | |
| # If CLAHE fails, continue with PIL-enhanced image | |
| pass | |
| except Exception: | |
| # If any enhancement fails, fall back to basic contrast enhancement | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| enhancer = ImageEnhance.Contrast(img) | |
| enhanced = enhancer.enhance(1.2) | |
| return enhanced | |
| # Removed caching decorator to fix unhashable type error | |
| def resize_image(img: Image.Image, target_dpi: int = 300) -> Image.Image: | |
| """ | |
| Resize an image to an optimal size for OCR while preserving quality. | |
| Args: | |
| img: PIL Image object | |
| target_dpi: Target DPI (dots per inch) | |
| Returns: | |
| Resized PIL Image | |
| """ | |
| # Store the image for implementation function | |
| resize_image._current_img = img | |
| return resize_image_impl(target_dpi) | |
| def resize_image_impl(target_dpi: int = 300) -> Image.Image: | |
| """ | |
| Implementation of resize function that uses thread-local storage. | |
| Args: | |
| target_dpi: Target DPI (dots per inch) | |
| Returns: | |
| Resized PIL Image | |
| """ | |
| # Get the image from thread-local storage (set by the caller) | |
| if not hasattr(resize_image, "_current_img"): | |
| raise ValueError("No image set for resizing") | |
| img = resize_image._current_img | |
| # Calculate current dimensions | |
| width, height = img.size | |
| # Fixed target dimensions based on DPI | |
| # Using 8.5x11 inches (standard paper size) as reference | |
| max_width = int(8.5 * target_dpi) | |
| max_height = int(11 * target_dpi) | |
| # Check if resizing is needed - quick early return | |
| if width <= max_width and height <= max_height: | |
| return img # No resizing needed | |
| # Calculate scaling factor once | |
| scale_factor = min(max_width / width, max_height / height) | |
| # Calculate new dimensions | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| # Use BICUBIC for better balance of speed and quality | |
| return img.resize((new_width, new_height), Image.BICUBIC) | |
| def calculate_image_entropy(img: Image.Image) -> float: | |
| """ | |
| Calculate the entropy (information content) of an image. | |
| Args: | |
| img: PIL Image object | |
| Returns: | |
| Entropy value | |
| """ | |
| # Convert to grayscale | |
| if img.mode != 'L': | |
| img = img.convert('L') | |
| # Calculate histogram | |
| histogram = img.histogram() | |
| total_pixels = img.width * img.height | |
| # Calculate entropy | |
| entropy = 0 | |
| for h in histogram: | |
| if h > 0: | |
| probability = h / total_pixels | |
| entropy -= probability * np.log2(probability) | |
| return entropy | |
| def create_html_with_images(result): | |
| """ | |
| Create an HTML document with embedded images from OCR results. | |
| Args: | |
| result: OCR result dictionary containing pages_data | |
| Returns: | |
| HTML content as string | |
| """ | |
| # Create HTML document structure | |
| html_content = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>OCR Document with Images</title> | |
| <style> | |
| body { | |
| font-family: Georgia, serif; | |
| line-height: 1.7; | |
| margin: 0 auto; | |
| max-width: 800px; | |
| padding: 20px; | |
| } | |
| img { | |
| max-width: 90%; | |
| max-height: 500px; | |
| object-fit: contain; | |
| margin: 20px auto; | |
| display: block; | |
| border: 1px solid #ddd; | |
| border-radius: 4px; | |
| } | |
| .image-container { | |
| margin: 20px 0; | |
| text-align: center; | |
| } | |
| .page-break { | |
| border-top: 1px solid #ddd; | |
| margin: 40px 0; | |
| padding-top: 40px; | |
| } | |
| h3 { | |
| color: #333; | |
| border-bottom: 1px solid #eee; | |
| padding-bottom: 10px; | |
| } | |
| p { | |
| margin: 12px 0; | |
| } | |
| .page-text-content { | |
| margin-bottom: 20px; | |
| } | |
| .text-block { | |
| background-color: #f9f9f9; | |
| padding: 15px; | |
| border-radius: 4px; | |
| border-left: 3px solid #546e7a; | |
| margin-bottom: 15px; | |
| color: #333; | |
| } | |
| .text-block p { | |
| margin: 8px 0; | |
| color: #333; | |
| } | |
| .metadata { | |
| background-color: #f5f5f5; | |
| padding: 10px 15px; | |
| border-radius: 4px; | |
| margin-bottom: 20px; | |
| font-size: 14px; | |
| } | |
| .metadata p { | |
| margin: 5px 0; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| """ | |
| # Add document metadata | |
| html_content += f""" | |
| <div class="metadata"> | |
| <h2>{result.get('file_name', 'Document')}</h2> | |
| <p><strong>Processed at:</strong> {result.get('timestamp', '')}</p> | |
| <p><strong>Languages:</strong> {', '.join(result.get('languages', ['Unknown']))}</p> | |
| <p><strong>Topics:</strong> {', '.join(result.get('topics', ['Unknown']))}</p> | |
| </div> | |
| """ | |
| # Check if we have pages_data | |
| if 'pages_data' in result and result['pages_data']: | |
| pages_data = result['pages_data'] | |
| # Process each page | |
| for i, page in enumerate(pages_data): | |
| page_markdown = page.get('markdown', '') | |
| images = page.get('images', []) | |
| # Add page header if multi-page | |
| if len(pages_data) > 1: | |
| html_content += f"<h3>Page {i+1}</h3>" | |
| # Create image dictionary | |
| image_dict = {} | |
| for img in images: | |
| if 'id' in img and 'image_base64' in img: | |
| image_dict[img['id']] = img['image_base64'] | |
| # Process the markdown content | |
| if page_markdown: | |
| # Extract text content (lines without images) | |
| text_content = [] | |
| image_lines = [] | |
| for line in page_markdown.split('\n'): | |
| if ' | |
| elif line.strip(): | |
| text_content.append(line) | |
| # Add text content | |
| if text_content: | |
| html_content += '<div class="text-block">' | |
| for line in text_content: | |
| html_content += f"<p>{line}</p>" | |
| html_content += '</div>' | |
| # Add images | |
| for line in image_lines: | |
| # Extract image ID and alt text using simple parsing | |
| try: | |
| alt_start = line.find('![') + 2 | |
| alt_end = line.find(']', alt_start) | |
| alt_text = line[alt_start:alt_end] | |
| img_start = line.find('(', alt_end) + 1 | |
| img_end = line.find(')', img_start) | |
| img_id = line[img_start:img_end] | |
| if img_id in image_dict: | |
| html_content += f'<div class="image-container">' | |
| html_content += f'<img src="{image_dict[img_id]}" alt="{alt_text}">' | |
| html_content += f'</div>' | |
| except: | |
| # If parsing fails, just skip this image | |
| continue | |
| # Add page separator if not the last page | |
| if i < len(pages_data) - 1: | |
| html_content += '<div class="page-break"></div>' | |
| # Add structured content if available | |
| if 'ocr_contents' in result and isinstance(result['ocr_contents'], dict): | |
| html_content += '<h3>Structured Content</h3>' | |
| for section, content in result['ocr_contents'].items(): | |
| if content and section not in ['error', 'raw_text', 'partial_text']: | |
| html_content += f'<h4>{section.replace("_", " ").title()}</h4>' | |
| if isinstance(content, str): | |
| html_content += f'<p>{content}</p>' | |
| elif isinstance(content, list): | |
| html_content += '<ul>' | |
| for item in content: | |
| html_content += f'<li>{str(item)}</li>' | |
| html_content += '</ul>' | |
| elif isinstance(content, dict): | |
| html_content += '<dl>' | |
| for k, v in content.items(): | |
| html_content += f'<dt>{k}</dt><dd>{v}</dd>' | |
| html_content += '</dl>' | |
| # Close HTML document | |
| html_content += """ | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| def generate_document_thumbnail(image_path: Union[str, Path], max_size: int = 300) -> str: | |
| """ | |
| Generate a thumbnail for document preview. | |
| Args: | |
| image_path: Path to the image file | |
| max_size: Maximum dimension for thumbnail | |
| Returns: | |
| Base64 encoded thumbnail | |
| """ | |
| if not PILLOW_AVAILABLE: | |
| return None | |
| try: | |
| # Open the image | |
| with Image.open(image_path) as img: | |
| # Calculate thumbnail size preserving aspect ratio | |
| width, height = img.size | |
| if width > height: | |
| new_width = max_size | |
| new_height = int(height * (max_size / width)) | |
| else: | |
| new_height = max_size | |
| new_width = int(width * (max_size / height)) | |
| # Create thumbnail | |
| thumbnail = img.resize((new_width, new_height), Image.LANCZOS) | |
| # Save to buffer | |
| buffer = io.BytesIO() | |
| thumbnail.save(buffer, format="JPEG", quality=85) | |
| buffer.seek(0) | |
| # Encode as base64 | |
| encoded = base64.b64encode(buffer.getvalue()).decode() | |
| return f"data:image/jpeg;base64,{encoded}" | |
| except Exception: | |
| # Return None if thumbnail generation fails | |
| return None | |
| def try_local_ocr_fallback(image_path: Union[str, Path], base64_data_url: str = None) -> str: | |
| """ | |
| Attempt to use local pytesseract OCR as a fallback when API fails | |
| Args: | |
| image_path: Path to the image file | |
| base64_data_url: Optional base64 data URL if already available | |
| Returns: | |
| OCR text string if successful, None if failed | |
| """ | |
| logger.info("Attempting local OCR fallback using pytesseract...") | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| # Load image - either from path or from base64 | |
| if base64_data_url and base64_data_url.startswith('data:image'): | |
| # Extract image from base64 | |
| image_data = base64_data_url.split(',', 1)[1] | |
| image_bytes = base64.b64decode(image_data) | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| else: | |
| # Load from file path | |
| image_path = Path(image_path) if isinstance(image_path, str) else image_path | |
| image = Image.open(image_path) | |
| # Convert to RGB if not already (pytesseract works best with RGB) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Apply image enhancements for better OCR | |
| # Convert to grayscale for better text recognition | |
| image = image.convert('L') | |
| # Enhance contrast | |
| enhancer = ImageEnhance.Contrast(image) | |
| image = enhancer.enhance(2.0) # Higher contrast for better OCR | |
| # Run OCR | |
| ocr_text = pytesseract.image_to_string(image, lang='eng') | |
| if ocr_text and len(ocr_text.strip()) > 50: | |
| logger.info(f"Local OCR successful: extracted {len(ocr_text)} characters") | |
| return ocr_text | |
| else: | |
| logger.warning("Local OCR produced minimal or no text") | |
| return None | |
| except ImportError: | |
| logger.warning("Pytesseract not installed - local OCR not available") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Local OCR fallback failed: {str(e)}") | |
| return None |