Spaces:
Running
Running
| import os | |
| import sys | |
| import time | |
| import random | |
| from enum import Enum | |
| from pathlib import Path | |
| import json | |
| import base64 | |
| import pycountry | |
| import logging | |
| from functools import lru_cache | |
| from typing import Optional, Dict, Any, List, Union, Tuple | |
| from pydantic import BaseModel | |
| from mistralai import Mistral | |
| from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
| from mistralai.models import OCRImageObject | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| # Import utilities for OCR processing | |
| try: | |
| from ocr_utils import replace_images_in_markdown, get_combined_markdown | |
| except ImportError: | |
| # Define fallback functions if module not found | |
| def replace_images_in_markdown(markdown_str, images_dict): | |
| for img_name, base64_str in images_dict.items(): | |
| markdown_str = markdown_str.replace( | |
| f"", f"" | |
| ) | |
| return markdown_str | |
| def get_combined_markdown(ocr_response): | |
| markdowns = [] | |
| for page in ocr_response.pages: | |
| image_data = {} | |
| for img in page.images: | |
| image_data[img.id] = img.image_base64 | |
| markdowns.append(replace_images_in_markdown(page.markdown, image_data)) | |
| return "\n\n".join(markdowns) | |
| # Import config directly (now local to historical-ocr) | |
| from config import MISTRAL_API_KEY, OCR_MODEL, TEXT_MODEL, VISION_MODEL, TEST_MODE | |
| # Helper function to make OCR objects JSON serializable | |
| # Removed caching to fix unhashable type error | |
| def serialize_ocr_response(obj): | |
| """ | |
| Convert OCR response objects to JSON serializable format | |
| Optimized for speed and memory usage | |
| """ | |
| # Fast path: Handle primitive types directly | |
| if obj is None or isinstance(obj, (str, int, float, bool)): | |
| return obj | |
| # Handle collections with optimized recursion | |
| if isinstance(obj, list): | |
| return [serialize_ocr_response(item) for item in obj] | |
| elif isinstance(obj, dict): | |
| return {k: serialize_ocr_response(v) for k, v in obj.items()} | |
| elif hasattr(obj, '__dict__'): | |
| # For OCR objects with __dict__ attribute | |
| result = {} | |
| for key, value in obj.__dict__.items(): | |
| if key.startswith('_'): | |
| continue # Skip private attributes | |
| # Fast path for OCRImageObject - most common complex object | |
| if isinstance(value, OCRImageObject): | |
| # Special handling for OCRImageObject with direct attribute access | |
| result[key] = { | |
| 'id': value.id if hasattr(value, 'id') else None, | |
| 'image_base64': value.image_base64 if hasattr(value, 'image_base64') else None | |
| } | |
| # Handle collections | |
| elif isinstance(value, list): | |
| result[key] = [serialize_ocr_response(item) for item in value] | |
| # Handle nested objects | |
| elif hasattr(value, '__dict__'): | |
| result[key] = serialize_ocr_response(value) | |
| # Handle primitives and other types | |
| else: | |
| result[key] = value | |
| return result | |
| else: | |
| return obj | |
| # Create language enum for structured output - cache language lookup to avoid repeated processing | |
| def get_language_dict(): | |
| return {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')} | |
| class LanguageMeta(Enum.__class__): | |
| def __new__(metacls, cls, bases, classdict): | |
| languages = get_language_dict() | |
| for code, name in languages.items(): | |
| classdict[name.upper().replace(' ', '_')] = name | |
| return super().__new__(metacls, cls, bases, classdict) | |
| class Language(Enum, metaclass=LanguageMeta): | |
| pass | |
| class StructuredOCRModel(BaseModel): | |
| file_name: str | |
| topics: list[str] | |
| languages: list[Language] | |
| ocr_contents: dict | |
| class StructuredOCR: | |
| def __init__(self, api_key=None): | |
| """Initialize the OCR processor with API key""" | |
| # Check if we're running in test mode | |
| self.test_mode = TEST_MODE | |
| # Initialize API key - use provided key, or environment var | |
| if self.test_mode and not api_key: | |
| self.api_key = "placeholder_key" | |
| else: | |
| self.api_key = api_key or MISTRAL_API_KEY | |
| # Ensure we have a valid API key when not in test mode | |
| if not self.api_key and not self.test_mode: | |
| raise ValueError("No Mistral API key provided. Please set the MISTRAL_API_KEY environment variable or enable TEST_MODE.") | |
| # Clean the API key by removing any whitespace | |
| self.api_key = self.api_key.strip() | |
| # Check if API key exists but don't enforce length requirements | |
| if not self.test_mode and not self.api_key: | |
| logger = logging.getLogger("api_validator") | |
| logger.warning("Warning: No API key provided") | |
| # Initialize client with the API key | |
| try: | |
| self.client = Mistral(api_key=self.api_key) | |
| # Skip validation to avoid unnecessary API calls | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "unauthorized" in error_msg or "401" in error_msg: | |
| raise ValueError(f"API key authentication failed. Please check your Mistral API key: {str(e)}") | |
| else: | |
| raise | |
| def process_file(self, file_path, file_type=None, use_vision=True, max_pages=None, file_size_mb=None, custom_pages=None, custom_prompt=None): | |
| """Process a file and return structured OCR results | |
| Args: | |
| file_path: Path to the file to process | |
| file_type: 'pdf' or 'image' (will be auto-detected if None) | |
| use_vision: Whether to use vision model for improved analysis | |
| max_pages: Optional limit on number of pages to process | |
| file_size_mb: Optional file size in MB (used for automatic page limiting) | |
| custom_pages: Optional list of specific page numbers to process | |
| custom_prompt: Optional instructions for the AI to handle unusual document formatting or specific extraction needs | |
| Returns: | |
| Dictionary with structured OCR results | |
| """ | |
| # Convert file_path to Path object if it's a string | |
| file_path = Path(file_path) | |
| # Auto-detect file type if not provided | |
| if file_type is None: | |
| suffix = file_path.suffix.lower() | |
| file_type = "pdf" if suffix == ".pdf" else "image" | |
| # Get file size if not provided | |
| if file_size_mb is None and file_path.exists(): | |
| file_size_mb = file_path.stat().st_size / (1024 * 1024) # Convert bytes to MB | |
| # Check if file exceeds API limits (50 MB) | |
| if file_size_mb and file_size_mb > 50: | |
| logging.warning(f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB") | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "confidence_score": 0.0, | |
| "error": f"File size {file_size_mb:.2f} MB exceeds API limit of 50 MB", | |
| "ocr_contents": { | |
| "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
| "partial_text": "Document could not be processed due to size limitations." | |
| } | |
| } | |
| # For PDF files, limit pages based on file size if no explicit limit is given | |
| if file_type == "pdf" and file_size_mb and max_pages is None and custom_pages is None: | |
| if file_size_mb > 100: # Very large files | |
| max_pages = 3 | |
| elif file_size_mb > 50: # Large files | |
| max_pages = 5 | |
| elif file_size_mb > 20: # Medium files | |
| max_pages = 10 | |
| else: # Small files | |
| max_pages = None # Process all pages | |
| # Start processing timer | |
| start_time = time.time() | |
| # Read and process the file | |
| if file_type == "pdf": | |
| result = self._process_pdf(file_path, use_vision, max_pages, custom_pages, custom_prompt) | |
| else: | |
| result = self._process_image(file_path, use_vision, custom_prompt) | |
| # Add processing time information | |
| processing_time = time.time() - start_time | |
| result['processing_time'] = processing_time | |
| # Add a default confidence score if not present | |
| if 'confidence_score' not in result: | |
| result['confidence_score'] = 0.85 # Default confidence | |
| # Ensure the entire result is fully JSON serializable by running it through our serializer | |
| try: | |
| # First convert to a standard dict if it's not already | |
| if not isinstance(result, dict): | |
| result = serialize_ocr_response(result) | |
| # Make a final pass to check for any remaining non-serializable objects | |
| # Test JSON serialization to catch any remaining issues | |
| json.dumps(result) | |
| except TypeError as e: | |
| # If there's a serialization error, run the whole result through our serializer | |
| logger = logging.getLogger("serializer") | |
| logger.warning(f"JSON serialization error in result: {str(e)}. Applying full serialization.") | |
| result = serialize_ocr_response(result) | |
| return result | |
| def _process_pdf(self, file_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
| """ | |
| Process a PDF file with OCR - optimized version with smart page handling and memory management | |
| Args: | |
| file_path: Path to the PDF file | |
| use_vision: Whether to use vision model for enhanced analysis | |
| max_pages: Optional limit on the number of pages to process | |
| custom_pages: Optional list of specific page numbers to process | |
| custom_prompt: Optional custom prompt for specialized extraction | |
| """ | |
| logger = logging.getLogger("pdf_processor") | |
| logger.info(f"Processing PDF: {file_path}") | |
| # Track processing time | |
| start_time = time.time() | |
| # Fast path: Return placeholder if in test mode | |
| if self.test_mode: | |
| logger.info("Test mode active, returning placeholder response") | |
| # Enhanced test mode placeholder that's more realistic | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Historical Document", "Literature", "American History"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "title": "Harper's New Monthly Magazine", | |
| "publication_date": "1855", | |
| "publisher": "Harper & Brothers, New York", | |
| "raw_text": "This is a test mode placeholder for Harper's New Monthly Magazine from 1855. The actual document contains articles on literature, politics, science, and culture from mid-19th century America.", | |
| "content": "The magazine includes various literary pieces, poetry, political commentary, and illustrations typical of 19th century periodicals. Known for publishing works by prominent American authors including Herman Melville and Charles Dickens.", | |
| "key_figures": ["Herman Melville", "Charles Dickens", "Henry Wadsworth Longfellow"], | |
| "noted_articles": ["Continued serialization of popular novels", "Commentary on contemporary political events", "Scientific discoveries and technological advancements"] | |
| }, | |
| "pdf_processing_method": "enhanced_test_mode", | |
| "total_pages": 12, | |
| "processed_pages": 3, | |
| "processing_time": 0.5, | |
| "confidence_score": 0.9 | |
| } | |
| try: | |
| # PDF processing strategy decision based on file size | |
| file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
| logger.info(f"PDF size: {file_size_mb:.2f} MB") | |
| # Always use pdf2image for better control and consistency across all PDF files | |
| use_pdf2image = True | |
| # First try local PDF processing for better performance and control | |
| if use_pdf2image: | |
| try: | |
| import tempfile | |
| from pdf2image import convert_from_path | |
| logger.info("Processing PDF using pdf2image for better multi-page handling") | |
| # Convert PDF to images with optimized parameters | |
| conversion_start = time.time() | |
| # Use consistent DPI for all files to ensure reliable results | |
| dpi = 200 # Higher quality DPI for all files to ensure better text recognition | |
| # Only convert first page initially to check document type | |
| pdf_first_page = convert_from_path(file_path, dpi=dpi, first_page=1, last_page=1) | |
| logger.info(f"First page converted in {time.time() - conversion_start:.2f}s") | |
| # Quick check if PDF has readable content | |
| if not pdf_first_page: | |
| logger.warning("PDF conversion produced no images, falling back to API") | |
| raise Exception("PDF conversion failed to produce images") | |
| # Determine total pages in the document | |
| # First, try simple estimate from first page conversion | |
| total_pages = 1 | |
| # Try pdf2image info extraction | |
| try: | |
| # Try with pdf2image page counting - use simpler parameters | |
| logger.info("Determining PDF page count...") | |
| count_start = time.time() | |
| # Use a lightweight approach with multi-threading for faster processing | |
| pdf_info = convert_from_path( | |
| file_path, | |
| dpi=72, # Low DPI just for info | |
| first_page=1, | |
| last_page=1, | |
| size=(100, 100), # Tiny image to save memory | |
| fmt="jpeg", | |
| thread_count=4, # Increased thread count for faster processing | |
| output_file=None | |
| ) | |
| # Extract page count | |
| if hasattr(pdf_info, 'n_pages'): | |
| total_pages = pdf_info.n_pages | |
| elif isinstance(pdf_info, dict) and "Pages" in pdf_info: | |
| total_pages = int(pdf_info.get("Pages", "1")) | |
| elif len(pdf_first_page) > 0: | |
| # Just estimate based on first page - at least we have one | |
| total_pages = 1 | |
| logger.info(f"Page count determined in {time.time() - count_start:.2f}s") | |
| except Exception as count_error: | |
| logger.warning(f"Error determining page count: {str(count_error)}. Using default of 1") | |
| total_pages = 1 | |
| logger.info(f"PDF has {total_pages} total pages") | |
| # Determine which pages to process | |
| pages_to_process = [] | |
| # Handle custom page selection if provided | |
| if custom_pages and any(0 < p <= total_pages for p in custom_pages): | |
| # Filter valid page numbers | |
| pages_to_process = [p for p in custom_pages if 0 < p <= total_pages] | |
| logger.info(f"Processing {len(pages_to_process)} custom-selected pages: {pages_to_process}") | |
| # Otherwise use max_pages limit if provided | |
| elif max_pages and max_pages < total_pages: | |
| pages_to_process = list(range(1, max_pages + 1)) | |
| logger.info(f"Processing first {max_pages} pages of {total_pages} total") | |
| # Or process all pages if reasonable count | |
| elif total_pages <= 10: | |
| pages_to_process = list(range(1, total_pages + 1)) | |
| logger.info(f"Processing all {total_pages} pages") | |
| # For large documents without limits, process subset of pages | |
| else: | |
| # Smart sampling: first page, last page, and some pages in between | |
| pages_to_process = [1] # Always include first page | |
| if total_pages > 1: | |
| if total_pages <= 5: | |
| # For few pages, process all | |
| pages_to_process = list(range(1, total_pages + 1)) | |
| else: | |
| # For many pages, sample intelligently | |
| # Add pages from the middle of the document | |
| middle = total_pages // 2 | |
| # Add last page if more than 3 pages | |
| if total_pages > 3: | |
| pages_to_process.append(total_pages) | |
| # Add up to 3 pages from middle if document is large | |
| if total_pages > 5: | |
| pages_to_process.append(middle) | |
| if total_pages > 10: | |
| pages_to_process.append(middle // 2) | |
| pages_to_process.append(middle + (middle // 2)) | |
| # Sort pages for sequential processing | |
| pages_to_process = sorted(list(set(pages_to_process))) | |
| logger.info(f"Processing {len(pages_to_process)} sampled pages out of {total_pages} total: {pages_to_process}") | |
| # Convert only the selected pages to minimize memory usage | |
| selected_images = [] | |
| combined_text = [] | |
| # Process pages in larger batches for better efficiency | |
| batch_size = 5 # Process 5 pages at a time for better throughput | |
| for i in range(0, len(pages_to_process), batch_size): | |
| batch_pages = pages_to_process[i:i+batch_size] | |
| logger.info(f"Converting batch of pages {batch_pages}") | |
| # Convert batch of pages with multi-threading for better performance | |
| batch_start = time.time() | |
| batch_images = convert_from_path( | |
| file_path, | |
| dpi=dpi, | |
| first_page=min(batch_pages), | |
| last_page=max(batch_pages), | |
| thread_count=4, # Use multi-threading for faster PDF processing | |
| fmt="jpeg" # Use JPEG format for better compatibility | |
| ) | |
| logger.info(f"Batch conversion completed in {time.time() - batch_start:.2f}s") | |
| # Map converted images to requested page numbers | |
| for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): | |
| if page_num in pages_to_process and idx < len(batch_images): | |
| if page_num == pages_to_process[0]: # First page to process | |
| selected_images.append(batch_images[idx]) | |
| # Process each page individually | |
| with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
| batch_images[idx].save(tmp.name, format='JPEG') | |
| # Simple OCR to extract text | |
| try: | |
| page_result = self._process_image(Path(tmp.name), False, None) | |
| if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: | |
| # Add page text to combined text | |
| page_text = page_result['ocr_contents']['raw_text'] | |
| combined_text.append(f"--- PAGE {page_num} ---\n{page_text}") | |
| except Exception as page_e: | |
| logger.warning(f"Error processing page {page_num}: {str(page_e)}") | |
| # Clean up temp file | |
| import os | |
| os.unlink(tmp.name) | |
| # If we have processed pages | |
| if selected_images and combined_text: | |
| # Save first image to temp file for vision model | |
| with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
| selected_images[0].save(tmp.name, format='JPEG', quality=95) | |
| first_image_path = tmp.name | |
| # Combine all extracted text | |
| all_text = "\n\n".join(combined_text) | |
| # For custom prompts, use specialized processing | |
| if custom_prompt: | |
| try: | |
| # Process image with vision model | |
| result = self._process_image(Path(first_image_path), use_vision, None) | |
| # Enhance with text analysis using combined text from all pages | |
| enhanced_result = self._extract_structured_data_text_only(all_text, file_path.name, custom_prompt) | |
| # Merge results, keeping images from original result | |
| for key, value in enhanced_result.items(): | |
| if key not in ('raw_response_data', 'pages_data', 'has_images'): | |
| result[key] = value | |
| # Update raw text with full document text | |
| if 'ocr_contents' in result: | |
| result['ocr_contents']['raw_text'] = all_text | |
| except Exception as e: | |
| logger.warning(f"Custom prompt processing failed: {str(e)}. Using standard processing.") | |
| # Fall back to standard processing | |
| result = self._process_image(Path(first_image_path), use_vision, None) | |
| if 'ocr_contents' in result: | |
| result['ocr_contents']['raw_text'] = all_text | |
| else: | |
| # Standard processing with combined text | |
| result = self._process_image(Path(first_image_path), use_vision, None) | |
| if 'ocr_contents' in result: | |
| result['ocr_contents']['raw_text'] = all_text | |
| # Add PDF metadata | |
| result['file_name'] = file_path.name | |
| result['pdf_processing_method'] = 'pdf2image_optimized' | |
| result['total_pages'] = total_pages | |
| result['processed_pages'] = len(pages_to_process) | |
| result['pages_processed'] = pages_to_process | |
| # Add processing info | |
| result['processing_info'] = { | |
| 'method': 'local_pdf_processing', | |
| 'dpi': dpi, | |
| 'pages_sampled': pages_to_process, | |
| 'processing_time': time.time() - start_time | |
| } | |
| # Clean up | |
| os.unlink(first_image_path) | |
| return result | |
| else: | |
| logger.warning("No pages successfully processed with pdf2image, falling back to API") | |
| raise Exception("Failed to process PDF pages locally") | |
| except Exception as pdf2image_error: | |
| logger.warning(f"Local PDF processing failed, falling back to API: {str(pdf2image_error)}") | |
| # Fall back to API processing | |
| # API-based PDF processing | |
| logger.info("Processing PDF via Mistral API") | |
| # Optimize file upload for faster processing | |
| logger.info("Uploading PDF file to Mistral API") | |
| upload_start = time.time() | |
| # Set appropriate timeout based on file size | |
| upload_timeout = max(60, min(300, int(file_size_mb * 5))) # 60s to 300s based on size | |
| try: | |
| # Upload the file (Mistral client doesn't support timeout parameter for upload) | |
| uploaded_file = self.client.files.upload( | |
| file={ | |
| "file_name": file_path.stem, | |
| "content": file_path.read_bytes(), | |
| }, | |
| purpose="ocr" | |
| ) | |
| logger.info(f"PDF uploaded in {time.time() - upload_start:.2f}s") | |
| # Get a signed URL for the uploaded file | |
| signed_url = self.client.files.get_signed_url(file_id=uploaded_file.id, expiry=1) | |
| # Process the PDF with OCR - use adaptive timeout based on file size | |
| logger.info(f"Processing PDF with OCR using {OCR_MODEL}") | |
| # Adaptive retry strategy based on file size | |
| max_retries = 3 if file_size_mb < 20 else 2 # Fewer retries for large files | |
| base_retry_delay = 1 if file_size_mb < 10 else 2 # Longer delays for large files | |
| # Adaptive timeout based on file size | |
| ocr_timeout_ms = min(180000, max(60000, int(file_size_mb * 3000))) # 60s to 180s | |
| # Try processing with retries | |
| for retry in range(max_retries): | |
| try: | |
| ocr_start = time.time() | |
| pdf_response = self.client.ocr.process( | |
| document=DocumentURLChunk(document_url=signed_url.url), | |
| model=OCR_MODEL, | |
| include_image_base64=True, | |
| timeout_ms=ocr_timeout_ms | |
| ) | |
| logger.info(f"PDF OCR processing completed in {time.time() - ocr_start:.2f}s") | |
| break # Success, exit retry loop | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
| # Handle errors with optimized retry logic | |
| error_lower = error_msg.lower() | |
| # Authentication errors - no point in retrying | |
| if any(term in error_lower for term in ["unauthorized", "401", "403", "authentication"]): | |
| logger.error("API authentication failed. Check your API key.") | |
| raise ValueError(f"Authentication failed. Please verify your Mistral API key: {error_msg}") | |
| # Connection or server errors - worth retrying | |
| elif any(term in error_lower for term in ["connection", "timeout", "520", "server error", "502", "503", "504"]): | |
| if retry < max_retries - 1: | |
| # Exponential backoff with jitter for better retry behavior | |
| wait_time = base_retry_delay * (2 ** retry) * (0.8 + 0.4 * random.random()) | |
| logger.info(f"Connection issue detected. Waiting {wait_time:.1f}s before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| # Last retry failed | |
| logger.error("Maximum retries reached, API connection error persists.") | |
| raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
| # Rate limit errors - much longer wait | |
| elif any(term in error_lower for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
| # Check specifically for token exhaustion vs temporary rate limit | |
| if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
| logger.error("API quota or credit limit reached. No retry will help.") | |
| raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
| elif retry < max_retries - 1: | |
| wait_time = base_retry_delay * (2 ** retry) * 6.0 # Significantly longer wait for rate limits | |
| logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| logger.error("Maximum retries reached, rate limit error persists.") | |
| raise ValueError(f"API rate limit exceeded. Please try again later: {error_msg}") | |
| # Misc errors - typically no retry will help | |
| else: | |
| if retry < max_retries - 1 and any(term in error_lower for term in ["transient", "temporary"]): | |
| # Only retry for errors explicitly marked as transient | |
| wait_time = base_retry_delay * (2 ** retry) | |
| logger.info(f"Transient error detected. Waiting {wait_time:.1f}s before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| logger.error(f"Unrecoverable API error: {error_msg}") | |
| raise | |
| # Calculate the number of pages to process | |
| pages_to_process = pdf_response.pages | |
| total_pages = len(pdf_response.pages) | |
| limited_pages = False | |
| logger.info(f"API returned {total_pages} total PDF pages") | |
| # Smart page selection logic for better performance | |
| if custom_pages: | |
| # Convert to 0-based indexing and filter valid page numbers | |
| valid_indices = [i-1 for i in custom_pages if 0 < i <= total_pages] | |
| if valid_indices: | |
| pages_to_process = [pdf_response.pages[i] for i in valid_indices] | |
| limited_pages = True | |
| logger.info(f"Processing {len(valid_indices)} custom-selected pages") | |
| # Max pages limit with smart sampling | |
| elif max_pages and total_pages > max_pages: | |
| if max_pages == 1: | |
| # Just first page | |
| pages_to_process = pages_to_process[:1] | |
| elif max_pages < 5 and total_pages > 10: | |
| # For small max_pages on large docs, include first, last, and middle | |
| indices = [0] # First page | |
| if max_pages > 1: | |
| indices.append(total_pages - 1) # Last page | |
| if max_pages > 2: | |
| indices.append(total_pages // 2) # Middle page | |
| # Add more pages up to max_pages if needed | |
| if max_pages > 3: | |
| remaining = max_pages - len(indices) | |
| step = total_pages // (remaining + 1) | |
| for i in range(1, remaining + 1): | |
| idx = i * step | |
| if idx not in indices and 0 <= idx < total_pages: | |
| indices.append(idx) | |
| indices.sort() | |
| pages_to_process = [pdf_response.pages[i] for i in indices] | |
| else: | |
| # Default: first max_pages | |
| pages_to_process = pages_to_process[:max_pages] | |
| limited_pages = True | |
| logger.info(f"Processing {len(pages_to_process)} pages out of {total_pages} total") | |
| # Calculate confidence score if available | |
| try: | |
| confidence_values = [page.confidence for page in pages_to_process if hasattr(page, 'confidence')] | |
| confidence_score = sum(confidence_values) / len(confidence_values) if confidence_values else 0.89 | |
| except Exception: | |
| confidence_score = 0.89 # Improved default | |
| # Merge page content intelligently - include page numbers for better context | |
| all_markdown = [] | |
| for idx, page in enumerate(pages_to_process): | |
| # Try to determine actual page number | |
| if custom_pages and len(custom_pages) == len(pages_to_process): | |
| page_num = custom_pages[idx] | |
| else: | |
| # Estimate page number - may not be accurate with sampling | |
| page_num = idx + 1 | |
| page_markdown = page.markdown if hasattr(page, 'markdown') else "" | |
| # Add page header if content exists | |
| if page_markdown.strip(): | |
| all_markdown.append(f"--- PAGE {page_num} ---\n{page_markdown}") | |
| # Join all pages with separation | |
| combined_markdown = "\n\n".join(all_markdown) | |
| # Extract structured data with the appropriate model | |
| if use_vision: | |
| # Try to get a good image for vision model | |
| vision_image = None | |
| # Try first page with images | |
| for page in pages_to_process: | |
| if hasattr(page, 'images') and page.images: | |
| vision_image = page.images[0].image_base64 | |
| break | |
| if vision_image: | |
| # Use vision model with enhanced prompt | |
| logger.info(f"Using vision model: {VISION_MODEL}") | |
| result = self._extract_structured_data_with_vision( | |
| vision_image, combined_markdown, file_path.name, custom_prompt | |
| ) | |
| else: | |
| # Fall back to text-only if no images available | |
| logger.info(f"No images in PDF, falling back to text model: {TEXT_MODEL}") | |
| result = self._extract_structured_data_text_only( | |
| combined_markdown, file_path.name, custom_prompt | |
| ) | |
| else: | |
| # Use text-only model as requested | |
| logger.info(f"Using text-only model as specified: {TEXT_MODEL}") | |
| result = self._extract_structured_data_text_only( | |
| combined_markdown, file_path.name, custom_prompt | |
| ) | |
| # Add metadata about pages | |
| if limited_pages: | |
| result['limited_pages'] = { | |
| 'processed': len(pages_to_process), | |
| 'total': total_pages | |
| } | |
| # Set confidence score from OCR | |
| result['confidence_score'] = confidence_score | |
| # Add processing method info | |
| result['pdf_processing_method'] = 'api' | |
| result['total_pages'] = total_pages | |
| result['processed_pages'] = len(pages_to_process) | |
| # Store serialized OCR response for rendering | |
| serialized_response = serialize_ocr_response(pdf_response) | |
| result['raw_response_data'] = serialized_response | |
| # Check if there are images to include | |
| has_images = hasattr(pdf_response, 'pages') and any( | |
| hasattr(page, 'images') and page.images for page in pdf_response.pages | |
| ) | |
| result['has_images'] = has_images | |
| # Include image data for rendering if available | |
| if has_images: | |
| # Prepare pages data with image references | |
| result['pages_data'] = [] | |
| # Get serialized pages - handle different formats | |
| serialized_pages = None | |
| try: | |
| if hasattr(serialized_response, 'pages'): | |
| serialized_pages = serialized_response.pages | |
| elif isinstance(serialized_response, dict) and 'pages' in serialized_response: | |
| serialized_pages = serialized_response.get('pages', []) | |
| else: | |
| # No pages found in response | |
| logger.warning("No pages found in OCR response") | |
| serialized_pages = [] | |
| except Exception as pages_err: | |
| logger.warning(f"Error extracting pages from OCR response: {str(pages_err)}") | |
| serialized_pages = [] | |
| # Process each page to extract images | |
| for page_idx, page in enumerate(serialized_pages): | |
| try: | |
| # Skip processing pages not in our selection | |
| if limited_pages and page_idx >= len(pages_to_process): | |
| continue | |
| # Extract page data with careful error handling | |
| markdown = "" | |
| images = [] | |
| # Handle different page formats safely | |
| if isinstance(page, dict): | |
| markdown = page.get('markdown', '') | |
| images = page.get('images', []) | |
| else: | |
| # Try attribute access | |
| if hasattr(page, 'markdown'): | |
| markdown = page.markdown | |
| if hasattr(page, 'images'): | |
| images = page.images | |
| # Create page data record | |
| page_data = { | |
| 'page_number': page_idx + 1, | |
| 'markdown': markdown, | |
| 'images': [] | |
| } | |
| # Process images with careful error handling | |
| for img_idx, img in enumerate(images): | |
| try: | |
| # Extract image ID and base64 data | |
| img_id = None | |
| img_base64 = None | |
| if isinstance(img, dict): | |
| img_id = img.get('id') | |
| img_base64 = img.get('image_base64') | |
| else: | |
| # Try attribute access | |
| if hasattr(img, 'id'): | |
| img_id = img.id | |
| if hasattr(img, 'image_base64'): | |
| img_base64 = img.image_base64 | |
| # Only add if we have valid image data | |
| if img_base64 and isinstance(img_base64, str): | |
| # Ensure ID exists | |
| safe_id = img_id if img_id else f"img_{page_idx}_{img_idx}" | |
| page_data['images'].append({ | |
| 'id': safe_id, | |
| 'image_base64': img_base64 | |
| }) | |
| except Exception as img_err: | |
| logger.warning(f"Error processing image {img_idx} on page {page_idx+1}: {str(img_err)}") | |
| continue # Skip this image | |
| # Add page data if it has content | |
| if page_data['markdown'] or page_data['images']: | |
| result['pages_data'].append(page_data) | |
| except Exception as page_err: | |
| logger.warning(f"Error processing page {page_idx+1}: {str(page_err)}") | |
| continue # Skip this page | |
| # Record final processing time | |
| total_time = time.time() - start_time | |
| result['processing_time'] = total_time | |
| logger.info(f"PDF API processing completed in {total_time:.2f}s") | |
| return result | |
| except Exception as api_e: | |
| logger.error(f"Error in API-based PDF processing: {str(api_e)}") | |
| # Re-raise to be caught by outer exception handler | |
| raise | |
| except Exception as e: | |
| # Log the error and return a helpful error result | |
| logger.error(f"Error processing PDF: {str(e)}") | |
| # Return basic result on error | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "confidence_score": 0.0, | |
| "error": str(e), | |
| "ocr_contents": { | |
| "error": f"Failed to process PDF: {str(e)}", | |
| "partial_text": "Document could not be fully processed." | |
| }, | |
| "processing_time": time.time() - start_time | |
| } | |
| def _process_image(self, file_path, use_vision=True, custom_prompt=None): | |
| """Process an image file with OCR""" | |
| logger = logging.getLogger("image_processor") | |
| logger.info(f"Processing image: {file_path}") | |
| # Check if we're in test mode | |
| if self.test_mode: | |
| # Return a placeholder document response | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "title": "Document", | |
| "content": "Please set up API key to process documents." | |
| }, | |
| "processing_time": 0.5, | |
| "confidence_score": 0.0 | |
| } | |
| try: | |
| # Check file size | |
| file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
| logger.info(f"Original image size: {file_size_mb:.2f} MB") | |
| # Use enhanced preprocessing functions from ocr_utils | |
| try: | |
| from ocr_utils import preprocess_image_for_ocr, IMAGE_PREPROCESSING | |
| logger.info(f"Applying advanced image preprocessing for OCR") | |
| # Get preprocessing settings from config | |
| max_size_mb = IMAGE_PREPROCESSING.get("max_size_mb", 8.0) | |
| if file_size_mb > max_size_mb: | |
| logger.info(f"Image is large ({file_size_mb:.2f} MB), optimizing for API submission") | |
| # Preprocess image with document-type detection and appropriate enhancements | |
| _, base64_data_url = preprocess_image_for_ocr(file_path) | |
| logger.info(f"Image preprocessing completed successfully") | |
| except (ImportError, AttributeError) as e: | |
| # Fallback to basic processing if advanced functions not available | |
| logger.warning(f"Advanced preprocessing not available: {str(e)}. Using basic image processing.") | |
| # If image is larger than 8MB, resize it to reduce API payload size | |
| if file_size_mb > 8: | |
| logger.info("Image is large, resizing before API submission") | |
| try: | |
| from PIL import Image | |
| import io | |
| # Open and process the image | |
| with Image.open(file_path) as img: | |
| # Convert to RGB if not already (prevents mode errors) | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Calculate new dimensions (maintain aspect ratio) | |
| # Target around 2000-2500 pixels on longest side for better OCR quality | |
| width, height = img.size | |
| max_dimension = max(width, height) | |
| target_dimension = 2000 # Restored to 2000 for better image quality | |
| if max_dimension > target_dimension: | |
| scale_factor = target_dimension / max_dimension | |
| resized_width = int(width * scale_factor) | |
| resized_height = int(height * scale_factor) | |
| # Use LANCZOS instead of BILINEAR for better quality | |
| img = img.resize((resized_width, resized_height), Image.LANCZOS) | |
| # Enhance contrast for better text recognition | |
| from PIL import ImageEnhance | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(1.3) | |
| # Save to bytes with compression | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="JPEG", quality=92, optimize=True) # Higher quality for better OCR | |
| buffer.seek(0) | |
| # Get the base64 | |
| encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| # Log the new size | |
| new_size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
| logger.info(f"Resized image to {new_size_mb:.2f} MB") | |
| except ImportError: | |
| logger.warning("PIL not available for resizing. Using original image.") | |
| encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| except Exception as e: | |
| logger.warning(f"Image resize failed: {str(e)}. Using original image.") | |
| encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| else: | |
| # For smaller images, use as-is | |
| encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| except Exception as e: | |
| # Fallback to original image if any preprocessing fails | |
| logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
| encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
| base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
| # Process the image with OCR | |
| logger.info(f"Processing image with OCR using {OCR_MODEL}") | |
| # Add retry logic with more retries and longer backoff periods for rate limit issues | |
| max_retries = 4 # Increased from 2 to give more chances to succeed | |
| retry_delay = 2 # Increased from 1 to allow for longer backoff periods | |
| for retry in range(max_retries): | |
| try: | |
| image_response = self.client.ocr.process( | |
| document=ImageURLChunk(image_url=base64_data_url), | |
| model=OCR_MODEL, | |
| include_image_base64=True, | |
| timeout_ms=90000 # 90 second timeout for better success rate | |
| ) | |
| break # Success, exit retry loop | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
| # Check specific error types to handle them appropriately | |
| error_lower = error_msg.lower() | |
| # Authentication errors - no point in retrying | |
| if "unauthorized" in error_lower or "401" in error_lower: | |
| logger.error("API authentication failed. Check your API key.") | |
| raise ValueError(f"Authentication failed with API key. Please verify your Mistral API key is correct and active: {error_msg}") | |
| # Connection errors - worth retrying | |
| elif "connection" in error_lower or "timeout" in error_lower or "520" in error_msg or "server error" in error_lower: | |
| if retry < max_retries - 1: | |
| # Wait with shorter delay before retrying | |
| wait_time = retry_delay * (2 ** retry) | |
| logger.info(f"Connection issue detected. Waiting {wait_time}s before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| # Last retry failed | |
| logger.error("Maximum retries reached, API connection error persists.") | |
| raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
| # Rate limit errors | |
| elif "rate limit" in error_lower or "429" in error_lower or "requests rate limit exceeded" in error_lower: | |
| # Check specifically for token exhaustion vs temporary rate limit | |
| if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
| logger.error("API quota or credit limit reached. No retry will help.") | |
| raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
| elif retry < max_retries - 1: | |
| # More aggressive backoff for rate limits | |
| wait_time = retry_delay * (2 ** retry) * 5 # 5x longer wait for rate limits | |
| logger.info(f"Rate limit exceeded. Waiting {wait_time}s before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| # Last retry failed, try local OCR as fallback | |
| logger.error("Maximum retries reached, rate limit error persists.") | |
| try: | |
| # Try to import the local OCR fallback function | |
| from ocr_utils import try_local_ocr_fallback | |
| # Attempt local OCR fallback | |
| ocr_text = try_local_ocr_fallback(file_path, base64_data_url) | |
| if ocr_text: | |
| logger.info("Successfully used local OCR fallback") | |
| # Return a basic result with the local OCR text | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "title": "Document (Local OCR)", | |
| "content": "This document was processed with local OCR due to API rate limiting.", | |
| "raw_text": ocr_text | |
| }, | |
| "processing_method": "local_fallback", | |
| "processing_note": "Used local OCR due to API rate limit" | |
| } | |
| except (ImportError, Exception) as local_err: | |
| logger.warning(f"Local OCR fallback failed: {str(local_err)}") | |
| # If we get here, both API and local OCR failed | |
| raise ValueError(f"Mistral API rate limit exceeded. Please try again later: {error_msg}") | |
| # Other errors - no retry | |
| else: | |
| logger.error(f"Unrecoverable API error: {error_msg}") | |
| raise | |
| # Get the OCR markdown from the first page | |
| image_ocr_markdown = image_response.pages[0].markdown if image_response.pages else "" | |
| # Optimize: Skip vision model step if ocr_markdown is very small or empty | |
| if not image_ocr_markdown or len(image_ocr_markdown) < 50: | |
| logger.warning("OCR produced minimal or no text. Returning basic result.") | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "raw_text": image_ocr_markdown if image_ocr_markdown else "No text could be extracted from the image." | |
| }, | |
| "processing_note": "OCR produced minimal text content" | |
| } | |
| # Extract structured data using the appropriate model, with a single API call | |
| if use_vision: | |
| logger.info(f"Using vision model: {VISION_MODEL}") | |
| result = self._extract_structured_data_with_vision(base64_data_url, image_ocr_markdown, file_path.name, custom_prompt) | |
| else: | |
| logger.info(f"Using text-only model: {TEXT_MODEL}") | |
| result = self._extract_structured_data_text_only(image_ocr_markdown, file_path.name, custom_prompt) | |
| # Store the serialized OCR response for image rendering (for compatibility with original version) | |
| # Don't store raw_response directly as it's not JSON serializable | |
| serialized_response = serialize_ocr_response(image_response) | |
| result['raw_response_data'] = serialized_response | |
| # Store key parts of the OCR response for image rendering | |
| # With serialized format that can be stored in JSON | |
| has_images = hasattr(image_response, 'pages') and image_response.pages and hasattr(image_response.pages[0], 'images') and image_response.pages[0].images | |
| result['has_images'] = has_images | |
| if has_images: | |
| # Serialize the entire response to ensure it's JSON serializable | |
| serialized_response = serialize_ocr_response(image_response) | |
| # Create a structured representation of images that can be serialized | |
| result['pages_data'] = [] | |
| if hasattr(serialized_response, 'pages'): | |
| serialized_pages = serialized_response.pages | |
| else: | |
| # Handle case where serialization returns a dict instead of an object | |
| serialized_pages = serialized_response.get('pages', []) | |
| for page_idx, page in enumerate(serialized_pages): | |
| # Handle both object and dict forms | |
| if isinstance(page, dict): | |
| markdown = page.get('markdown', '') | |
| images = page.get('images', []) | |
| else: | |
| markdown = page.markdown if hasattr(page, 'markdown') else '' | |
| images = page.images if hasattr(page, 'images') else [] | |
| page_data = { | |
| 'page_number': page_idx + 1, | |
| 'markdown': markdown, | |
| 'images': [] | |
| } | |
| # Extract images if present | |
| for img_idx, img in enumerate(images): | |
| img_id = None | |
| img_base64 = None | |
| if isinstance(img, dict): | |
| img_id = img.get('id') | |
| img_base64 = img.get('image_base64') | |
| else: | |
| img_id = img.id if hasattr(img, 'id') else None | |
| img_base64 = img.image_base64 if hasattr(img, 'image_base64') else None | |
| if img_base64: | |
| page_data['images'].append({ | |
| 'id': img_id if img_id else f"img_{page_idx}_{img_idx}", | |
| 'image_base64': img_base64 | |
| }) | |
| result['pages_data'].append(page_data) | |
| logger.info("Image processing completed successfully") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}") | |
| # Return basic result on error | |
| return { | |
| "file_name": file_path.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "error": str(e), | |
| "ocr_contents": { | |
| "error": f"Failed to process image: {str(e)}", | |
| "partial_text": "Image could not be processed." | |
| } | |
| } | |
| def _extract_structured_data_with_vision(self, image_base64, ocr_markdown, filename, custom_prompt=None): | |
| """ | |
| Extract structured data using vision model with detailed historical context prompting | |
| Optimized for speed, accuracy, and resilience | |
| """ | |
| logger = logging.getLogger("vision_processor") | |
| try: | |
| # Fast path: Skip vision API for minimal OCR text (saves an API call) | |
| if not ocr_markdown or len(ocr_markdown.strip()) < 100: # Increased threshold for better detection | |
| logger.info("Minimal OCR text detected, skipping vision model processing") | |
| return { | |
| "file_name": filename, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" | |
| } | |
| } | |
| # Fast path: Skip if in test mode or no API key | |
| if self.test_mode or not self.api_key: | |
| logger.info("Test mode or no API key, using text-only processing") | |
| return self._extract_structured_data_text_only(ocr_markdown, filename) | |
| # Detect document type with optimized cached implementation | |
| doc_type = self._detect_document_type(custom_prompt, ocr_markdown) | |
| logger.info(f"Detected document type: {doc_type}") | |
| # Optimize OCR text for processing - focus on the first part which usually contains | |
| # the most important information (title, metadata, etc.) | |
| if len(ocr_markdown) > 8000: | |
| # Start with first 5000 chars | |
| first_part = ocr_markdown[:5000] | |
| # Then add representative samples from different parts of the document | |
| # This captures headings and key information throughout | |
| middle_start = len(ocr_markdown) // 2 - 1000 | |
| middle_part = ocr_markdown[middle_start:middle_start+2000] if middle_start > 0 else "" | |
| # Get ending section if large enough | |
| if len(ocr_markdown) > 15000: | |
| end_part = ocr_markdown[-1000:] | |
| truncated_ocr = f"{first_part}\n...\n{middle_part}\n...\n{end_part}" | |
| else: | |
| truncated_ocr = f"{first_part}\n...\n{middle_part}" | |
| logger.info(f"Truncated OCR text from {len(ocr_markdown)} to {len(truncated_ocr)} chars") | |
| else: | |
| truncated_ocr = ocr_markdown | |
| # Build an optimized prompt based on document type | |
| enhanced_prompt = self._build_enhanced_prompt(doc_type, truncated_ocr, custom_prompt) | |
| # Measure API call time for optimization feedback | |
| start_time = time.time() | |
| try: | |
| # Try with enhanced timing parameters based on document complexity | |
| # Use shorter timeout for smaller documents | |
| timeout_ms = min(120000, max(60000, len(truncated_ocr) * 10)) # 60-120 seconds based on text length | |
| logger.info(f"Calling vision model with {timeout_ms}ms timeout and document type {doc_type}") | |
| chat_response = self.client.chat.parse( | |
| model=VISION_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| ImageURLChunk(image_url=image_base64), | |
| TextChunk(text=enhanced_prompt) | |
| ], | |
| }, | |
| ], | |
| response_format=StructuredOCRModel, | |
| temperature=0, | |
| timeout_ms=timeout_ms | |
| ) | |
| api_time = time.time() - start_time | |
| logger.info(f"Vision model completed in {api_time:.2f}s with document type: {doc_type}") | |
| except Exception as e: | |
| # If there's an error with the enhanced prompt, try progressively simpler approaches | |
| logger.warning(f"Enhanced prompt failed after {time.time() - start_time:.2f}s: {str(e)}") | |
| # Try a simplified approach with less context | |
| try: | |
| # Shorter prompt with less contextual information | |
| simplified_prompt = ( | |
| f"You are an expert in historical document analysis. " | |
| f"Analyze this document image and the OCR text below. " | |
| f"<BEGIN_OCR>\n{truncated_ocr[:4000]}\n<END_OCR>\n" | |
| f"Identify the document type, main topics, languages used, and extract key information " | |
| f"including names, dates, places, and events. Return a structured JSON response." | |
| ) | |
| # Add custom prompt if provided | |
| if custom_prompt: | |
| simplified_prompt += f"\n\nAdditional instructions: {custom_prompt}" | |
| logger.info(f"Trying simplified prompt approach") | |
| chat_response = self.client.chat.parse( | |
| model=VISION_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| ImageURLChunk(image_url=image_base64), | |
| TextChunk(text=simplified_prompt) | |
| ], | |
| }, | |
| ], | |
| response_format=StructuredOCRModel, | |
| temperature=0, | |
| timeout_ms=60000 # Shorter timeout for simplified approach | |
| ) | |
| logger.info(f"Simplified prompt approach succeeded") | |
| except Exception as second_e: | |
| # If that fails, try with minimal prompt and just image analysis | |
| logger.warning(f"Simplified prompt failed: {str(second_e)}. Trying minimal prompt.") | |
| try: | |
| # Minimal prompt focusing on just the image | |
| minimal_prompt = ( | |
| f"Analyze this historical document image. " | |
| f"Extract the document type, main topics, languages, and key information. " | |
| f"Provide your analysis in a structured JSON format." | |
| ) | |
| logger.info(f"Trying minimal prompt with image-only focus") | |
| chat_response = self.client.chat.parse( | |
| model=VISION_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| ImageURLChunk(image_url=image_base64), | |
| TextChunk(text=minimal_prompt) | |
| ], | |
| }, | |
| ], | |
| response_format=StructuredOCRModel, | |
| temperature=0, | |
| timeout_ms=45000 # Even shorter timeout for minimal approach | |
| ) | |
| logger.info(f"Minimal prompt approach succeeded") | |
| except Exception as third_e: | |
| # If all vision attempts fail, fall back to text-only model | |
| logger.warning(f"All vision model attempts failed, falling back to text-only model: {str(third_e)}") | |
| return self._extract_structured_data_text_only(ocr_markdown, filename) | |
| # Convert the response to a dictionary | |
| result = json.loads(chat_response.choices[0].message.parsed.json()) | |
| # Ensure languages is a list of strings, not Language enum objects | |
| if 'languages' in result: | |
| result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
| # Add metadata about processing | |
| result['processing_info'] = { | |
| 'method': 'vision_model', | |
| 'document_type': doc_type, | |
| 'ocr_text_length': len(ocr_markdown), | |
| 'api_response_time': time.time() - start_time | |
| } | |
| # Add confidence score if not present | |
| if 'confidence_score' not in result: | |
| result['confidence_score'] = 0.92 # Vision model typically has higher confidence | |
| except Exception as e: | |
| # Fall back to text-only model if vision model fails | |
| logger.warning(f"Vision model processing failed, falling back to text-only model: {str(e)}") | |
| result = self._extract_structured_data_text_only(ocr_markdown, filename) | |
| return result | |
| # Thread-safe document type detection cache with increased size for better performance | |
| _doc_type_cache = {} | |
| _doc_type_cache_size = 256 | |
| def _detect_document_type_cached(custom_prompt: Optional[str], ocr_text_sample: str) -> str: | |
| """ | |
| Cached version of document type detection logic with thread-safe implementation | |
| """ | |
| # Generate cache key - use first 50 chars of prompt and ocr_text to avoid memory issues | |
| prompt_key = str(custom_prompt)[:50] if custom_prompt else "" | |
| text_key = ocr_text_sample[:50] if ocr_text_sample else "" | |
| cache_key = f"{prompt_key}::{text_key}" | |
| # Check cache first (fast path) | |
| if cache_key in StructuredOCR._doc_type_cache: | |
| return StructuredOCR._doc_type_cache[cache_key] | |
| # Set default document type | |
| doc_type = "general" | |
| # Optimized pattern matching with compiled lookup dictionaries | |
| doc_type_patterns = { | |
| "handwritten": ["handwritten", "handwriting", "cursive", "manuscript"], | |
| "letter": ["letter", "correspondence", "message", "dear sir", "dear madam", "sincerely", "yours truly"], | |
| "legal": ["form", "contract", "agreement", "legal", "certificate", "court", "attorney", "plaintiff", "defendant"], | |
| "recipe": ["recipe", "food", "ingredients", "directions", "tbsp", "tsp", "cup", "mix", "bake", "cooking"], | |
| "travel": ["travel", "expedition", "journey", "exploration", "voyage", "destination", "map"], | |
| "scientific": ["scientific", "experiment", "hypothesis", "research", "study", "analysis", "results", "procedure"], | |
| "newspaper": ["news", "newspaper", "article", "press", "headline", "column", "editor"] | |
| } | |
| # Fast custom prompt matching | |
| if custom_prompt: | |
| prompt_lower = custom_prompt.lower() | |
| # Optimized pattern matching with early exit | |
| for detected_type, patterns in doc_type_patterns.items(): | |
| if any(term in prompt_lower for term in patterns): | |
| doc_type = detected_type | |
| break | |
| # Fast OCR text matching if still general type | |
| if doc_type == "general" and ocr_text_sample: | |
| ocr_lower = ocr_text_sample.lower() | |
| # Use the same patterns dictionary for consistency, but scan the OCR text | |
| for detected_type, patterns in doc_type_patterns.items(): | |
| if any(term in ocr_lower for term in patterns): | |
| doc_type = detected_type | |
| break | |
| # Cache the result with improved LRU-like behavior | |
| if len(StructuredOCR._doc_type_cache) >= StructuredOCR._doc_type_cache_size: | |
| # Clear multiple entries at once for better performance | |
| try: | |
| # Remove up to 20 entries to avoid frequent cache clearing | |
| for _ in range(20): | |
| if StructuredOCR._doc_type_cache: | |
| StructuredOCR._doc_type_cache.pop(next(iter(StructuredOCR._doc_type_cache))) | |
| except: | |
| # If concurrent modification causes issues, just proceed | |
| pass | |
| # Store in cache | |
| StructuredOCR._doc_type_cache[cache_key] = doc_type | |
| return doc_type | |
| def _detect_document_type(self, custom_prompt: Optional[str], ocr_text: str) -> str: | |
| """ | |
| Detect document type based on content and custom prompt. | |
| Args: | |
| custom_prompt: User-provided custom prompt | |
| ocr_text: OCR-extracted text | |
| Returns: | |
| Document type identifier ("handwritten", "printed", "letter", etc.) | |
| """ | |
| # Only sample first 1000 characters of OCR text for faster processing while maintaining accuracy | |
| ocr_sample = ocr_text[:1000] if ocr_text else "" | |
| # Use the cached version for better performance | |
| return self._detect_document_type_cached(custom_prompt, ocr_sample) | |
| def _build_enhanced_prompt(self, doc_type: str, ocr_text: str, custom_prompt: Optional[str]) -> str: | |
| """ | |
| Build an enhanced prompt based on document type. | |
| Args: | |
| doc_type: Detected document type | |
| ocr_text: OCR-extracted text | |
| custom_prompt: User-provided custom prompt | |
| Returns: | |
| Enhanced prompt optimized for the document type | |
| """ | |
| # Generic document section (included in all prompts) | |
| generic_section = ( | |
| f"This is a historical document's OCR text:\n" | |
| f"<BEGIN_OCR>\n{ocr_text}\n<END_OCR>\n\n" | |
| ) | |
| # Document-specific prompting | |
| if doc_type == "handwritten": | |
| specific_section = ( | |
| f"You are an expert historian specializing in handwritten document transcription and analysis. " | |
| f"The OCR system has attempted to capture the handwriting, but may have made errors with cursive script " | |
| f"or unusual letter formations.\n\n" | |
| f"Pay careful attention to:\n" | |
| f"- Correcting OCR errors common in handwriting recognition\n" | |
| f"- Preserving the original document structure\n" | |
| f"- Identifying topics, language(s), and document type accurately\n" | |
| f"- Detecting any names, dates, places, or events mentioned\n" | |
| ) | |
| elif doc_type == "letter": | |
| specific_section = ( | |
| f"You are an expert in historical correspondence analysis. " | |
| f"Analyze this letter as a historian would, identifying:\n" | |
| f"- Sender and recipient (if mentioned)\n" | |
| f"- Date and location of writing (if present)\n" | |
| f"- Key topics discussed\n" | |
| f"- Historical context and significance\n" | |
| f"- Sentiment and tone of the communication\n" | |
| f"- Closing formulations and signature\n" | |
| ) | |
| elif doc_type == "recipe": | |
| specific_section = ( | |
| f"You are a culinary historian specializing in historical recipes. " | |
| f"Analyze this recipe document to extract:\n" | |
| f"- Recipe name/title\n" | |
| f"- Complete list of ingredients with measurements\n" | |
| f"- Preparation instructions in correct order\n" | |
| f"- Cooking time and temperature if mentioned\n" | |
| f"- Serving suggestions or yield information\n" | |
| f"- Any cultural or historical context provided\n" | |
| ) | |
| elif doc_type == "travel": | |
| specific_section = ( | |
| f"You are a historian specializing in historical travel and exploration accounts. " | |
| f"Analyze this document to extract:\n" | |
| f"- Geographical locations mentioned\n" | |
| f"- Names of explorers, ships, or expeditions\n" | |
| f"- Dates and timelines\n" | |
| f"- Descriptions of indigenous peoples, cultures, or local conditions\n" | |
| f"- Natural features, weather, or navigational details\n" | |
| f"- Historical significance of the journey described\n" | |
| ) | |
| elif doc_type == "scientific": | |
| specific_section = ( | |
| f"You are a historian of science specializing in historical scientific documents. " | |
| f"Analyze this document to extract:\n" | |
| f"- Scientific methodology described\n" | |
| f"- Observations, measurements, or data presented\n" | |
| f"- Scientific terminology of the period\n" | |
| f"- Experimental apparatus or tools mentioned\n" | |
| f"- Conclusions or hypotheses presented\n" | |
| f"- Historical significance within scientific development\n" | |
| ) | |
| elif doc_type == "newspaper": | |
| specific_section = ( | |
| f"You are a media historian specializing in historical newspapers and publications. " | |
| f"Analyze this document to extract:\n" | |
| f"- Publication name and date if present\n" | |
| f"- Headlines and article titles\n" | |
| f"- Main news content with focus on events, people, and places\n" | |
| f"- Advertisement content if present\n" | |
| f"- Historical context and significance\n" | |
| f"- Editorial perspective or bias if detectable\n" | |
| ) | |
| elif doc_type == "legal": | |
| specific_section = ( | |
| f"You are a legal historian specializing in historical legal documents. " | |
| f"Analyze this document to extract:\n" | |
| f"- Document type (contract, certificate, will, deed, etc.)\n" | |
| f"- Parties involved and their roles\n" | |
| f"- Key terms, conditions, or declarations\n" | |
| f"- Dates, locations, and jurisdictions mentioned\n" | |
| f"- Legal terminology of the period\n" | |
| f"- Signatures, witnesses, or official markings\n" | |
| ) | |
| else: | |
| # General historical document | |
| specific_section = ( | |
| f"You are a historian specializing in historical document analysis. " | |
| f"Analyze this document to extract:\n" | |
| f"- Document type and purpose\n" | |
| f"- Time period and historical context\n" | |
| f"- Key topics, themes, and subjects\n" | |
| f"- People, places, and events mentioned\n" | |
| f"- Languages used and writing style\n" | |
| f"- Historical significance and connections\n" | |
| ) | |
| # Output instructions | |
| output_section = ( | |
| f"Create a structured JSON response with the following fields:\n" | |
| f"- file_name: The document's name\n" | |
| f"- topics: An array of topics covered in the document\n" | |
| f"- languages: An array of languages used in the document\n" | |
| f"- ocr_contents: A dictionary with the document's contents, organized logically\n" | |
| ) | |
| # Add custom prompt if provided | |
| custom_section = "" | |
| if custom_prompt: | |
| custom_section = f"\n\nADDITIONAL CONTEXT AND INSTRUCTIONS:\n{custom_prompt}\n" | |
| # Combine all sections into complete prompt | |
| return generic_section + specific_section + output_section + custom_section | |
| def _extract_structured_data_text_only(self, ocr_markdown, filename, custom_prompt=None): | |
| """ | |
| Extract structured data using text-only model with detailed historical context prompting | |
| and improved error handling | |
| """ | |
| logger = logging.getLogger("text_processor") | |
| start_time = time.time() | |
| try: | |
| # Fast path: Skip for minimal OCR text | |
| if not ocr_markdown or len(ocr_markdown.strip()) < 50: | |
| logger.info("Minimal OCR text - returning basic result") | |
| return { | |
| "file_name": filename, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" | |
| }, | |
| "processing_method": "minimal_text" | |
| } | |
| # Check for API key to avoid unnecessary processing | |
| if self.test_mode or not self.api_key: | |
| logger.info("Test mode or no API key - returning basic result") | |
| return { | |
| "file_name": filename, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "raw_text": ocr_markdown[:10000] if ocr_markdown else "No text could be extracted", | |
| "note": "API key not provided - showing raw OCR text only" | |
| }, | |
| "processing_method": "test_mode" | |
| } | |
| # Detect document type and build enhanced prompt | |
| doc_type = self._detect_document_type(custom_prompt, ocr_markdown) | |
| logger.info(f"Detected document type: {doc_type}") | |
| # If OCR text is very large, truncate it to avoid API limits | |
| truncated_text = ocr_markdown | |
| if len(ocr_markdown) > 25000: | |
| # Keep first 15000 chars and last 5000 chars | |
| truncated_text = ocr_markdown[:15000] + "\n...[content truncated]...\n" + ocr_markdown[-5000:] | |
| logger.info(f"OCR text truncated from {len(ocr_markdown)} to {len(truncated_text)} chars") | |
| # Build the prompt with truncated text if needed | |
| enhanced_prompt = self._build_enhanced_prompt(doc_type, truncated_text, custom_prompt) | |
| # Use enhanced prompt with text-only model - with retry logic | |
| max_retries = 2 | |
| retry_delay = 1 | |
| for retry in range(max_retries): | |
| try: | |
| logger.info(f"Calling text model ({TEXT_MODEL})") | |
| api_start = time.time() | |
| # Set appropriate timeout based on text length | |
| timeout_ms = min(120000, max(30000, len(truncated_text) * 5)) # 30-120s based on length | |
| # Make API call with appropriate timeout | |
| chat_response = self.client.chat.parse( | |
| model=TEXT_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": enhanced_prompt | |
| }, | |
| ], | |
| response_format=StructuredOCRModel, | |
| temperature=0, | |
| timeout_ms=timeout_ms | |
| ) | |
| api_time = time.time() - api_start | |
| logger.info(f"Text model API call completed in {api_time:.2f}s") | |
| # Convert the response to a dictionary | |
| result = json.loads(chat_response.choices[0].message.parsed.json()) | |
| # Ensure languages is a list of strings, not Language enum objects | |
| if 'languages' in result: | |
| result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
| # Add processing metadata | |
| result['processing_method'] = 'text_model' | |
| result['document_type'] = doc_type | |
| result['model_used'] = TEXT_MODEL | |
| result['processing_time'] = time.time() - start_time | |
| # Add raw text for reference if not already present | |
| if 'ocr_contents' in result and 'raw_text' not in result['ocr_contents']: | |
| # Add truncated raw text if very large | |
| if len(ocr_markdown) > 50000: | |
| result['ocr_contents']['raw_text'] = ocr_markdown[:50000] + "\n...[content truncated]..." | |
| else: | |
| result['ocr_contents']['raw_text'] = ocr_markdown | |
| return result | |
| except Exception as api_error: | |
| error_msg = str(api_error).lower() | |
| logger.warning(f"API error on attempt {retry+1}/{max_retries}: {str(api_error)}") | |
| # Check if retry would help | |
| if retry < max_retries - 1: | |
| # Rate limit errors - special handling with longer wait | |
| if any(term in error_msg for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
| # Check specifically for token exhaustion vs temporary rate limit | |
| if any(term in error_msg for term in ["quota", "credit", "subscription"]): | |
| logger.error("API quota or credit limit reached. No retry will help.") | |
| raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
| # Longer backoff for rate limit errors | |
| wait_time = retry_delay * (2 ** retry) * 6.0 # 6x longer wait for rate limits | |
| logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
| time.sleep(wait_time) | |
| # Other transient errors | |
| elif any(term in error_msg for term in ["timeout", "connection", "500", "503", "504"]): | |
| # Wait before retrying | |
| wait_time = retry_delay * (2 ** retry) | |
| logger.info(f"Transient error, retrying in {wait_time}s") | |
| time.sleep(wait_time) | |
| else: | |
| # Non-retryable error | |
| raise | |
| else: | |
| # Last retry failed | |
| raise | |
| # This shouldn't be reached due to raise in the loop, but just in case | |
| raise Exception("All retries failed for text model") | |
| except Exception as e: | |
| logger.error(f"Text model failed: {str(e)}. Creating basic result.") | |
| # Create a basic result with available OCR text | |
| try: | |
| # Create a more informative fallback result | |
| result = { | |
| "file_name": filename, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "raw_text": ocr_markdown[:50000] if ocr_markdown else "No text could be extracted", | |
| "error": f"AI processing failed: {str(e)}" | |
| }, | |
| "processing_method": "fallback", | |
| "processing_error": str(e), | |
| "processing_time": time.time() - start_time | |
| } | |
| # Try to extract some basic metadata even without AI | |
| if ocr_markdown: | |
| # Simple content analysis | |
| text_sample = ocr_markdown[:5000].lower() | |
| # Try to detect language | |
| if "dear" in text_sample and any(word in text_sample for word in ["sincerely", "regards", "truly"]): | |
| result["topics"].append("Letter") | |
| elif any(word in text_sample for word in ["recipe", "ingredients", "instructions", "cook", "bake"]): | |
| result["topics"].append("Recipe") | |
| elif any(word in text_sample for word in ["article", "report", "study", "analysis"]): | |
| result["topics"].append("Article") | |
| except Exception as inner_e: | |
| logger.error(f"Error creating basic result: {str(inner_e)}") | |
| result = { | |
| "file_name": str(filename) if filename else "unknown", | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "error": "Processing failed completely", | |
| "partial_text": ocr_markdown[:1000] if ocr_markdown else "Document could not be processed." | |
| } | |
| } | |
| return result | |
| # For testing directly | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python structured_ocr.py <file_path>") | |
| sys.exit(1) | |
| file_path = sys.argv[1] | |
| processor = StructuredOCR() | |
| result = processor.process_file(file_path) | |
| print(json.dumps(result, indent=2)) |