Spaces:
Runtime error
Runtime error
| """ | |
| Document processing service layer. | |
| """ | |
| import tempfile | |
| import logging | |
| import time | |
| import os | |
| import threading | |
| from pathlib import Path | |
| from typing import Optional, Tuple, Any, List | |
| from src.core.config import config | |
| from src.core.exceptions import ( | |
| DocumentProcessingError, | |
| FileSizeLimitError, | |
| UnsupportedFileTypeError, | |
| ConversionError | |
| ) | |
| from src.core.parser_factory import ParserFactory | |
| from src.core.latex_to_markdown_converter import convert_latex_to_markdown | |
| class DocumentService: | |
| """Service for handling document processing operations.""" | |
| def __init__(self): | |
| self._conversion_in_progress = False | |
| self._cancellation_flag: Optional[threading.Event] = None | |
| def set_cancellation_flag(self, flag: threading.Event) -> None: | |
| """Set the cancellation flag for this service.""" | |
| self._cancellation_flag = flag | |
| def is_conversion_in_progress(self) -> bool: | |
| """Check if conversion is currently in progress.""" | |
| return self._conversion_in_progress | |
| def _check_cancellation(self) -> bool: | |
| """Check if cancellation has been requested.""" | |
| if self._cancellation_flag and self._cancellation_flag.is_set(): | |
| logging.info("Cancellation detected in document service") | |
| return True | |
| return False | |
| def _safe_delete_file(self, file_path: Optional[str]) -> None: | |
| """Safely delete a file with error handling.""" | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.unlink(file_path) | |
| except Exception as e: | |
| logging.error(f"Error cleaning up temp file {file_path}: {e}") | |
| def _validate_file(self, file_path: str) -> None: | |
| """Validate file size and type.""" | |
| if not os.path.exists(file_path): | |
| raise DocumentProcessingError(f"File not found: {file_path}") | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size > config.app.max_file_size: | |
| raise FileSizeLimitError( | |
| f"File size ({file_size} bytes) exceeds maximum allowed size " | |
| f"({config.app.max_file_size} bytes)" | |
| ) | |
| # Check file extension | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext not in config.app.allowed_extensions: | |
| raise UnsupportedFileTypeError( | |
| f"File type '{file_ext}' is not supported. " | |
| f"Allowed types: {', '.join(config.app.allowed_extensions)}" | |
| ) | |
| def _create_temp_file(self, original_path: str) -> str: | |
| """Create a temporary file with English filename.""" | |
| original_ext = Path(original_path).suffix | |
| with tempfile.NamedTemporaryFile(suffix=original_ext, delete=False) as temp_file: | |
| temp_path = temp_file.name | |
| # Copy content in chunks with cancellation checks | |
| with open(original_path, 'rb') as original: | |
| chunk_size = 1024 * 1024 # 1MB chunks | |
| while True: | |
| if self._check_cancellation(): | |
| self._safe_delete_file(temp_path) | |
| raise ConversionError("Conversion cancelled during file copy") | |
| chunk = original.read(chunk_size) | |
| if not chunk: | |
| break | |
| temp_file.write(chunk) | |
| return temp_path | |
| def _process_latex_content(self, content: str, parser_name: str, ocr_method_name: str) -> str: | |
| """Process LaTeX content - for GOT-OCR, return raw LaTeX without conversion.""" | |
| # For GOT-OCR, skip LLM conversion and return raw LaTeX | |
| if parser_name == "GOT-OCR (jpg,png only)": | |
| logging.info("GOT-OCR detected: returning raw LaTeX output (no LLM conversion)") | |
| return content | |
| # For other parsers with LaTeX content, process as before | |
| if (content and | |
| ("\\begin" in content or "\\end" in content or "$" in content) and | |
| config.api.google_api_key): | |
| logging.info("Converting LaTeX output to Markdown using Gemini API") | |
| start_convert = time.time() | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled before LaTeX conversion") | |
| try: | |
| from src.core.latex_to_markdown_converter import convert_latex_to_markdown | |
| markdown_content = convert_latex_to_markdown(content) | |
| if markdown_content: | |
| logging.info(f"LaTeX conversion completed in {time.time() - start_convert:.2f} seconds") | |
| return markdown_content | |
| else: | |
| logging.warning("LaTeX to Markdown conversion failed, using raw LaTeX output") | |
| except Exception as e: | |
| logging.error(f"Error converting LaTeX to Markdown: {str(e)}") | |
| # Continue with original content on error | |
| return content | |
| def _create_output_file(self, content: str, output_format: str, original_file_path: Optional[str] = None, parser_name: Optional[str] = None) -> str: | |
| """Create output file with proper extension and preserved filename.""" | |
| # Determine file extension based on parser and format | |
| if parser_name == "GOT-OCR (jpg,png only)": | |
| # For GOT-OCR, use .tex extension | |
| ext = ".tex" | |
| else: | |
| # For other parsers, use format-based extensions | |
| format_extensions = { | |
| "markdown": ".md", | |
| "json": ".json", | |
| "text": ".txt", | |
| "document tags": ".doctags" | |
| } | |
| ext = format_extensions.get(output_format.lower(), ".txt") | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled before output file creation") | |
| # Create output filename based on original filename if provided | |
| if original_file_path: | |
| original_name = Path(original_file_path).stem # Get filename without extension | |
| # Clean the filename to be filesystem-safe while preserving spaces and common characters | |
| clean_name = "".join(c for c in original_name if c.isalnum() or c in (' ', '-', '_', '.', '(', ')')).strip() | |
| # Replace multiple spaces with single spaces | |
| clean_name = ' '.join(clean_name.split()) | |
| if not clean_name: # Fallback if cleaning removes everything | |
| clean_name = "converted_document" | |
| # Create output file in temp directory with proper name | |
| temp_dir = tempfile.gettempdir() | |
| output_filename = f"{clean_name}{ext}" | |
| tmp_path = os.path.join(temp_dir, output_filename) | |
| # Handle filename conflicts by adding a number suffix | |
| counter = 1 | |
| base_path = tmp_path | |
| while os.path.exists(tmp_path): | |
| name_part = f"{clean_name}_{counter}" | |
| tmp_path = os.path.join(temp_dir, f"{name_part}{ext}") | |
| counter += 1 | |
| else: | |
| # Fallback to random temporary file | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=ext, delete=False, encoding="utf-8") as tmp: | |
| tmp_path = tmp.name | |
| # Write content to file | |
| try: | |
| with open(tmp_path, "w", encoding="utf-8") as f: | |
| # Write in chunks with cancellation checks | |
| chunk_size = 10000 # characters | |
| for i in range(0, len(content), chunk_size): | |
| if self._check_cancellation(): | |
| self._safe_delete_file(tmp_path) | |
| raise ConversionError("Conversion cancelled during output file writing") | |
| f.write(content[i:i+chunk_size]) | |
| except Exception as e: | |
| self._safe_delete_file(tmp_path) | |
| raise ConversionError(f"Failed to write output file: {str(e)}") | |
| return tmp_path | |
| def convert_document( | |
| self, | |
| file_path: str, | |
| parser_name: str, | |
| ocr_method_name: str, | |
| output_format: str | |
| ) -> Tuple[str, Optional[str]]: | |
| """ | |
| Convert a document using the specified parser and OCR method. | |
| Args: | |
| file_path: Path to the input file | |
| parser_name: Name of the parser to use | |
| ocr_method_name: Name of the OCR method to use | |
| output_format: Output format (Markdown, JSON, Text, Document Tags) | |
| Returns: | |
| Tuple of (content, output_file_path) | |
| Raises: | |
| DocumentProcessingError: For general processing errors | |
| FileSizeLimitError: When file is too large | |
| UnsupportedFileTypeError: For unsupported file types | |
| ConversionError: When conversion fails or is cancelled | |
| """ | |
| if not file_path: | |
| raise DocumentProcessingError("No file provided") | |
| self._conversion_in_progress = True | |
| temp_input = None | |
| output_path = None | |
| try: | |
| # Validate input file | |
| self._validate_file(file_path) | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Create temporary file with English name | |
| temp_input = self._create_temp_file(file_path) | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Process document using parser factory | |
| start_time = time.time() | |
| content = ParserFactory.parse_document( | |
| file_path=temp_input, | |
| parser_name=parser_name, | |
| ocr_method_name=ocr_method_name, | |
| output_format=output_format.lower(), | |
| cancellation_flag=self._cancellation_flag | |
| ) | |
| if content == "Conversion cancelled.": | |
| raise ConversionError("Conversion cancelled by parser") | |
| duration = time.time() - start_time | |
| logging.info(f"Document processed in {duration:.2f} seconds") | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Process LaTeX content if needed | |
| content = self._process_latex_content(content, parser_name, ocr_method_name) | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Create output file | |
| output_path = self._create_output_file(content, output_format, file_path, parser_name) | |
| return content, output_path | |
| except (DocumentProcessingError, FileSizeLimitError, UnsupportedFileTypeError, ConversionError): | |
| # Re-raise our custom exceptions | |
| self._safe_delete_file(temp_input) | |
| self._safe_delete_file(output_path) | |
| raise | |
| except Exception as e: | |
| # Wrap unexpected exceptions | |
| self._safe_delete_file(temp_input) | |
| self._safe_delete_file(output_path) | |
| raise DocumentProcessingError(f"Unexpected error during conversion: {str(e)}") | |
| finally: | |
| # Clean up temp input file | |
| self._safe_delete_file(temp_input) | |
| # Clean up output file if cancelled | |
| if self._check_cancellation() and output_path: | |
| self._safe_delete_file(output_path) | |
| self._conversion_in_progress = False | |
| def convert_documents( | |
| self, | |
| file_paths: List[str], | |
| parser_name: str, | |
| ocr_method_name: str, | |
| output_format: str, | |
| processing_type: str = "combined" | |
| ) -> Tuple[str, Optional[str]]: | |
| """ | |
| Unified method to convert single or multiple documents. | |
| Args: | |
| file_paths: List of paths to input files (can be single file) | |
| parser_name: Name of the parser to use | |
| ocr_method_name: Name of the OCR method to use | |
| output_format: Output format (Markdown, JSON, Text, Document Tags) | |
| processing_type: Type of multi-document processing (combined, individual, summary, comparison) | |
| Returns: | |
| Tuple of (content, output_file_path) | |
| Raises: | |
| DocumentProcessingError: For general processing errors | |
| FileSizeLimitError: When file(s) are too large | |
| UnsupportedFileTypeError: For unsupported file types | |
| ConversionError: When conversion fails or is cancelled | |
| """ | |
| if not file_paths: | |
| raise DocumentProcessingError("No files provided") | |
| # Route to appropriate processing method | |
| if len(file_paths) == 1: | |
| # Single file processing - use existing method | |
| return self.convert_document( | |
| file_paths[0], parser_name, ocr_method_name, output_format | |
| ) | |
| else: | |
| # Multi-file processing - use new batch method | |
| return self._convert_multiple_documents( | |
| file_paths, parser_name, ocr_method_name, output_format, processing_type | |
| ) | |
| def _convert_multiple_documents( | |
| self, | |
| file_paths: List[str], | |
| parser_name: str, | |
| ocr_method_name: str, | |
| output_format: str, | |
| processing_type: str | |
| ) -> Tuple[str, Optional[str]]: | |
| """ | |
| Convert multiple documents using batch processing. | |
| Args: | |
| file_paths: List of paths to input files | |
| parser_name: Name of the parser to use | |
| ocr_method_name: Name of the OCR method to use | |
| output_format: Output format (Markdown, JSON, Text, Document Tags) | |
| processing_type: Type of multi-document processing | |
| Returns: | |
| Tuple of (content, output_file_path) | |
| """ | |
| self._conversion_in_progress = True | |
| temp_inputs = [] | |
| output_path = None | |
| self._original_file_paths = file_paths # Store original paths for filename reference | |
| try: | |
| # Validate all files first | |
| for file_path in file_paths: | |
| self._validate_file(file_path) | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Create temporary files with English names | |
| for original_path in file_paths: | |
| temp_path = self._create_temp_file(original_path) | |
| temp_inputs.append(temp_path) | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled during file preparation") | |
| # Process documents using parser factory with multi-document support | |
| start_time = time.time() | |
| content = self._process_multiple_with_parser( | |
| temp_inputs, parser_name, ocr_method_name, output_format, processing_type | |
| ) | |
| if content == "Conversion cancelled.": | |
| raise ConversionError("Conversion cancelled by parser") | |
| duration = time.time() - start_time | |
| logging.info(f"Multiple documents processed in {duration:.2f} seconds") | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled") | |
| # Create output file with batch naming | |
| output_path = self._create_batch_output_file( | |
| content, output_format, file_paths, processing_type | |
| ) | |
| return content, output_path | |
| except (DocumentProcessingError, FileSizeLimitError, UnsupportedFileTypeError, ConversionError): | |
| # Re-raise our custom exceptions | |
| for temp_path in temp_inputs: | |
| self._safe_delete_file(temp_path) | |
| self._safe_delete_file(output_path) | |
| raise | |
| except Exception as e: | |
| # Wrap unexpected exceptions | |
| for temp_path in temp_inputs: | |
| self._safe_delete_file(temp_path) | |
| self._safe_delete_file(output_path) | |
| raise DocumentProcessingError(f"Unexpected error during batch conversion: {str(e)}") | |
| finally: | |
| # Clean up temp input files | |
| for temp_path in temp_inputs: | |
| self._safe_delete_file(temp_path) | |
| # Clean up output file if cancelled | |
| if self._check_cancellation() and output_path: | |
| self._safe_delete_file(output_path) | |
| self._conversion_in_progress = False | |
| def _process_multiple_with_parser( | |
| self, | |
| temp_file_paths: List[str], | |
| parser_name: str, | |
| ocr_method_name: str, | |
| output_format: str, | |
| processing_type: str | |
| ) -> str: | |
| """Process multiple documents using the parser factory.""" | |
| try: | |
| # Get parser instance | |
| from src.parsers.parser_registry import ParserRegistry | |
| parser_class = ParserRegistry.get_parser_class(parser_name) | |
| if not parser_class: | |
| raise DocumentProcessingError(f"Parser '{parser_name}' not found") | |
| parser_instance = parser_class() | |
| parser_instance.set_cancellation_flag(self._cancellation_flag) | |
| # Check if parser supports multi-document processing | |
| if hasattr(parser_instance, 'parse_multiple'): | |
| # Use multi-document parsing with original filenames for reference | |
| return parser_instance.parse_multiple( | |
| file_paths=temp_file_paths, | |
| processing_type=processing_type, | |
| ocr_method=ocr_method_name, | |
| output_format=output_format.lower(), | |
| original_filenames=[Path(fp).name for fp in self._original_file_paths] | |
| ) | |
| else: | |
| # Fallback: process individually and combine | |
| results = [] | |
| for i, file_path in enumerate(temp_file_paths): | |
| if self._check_cancellation(): | |
| return "Conversion cancelled." | |
| result = parser_instance.parse( | |
| file_path=file_path, | |
| ocr_method=ocr_method_name | |
| ) | |
| # Add section header for individual results using original filename | |
| original_filename = Path(self._original_file_paths[i]).name | |
| results.append(f"# Document {i+1}: {original_filename}\n\n{result}") | |
| return "\n\n---\n\n".join(results) | |
| except Exception as e: | |
| raise DocumentProcessingError(f"Error processing multiple documents: {str(e)}") | |
| def _create_batch_output_file( | |
| self, | |
| content: str, | |
| output_format: str, | |
| original_file_paths: List[str], | |
| processing_type: str | |
| ) -> str: | |
| """Create output file for batch processing with descriptive naming.""" | |
| # Determine file extension | |
| format_extensions = { | |
| "markdown": ".md", | |
| "json": ".json", | |
| "text": ".txt", | |
| "document tags": ".doctags" | |
| } | |
| ext = format_extensions.get(output_format.lower(), ".txt") | |
| if self._check_cancellation(): | |
| raise ConversionError("Conversion cancelled before output file creation") | |
| # Create descriptive filename for batch processing | |
| file_count = len(original_file_paths) | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| if processing_type == "combined": | |
| filename = f"Combined_{file_count}_Documents_{timestamp}{ext}" | |
| elif processing_type == "individual": | |
| filename = f"Individual_Sections_{file_count}_Files_{timestamp}{ext}" | |
| elif processing_type == "summary": | |
| filename = f"Summary_Analysis_{file_count}_Files_{timestamp}{ext}" | |
| elif processing_type == "comparison": | |
| filename = f"Comparison_Analysis_{file_count}_Files_{timestamp}{ext}" | |
| else: | |
| filename = f"Batch_Processing_{file_count}_Files_{timestamp}{ext}" | |
| # Create output file in temp directory | |
| temp_dir = tempfile.gettempdir() | |
| tmp_path = os.path.join(temp_dir, filename) | |
| # Handle filename conflicts | |
| counter = 1 | |
| base_path = tmp_path | |
| while os.path.exists(tmp_path): | |
| name_part = filename.replace(ext, f"_{counter}{ext}") | |
| tmp_path = os.path.join(temp_dir, name_part) | |
| counter += 1 | |
| # Write content to file | |
| try: | |
| with open(tmp_path, "w", encoding="utf-8") as f: | |
| # Write in chunks with cancellation checks | |
| chunk_size = 10000 # characters | |
| for i in range(0, len(content), chunk_size): | |
| if self._check_cancellation(): | |
| self._safe_delete_file(tmp_path) | |
| raise ConversionError("Conversion cancelled during output file writing") | |
| f.write(content[i:i+chunk_size]) | |
| except Exception as e: | |
| self._safe_delete_file(tmp_path) | |
| raise ConversionError(f"Failed to write batch output file: {str(e)}") | |
| return tmp_path |