File size: 17,218 Bytes
5b14aa2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | """Cloud processor for Nanonets API integration with API key pool rotation and local fallback."""
import os
import requests
import json
import logging
import time
from typing import Dict, Any, Optional, List
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError
logger = logging.getLogger(__name__)
# Default reset time for rate-limited keys (1 hour)
DEFAULT_RATE_LIMIT_RESET = 3600
class CloudConversionResult(ConversionResult):
"""Enhanced ConversionResult for cloud mode with lazy API calls, key rotation, and local fallback."""
def __init__(self, file_path: str, cloud_processor: 'CloudProcessor', metadata: Optional[Dict[str, Any]] = None,
api_key_pool=None, local_fallback_processor=None):
# Initialize with empty content - we'll make API calls on demand
super().__init__("", metadata)
self.file_path = file_path
self.cloud_processor = cloud_processor
self.api_key_pool = api_key_pool
self.local_fallback_processor = local_fallback_processor # GPU processor or None
self._cached_outputs = {} # Cache API responses by output type
self._used_fallback = False # Track if we fell back to local processing
def _get_cloud_output(self, output_type: str, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> str:
"""Get output from cloud API for specific type, with caching, key rotation, and local fallback."""
# Validate output type
valid_output_types = ["markdown", "flat-json", "html", "csv", "specified-fields", "specified-json"]
if output_type not in valid_output_types:
logger.warning(f"Invalid output type '{output_type}' for cloud API. Using 'markdown'.")
output_type = "markdown"
# Create cache key based on output type and parameters
cache_key = output_type
if specified_fields:
cache_key += f"_fields_{','.join(specified_fields)}"
if json_schema:
cache_key += f"_schema_{hash(str(json_schema))}"
if cache_key in self._cached_outputs:
return self._cached_outputs[cache_key]
# If we already fell back to local, skip cloud
if self._used_fallback:
return self._convert_locally(output_type)
# Try cloud API with key rotation
last_error = None
keys_tried = set()
while True:
# Get next available key from pool
current_key = None
if self.api_key_pool:
current_key = self.api_key_pool.get_next_key()
# Also try the processor's own key if set
if not current_key and self.cloud_processor.api_key:
current_key = self.cloud_processor.api_key
if not current_key:
logger.info("No API keys available, falling back to local processing")
return self._convert_locally(output_type)
# Don't try the same key twice in one cycle
if current_key in keys_tried:
logger.info("All API keys rate limited, falling back to local processing")
return self._convert_locally(output_type)
keys_tried.add(current_key)
try:
# Prepare headers
headers = {}
if current_key:
headers['Authorization'] = f'Bearer {current_key}'
# Prepare file for upload
with open(self.file_path, 'rb') as file:
files = {
'file': (os.path.basename(self.file_path), file, self.cloud_processor._get_content_type(self.file_path))
}
data = {
'output_type': output_type
}
# Add model_type if specified
if self.cloud_processor.model_type:
data['model_type'] = self.cloud_processor.model_type
# Add field extraction parameters
if output_type == "specified-fields" and specified_fields:
data['specified_fields'] = ','.join(specified_fields)
elif output_type == "specified-json" and json_schema:
data['json_schema'] = json.dumps(json_schema)
log_prefix = f"API key {current_key[:8]}..." if current_key else "no auth"
logger.info(f"Making cloud API call ({log_prefix}) for {output_type} on {self.file_path}")
# Make API request
response = requests.post(
self.cloud_processor.api_url,
headers=headers,
files=files,
data=data,
timeout=300
)
# Handle rate limiting (429) - mark key as limited and try next
if response.status_code == 429:
# Mark this key as rate limited in the pool
if self.api_key_pool:
self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET)
# Also mark the processor's key if it matches
if self.cloud_processor.api_key == current_key:
logger.warning(f"Processor API key rate limited, will try pool keys")
logger.warning(f"API key {current_key[:8]}... rate limited, trying next key...")
last_error = f"Rate limited (429)"
continue
response.raise_for_status()
result_data = response.json()
# Extract content from response
content = self.cloud_processor._extract_content_from_response(result_data)
# Cache the result
self._cached_outputs[cache_key] = content
return content
except requests.exceptions.HTTPError as e:
if '429' in str(e):
if self.api_key_pool:
self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET)
logger.warning(f"API key {current_key[:8]}... rate limited (HTTPError), trying next key...")
last_error = str(e)
continue
else:
logger.error(f"Cloud API HTTP error: {e}")
last_error = str(e)
break
except Exception as e:
logger.error(f"Cloud API call failed: {e}")
last_error = str(e)
break
# All keys exhausted, fall back to local processing
logger.warning(f"All API keys rate limited or failed. Falling back to local Docling processing.")
self._used_fallback = True
return self._convert_locally(output_type)
def _convert_locally(self, output_type: str) -> str:
"""Fallback to local Docling/GPU conversion methods."""
self._used_fallback = True
# Try the local fallback processor (GPU processor with Docling models)
if self.local_fallback_processor:
try:
logger.info(f"Using local Docling processor for fallback on {self.file_path}")
local_result = self.local_fallback_processor.process(self.file_path)
if output_type == "html":
return local_result.extract_html()
elif output_type == "flat-json":
return json.dumps(local_result.extract_data(), indent=2)
elif output_type == "csv":
return local_result.extract_csv(include_all_tables=True)
else:
return local_result.extract_markdown()
except Exception as e:
logger.error(f"Local Docling fallback also failed: {e}")
# Last resort: use parent class methods
if output_type == "html":
return super().extract_html()
elif output_type == "flat-json":
return json.dumps(super().extract_data(), indent=2)
elif output_type == "csv":
return super().extract_csv(include_all_tables=True)
else:
return self.content
def extract_markdown(self) -> str:
"""Export as markdown."""
return self._get_cloud_output("markdown")
def extract_html(self) -> str:
"""Export as HTML."""
return self._get_cloud_output("html")
def extract_data(self, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> Dict[str, Any]:
"""Export as structured JSON with optional field extraction.
Args:
specified_fields: Optional list of specific fields to extract
json_schema: Optional JSON schema defining fields and types to extract
Returns:
Structured JSON with extracted data
"""
try:
if specified_fields:
# Request specified fields extraction
content = self._get_cloud_output("specified-fields", specified_fields=specified_fields)
extracted_data = json.loads(content)
return {
"extracted_fields": extracted_data,
"format": "specified_fields"
}
elif json_schema:
# Request JSON schema extraction
content = self._get_cloud_output("specified-json", json_schema=json_schema)
extracted_data = json.loads(content)
return {
"structured_data": extracted_data,
"format": "structured_json"
}
else:
# Standard JSON extraction
json_content = self._get_cloud_output("flat-json")
parsed_content = json.loads(json_content)
return {
"document": parsed_content,
"format": "cloud_flat_json"
}
except Exception as e:
logger.error(f"Failed to parse JSON content: {e}")
return {
"document": {"raw_content": content if 'content' in locals() else ""},
"format": "json_parse_error",
"error": str(e)
}
def extract_text(self) -> str:
"""Export as plain text."""
# For text output, we can try markdown first and then extract to text
try:
return self._get_cloud_output("markdown")
except Exception as e:
logger.error(f"Failed to get text output: {e}")
return ""
def extract_csv(self, table_index: int = 0, include_all_tables: bool = False) -> str:
"""Export tables as CSV format.
Args:
table_index: Which table to export (0-based index). Default is 0 (first table).
include_all_tables: If True, export all tables with separators. Default is False.
Returns:
CSV formatted string of the table(s)
Raises:
ValueError: If no tables are found or table_index is out of range
"""
return self._get_cloud_output("csv")
class CloudProcessor(BaseProcessor):
"""Processor for cloud-based document conversion using Nanonets API with API key pool rotation."""
def __init__(self, api_key: Optional[str] = None, output_type: str = None, model_type: Optional[str] = None,
specified_fields: Optional[list] = None, json_schema: Optional[dict] = None,
api_key_pool=None, local_fallback_processor=None, **kwargs):
"""Initialize the cloud processor.
Args:
api_key: API key for cloud processing (optional - uses rate-limited free tier without key)
output_type: Output type for cloud processing (markdown, flat-json, html, csv, specified-fields, specified-json)
model_type: Model type for cloud processing (gemini, openapi, nanonets)
specified_fields: List of fields to extract (for specified-fields output type)
json_schema: JSON schema defining fields and types to extract (for specified-json output type)
api_key_pool: ApiKeyPool instance for key rotation
local_fallback_processor: Local processor (GPU/Docling) for fallback when all keys exhausted
"""
super().__init__(**kwargs)
self.api_key = api_key
self.output_type = output_type
self.model_type = model_type
self.specified_fields = specified_fields
self.json_schema = json_schema
self.api_key_pool = api_key_pool
self.local_fallback_processor = local_fallback_processor
self.api_url = "https://extraction-api.nanonets.com/extract"
# Don't validate output_type during initialization - it will be validated during processing
# This prevents warnings during DocumentExtractor initialization
def can_process(self, file_path: str) -> bool:
"""Check if the processor can handle the file."""
# Cloud processor supports most common document formats
# API key is optional - without it, uses rate-limited free tier
supported_extensions = {
'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt',
'.txt', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif',
'.bmp', '.tiff', '.tif'
}
_, ext = os.path.splitext(file_path.lower())
return ext in supported_extensions
def process(self, file_path: str) -> CloudConversionResult:
"""Create a lazy CloudConversionResult that will make API calls on demand with key rotation.
Args:
file_path: Path to the file to process
Returns:
CloudConversionResult that makes API calls when output methods are called
Raises:
ConversionError: If file doesn't exist
"""
if not os.path.exists(file_path):
raise ConversionError(f"File not found: {file_path}")
# Create metadata without making any API calls
metadata = {
'source_file': file_path,
'processing_mode': 'cloud',
'api_provider': 'nanonets',
'file_size': os.path.getsize(file_path),
'model_type': self.model_type,
'has_api_key': bool(self.api_key),
'key_rotation': True,
'local_fallback': self.local_fallback_processor is not None
}
if self.api_key:
logger.info(f"Created cloud extractor for {file_path} with API key pool rotation")
else:
logger.info(f"Created cloud extractor for {file_path} without API key - will use pool + local fallback")
# Return lazy result with key pool and local fallback
return CloudConversionResult(
file_path=file_path,
cloud_processor=self,
metadata=metadata,
api_key_pool=self.api_key_pool,
local_fallback_processor=self.local_fallback_processor
)
def _extract_content_from_response(self, response_data: Dict[str, Any]) -> str:
"""Extract content from API response."""
try:
# API always returns content in the 'content' field
if 'content' in response_data:
return response_data['content']
# Fallback: return whole response as JSON if no content field
logger.warning("No 'content' field found in API response, returning full response")
return json.dumps(response_data, indent=2)
except Exception as e:
logger.error(f"Failed to extract content from API response: {e}")
return json.dumps(response_data, indent=2)
def _get_content_type(self, file_path: str) -> str:
"""Get content type for file upload."""
_, ext = os.path.splitext(file_path.lower())
content_types = {
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.txt': 'text/plain',
'.html': 'text/html',
'.htm': 'text/html',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.tiff': 'image/tiff',
'.tif': 'image/tiff'
}
return content_types.get(ext, 'application/octet-stream') |