Spaces:
Sleeping
Sleeping
| import base64 | |
| import os | |
| from typing import Optional, Tuple, List, Dict, Any | |
| from mistralai import Mistral | |
| from src.utils.text_explainer import TextExplainer | |
| class PDFTextExtractor: | |
| """PDF text extraction using Mistral AI OCR.""" | |
| def __init__(self): | |
| """Initialize the PDF text extractor with Mistral AI client.""" | |
| self.api_key = os.environ.get("MISTRAL_API_KEY") | |
| if not self.api_key: | |
| raise ValueError("MISTRAL_API_KEY environment variable is required") | |
| self.client = Mistral(api_key=self.api_key) | |
| self.text_explainer = TextExplainer() | |
| def encode_pdf(self, pdf_path: str) -> Optional[str]: | |
| """ | |
| Encode the PDF file to base64. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| Base64 encoded string or None if error | |
| """ | |
| try: | |
| with open(pdf_path, "rb") as pdf_file: | |
| return base64.b64encode(pdf_file.read()).decode('utf-8') | |
| except FileNotFoundError: | |
| print(f"Error: The file {pdf_path} was not found.") | |
| return None | |
| except Exception as e: | |
| print(f"Error encoding PDF: {e}") | |
| return None | |
| def extract_text_from_pdf(self, pdf_file) -> Tuple[str, str, List[Dict[str, Any]]]: | |
| """ | |
| Extract text and images from uploaded PDF using Mistral AI OCR. | |
| Args: | |
| pdf_file: Gradio file object | |
| Returns: | |
| Tuple of (extracted_text, status_message, images_data) | |
| """ | |
| if pdf_file is None: | |
| return "", "Please upload a PDF file.", [] | |
| try: | |
| # Get the file path from Gradio file object | |
| pdf_path = pdf_file.name if hasattr(pdf_file, 'name') else pdf_file | |
| # Encode PDF to base64 | |
| base64_pdf = self.encode_pdf(pdf_path) | |
| if base64_pdf is None: | |
| return "", "Failed to encode PDF file.", [] | |
| # Process with Mistral OCR | |
| print(f"π Processing PDF with Mistral OCR...") | |
| ocr_response = self.client.ocr.process( | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": "document_url", | |
| "document_url": f"data:application/pdf;base64,{base64_pdf}" | |
| }, | |
| include_image_base64=True | |
| ) | |
| # Enhanced debugging and response parsing | |
| print("π Analyzing OCR Response Structure...") | |
| print(f" Type: {type(ocr_response)}") | |
| print(f" String representation: {str(ocr_response)[:500]}...") | |
| # Check if it's a simple object with attributes | |
| if hasattr(ocr_response, '__dict__'): | |
| print(f" Object attributes: {list(ocr_response.__dict__.keys())}") | |
| for key, value in ocr_response.__dict__.items(): | |
| print(f" {key}: {type(value)} = {str(value)[:100]}...") | |
| # Check if it has commonly expected attributes | |
| common_attrs = ['text', 'content', 'result', 'data', 'output', 'extracted_text', 'ocr_text', 'choices', 'message'] | |
| for attr in common_attrs: | |
| if hasattr(ocr_response, attr): | |
| value = getattr(ocr_response, attr) | |
| print(f" Has '{attr}': {type(value)} = {str(value)[:100]}...") | |
| # Check if it's iterable but not a string | |
| try: | |
| if hasattr(ocr_response, '__iter__') and not isinstance(ocr_response, str): | |
| print(f" Iterable with {len(list(ocr_response))} items") | |
| for i, item in enumerate(ocr_response): | |
| if i < 3: # Show first 3 items | |
| print(f" Item {i}: {type(item)} = {str(item)[:100]}...") | |
| except Exception as e: | |
| print(f" Error checking iteration: {e}") | |
| # Advanced text extraction with multiple strategies | |
| extracted_text = "" | |
| extraction_method = "none" | |
| extracted_images = [] | |
| # Strategy 1: Mistral OCR specific - pages with markdown content and images | |
| if hasattr(ocr_response, 'pages') and ocr_response.pages: | |
| pages = ocr_response.pages | |
| if isinstance(pages, list) and len(pages) > 0: | |
| page_texts = [] | |
| for i, page in enumerate(pages): | |
| # Extract text | |
| if hasattr(page, 'markdown') and page.markdown: | |
| page_texts.append(page.markdown) | |
| print(f"β Found text in page {i} markdown: {len(page.markdown)} characters") | |
| # Extract images | |
| if hasattr(page, 'images') and page.images: | |
| for j, img in enumerate(page.images): | |
| image_data = { | |
| 'page': i, | |
| 'image_id': f"img-{i}-{j}", | |
| 'top_left_x': getattr(img, 'top_left_x', 0), | |
| 'top_left_y': getattr(img, 'top_left_y', 0), | |
| 'bottom_right_x': getattr(img, 'bottom_right_x', 0), | |
| 'bottom_right_y': getattr(img, 'bottom_right_y', 0), | |
| 'base64': getattr(img, 'image_base64', '') | |
| } | |
| extracted_images.append(image_data) | |
| print(f"β Found image in page {i}, image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})") | |
| if page_texts: | |
| extracted_text = "\n\n".join(page_texts) | |
| extraction_method = f"pages_markdown_{len(page_texts)}_pages" | |
| # Try to extract images from other response structures if no images found yet | |
| if not extracted_images: | |
| # Check if response has images attribute directly | |
| if hasattr(ocr_response, 'images') and ocr_response.images: | |
| for j, img in enumerate(ocr_response.images): | |
| image_data = { | |
| 'page': 0, | |
| 'image_id': getattr(img, 'id', f"img-{j}"), | |
| 'top_left_x': getattr(img, 'top_left_x', 0), | |
| 'top_left_y': getattr(img, 'top_left_y', 0), | |
| 'bottom_right_x': getattr(img, 'bottom_right_x', 0), | |
| 'bottom_right_y': getattr(img, 'bottom_right_y', 0), | |
| 'base64': getattr(img, 'image_base64', '') | |
| } | |
| extracted_images.append(image_data) | |
| print(f"β Found image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})") | |
| # Continue with fallback strategies for text extraction | |
| if not extracted_text: | |
| # Strategy 2: Direct text attribute (fallback) | |
| if hasattr(ocr_response, 'text') and ocr_response.text: | |
| extracted_text = str(ocr_response.text) | |
| extraction_method = "direct_text_attribute" | |
| # Strategy 3: Content attribute (fallback) | |
| elif hasattr(ocr_response, 'content') and ocr_response.content: | |
| content = ocr_response.content | |
| if isinstance(content, str): | |
| extracted_text = content | |
| extraction_method = "content_attribute_string" | |
| elif hasattr(content, 'text'): | |
| extracted_text = str(content.text) | |
| extraction_method = "content_text_attribute" | |
| else: | |
| extracted_text = str(content) | |
| extraction_method = "content_attribute_converted" | |
| # Strategy 4: Result attribute (fallback) | |
| elif hasattr(ocr_response, 'result'): | |
| result = ocr_response.result | |
| if isinstance(result, str): | |
| extracted_text = result | |
| extraction_method = "result_string" | |
| elif hasattr(result, 'text'): | |
| extracted_text = str(result.text) | |
| extraction_method = "result_text_attribute" | |
| elif isinstance(result, dict) and 'text' in result: | |
| extracted_text = str(result['text']) | |
| extraction_method = "result_dict_text" | |
| else: | |
| extracted_text = str(result) | |
| extraction_method = "result_converted" | |
| # Strategy 5: Choices attribute (ChatGPT-style response - fallback) | |
| elif hasattr(ocr_response, 'choices') and ocr_response.choices: | |
| choices = ocr_response.choices | |
| if isinstance(choices, list) and len(choices) > 0: | |
| choice = choices[0] | |
| if hasattr(choice, 'message') and hasattr(choice.message, 'content'): | |
| extracted_text = str(choice.message.content) | |
| extraction_method = "choices_message_content" | |
| elif hasattr(choice, 'text'): | |
| extracted_text = str(choice.text) | |
| extraction_method = "choices_text" | |
| else: | |
| extracted_text = str(choice) | |
| extraction_method = "choices_converted" | |
| # Strategy 6: Dict-like access (fallback) | |
| elif hasattr(ocr_response, 'get') or isinstance(ocr_response, dict): | |
| for key in ['text', 'content', 'result', 'extracted_text', 'ocr_text', 'output']: | |
| if hasattr(ocr_response, 'get'): | |
| value = ocr_response.get(key) | |
| else: | |
| value = ocr_response.get(key) if isinstance(ocr_response, dict) else None | |
| if value: | |
| extracted_text = str(value) | |
| extraction_method = f"dict_key_{key}" | |
| break | |
| # Strategy 7: Inspect all attributes for string-like content (fallback) | |
| elif hasattr(ocr_response, '__dict__'): | |
| for key, value in ocr_response.__dict__.items(): | |
| if isinstance(value, str) and len(value) > 20: # Likely text content | |
| extracted_text = value | |
| extraction_method = f"attribute_{key}" | |
| break | |
| elif hasattr(value, 'text') and isinstance(value.text, str): | |
| extracted_text = str(value.text) | |
| extraction_method = f"nested_text_in_{key}" | |
| break | |
| # Strategy 8: Convert entire response to string if it seems to contain text (fallback) | |
| if not extracted_text: | |
| response_str = str(ocr_response) | |
| if len(response_str) > 50 and not response_str.startswith('<'): # Not an object reference | |
| extracted_text = response_str | |
| extraction_method = "full_response_string" | |
| print(f"π― Extraction method used: {extraction_method}") | |
| print(f"π Extracted text length: {len(extracted_text)} characters") | |
| print(f"πΌοΈ Extracted images: {len(extracted_images)}") | |
| if extracted_text: | |
| status = f"β Successfully extracted text from PDF ({len(extracted_text)} characters)" | |
| if extracted_images: | |
| status += f" and {len(extracted_images)} image(s)" | |
| else: | |
| extracted_text = "No text could be extracted from this PDF." | |
| status = "β οΈ OCR completed but no text was found in response." | |
| if extracted_images: | |
| status = f"β Successfully extracted {len(extracted_images)} image(s) from PDF, but no text was found." | |
| print(f"β No extractable text found in OCR response") | |
| return extracted_text, status, extracted_images | |
| except Exception as e: | |
| error_msg = f"Error processing PDF: {str(e)}" | |
| print(error_msg) | |
| return "", f"β {error_msg}", [] | |
| def generate_explanations(self, extracted_text: str) -> str: | |
| """ | |
| Generate explanations for the extracted text sections. | |
| Args: | |
| extracted_text: The extracted text from PDF | |
| Returns: | |
| Formatted explanations for all sections | |
| """ | |
| try: | |
| if not extracted_text or extracted_text.strip() == "": | |
| return "No text available to explain." | |
| if extracted_text.startswith("No text could be extracted"): | |
| return "Cannot generate explanations - no text was extracted from the PDF." | |
| print("π€ Generating explanations for extracted text...") | |
| explained_sections = self.text_explainer.explain_all_sections(extracted_text) | |
| if not explained_sections: | |
| return "No sections found to explain in the extracted text." | |
| formatted_explanations = self.text_explainer.format_explanations_for_display(explained_sections) | |
| return formatted_explanations | |
| except Exception as e: | |
| error_msg = f"Error generating explanations: {str(e)}" | |
| print(error_msg) | |
| return f"β {error_msg}" | |