Spaces:
Sleeping
Sleeping
File size: 14,531 Bytes
4ce49c4 68c0007 4ce49c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
import base64
import os
from typing import Optional, Tuple, List, Dict, Any
from mistralai import Mistral
from src.utils.text_explainer import TextExplainer
class PDFTextExtractor:
"""PDF text extraction using Mistral AI OCR."""
def __init__(self):
"""Initialize the PDF text extractor with Mistral AI client."""
self.api_key = os.environ.get("MISTRAL_API_KEY")
if not self.api_key:
raise ValueError("MISTRAL_API_KEY environment variable is required")
self.client = Mistral(api_key=self.api_key)
self.text_explainer = TextExplainer()
def encode_pdf(self, pdf_path: str) -> Optional[str]:
"""
Encode the PDF file to base64.
Args:
pdf_path: Path to the PDF file
Returns:
Base64 encoded string or None if error
"""
try:
with open(pdf_path, "rb") as pdf_file:
return base64.b64encode(pdf_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Error: The file {pdf_path} was not found.")
return None
except Exception as e:
print(f"Error encoding PDF: {e}")
return None
def extract_text_from_pdf(self, pdf_file) -> Tuple[str, str, List[Dict[str, Any]]]:
"""
Extract text and images from uploaded PDF using Mistral AI OCR.
Args:
pdf_file: Gradio file object
Returns:
Tuple of (extracted_text, status_message, images_data)
"""
if pdf_file is None:
return "", "Please upload a PDF file.", []
try:
# Get the file path from Gradio file object
pdf_path = pdf_file.name if hasattr(pdf_file, 'name') else pdf_file
# Encode PDF to base64
base64_pdf = self.encode_pdf(pdf_path)
if base64_pdf is None:
return "", "Failed to encode PDF file.", []
# Process with Mistral OCR
print(f"π Processing PDF with Mistral OCR...")
ocr_response = self.client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": f"data:application/pdf;base64,{base64_pdf}"
},
include_image_base64=True
)
# Enhanced debugging and response parsing
print("π Analyzing OCR Response Structure...")
print(f" Type: {type(ocr_response)}")
print(f" String representation: {str(ocr_response)[:500]}...")
# Check if it's a simple object with attributes
if hasattr(ocr_response, '__dict__'):
print(f" Object attributes: {list(ocr_response.__dict__.keys())}")
for key, value in ocr_response.__dict__.items():
print(f" {key}: {type(value)} = {str(value)[:100]}...")
# Check if it has commonly expected attributes
common_attrs = ['text', 'content', 'result', 'data', 'output', 'extracted_text', 'ocr_text', 'choices', 'message']
for attr in common_attrs:
if hasattr(ocr_response, attr):
value = getattr(ocr_response, attr)
print(f" Has '{attr}': {type(value)} = {str(value)[:100]}...")
# Check if it's iterable but not a string
try:
if hasattr(ocr_response, '__iter__') and not isinstance(ocr_response, str):
print(f" Iterable with {len(list(ocr_response))} items")
for i, item in enumerate(ocr_response):
if i < 3: # Show first 3 items
print(f" Item {i}: {type(item)} = {str(item)[:100]}...")
except Exception as e:
print(f" Error checking iteration: {e}")
# Advanced text extraction with multiple strategies
extracted_text = ""
extraction_method = "none"
extracted_images = []
# Strategy 1: Mistral OCR specific - pages with markdown content and images
if hasattr(ocr_response, 'pages') and ocr_response.pages:
pages = ocr_response.pages
if isinstance(pages, list) and len(pages) > 0:
page_texts = []
for i, page in enumerate(pages):
# Extract text
if hasattr(page, 'markdown') and page.markdown:
page_texts.append(page.markdown)
print(f"β
Found text in page {i} markdown: {len(page.markdown)} characters")
# Extract images
if hasattr(page, 'images') and page.images:
for j, img in enumerate(page.images):
image_data = {
'page': i,
'image_id': f"img-{i}-{j}",
'top_left_x': getattr(img, 'top_left_x', 0),
'top_left_y': getattr(img, 'top_left_y', 0),
'bottom_right_x': getattr(img, 'bottom_right_x', 0),
'bottom_right_y': getattr(img, 'bottom_right_y', 0),
'base64': getattr(img, 'image_base64', '')
}
extracted_images.append(image_data)
print(f"β
Found image in page {i}, image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
if page_texts:
extracted_text = "\n\n".join(page_texts)
extraction_method = f"pages_markdown_{len(page_texts)}_pages"
# Try to extract images from other response structures if no images found yet
if not extracted_images:
# Check if response has images attribute directly
if hasattr(ocr_response, 'images') and ocr_response.images:
for j, img in enumerate(ocr_response.images):
image_data = {
'page': 0,
'image_id': getattr(img, 'id', f"img-{j}"),
'top_left_x': getattr(img, 'top_left_x', 0),
'top_left_y': getattr(img, 'top_left_y', 0),
'bottom_right_x': getattr(img, 'bottom_right_x', 0),
'bottom_right_y': getattr(img, 'bottom_right_y', 0),
'base64': getattr(img, 'image_base64', '')
}
extracted_images.append(image_data)
print(f"β
Found image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
# Continue with fallback strategies for text extraction
if not extracted_text:
# Strategy 2: Direct text attribute (fallback)
if hasattr(ocr_response, 'text') and ocr_response.text:
extracted_text = str(ocr_response.text)
extraction_method = "direct_text_attribute"
# Strategy 3: Content attribute (fallback)
elif hasattr(ocr_response, 'content') and ocr_response.content:
content = ocr_response.content
if isinstance(content, str):
extracted_text = content
extraction_method = "content_attribute_string"
elif hasattr(content, 'text'):
extracted_text = str(content.text)
extraction_method = "content_text_attribute"
else:
extracted_text = str(content)
extraction_method = "content_attribute_converted"
# Strategy 4: Result attribute (fallback)
elif hasattr(ocr_response, 'result'):
result = ocr_response.result
if isinstance(result, str):
extracted_text = result
extraction_method = "result_string"
elif hasattr(result, 'text'):
extracted_text = str(result.text)
extraction_method = "result_text_attribute"
elif isinstance(result, dict) and 'text' in result:
extracted_text = str(result['text'])
extraction_method = "result_dict_text"
else:
extracted_text = str(result)
extraction_method = "result_converted"
# Strategy 5: Choices attribute (ChatGPT-style response - fallback)
elif hasattr(ocr_response, 'choices') and ocr_response.choices:
choices = ocr_response.choices
if isinstance(choices, list) and len(choices) > 0:
choice = choices[0]
if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
extracted_text = str(choice.message.content)
extraction_method = "choices_message_content"
elif hasattr(choice, 'text'):
extracted_text = str(choice.text)
extraction_method = "choices_text"
else:
extracted_text = str(choice)
extraction_method = "choices_converted"
# Strategy 6: Dict-like access (fallback)
elif hasattr(ocr_response, 'get') or isinstance(ocr_response, dict):
for key in ['text', 'content', 'result', 'extracted_text', 'ocr_text', 'output']:
if hasattr(ocr_response, 'get'):
value = ocr_response.get(key)
else:
value = ocr_response.get(key) if isinstance(ocr_response, dict) else None
if value:
extracted_text = str(value)
extraction_method = f"dict_key_{key}"
break
# Strategy 7: Inspect all attributes for string-like content (fallback)
elif hasattr(ocr_response, '__dict__'):
for key, value in ocr_response.__dict__.items():
if isinstance(value, str) and len(value) > 20: # Likely text content
extracted_text = value
extraction_method = f"attribute_{key}"
break
elif hasattr(value, 'text') and isinstance(value.text, str):
extracted_text = str(value.text)
extraction_method = f"nested_text_in_{key}"
break
# Strategy 8: Convert entire response to string if it seems to contain text (fallback)
if not extracted_text:
response_str = str(ocr_response)
if len(response_str) > 50 and not response_str.startswith('<'): # Not an object reference
extracted_text = response_str
extraction_method = "full_response_string"
print(f"π― Extraction method used: {extraction_method}")
print(f"π Extracted text length: {len(extracted_text)} characters")
print(f"πΌοΈ Extracted images: {len(extracted_images)}")
if extracted_text:
status = f"β
Successfully extracted text from PDF ({len(extracted_text)} characters)"
if extracted_images:
status += f" and {len(extracted_images)} image(s)"
else:
extracted_text = "No text could be extracted from this PDF."
status = "β οΈ OCR completed but no text was found in response."
if extracted_images:
status = f"β
Successfully extracted {len(extracted_images)} image(s) from PDF, but no text was found."
print(f"β No extractable text found in OCR response")
return extracted_text, status, extracted_images
except Exception as e:
error_msg = f"Error processing PDF: {str(e)}"
print(error_msg)
return "", f"β {error_msg}", []
def generate_explanations(self, extracted_text: str) -> str:
"""
Generate explanations for the extracted text sections.
Args:
extracted_text: The extracted text from PDF
Returns:
Formatted explanations for all sections
"""
try:
if not extracted_text or extracted_text.strip() == "":
return "No text available to explain."
if extracted_text.startswith("No text could be extracted"):
return "Cannot generate explanations - no text was extracted from the PDF."
print("π€ Generating explanations for extracted text...")
explained_sections = self.text_explainer.explain_all_sections(extracted_text)
if not explained_sections:
return "No sections found to explain in the extracted text."
formatted_explanations = self.text_explainer.format_explanations_for_display(explained_sections)
return formatted_explanations
except Exception as e:
error_msg = f"Error generating explanations: {str(e)}"
print(error_msg)
return f"β {error_msg}"
|