"""Cloud processor for Nanonets API integration with API key pool rotation and local fallback.""" import os import requests import json import logging import time from typing import Dict, Any, Optional, List from .base import BaseProcessor from ..result import ConversionResult from ..exceptions import ConversionError logger = logging.getLogger(__name__) # Default reset time for rate-limited keys (1 hour) DEFAULT_RATE_LIMIT_RESET = 3600 class CloudConversionResult(ConversionResult): """Enhanced ConversionResult for cloud mode with lazy API calls, key rotation, and local fallback.""" def __init__(self, file_path: str, cloud_processor: 'CloudProcessor', metadata: Optional[Dict[str, Any]] = None, api_key_pool=None, local_fallback_processor=None): # Initialize with empty content - we'll make API calls on demand super().__init__("", metadata) self.file_path = file_path self.cloud_processor = cloud_processor self.api_key_pool = api_key_pool self.local_fallback_processor = local_fallback_processor # GPU processor or None self._cached_outputs = {} # Cache API responses by output type self._used_fallback = False # Track if we fell back to local processing def _get_cloud_output(self, output_type: str, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> str: """Get output from cloud API for specific type, with caching, key rotation, and local fallback.""" # Validate output type valid_output_types = ["markdown", "flat-json", "html", "csv", "specified-fields", "specified-json"] if output_type not in valid_output_types: logger.warning(f"Invalid output type '{output_type}' for cloud API. Using 'markdown'.") output_type = "markdown" # Create cache key based on output type and parameters cache_key = output_type if specified_fields: cache_key += f"_fields_{','.join(specified_fields)}" if json_schema: cache_key += f"_schema_{hash(str(json_schema))}" if cache_key in self._cached_outputs: return self._cached_outputs[cache_key] # If we already fell back to local, skip cloud if self._used_fallback: return self._convert_locally(output_type) # Try cloud API with key rotation last_error = None keys_tried = set() while True: # Get next available key from pool current_key = None if self.api_key_pool: current_key = self.api_key_pool.get_next_key() # Also try the processor's own key if set if not current_key and self.cloud_processor.api_key: current_key = self.cloud_processor.api_key if not current_key: logger.info("No API keys available, falling back to local processing") return self._convert_locally(output_type) # Don't try the same key twice in one cycle if current_key in keys_tried: logger.info("All API keys rate limited, falling back to local processing") return self._convert_locally(output_type) keys_tried.add(current_key) try: # Prepare headers headers = {} if current_key: headers['Authorization'] = f'Bearer {current_key}' # Prepare file for upload with open(self.file_path, 'rb') as file: files = { 'file': (os.path.basename(self.file_path), file, self.cloud_processor._get_content_type(self.file_path)) } data = { 'output_type': output_type } # Add model_type if specified if self.cloud_processor.model_type: data['model_type'] = self.cloud_processor.model_type # Add field extraction parameters if output_type == "specified-fields" and specified_fields: data['specified_fields'] = ','.join(specified_fields) elif output_type == "specified-json" and json_schema: data['json_schema'] = json.dumps(json_schema) log_prefix = f"API key {current_key[:8]}..." if current_key else "no auth" logger.info(f"Making cloud API call ({log_prefix}) for {output_type} on {self.file_path}") # Make API request response = requests.post( self.cloud_processor.api_url, headers=headers, files=files, data=data, timeout=300 ) # Handle rate limiting (429) - mark key as limited and try next if response.status_code == 429: # Mark this key as rate limited in the pool if self.api_key_pool: self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET) # Also mark the processor's key if it matches if self.cloud_processor.api_key == current_key: logger.warning(f"Processor API key rate limited, will try pool keys") logger.warning(f"API key {current_key[:8]}... rate limited, trying next key...") last_error = f"Rate limited (429)" continue response.raise_for_status() result_data = response.json() # Extract content from response content = self.cloud_processor._extract_content_from_response(result_data) # Cache the result self._cached_outputs[cache_key] = content return content except requests.exceptions.HTTPError as e: if '429' in str(e): if self.api_key_pool: self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET) logger.warning(f"API key {current_key[:8]}... rate limited (HTTPError), trying next key...") last_error = str(e) continue else: logger.error(f"Cloud API HTTP error: {e}") last_error = str(e) break except Exception as e: logger.error(f"Cloud API call failed: {e}") last_error = str(e) break # All keys exhausted, fall back to local processing logger.warning(f"All API keys rate limited or failed. Falling back to local Docling processing.") self._used_fallback = True return self._convert_locally(output_type) def _convert_locally(self, output_type: str) -> str: """Fallback to local Docling/GPU conversion methods.""" self._used_fallback = True # Try the local fallback processor (GPU processor with Docling models) if self.local_fallback_processor: try: logger.info(f"Using local Docling processor for fallback on {self.file_path}") local_result = self.local_fallback_processor.process(self.file_path) if output_type == "html": return local_result.extract_html() elif output_type == "flat-json": return json.dumps(local_result.extract_data(), indent=2) elif output_type == "csv": return local_result.extract_csv(include_all_tables=True) else: return local_result.extract_markdown() except Exception as e: logger.error(f"Local Docling fallback also failed: {e}") # Last resort: use parent class methods if output_type == "html": return super().extract_html() elif output_type == "flat-json": return json.dumps(super().extract_data(), indent=2) elif output_type == "csv": return super().extract_csv(include_all_tables=True) else: return self.content def extract_markdown(self) -> str: """Export as markdown.""" return self._get_cloud_output("markdown") def extract_html(self) -> str: """Export as HTML.""" return self._get_cloud_output("html") def extract_data(self, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> Dict[str, Any]: """Export as structured JSON with optional field extraction. Args: specified_fields: Optional list of specific fields to extract json_schema: Optional JSON schema defining fields and types to extract Returns: Structured JSON with extracted data """ try: if specified_fields: # Request specified fields extraction content = self._get_cloud_output("specified-fields", specified_fields=specified_fields) extracted_data = json.loads(content) return { "extracted_fields": extracted_data, "format": "specified_fields" } elif json_schema: # Request JSON schema extraction content = self._get_cloud_output("specified-json", json_schema=json_schema) extracted_data = json.loads(content) return { "structured_data": extracted_data, "format": "structured_json" } else: # Standard JSON extraction json_content = self._get_cloud_output("flat-json") parsed_content = json.loads(json_content) return { "document": parsed_content, "format": "cloud_flat_json" } except Exception as e: logger.error(f"Failed to parse JSON content: {e}") return { "document": {"raw_content": content if 'content' in locals() else ""}, "format": "json_parse_error", "error": str(e) } def extract_text(self) -> str: """Export as plain text.""" # For text output, we can try markdown first and then extract to text try: return self._get_cloud_output("markdown") except Exception as e: logger.error(f"Failed to get text output: {e}") return "" def extract_csv(self, table_index: int = 0, include_all_tables: bool = False) -> str: """Export tables as CSV format. Args: table_index: Which table to export (0-based index). Default is 0 (first table). include_all_tables: If True, export all tables with separators. Default is False. Returns: CSV formatted string of the table(s) Raises: ValueError: If no tables are found or table_index is out of range """ return self._get_cloud_output("csv") class CloudProcessor(BaseProcessor): """Processor for cloud-based document conversion using Nanonets API with API key pool rotation.""" def __init__(self, api_key: Optional[str] = None, output_type: str = None, model_type: Optional[str] = None, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None, api_key_pool=None, local_fallback_processor=None, **kwargs): """Initialize the cloud processor. Args: api_key: API key for cloud processing (optional - uses rate-limited free tier without key) output_type: Output type for cloud processing (markdown, flat-json, html, csv, specified-fields, specified-json) model_type: Model type for cloud processing (gemini, openapi, nanonets) specified_fields: List of fields to extract (for specified-fields output type) json_schema: JSON schema defining fields and types to extract (for specified-json output type) api_key_pool: ApiKeyPool instance for key rotation local_fallback_processor: Local processor (GPU/Docling) for fallback when all keys exhausted """ super().__init__(**kwargs) self.api_key = api_key self.output_type = output_type self.model_type = model_type self.specified_fields = specified_fields self.json_schema = json_schema self.api_key_pool = api_key_pool self.local_fallback_processor = local_fallback_processor self.api_url = "https://extraction-api.nanonets.com/extract" # Don't validate output_type during initialization - it will be validated during processing # This prevents warnings during DocumentExtractor initialization def can_process(self, file_path: str) -> bool: """Check if the processor can handle the file.""" # Cloud processor supports most common document formats # API key is optional - without it, uses rate-limited free tier supported_extensions = { '.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt', '.txt', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.tif' } _, ext = os.path.splitext(file_path.lower()) return ext in supported_extensions def process(self, file_path: str) -> CloudConversionResult: """Create a lazy CloudConversionResult that will make API calls on demand with key rotation. Args: file_path: Path to the file to process Returns: CloudConversionResult that makes API calls when output methods are called Raises: ConversionError: If file doesn't exist """ if not os.path.exists(file_path): raise ConversionError(f"File not found: {file_path}") # Create metadata without making any API calls metadata = { 'source_file': file_path, 'processing_mode': 'cloud', 'api_provider': 'nanonets', 'file_size': os.path.getsize(file_path), 'model_type': self.model_type, 'has_api_key': bool(self.api_key), 'key_rotation': True, 'local_fallback': self.local_fallback_processor is not None } if self.api_key: logger.info(f"Created cloud extractor for {file_path} with API key pool rotation") else: logger.info(f"Created cloud extractor for {file_path} without API key - will use pool + local fallback") # Return lazy result with key pool and local fallback return CloudConversionResult( file_path=file_path, cloud_processor=self, metadata=metadata, api_key_pool=self.api_key_pool, local_fallback_processor=self.local_fallback_processor ) def _extract_content_from_response(self, response_data: Dict[str, Any]) -> str: """Extract content from API response.""" try: # API always returns content in the 'content' field if 'content' in response_data: return response_data['content'] # Fallback: return whole response as JSON if no content field logger.warning("No 'content' field found in API response, returning full response") return json.dumps(response_data, indent=2) except Exception as e: logger.error(f"Failed to extract content from API response: {e}") return json.dumps(response_data, indent=2) def _get_content_type(self, file_path: str) -> str: """Get content type for file upload.""" _, ext = os.path.splitext(file_path.lower()) content_types = { '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xls': 'application/vnd.ms-excel', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', '.txt': 'text/plain', '.html': 'text/html', '.htm': 'text/html', '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff' } return content_types.get(ext, 'application/octet-stream')