docling-processor / docstrange /processors /cloud_processor.py
arjunbhargav212's picture
Upload 63 files
5b14aa2 verified
"""Cloud processor for Nanonets API integration with API key pool rotation and local fallback."""
import os
import requests
import json
import logging
import time
from typing import Dict, Any, Optional, List
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError
logger = logging.getLogger(__name__)
# Default reset time for rate-limited keys (1 hour)
DEFAULT_RATE_LIMIT_RESET = 3600
class CloudConversionResult(ConversionResult):
"""Enhanced ConversionResult for cloud mode with lazy API calls, key rotation, and local fallback."""
def __init__(self, file_path: str, cloud_processor: 'CloudProcessor', metadata: Optional[Dict[str, Any]] = None,
api_key_pool=None, local_fallback_processor=None):
# Initialize with empty content - we'll make API calls on demand
super().__init__("", metadata)
self.file_path = file_path
self.cloud_processor = cloud_processor
self.api_key_pool = api_key_pool
self.local_fallback_processor = local_fallback_processor # GPU processor or None
self._cached_outputs = {} # Cache API responses by output type
self._used_fallback = False # Track if we fell back to local processing
def _get_cloud_output(self, output_type: str, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> str:
"""Get output from cloud API for specific type, with caching, key rotation, and local fallback."""
# Validate output type
valid_output_types = ["markdown", "flat-json", "html", "csv", "specified-fields", "specified-json"]
if output_type not in valid_output_types:
logger.warning(f"Invalid output type '{output_type}' for cloud API. Using 'markdown'.")
output_type = "markdown"
# Create cache key based on output type and parameters
cache_key = output_type
if specified_fields:
cache_key += f"_fields_{','.join(specified_fields)}"
if json_schema:
cache_key += f"_schema_{hash(str(json_schema))}"
if cache_key in self._cached_outputs:
return self._cached_outputs[cache_key]
# If we already fell back to local, skip cloud
if self._used_fallback:
return self._convert_locally(output_type)
# Try cloud API with key rotation
last_error = None
keys_tried = set()
while True:
# Get next available key from pool
current_key = None
if self.api_key_pool:
current_key = self.api_key_pool.get_next_key()
# Also try the processor's own key if set
if not current_key and self.cloud_processor.api_key:
current_key = self.cloud_processor.api_key
if not current_key:
logger.info("No API keys available, falling back to local processing")
return self._convert_locally(output_type)
# Don't try the same key twice in one cycle
if current_key in keys_tried:
logger.info("All API keys rate limited, falling back to local processing")
return self._convert_locally(output_type)
keys_tried.add(current_key)
try:
# Prepare headers
headers = {}
if current_key:
headers['Authorization'] = f'Bearer {current_key}'
# Prepare file for upload
with open(self.file_path, 'rb') as file:
files = {
'file': (os.path.basename(self.file_path), file, self.cloud_processor._get_content_type(self.file_path))
}
data = {
'output_type': output_type
}
# Add model_type if specified
if self.cloud_processor.model_type:
data['model_type'] = self.cloud_processor.model_type
# Add field extraction parameters
if output_type == "specified-fields" and specified_fields:
data['specified_fields'] = ','.join(specified_fields)
elif output_type == "specified-json" and json_schema:
data['json_schema'] = json.dumps(json_schema)
log_prefix = f"API key {current_key[:8]}..." if current_key else "no auth"
logger.info(f"Making cloud API call ({log_prefix}) for {output_type} on {self.file_path}")
# Make API request
response = requests.post(
self.cloud_processor.api_url,
headers=headers,
files=files,
data=data,
timeout=300
)
# Handle rate limiting (429) - mark key as limited and try next
if response.status_code == 429:
# Mark this key as rate limited in the pool
if self.api_key_pool:
self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET)
# Also mark the processor's key if it matches
if self.cloud_processor.api_key == current_key:
logger.warning(f"Processor API key rate limited, will try pool keys")
logger.warning(f"API key {current_key[:8]}... rate limited, trying next key...")
last_error = f"Rate limited (429)"
continue
response.raise_for_status()
result_data = response.json()
# Extract content from response
content = self.cloud_processor._extract_content_from_response(result_data)
# Cache the result
self._cached_outputs[cache_key] = content
return content
except requests.exceptions.HTTPError as e:
if '429' in str(e):
if self.api_key_pool:
self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET)
logger.warning(f"API key {current_key[:8]}... rate limited (HTTPError), trying next key...")
last_error = str(e)
continue
else:
logger.error(f"Cloud API HTTP error: {e}")
last_error = str(e)
break
except Exception as e:
logger.error(f"Cloud API call failed: {e}")
last_error = str(e)
break
# All keys exhausted, fall back to local processing
logger.warning(f"All API keys rate limited or failed. Falling back to local Docling processing.")
self._used_fallback = True
return self._convert_locally(output_type)
def _convert_locally(self, output_type: str) -> str:
"""Fallback to local Docling/GPU conversion methods."""
self._used_fallback = True
# Try the local fallback processor (GPU processor with Docling models)
if self.local_fallback_processor:
try:
logger.info(f"Using local Docling processor for fallback on {self.file_path}")
local_result = self.local_fallback_processor.process(self.file_path)
if output_type == "html":
return local_result.extract_html()
elif output_type == "flat-json":
return json.dumps(local_result.extract_data(), indent=2)
elif output_type == "csv":
return local_result.extract_csv(include_all_tables=True)
else:
return local_result.extract_markdown()
except Exception as e:
logger.error(f"Local Docling fallback also failed: {e}")
# Last resort: use parent class methods
if output_type == "html":
return super().extract_html()
elif output_type == "flat-json":
return json.dumps(super().extract_data(), indent=2)
elif output_type == "csv":
return super().extract_csv(include_all_tables=True)
else:
return self.content
def extract_markdown(self) -> str:
"""Export as markdown."""
return self._get_cloud_output("markdown")
def extract_html(self) -> str:
"""Export as HTML."""
return self._get_cloud_output("html")
def extract_data(self, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> Dict[str, Any]:
"""Export as structured JSON with optional field extraction.
Args:
specified_fields: Optional list of specific fields to extract
json_schema: Optional JSON schema defining fields and types to extract
Returns:
Structured JSON with extracted data
"""
try:
if specified_fields:
# Request specified fields extraction
content = self._get_cloud_output("specified-fields", specified_fields=specified_fields)
extracted_data = json.loads(content)
return {
"extracted_fields": extracted_data,
"format": "specified_fields"
}
elif json_schema:
# Request JSON schema extraction
content = self._get_cloud_output("specified-json", json_schema=json_schema)
extracted_data = json.loads(content)
return {
"structured_data": extracted_data,
"format": "structured_json"
}
else:
# Standard JSON extraction
json_content = self._get_cloud_output("flat-json")
parsed_content = json.loads(json_content)
return {
"document": parsed_content,
"format": "cloud_flat_json"
}
except Exception as e:
logger.error(f"Failed to parse JSON content: {e}")
return {
"document": {"raw_content": content if 'content' in locals() else ""},
"format": "json_parse_error",
"error": str(e)
}
def extract_text(self) -> str:
"""Export as plain text."""
# For text output, we can try markdown first and then extract to text
try:
return self._get_cloud_output("markdown")
except Exception as e:
logger.error(f"Failed to get text output: {e}")
return ""
def extract_csv(self, table_index: int = 0, include_all_tables: bool = False) -> str:
"""Export tables as CSV format.
Args:
table_index: Which table to export (0-based index). Default is 0 (first table).
include_all_tables: If True, export all tables with separators. Default is False.
Returns:
CSV formatted string of the table(s)
Raises:
ValueError: If no tables are found or table_index is out of range
"""
return self._get_cloud_output("csv")
class CloudProcessor(BaseProcessor):
"""Processor for cloud-based document conversion using Nanonets API with API key pool rotation."""
def __init__(self, api_key: Optional[str] = None, output_type: str = None, model_type: Optional[str] = None,
specified_fields: Optional[list] = None, json_schema: Optional[dict] = None,
api_key_pool=None, local_fallback_processor=None, **kwargs):
"""Initialize the cloud processor.
Args:
api_key: API key for cloud processing (optional - uses rate-limited free tier without key)
output_type: Output type for cloud processing (markdown, flat-json, html, csv, specified-fields, specified-json)
model_type: Model type for cloud processing (gemini, openapi, nanonets)
specified_fields: List of fields to extract (for specified-fields output type)
json_schema: JSON schema defining fields and types to extract (for specified-json output type)
api_key_pool: ApiKeyPool instance for key rotation
local_fallback_processor: Local processor (GPU/Docling) for fallback when all keys exhausted
"""
super().__init__(**kwargs)
self.api_key = api_key
self.output_type = output_type
self.model_type = model_type
self.specified_fields = specified_fields
self.json_schema = json_schema
self.api_key_pool = api_key_pool
self.local_fallback_processor = local_fallback_processor
self.api_url = "https://extraction-api.nanonets.com/extract"
# Don't validate output_type during initialization - it will be validated during processing
# This prevents warnings during DocumentExtractor initialization
def can_process(self, file_path: str) -> bool:
"""Check if the processor can handle the file."""
# Cloud processor supports most common document formats
# API key is optional - without it, uses rate-limited free tier
supported_extensions = {
'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt',
'.txt', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif',
'.bmp', '.tiff', '.tif'
}
_, ext = os.path.splitext(file_path.lower())
return ext in supported_extensions
def process(self, file_path: str) -> CloudConversionResult:
"""Create a lazy CloudConversionResult that will make API calls on demand with key rotation.
Args:
file_path: Path to the file to process
Returns:
CloudConversionResult that makes API calls when output methods are called
Raises:
ConversionError: If file doesn't exist
"""
if not os.path.exists(file_path):
raise ConversionError(f"File not found: {file_path}")
# Create metadata without making any API calls
metadata = {
'source_file': file_path,
'processing_mode': 'cloud',
'api_provider': 'nanonets',
'file_size': os.path.getsize(file_path),
'model_type': self.model_type,
'has_api_key': bool(self.api_key),
'key_rotation': True,
'local_fallback': self.local_fallback_processor is not None
}
if self.api_key:
logger.info(f"Created cloud extractor for {file_path} with API key pool rotation")
else:
logger.info(f"Created cloud extractor for {file_path} without API key - will use pool + local fallback")
# Return lazy result with key pool and local fallback
return CloudConversionResult(
file_path=file_path,
cloud_processor=self,
metadata=metadata,
api_key_pool=self.api_key_pool,
local_fallback_processor=self.local_fallback_processor
)
def _extract_content_from_response(self, response_data: Dict[str, Any]) -> str:
"""Extract content from API response."""
try:
# API always returns content in the 'content' field
if 'content' in response_data:
return response_data['content']
# Fallback: return whole response as JSON if no content field
logger.warning("No 'content' field found in API response, returning full response")
return json.dumps(response_data, indent=2)
except Exception as e:
logger.error(f"Failed to extract content from API response: {e}")
return json.dumps(response_data, indent=2)
def _get_content_type(self, file_path: str) -> str:
"""Get content type for file upload."""
_, ext = os.path.splitext(file_path.lower())
content_types = {
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.txt': 'text/plain',
'.html': 'text/html',
'.htm': 'text/html',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.tiff': 'image/tiff',
'.tif': 'image/tiff'
}
return content_types.get(ext, 'application/octet-stream')