| """Main extractor class for handling document conversion.""" |
|
|
| import os |
| import logging |
| from typing import List, Optional |
|
|
| from .processors import ( |
| PDFProcessor, |
| DOCXProcessor, |
| TXTProcessor, |
| ExcelProcessor, |
| URLProcessor, |
| HTMLProcessor, |
| PPTXProcessor, |
| ImageProcessor, |
| CloudProcessor, |
| GPUProcessor, |
| ) |
| from .result import ConversionResult |
| from .exceptions import ConversionError, UnsupportedFormatError, FileNotFoundError |
| from .utils.gpu_utils import should_use_gpu_processor |
| from .services.api_key_pool import ApiKeyPool |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DocumentExtractor: |
| """Main class for converting documents to LLM-ready formats.""" |
|
|
| def __init__( |
| self, |
| preserve_layout: bool = True, |
| include_images: bool = True, |
| ocr_enabled: bool = True, |
| api_key: Optional[str] = None, |
| api_keys: Optional[List[str]] = None, |
| model: Optional[str] = None, |
| gpu: bool = False |
| ): |
| """Initialize the file extractor. |
| |
| Args: |
| preserve_layout: Whether to preserve document layout |
| include_images: Whether to include images in output |
| ocr_enabled: Whether to enable OCR for image and PDF processing |
| api_key: Single API key for cloud processing. Prefer 'docstrange login' for 10k docs/month |
| api_keys: List of API keys for automatic rotation when one hits rate limit |
| model: Model to use for cloud processing (gemini, openapi) - only for cloud mode |
| gpu: Force local GPU processing (disables cloud mode, requires GPU) |
| |
| Note: |
| - Cloud mode is default unless gpu is specified |
| - Multiple api_keys enable automatic rotation on rate limit |
| - Without login/API key, limited calls per day |
| - For 10k docs/month, run 'docstrange login' (recommended) or use API keys |
| """ |
| self.preserve_layout = preserve_layout |
| self.include_images = include_images |
| self.api_key = api_key |
| self.api_keys_list = api_keys or [] |
| self.model = model |
| self.gpu = gpu |
|
|
| |
| |
| self.cloud_mode = not self.gpu |
|
|
| |
| if self.gpu and not should_use_gpu_processor(): |
| raise RuntimeError( |
| "GPU preference specified but no GPU is available. " |
| "Please ensure CUDA is installed and a compatible GPU is present." |
| ) |
|
|
| |
| if ocr_enabled is None: |
| self.ocr_enabled = True |
| else: |
| self.ocr_enabled = ocr_enabled |
|
|
| |
| self.api_key_pool = ApiKeyPool.get_instance() |
|
|
| |
| if api_key: |
| self.api_key_pool.add_key(api_key, source="constructor") |
| for key in self.api_keys_list: |
| self.api_key_pool.add_key(key, source="constructor_list") |
|
|
| |
| if self.cloud_mode and not self.api_key: |
| env_keys = os.environ.get('NANONETS_API_KEYS', '') |
| if env_keys: |
| for key in env_keys.split(','): |
| key = key.strip() |
| if key: |
| self.api_key_pool.add_key(key, source="env") |
|
|
| |
| single_key = os.environ.get('NANONETS_API_KEY') |
| if single_key: |
| self.api_key_pool.add_key(single_key, source="env_single") |
|
|
| |
| if not self.api_key_pool.has_available_keys(): |
| try: |
| from .services.auth_service import get_authenticated_token |
| cached_token = get_authenticated_token(force_reauth=False) |
| if cached_token: |
| self.api_key_pool.add_key(cached_token, source="cached_credentials") |
| logger.info("Added cached authentication credentials to API key pool") |
| except ImportError: |
| logger.debug("Authentication service not available") |
| except Exception as e: |
| logger.warning(f"Could not retrieve cached credentials: {e}") |
|
|
| |
| self.local_gpu_processor = None |
| if should_use_gpu_processor(): |
| try: |
| self.local_gpu_processor = GPUProcessor( |
| preserve_layout=preserve_layout, |
| include_images=include_images, |
| ocr_enabled=ocr_enabled |
| ) |
| logger.info("Local GPU processor available for fallback") |
| except Exception as e: |
| logger.warning(f"Could not initialize local GPU processor: {e}") |
|
|
| |
| self.processors = [] |
|
|
| if self.cloud_mode: |
| |
| cloud_processor = CloudProcessor( |
| api_key=self.api_key, |
| model_type=self.model, |
| preserve_layout=preserve_layout, |
| include_images=include_images, |
| api_key_pool=self.api_key_pool, |
| local_fallback_processor=self.local_gpu_processor |
| ) |
| self.processors.append(cloud_processor) |
|
|
| pool_stats = self.api_key_pool.get_pool_stats() |
| if pool_stats["available"] > 0: |
| logger.info(f"Cloud processing enabled with {pool_stats['available']} API key(s) in pool") |
| else: |
| logger.info("Cloud processing enabled without API keys - will use local fallback when needed") |
| else: |
| |
| logger.info("Local processing mode enabled") |
| self._setup_local_processors() |
|
|
| def authenticate(self, force_reauth: bool = False) -> bool: |
| """ |
| Perform browser-based authentication and update API key. |
| |
| Args: |
| force_reauth: Force re-authentication even if cached credentials exist |
| |
| Returns: |
| True if authentication successful, False otherwise |
| """ |
| try: |
| from .services.auth_service import get_authenticated_token |
|
|
| token = get_authenticated_token(force_reauth=force_reauth) |
| if token: |
| self.api_key = token |
|
|
| |
| self.api_key_pool.add_key(token, source="authenticated") |
| for processor in self.processors: |
| if hasattr(processor, 'api_key'): |
| processor.api_key = token |
| logger.info("Updated processor with new authentication token") |
|
|
| return True |
| else: |
| return False |
|
|
| except ImportError: |
| logger.error("Authentication service not available") |
| return False |
| except Exception as e: |
| logger.error(f"Authentication failed: {e}") |
| return False |
|
|
| def _setup_local_processors(self): |
| """Setup local processors based on GPU preferences.""" |
| local_processors = [ |
| PDFProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled), |
| DOCXProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| TXTProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| ExcelProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| HTMLProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| PPTXProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| ImageProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled), |
| URLProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images), |
| ] |
|
|
| |
| if self.gpu: |
| logger.info("GPU preference specified - adding GPU processor with Nanonets OCR") |
| gpu_processor = GPUProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled) |
| local_processors.append(gpu_processor) |
|
|
| self.processors.extend(local_processors) |
|
|
| def extract(self, file_path: str) -> ConversionResult: |
| """Convert a file to internal format. |
| |
| Args: |
| file_path: Path to the file to extract |
| |
| Returns: |
| ConversionResult containing the processed content |
| |
| Raises: |
| FileNotFoundError: If the file doesn't exist |
| UnsupportedFormatError: If the format is not supported |
| ConversionError: If conversion fails |
| """ |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"File not found: {file_path}") |
|
|
| |
| processor = self._get_processor(file_path) |
| if not processor: |
| raise UnsupportedFormatError(f"No processor found for file: {file_path}") |
|
|
| logger.info(f"Using processor {processor.__class__.__name__} for {file_path}") |
|
|
| |
| return processor.process(file_path) |
|
|
| def convert_with_output_type(self, file_path: str, output_type: str) -> ConversionResult: |
| """Convert a file with specific output type for cloud processing. |
| |
| Args: |
| file_path: Path to the file to extract |
| output_type: Desired output type (markdown, flat-json, html) |
| |
| Returns: |
| ConversionResult containing the processed content |
| |
| Raises: |
| FileNotFoundError: If the file doesn't exist |
| UnsupportedFormatError: If the format is not supported |
| ConversionError: If conversion fails |
| """ |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"File not found: {file_path}") |
|
|
| |
| if self.cloud_mode: |
| cloud_processor = CloudProcessor( |
| api_key=self.api_key, |
| output_type=output_type, |
| model_type=self.model, |
| preserve_layout=self.preserve_layout, |
| include_images=self.include_images, |
| api_key_pool=self.api_key_pool, |
| local_fallback_processor=self.local_gpu_processor |
| ) |
| if cloud_processor.can_process(file_path): |
| logger.info(f"Using cloud processor with output_type={output_type} for {file_path}") |
| return cloud_processor.process(file_path) |
|
|
| |
| return self.extract(file_path) |
|
|
| def extract_url(self, url: str) -> ConversionResult: |
| """Convert a URL to internal format. |
| |
| Args: |
| url: URL to extract |
| |
| Returns: |
| ConversionResult containing the processed content |
| |
| Raises: |
| ConversionError: If conversion fails |
| """ |
| |
| if self.cloud_mode: |
| raise ConversionError("URL conversion is not supported in cloud mode. Use local mode for URL processing.") |
|
|
| |
| url_processor = None |
| for processor in self.processors: |
| if isinstance(processor, URLProcessor): |
| url_processor = processor |
| break |
|
|
| if not url_processor: |
| raise ConversionError("URL processor not available") |
|
|
| logger.info(f"Converting URL: {url}") |
| return url_processor.process(url) |
|
|
| def extract_text(self, text: str) -> ConversionResult: |
| """Convert plain text to internal format. |
| |
| Args: |
| text: Plain text to extract |
| |
| Returns: |
| ConversionResult containing the processed content |
| """ |
| |
| if self.cloud_mode: |
| raise ConversionError("Text conversion is not supported in cloud mode. Use local mode for text processing.") |
|
|
| metadata = { |
| "content_type": "text", |
| "processor": "TextConverter", |
| "preserve_layout": self.preserve_layout |
| } |
|
|
| return ConversionResult(text, metadata) |
|
|
| def is_cloud_enabled(self) -> bool: |
| """Check if cloud processing is enabled and configured. |
| |
| Returns: |
| True if cloud processing is available |
| """ |
| return self.cloud_mode and (bool(self.api_key) or self.api_key_pool.has_available_keys()) |
|
|
| def get_processing_mode(self) -> str: |
| """Get the current processing mode. |
| |
| Returns: |
| String describing the current processing mode |
| """ |
| pool_stats = self.api_key_pool.get_pool_stats() |
| if self.cloud_mode and pool_stats["available"] > 0: |
| return f"cloud ({pool_stats['available']} key(s))" |
| elif self.cloud_mode and self.local_gpu_processor: |
| return "cloud (local fallback ready)" |
| elif self.gpu: |
| return "gpu_forced" |
| elif should_use_gpu_processor(): |
| return "gpu_auto" |
| else: |
| return "cloud" |
|
|
| def get_api_key_pool_stats(self) -> dict: |
| """Get API key pool statistics. |
| |
| Returns: |
| Dictionary with pool statistics |
| """ |
| return self.api_key_pool.get_pool_stats() |
|
|
| def _get_processor(self, file_path: str): |
| """Get the appropriate processor for the file. |
| |
| Args: |
| file_path: Path to the file |
| |
| Returns: |
| Processor that can handle the file, or None if none found |
| """ |
| |
| gpu_supported_formats = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif', '.pdf'] |
|
|
| |
| _, ext = os.path.splitext(file_path.lower()) |
|
|
| |
| gpu_available = should_use_gpu_processor() |
|
|
| |
| if ext in gpu_supported_formats and (self.gpu or (gpu_available and not self.gpu)): |
| for processor in self.processors: |
| if isinstance(processor, GPUProcessor): |
| if self.gpu: |
| logger.info(f"Using GPU processor with Nanonets OCR for {file_path} (GPU preference specified)") |
| else: |
| logger.info(f"Using GPU processor with Nanonets OCR for {file_path} (GPU available and format supported)") |
| return processor |
|
|
| |
| for processor in self.processors: |
| if processor.can_process(file_path): |
| |
| if isinstance(processor, GPUProcessor): |
| continue |
| logger.info(f"Using {processor.__class__.__name__} for {file_path}") |
| return processor |
| return None |
|
|
| def get_supported_formats(self) -> List[str]: |
| """Get list of supported file formats. |
| |
| Returns: |
| List of supported file extensions |
| """ |
| formats = [] |
| for processor in self.processors: |
| if hasattr(processor, 'can_process'): |
| |
| |
| if isinstance(processor, PDFProcessor): |
| formats.extend(['.pdf']) |
| elif isinstance(processor, DOCXProcessor): |
| formats.extend(['.docx', '.doc']) |
| elif isinstance(processor, TXTProcessor): |
| formats.extend(['.txt', '.text']) |
| elif isinstance(processor, ExcelProcessor): |
| formats.extend(['.xlsx', '.xls', '.csv']) |
| elif isinstance(processor, HTMLProcessor): |
| formats.extend(['.html', '.htm']) |
| elif isinstance(processor, PPTXProcessor): |
| formats.extend(['.ppt', '.pptx']) |
| elif isinstance(processor, ImageProcessor): |
| formats.extend(['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif']) |
| elif isinstance(processor, URLProcessor): |
| formats.append('URLs') |
| elif isinstance(processor, CloudProcessor): |
| |
| pass |
| elif isinstance(processor, GPUProcessor): |
| |
| formats.extend(['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif', '.pdf']) |
|
|
| return list(set(formats)) |
|
|