Spaces:
Runtime error
Runtime error
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Union | |
| import os | |
| import json | |
| import tempfile | |
| import base64 | |
| from PIL import Image | |
| import io | |
| from src.parsers.parser_interface import DocumentParser | |
| from src.parsers.parser_registry import ParserRegistry | |
| from src.core.config import config | |
| # Import the Google Gemini API client | |
| try: | |
| from google import genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| # Load API key from environment variable | |
| api_key = os.getenv("GOOGLE_API_KEY") | |
| # Check if API key is available and print a message if not | |
| if not api_key: | |
| print("Warning: GOOGLE_API_KEY environment variable not found. Gemini Flash parser may not work.") | |
| class GeminiFlashParser(DocumentParser): | |
| """Parser that uses Google's Gemini Flash 2.0 to convert documents to markdown.""" | |
| def get_name(cls) -> str: | |
| return "Gemini Flash" | |
| def get_supported_ocr_methods(cls) -> List[Dict[str, Any]]: | |
| return [ | |
| { | |
| "id": "none", | |
| "name": "None", | |
| "default_params": {} | |
| } | |
| ] | |
| def get_description(cls) -> str: | |
| return "Gemini Flash 2.0 parser for converting documents and images to markdown" | |
| def parse(self, file_path: Union[str, Path], ocr_method: Optional[str] = None, **kwargs) -> str: | |
| """Parse a document using Gemini Flash 2.0.""" | |
| if not GEMINI_AVAILABLE: | |
| raise ImportError( | |
| "The Google Gemini API client is not installed. " | |
| "Please install it with 'pip install google-genai'." | |
| ) | |
| # Use the globally loaded API key | |
| if not api_key: | |
| raise ValueError( | |
| "GOOGLE_API_KEY environment variable is not set. " | |
| "Please set it to your Gemini API key." | |
| ) | |
| try: | |
| # Determine file type based on extension | |
| file_path = Path(file_path) | |
| file_extension = file_path.suffix.lower() | |
| # Read the file content | |
| file_content = file_path.read_bytes() | |
| # Determine MIME type based on file extension | |
| mime_type = self._get_mime_type(file_extension) | |
| # Create a client and use the model | |
| client = genai.Client(api_key=api_key) | |
| # Set up the prompt | |
| prompt = """ | |
| Convert this document to markdown format. | |
| Preserve the structure, headings, lists, tables, and formatting as much as possible. | |
| For images, include a brief description in markdown image syntax. | |
| Return only the markdown content, no other text. | |
| """ | |
| # Generate the response | |
| response = client.models.generate_content( | |
| model=config.model.gemini_model, | |
| contents=[ | |
| prompt, | |
| genai.types.Part.from_bytes( | |
| data=file_content, | |
| mime_type=mime_type | |
| ) | |
| ], | |
| config={ | |
| "temperature": config.model.temperature, | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": config.model.max_tokens, | |
| } | |
| ) | |
| # Extract the markdown text from the response | |
| markdown_text = response.text | |
| return markdown_text | |
| except Exception as e: | |
| error_message = f"Error parsing document with Gemini Flash: {str(e)}" | |
| print(error_message) | |
| return f"# Error\n\n{error_message}\n\nPlease check your API key and try again." | |
| def parse_multiple(self, file_paths: List[Union[str, Path]], processing_type: str = "combined", original_filenames: Optional[List[str]] = None, **kwargs) -> str: | |
| """Parse multiple documents using Gemini Flash 2.0.""" | |
| if not GEMINI_AVAILABLE: | |
| raise ImportError( | |
| "The Google Gemini API client is not installed. " | |
| "Please install it with 'pip install google-genai'." | |
| ) | |
| if not api_key: | |
| raise ValueError( | |
| "GOOGLE_API_KEY environment variable is not set. " | |
| "Please set it to your Gemini API key." | |
| ) | |
| try: | |
| # Convert to Path objects and validate | |
| path_objects = [Path(fp) for fp in file_paths] | |
| self._validate_batch_files(path_objects) | |
| # Check for cancellation | |
| if self._check_cancellation(): | |
| return "Conversion cancelled." | |
| # Create client | |
| client = genai.Client(api_key=api_key) | |
| # Create contents for API call | |
| contents = self._create_batch_contents(path_objects, processing_type, original_filenames) | |
| # Check for cancellation before API call | |
| if self._check_cancellation(): | |
| return "Conversion cancelled." | |
| # Generate the response | |
| response = client.models.generate_content( | |
| model=config.model.gemini_model, | |
| contents=contents, | |
| config={ | |
| "temperature": config.model.temperature, | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": config.model.max_tokens, | |
| } | |
| ) | |
| # Format the output based on processing type | |
| formatted_output = self._format_batch_output(response.text, path_objects, processing_type, original_filenames) | |
| return formatted_output | |
| except Exception as e: | |
| error_message = f"Error parsing multiple documents with Gemini Flash: {str(e)}" | |
| print(error_message) | |
| return f"# Error\n\n{error_message}\n\nPlease check your API key and try again." | |
| def _validate_batch_files(self, file_paths: List[Path]) -> None: | |
| """Validate batch of files for multi-document processing.""" | |
| # Check file count limit | |
| if len(file_paths) == 0: | |
| raise ValueError("No files provided for processing") | |
| if len(file_paths) > 5: | |
| raise ValueError("Maximum 5 files allowed for batch processing") | |
| # Check individual files and calculate total size | |
| total_size = 0 | |
| for file_path in file_paths: | |
| if not file_path.exists(): | |
| raise ValueError(f"File not found: {file_path}") | |
| file_size = file_path.stat().st_size | |
| total_size += file_size | |
| # Check individual file size (reasonable limit per file) | |
| if file_size > 10 * 1024 * 1024: # 10MB per file | |
| raise ValueError(f"Individual file size exceeds 10MB: {file_path.name}") | |
| # Check combined size limit | |
| if total_size > 20 * 1024 * 1024: # 20MB total | |
| raise ValueError(f"Combined file size ({total_size / (1024*1024):.1f}MB) exceeds 20MB limit") | |
| # Validate file types | |
| for file_path in file_paths: | |
| file_extension = file_path.suffix.lower() | |
| mime_type = self._get_mime_type(file_extension) | |
| if mime_type == "application/octet-stream": | |
| raise ValueError(f"Unsupported file type: {file_path.name}. Gemini supports: PDF, TXT, HTML, CSS, MD, CSV, XML, RTF, JS, PY, and image files.") | |
| # Check if it's a supported MIME type for Gemini | |
| if mime_type in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| "application/msword", | |
| "application/vnd.openxmlformats-officedocument.presentationml.presentation", | |
| "application/vnd.ms-powerpoint", | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| "application/vnd.ms-excel"]: | |
| raise ValueError(f"File type not supported by Gemini: {file_path.name}. Gemini supports: PDF, TXT, HTML, CSS, MD, CSV, XML, RTF, JS, PY, and image files.") | |
| def _create_batch_contents(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> List[Any]: | |
| """Create contents list for batch API call.""" | |
| # Create the prompt based on processing type | |
| prompt = self._create_batch_prompt(file_paths, processing_type, original_filenames) | |
| # Start with the prompt | |
| contents = [prompt] | |
| # Add each file as a content part | |
| for file_path in file_paths: | |
| file_content = file_path.read_bytes() | |
| mime_type = self._get_mime_type(file_path.suffix.lower()) | |
| contents.append( | |
| genai.types.Part.from_bytes( | |
| data=file_content, | |
| mime_type=mime_type | |
| ) | |
| ) | |
| return contents | |
| def _create_batch_prompt(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str: | |
| """Create appropriate prompt for batch processing.""" | |
| # Use original filenames if provided, otherwise use temp file names | |
| if original_filenames: | |
| file_names = original_filenames | |
| else: | |
| file_names = [fp.name for fp in file_paths] | |
| file_list = "\n".join([f"- {name}" for name in file_names]) | |
| base_prompt = f"""I will provide you with {len(file_paths)} documents to process: | |
| {file_list} | |
| """ | |
| if processing_type == "combined": | |
| return base_prompt + """Please convert all documents to a single, cohesive markdown document. | |
| Merge the content logically, remove duplicate information, and create a unified structure with clear headings. | |
| Preserve important formatting, tables, lists, and structure from all documents. | |
| For images, include brief descriptions in markdown image syntax. | |
| Return only the combined markdown content, no other text.""" | |
| elif processing_type == "individual": | |
| return base_prompt + """Please convert each document to markdown format and present them as separate sections. | |
| For each document, create a clear section header with the document name. | |
| Preserve the structure, headings, lists, tables, and formatting within each section. | |
| For images, include brief descriptions in markdown image syntax. | |
| Return the content in this format: | |
| # Document 1: [filename] | |
| [converted content] | |
| # Document 2: [filename] | |
| [converted content] | |
| Return only the markdown content, no other text.""" | |
| elif processing_type == "summary": | |
| return base_prompt + """Please create a comprehensive analysis with two parts: | |
| 1. EXECUTIVE SUMMARY: A concise overview summarizing the key points from all documents | |
| 2. DETAILED SECTIONS: Individual converted sections for each document | |
| Structure the output as: | |
| # Executive Summary | |
| [Brief summary of key findings and themes across all documents] | |
| # Detailed Analysis | |
| ## Document 1: [filename] | |
| [converted content] | |
| ## Document 2: [filename] | |
| [converted content] | |
| Preserve formatting, tables, lists, and structure throughout. | |
| For images, include brief descriptions in markdown image syntax. | |
| Return only the markdown content, no other text.""" | |
| elif processing_type == "comparison": | |
| return base_prompt + """Please create a comparative analysis of these documents: | |
| 1. Create a comparison table highlighting key differences and similarities | |
| 2. Provide individual document summaries | |
| 3. Include a section on cross-document insights | |
| Structure the output as: | |
| # Document Comparison Analysis | |
| ## Comparison Table | |
| | Aspect | Document 1 | Document 2 | Document 3 | ... | | |
| |--------|------------|------------|------------|-----| | |
| | [Key aspects found across documents] | | | | | | |
| ## Individual Document Summaries | |
| ### Document 1: [filename] | |
| [Key points and content summary] | |
| ### Document 2: [filename] | |
| [Key points and content summary] | |
| ## Cross-Document Insights | |
| [Analysis of patterns, contradictions, or complementary information across documents] | |
| Preserve important formatting and structure. | |
| For images, include brief descriptions in markdown image syntax. | |
| Return only the markdown content, no other text.""" | |
| else: | |
| # Fallback to combined | |
| return self._create_batch_prompt(file_paths, "combined") | |
| def _format_batch_output(self, response_text: str, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str: | |
| """Format the batch processing output.""" | |
| # Add metadata header using original filenames if provided | |
| if original_filenames: | |
| file_names = original_filenames | |
| else: | |
| file_names = [fp.name for fp in file_paths] | |
| header = f"""<!-- Multi-Document Processing Results --> | |
| <!-- Processing Type: {processing_type} --> | |
| <!-- Files Processed: {len(file_paths)} --> | |
| <!-- File Names: {', '.join(file_names)} --> | |
| """ | |
| return header + response_text | |
| def _get_mime_type(self, file_extension: str) -> str: | |
| """Get the MIME type for a file extension.""" | |
| mime_types = { | |
| ".pdf": "application/pdf", | |
| ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ".doc": "application/msword", | |
| ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", | |
| ".ppt": "application/vnd.ms-powerpoint", | |
| ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| ".xls": "application/vnd.ms-excel", | |
| ".txt": "text/plain", | |
| ".md": "text/markdown", | |
| ".html": "text/html", | |
| ".htm": "text/html", | |
| ".csv": "text/csv", | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".png": "image/png", | |
| ".gif": "image/gif", | |
| ".bmp": "image/bmp", | |
| ".tiff": "image/tiff", | |
| ".tif": "image/tiff", | |
| } | |
| return mime_types.get(file_extension, "application/octet-stream") | |
| # Register the parser with the registry | |
| if GEMINI_AVAILABLE: | |
| ParserRegistry.register(GeminiFlashParser) | |
| else: | |
| print("Gemini Flash parser not registered: google-genai package not installed") |