pdf_explainer / src /processors /pdf_text_extractor.py
spagestic's picture
fix: update import path for TextExplainer to reflect new structure
68c0007
import base64
import os
from typing import Optional, Tuple, List, Dict, Any
from mistralai import Mistral
from src.utils.text_explainer import TextExplainer
class PDFTextExtractor:
"""PDF text extraction using Mistral AI OCR."""
def __init__(self):
"""Initialize the PDF text extractor with Mistral AI client."""
self.api_key = os.environ.get("MISTRAL_API_KEY")
if not self.api_key:
raise ValueError("MISTRAL_API_KEY environment variable is required")
self.client = Mistral(api_key=self.api_key)
self.text_explainer = TextExplainer()
def encode_pdf(self, pdf_path: str) -> Optional[str]:
"""
Encode the PDF file to base64.
Args:
pdf_path: Path to the PDF file
Returns:
Base64 encoded string or None if error
"""
try:
with open(pdf_path, "rb") as pdf_file:
return base64.b64encode(pdf_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Error: The file {pdf_path} was not found.")
return None
except Exception as e:
print(f"Error encoding PDF: {e}")
return None
def extract_text_from_pdf(self, pdf_file) -> Tuple[str, str, List[Dict[str, Any]]]:
"""
Extract text and images from uploaded PDF using Mistral AI OCR.
Args:
pdf_file: Gradio file object
Returns:
Tuple of (extracted_text, status_message, images_data)
"""
if pdf_file is None:
return "", "Please upload a PDF file.", []
try:
# Get the file path from Gradio file object
pdf_path = pdf_file.name if hasattr(pdf_file, 'name') else pdf_file
# Encode PDF to base64
base64_pdf = self.encode_pdf(pdf_path)
if base64_pdf is None:
return "", "Failed to encode PDF file.", []
# Process with Mistral OCR
print(f"πŸ”„ Processing PDF with Mistral OCR...")
ocr_response = self.client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": f"data:application/pdf;base64,{base64_pdf}"
},
include_image_base64=True
)
# Enhanced debugging and response parsing
print("πŸ” Analyzing OCR Response Structure...")
print(f" Type: {type(ocr_response)}")
print(f" String representation: {str(ocr_response)[:500]}...")
# Check if it's a simple object with attributes
if hasattr(ocr_response, '__dict__'):
print(f" Object attributes: {list(ocr_response.__dict__.keys())}")
for key, value in ocr_response.__dict__.items():
print(f" {key}: {type(value)} = {str(value)[:100]}...")
# Check if it has commonly expected attributes
common_attrs = ['text', 'content', 'result', 'data', 'output', 'extracted_text', 'ocr_text', 'choices', 'message']
for attr in common_attrs:
if hasattr(ocr_response, attr):
value = getattr(ocr_response, attr)
print(f" Has '{attr}': {type(value)} = {str(value)[:100]}...")
# Check if it's iterable but not a string
try:
if hasattr(ocr_response, '__iter__') and not isinstance(ocr_response, str):
print(f" Iterable with {len(list(ocr_response))} items")
for i, item in enumerate(ocr_response):
if i < 3: # Show first 3 items
print(f" Item {i}: {type(item)} = {str(item)[:100]}...")
except Exception as e:
print(f" Error checking iteration: {e}")
# Advanced text extraction with multiple strategies
extracted_text = ""
extraction_method = "none"
extracted_images = []
# Strategy 1: Mistral OCR specific - pages with markdown content and images
if hasattr(ocr_response, 'pages') and ocr_response.pages:
pages = ocr_response.pages
if isinstance(pages, list) and len(pages) > 0:
page_texts = []
for i, page in enumerate(pages):
# Extract text
if hasattr(page, 'markdown') and page.markdown:
page_texts.append(page.markdown)
print(f"βœ… Found text in page {i} markdown: {len(page.markdown)} characters")
# Extract images
if hasattr(page, 'images') and page.images:
for j, img in enumerate(page.images):
image_data = {
'page': i,
'image_id': f"img-{i}-{j}",
'top_left_x': getattr(img, 'top_left_x', 0),
'top_left_y': getattr(img, 'top_left_y', 0),
'bottom_right_x': getattr(img, 'bottom_right_x', 0),
'bottom_right_y': getattr(img, 'bottom_right_y', 0),
'base64': getattr(img, 'image_base64', '')
}
extracted_images.append(image_data)
print(f"βœ… Found image in page {i}, image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
if page_texts:
extracted_text = "\n\n".join(page_texts)
extraction_method = f"pages_markdown_{len(page_texts)}_pages"
# Try to extract images from other response structures if no images found yet
if not extracted_images:
# Check if response has images attribute directly
if hasattr(ocr_response, 'images') and ocr_response.images:
for j, img in enumerate(ocr_response.images):
image_data = {
'page': 0,
'image_id': getattr(img, 'id', f"img-{j}"),
'top_left_x': getattr(img, 'top_left_x', 0),
'top_left_y': getattr(img, 'top_left_y', 0),
'bottom_right_x': getattr(img, 'bottom_right_x', 0),
'bottom_right_y': getattr(img, 'bottom_right_y', 0),
'base64': getattr(img, 'image_base64', '')
}
extracted_images.append(image_data)
print(f"βœ… Found image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
# Continue with fallback strategies for text extraction
if not extracted_text:
# Strategy 2: Direct text attribute (fallback)
if hasattr(ocr_response, 'text') and ocr_response.text:
extracted_text = str(ocr_response.text)
extraction_method = "direct_text_attribute"
# Strategy 3: Content attribute (fallback)
elif hasattr(ocr_response, 'content') and ocr_response.content:
content = ocr_response.content
if isinstance(content, str):
extracted_text = content
extraction_method = "content_attribute_string"
elif hasattr(content, 'text'):
extracted_text = str(content.text)
extraction_method = "content_text_attribute"
else:
extracted_text = str(content)
extraction_method = "content_attribute_converted"
# Strategy 4: Result attribute (fallback)
elif hasattr(ocr_response, 'result'):
result = ocr_response.result
if isinstance(result, str):
extracted_text = result
extraction_method = "result_string"
elif hasattr(result, 'text'):
extracted_text = str(result.text)
extraction_method = "result_text_attribute"
elif isinstance(result, dict) and 'text' in result:
extracted_text = str(result['text'])
extraction_method = "result_dict_text"
else:
extracted_text = str(result)
extraction_method = "result_converted"
# Strategy 5: Choices attribute (ChatGPT-style response - fallback)
elif hasattr(ocr_response, 'choices') and ocr_response.choices:
choices = ocr_response.choices
if isinstance(choices, list) and len(choices) > 0:
choice = choices[0]
if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
extracted_text = str(choice.message.content)
extraction_method = "choices_message_content"
elif hasattr(choice, 'text'):
extracted_text = str(choice.text)
extraction_method = "choices_text"
else:
extracted_text = str(choice)
extraction_method = "choices_converted"
# Strategy 6: Dict-like access (fallback)
elif hasattr(ocr_response, 'get') or isinstance(ocr_response, dict):
for key in ['text', 'content', 'result', 'extracted_text', 'ocr_text', 'output']:
if hasattr(ocr_response, 'get'):
value = ocr_response.get(key)
else:
value = ocr_response.get(key) if isinstance(ocr_response, dict) else None
if value:
extracted_text = str(value)
extraction_method = f"dict_key_{key}"
break
# Strategy 7: Inspect all attributes for string-like content (fallback)
elif hasattr(ocr_response, '__dict__'):
for key, value in ocr_response.__dict__.items():
if isinstance(value, str) and len(value) > 20: # Likely text content
extracted_text = value
extraction_method = f"attribute_{key}"
break
elif hasattr(value, 'text') and isinstance(value.text, str):
extracted_text = str(value.text)
extraction_method = f"nested_text_in_{key}"
break
# Strategy 8: Convert entire response to string if it seems to contain text (fallback)
if not extracted_text:
response_str = str(ocr_response)
if len(response_str) > 50 and not response_str.startswith('<'): # Not an object reference
extracted_text = response_str
extraction_method = "full_response_string"
print(f"🎯 Extraction method used: {extraction_method}")
print(f"πŸ“ Extracted text length: {len(extracted_text)} characters")
print(f"πŸ–ΌοΈ Extracted images: {len(extracted_images)}")
if extracted_text:
status = f"βœ… Successfully extracted text from PDF ({len(extracted_text)} characters)"
if extracted_images:
status += f" and {len(extracted_images)} image(s)"
else:
extracted_text = "No text could be extracted from this PDF."
status = "⚠️ OCR completed but no text was found in response."
if extracted_images:
status = f"βœ… Successfully extracted {len(extracted_images)} image(s) from PDF, but no text was found."
print(f"❌ No extractable text found in OCR response")
return extracted_text, status, extracted_images
except Exception as e:
error_msg = f"Error processing PDF: {str(e)}"
print(error_msg)
return "", f"❌ {error_msg}", []
def generate_explanations(self, extracted_text: str) -> str:
"""
Generate explanations for the extracted text sections.
Args:
extracted_text: The extracted text from PDF
Returns:
Formatted explanations for all sections
"""
try:
if not extracted_text or extracted_text.strip() == "":
return "No text available to explain."
if extracted_text.startswith("No text could be extracted"):
return "Cannot generate explanations - no text was extracted from the PDF."
print("πŸ€– Generating explanations for extracted text...")
explained_sections = self.text_explainer.explain_all_sections(extracted_text)
if not explained_sections:
return "No sections found to explain in the extracted text."
formatted_explanations = self.text_explainer.format_explanations_for_display(explained_sections)
return formatted_explanations
except Exception as e:
error_msg = f"Error generating explanations: {str(e)}"
print(error_msg)
return f"❌ {error_msg}"