Spaces:
Runtime error
Runtime error
| # Import spaces module for ZeroGPU support - Must be first import | |
| try: | |
| import spaces | |
| HAS_SPACES = True | |
| except ImportError: | |
| HAS_SPACES = False | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Union, Set | |
| import tempfile | |
| # Force CPU-only mode for EasyOCR and other CUDA libraries | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '' | |
| os.environ['USE_TORCH'] = '1' | |
| os.environ['EASYOCR_GPU'] = 'False' | |
| # Import the parser interface and registry | |
| from src.parsers.parser_interface import DocumentParser | |
| from src.parsers.parser_registry import ParserRegistry | |
| from src.core.exceptions import DocumentProcessingError, ParserError | |
| from src.core.config import config | |
| # Check for Docling availability | |
| try: | |
| from docling.document_converter import DocumentConverter | |
| from docling.datamodel.base_models import InputFormat | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions, EasyOcrOptions, TesseractOcrOptions | |
| from docling.document_converter import PdfFormatOption | |
| from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions | |
| HAS_DOCLING = True | |
| except ImportError: | |
| HAS_DOCLING = False | |
| logging.warning("Docling package not installed. Please install with 'pip install docling'") | |
| # Gemini availability | |
| try: | |
| from google import genai | |
| HAS_GEMINI = True | |
| except ImportError: | |
| HAS_GEMINI = False | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| class DoclingParser(DocumentParser): | |
| """ | |
| Parser implementation using Docling for converting documents to Markdown. | |
| Supports advanced PDF understanding, OCR, and multiple document formats. | |
| """ | |
| def __init__(self): | |
| super().__init__() # Initialize the base class (including _cancellation_flag) | |
| self.converter = None | |
| self.gpu_converter = None | |
| # Don't initialize converters here to avoid CUDA issues | |
| # They will be created on-demand in the parse methods | |
| logger.info("Docling parser initialized (converters will be created on-demand)") | |
| def _create_converter_with_options(self, ocr_method: str, **kwargs) -> DocumentConverter: | |
| """Create a DocumentConverter with specific OCR options.""" | |
| pipeline_options = PdfPipelineOptions() | |
| # Enable OCR by default | |
| pipeline_options.do_ocr = True | |
| # Configure OCR method | |
| if ocr_method == "docling_tesseract": | |
| pipeline_options.ocr_options = TesseractOcrOptions() | |
| elif ocr_method == "docling_easyocr": | |
| pipeline_options.ocr_options = EasyOcrOptions() | |
| else: # Default to EasyOCR | |
| pipeline_options.ocr_options = EasyOcrOptions() | |
| # Configure advanced features | |
| pipeline_options.do_table_structure = kwargs.get('enable_tables', True) | |
| pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False) | |
| pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False) | |
| pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False) | |
| pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False) | |
| # Create converter with options | |
| converter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) | |
| } | |
| ) | |
| return converter | |
| def parse(self, file_path: Union[str, Path], ocr_method: Optional[str] = None, **kwargs) -> str: | |
| """ | |
| Parse a document and return its content as Markdown. | |
| Args: | |
| file_path: Path to the document | |
| ocr_method: OCR method to use ('docling_default', 'docling_tesseract', 'docling_easyocr') | |
| **kwargs: Additional options for Docling processing | |
| Returns: | |
| str: Markdown representation of the document | |
| """ | |
| # Validate file first | |
| self.validate_file(file_path) | |
| # Check if Docling is available | |
| if not HAS_DOCLING: | |
| raise ParserError("Docling is not available. Please install with 'pip install docling'") | |
| # Check for cancellation before starting | |
| if self._check_cancellation(): | |
| raise DocumentProcessingError("Conversion cancelled") | |
| try: | |
| # Try ZeroGPU first, fallback to CPU | |
| if HAS_SPACES: | |
| try: | |
| logger.info("Attempting Docling processing with ZeroGPU") | |
| # Filter kwargs to avoid pickle issues | |
| safe_kwargs = {} | |
| for key, value in kwargs.items(): | |
| if not key.startswith('_') and not hasattr(value, '__call__'): | |
| try: | |
| import pickle | |
| pickle.dumps(value) | |
| safe_kwargs[key] = value | |
| except (TypeError, pickle.PickleError): | |
| logger.debug(f"Skipping unpicklable kwarg: {key}") | |
| result = self._process_with_gpu(str(file_path), ocr_method, **safe_kwargs) | |
| return result | |
| except Exception as e: | |
| if "pickle" in str(e).lower(): | |
| logger.warning(f"ZeroGPU pickle error: {str(e)}") | |
| elif "cuda" in str(e).lower(): | |
| logger.warning(f"ZeroGPU CUDA error: {str(e)}") | |
| else: | |
| logger.warning(f"ZeroGPU processing failed: {str(e)}") | |
| logger.info("Falling back to CPU processing") | |
| # Fallback to CPU processing | |
| result = self._process_with_cpu(str(file_path), ocr_method, **kwargs) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error converting file with Docling: {str(e)}") | |
| raise DocumentProcessingError(f"Docling conversion failed: {str(e)}") | |
| def _process_with_cpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str: | |
| """Process document with CPU-only Docling converter.""" | |
| logger.info("Processing with CPU-only Docling converter") | |
| # Create CPU converter if not exists | |
| if self.converter is None: | |
| self.converter = self._create_cpu_converter(ocr_method, **kwargs) | |
| # Convert the document | |
| result = self.converter.convert(file_path) | |
| # Check for cancellation after processing | |
| if self._check_cancellation(): | |
| raise DocumentProcessingError("Conversion cancelled") | |
| # Export to markdown | |
| return result.document.export_to_markdown() | |
| def _create_cpu_converter(self, ocr_method: Optional[str] = None, **kwargs) -> DocumentConverter: | |
| """Create a CPU-only DocumentConverter with proper OCR fallback.""" | |
| # Configure CPU-only accelerator | |
| accelerator_options = AcceleratorOptions( | |
| num_threads=4, | |
| device=AcceleratorDevice.CPU | |
| ) | |
| # Create pipeline options with CPU-only accelerator | |
| pipeline_options = PdfPipelineOptions() | |
| pipeline_options.accelerator_options = accelerator_options | |
| pipeline_options.do_ocr = True | |
| pipeline_options.do_table_structure = True | |
| pipeline_options.table_structure_options.do_cell_matching = True | |
| # Configure OCR method - use EasyOCR with CPU enforcement | |
| pipeline_options.ocr_options = EasyOcrOptions() | |
| logger.info("Using EasyOCR (CPU-only)") | |
| # Configure advanced features | |
| pipeline_options.do_table_structure = kwargs.get('enable_tables', True) | |
| pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False) | |
| pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False) | |
| pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False) | |
| pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False) | |
| # Create converter with CPU-only configuration | |
| return DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption( | |
| pipeline_options=pipeline_options, | |
| ) | |
| } | |
| ) | |
| # Define the GPU-decorated function for ZeroGPU | |
| if HAS_SPACES: | |
| # Allocate GPU for up to 2 minutes | |
| def _process_with_gpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str: | |
| """Process document with GPU-accelerated Docling converter. | |
| IMPORTANT: All model loading and CUDA operations must happen inside this method. | |
| """ | |
| logger.info("Processing with ZeroGPU allocation") | |
| # Configure GPU accelerator | |
| accelerator_options = AcceleratorOptions( | |
| num_threads=4, | |
| device=AcceleratorDevice.CUDA | |
| ) | |
| # Create pipeline options with GPU accelerator | |
| pipeline_options = PdfPipelineOptions() | |
| pipeline_options.accelerator_options = accelerator_options | |
| pipeline_options.do_ocr = True | |
| pipeline_options.do_table_structure = True | |
| pipeline_options.table_structure_options.do_cell_matching = True | |
| # Configure OCR method - use EasyOCR | |
| pipeline_options.ocr_options = EasyOcrOptions() | |
| # Configure advanced features | |
| pipeline_options.do_table_structure = kwargs.get('enable_tables', True) | |
| pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False) | |
| pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False) | |
| pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False) | |
| pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False) | |
| # Create converter with GPU configuration inside the decorated function | |
| converter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption( | |
| pipeline_options=pipeline_options, | |
| ) | |
| } | |
| ) | |
| # Convert the document | |
| result = converter.convert(file_path) | |
| # Export to markdown | |
| markdown_content = result.document.export_to_markdown() | |
| # Clean up to free memory | |
| del converter | |
| import gc | |
| gc.collect() | |
| return markdown_content | |
| else: | |
| # Define a dummy method if spaces is not available | |
| def _process_with_gpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str: | |
| # This should never be called if HAS_SPACES is False | |
| return self._process_with_cpu(file_path, ocr_method, **kwargs) | |
| def get_name(cls) -> str: | |
| return "Docling" | |
| def get_supported_file_types(cls) -> Set[str]: | |
| """Return a set of supported file extensions.""" | |
| return { | |
| # PDF files | |
| ".pdf", | |
| # Image files | |
| ".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".webp", | |
| # Office documents | |
| ".docx", ".xlsx", ".pptx", | |
| # Web and markup | |
| ".html", ".xhtml", ".md", | |
| # Other formats | |
| ".csv" | |
| } | |
| def is_available(cls) -> bool: | |
| """Check if this parser is available.""" | |
| return HAS_DOCLING | |
| def get_supported_ocr_methods(cls) -> List[Dict[str, Any]]: | |
| """Return list of supported OCR methods.""" | |
| return [ | |
| { | |
| "id": "docling_default", | |
| "name": "EasyOCR", | |
| "default_params": { | |
| "enable_tables": True, | |
| "enable_code_enrichment": False, | |
| "enable_formula_enrichment": False, | |
| "enable_picture_classification": False, | |
| "generate_picture_images": False | |
| } | |
| } | |
| ] | |
| def get_description(cls) -> str: | |
| return "Docling parser with advanced PDF understanding, table structure recognition, and multiple OCR engines" | |
| def _validate_batch_files(self, file_paths: List[Path]) -> None: | |
| """Validate batch of files (size, count, type) for multi-document processing.""" | |
| if len(file_paths) == 0: | |
| raise DocumentProcessingError("No files provided for processing") | |
| if len(file_paths) > 5: | |
| raise DocumentProcessingError("Maximum 5 files allowed for batch processing") | |
| total_size = 0 | |
| for fp in file_paths: | |
| if not fp.exists(): | |
| raise DocumentProcessingError(f"File not found: {fp}") | |
| size = fp.stat().st_size | |
| if size > 10 * 1024 * 1024: # 10 MB | |
| raise DocumentProcessingError(f"Individual file size exceeds 10MB: {fp.name}") | |
| total_size += size | |
| if total_size > 20 * 1024 * 1024: | |
| raise DocumentProcessingError(f"Combined file size ({total_size / (1024*1024):.1f}MB) exceeds 20MB limit") | |
| def _create_batch_prompt(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str: | |
| """Create a natural-language prompt for Gemini post-processing.""" | |
| names = original_filenames if original_filenames else [p.name for p in file_paths] | |
| file_list = "\n".join(f"- {n}" for n in names) | |
| base = f"I will provide you with {len(file_paths)} documents:\n{file_list}\n\n" | |
| if processing_type == "combined": | |
| return base + "Merge the content into a single coherent markdown document, preserving structure." | |
| if processing_type == "individual": | |
| return base + "Convert each document to markdown under its own heading." | |
| if processing_type == "summary": | |
| return base + "Create an EXECUTIVE SUMMARY followed by detailed markdown conversions per document." | |
| if processing_type == "comparison": | |
| return base + "Provide a comparison table of the documents, individual summaries, and cross-document insights." | |
| # default fallback | |
| return base | |
| def _format_batch_output(self, response_text: str, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str: | |
| names = original_filenames if original_filenames else [p.name for p in file_paths] | |
| header = ( | |
| f"<!-- Multi-Document Processing Results -->\n" | |
| f"<!-- Processing Type: {processing_type} -->\n" | |
| f"<!-- Files Processed: {len(file_paths)} -->\n" | |
| f"<!-- File Names: {', '.join(names)} -->\n\n" | |
| ) | |
| # Ensure response_text is a string to avoid TypeError when it is None | |
| safe_resp = "" if response_text is None else str(response_text) | |
| return header + safe_resp | |
| def _convert_batch_with_docling(self, paths: List[Path], ocr_method: Optional[str], **kwargs) -> List[str]: | |
| """Run Docling conversion on a list of Paths and return markdown list.""" | |
| if self._check_cancellation(): | |
| raise DocumentProcessingError("Conversion cancelled") | |
| # Create CPU converter for batch processing (GPU not supported for batch yet) | |
| converter = self._create_cpu_converter(ocr_method, **kwargs) | |
| # Convert all docs | |
| from docling.datamodel.base_models import ConversionStatus | |
| markdown_results: List[str] = [] | |
| conv_results = converter.convert_all([str(p) for p in paths], raises_on_error=False) | |
| for idx, conv_res in enumerate(conv_results): | |
| if self._check_cancellation(): | |
| raise DocumentProcessingError("Conversion cancelled") | |
| if conv_res.status in (ConversionStatus.SUCCESS, ConversionStatus.PARTIAL_SUCCESS): | |
| markdown_results.append(conv_res.document.export_to_markdown()) | |
| else: | |
| raise DocumentProcessingError(f"Docling failed to convert {paths[idx].name}") | |
| return markdown_results | |
| def parse_multiple( | |
| self, | |
| file_paths: List[Union[str, Path]], | |
| processing_type: str = "combined", | |
| original_filenames: Optional[List[str]] = None, | |
| ocr_method: Optional[str] = None, | |
| output_format: str = "markdown", | |
| **kwargs, | |
| ) -> str: | |
| """Multi-document processing using Docling + optional Gemini summarisation/comparison.""" | |
| if not HAS_DOCLING: | |
| raise ParserError("Docling package not installed") | |
| paths = [Path(p) for p in file_paths] | |
| self._validate_batch_files(paths) | |
| # Run Docling conversion | |
| markdown_list = self._convert_batch_with_docling(paths, ocr_method, **kwargs) | |
| # LOCAL composition for combined/individual | |
| if processing_type in ("combined", "individual"): | |
| if processing_type == "individual": | |
| names = original_filenames if original_filenames else [p.name for p in paths] | |
| sections = [f"# Document {i+1}: {n}\n\n{md}" for i, (n, md) in enumerate(zip(names, markdown_list), 1)] | |
| combined = "\n\n---\n\n".join(sections) | |
| else: | |
| combined = "\n\n---\n\n".join(markdown_list) | |
| return self._format_batch_output(combined, paths, processing_type, original_filenames) | |
| # SUMMARY / COMPARISON → Gemini 2.5 Flash | |
| if not HAS_GEMINI or not config.api.google_api_key: | |
| raise DocumentProcessingError("Gemini API not available for summary/comparison post-processing") | |
| prompt = self._create_batch_prompt(paths, processing_type, original_filenames) | |
| combined_md = "\n\n---\n\n".join(markdown_list) | |
| try: | |
| client = genai.Client(api_key=config.api.google_api_key) | |
| response = client.models.generate_content( | |
| model=config.model.gemini_model, | |
| contents=[prompt + "\n\n" + combined_md], | |
| config={ | |
| "temperature": config.model.temperature, | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": config.model.max_tokens, | |
| }, | |
| ) | |
| # Debug logging for response structure | |
| logger.debug(f"Gemini response type: {type(response)}") | |
| logger.debug(f"Gemini response attributes: {dir(response)}") | |
| # Try different ways to extract text from response | |
| final_text = None | |
| if hasattr(response, "text") and response.text: | |
| final_text = response.text | |
| elif hasattr(response, "candidates") and response.candidates: | |
| # Try to get text from first candidate | |
| candidate = response.candidates[0] | |
| if hasattr(candidate, "content") and candidate.content: | |
| if hasattr(candidate.content, "parts") and candidate.content.parts: | |
| final_text = candidate.content.parts[0].text | |
| elif hasattr(candidate.content, "text"): | |
| final_text = candidate.content.text | |
| elif hasattr(candidate, "text"): | |
| final_text = candidate.text | |
| elif hasattr(response, "content") and response.content: | |
| final_text = str(response.content) | |
| if not final_text: | |
| logger.error(f"No text found in Gemini response. Response: {response}") | |
| raise DocumentProcessingError("Gemini post-processing returned no text") | |
| except Exception as e: | |
| logger.error(f"Gemini API error: {str(e)}") | |
| raise DocumentProcessingError(f"Gemini post-processing failed: {str(e)}") | |
| return self._format_batch_output(final_text, paths, processing_type, original_filenames) | |
| # Register the parser with the registry if available | |
| if HAS_DOCLING: | |
| ParserRegistry.register(DoclingParser) | |
| logger.info("Docling parser registered successfully") | |
| else: | |
| logger.warning("Could not register Docling parser: Package not installed") |