File size: 14,531 Bytes
4ce49c4
 
 
 
68c0007
4ce49c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import base64
import os
from typing import Optional, Tuple, List, Dict, Any
from mistralai import Mistral
from src.utils.text_explainer import TextExplainer

class PDFTextExtractor:
    """PDF text extraction using Mistral AI OCR."""
    
    def __init__(self):
        """Initialize the PDF text extractor with Mistral AI client."""
        self.api_key = os.environ.get("MISTRAL_API_KEY")
        if not self.api_key:
            raise ValueError("MISTRAL_API_KEY environment variable is required")
        self.client = Mistral(api_key=self.api_key)
        self.text_explainer = TextExplainer()
    
    def encode_pdf(self, pdf_path: str) -> Optional[str]:
        """
        Encode the PDF file to base64.
        
        Args:
            pdf_path: Path to the PDF file
            
        Returns:
            Base64 encoded string or None if error
        """
        try:
            with open(pdf_path, "rb") as pdf_file:
                return base64.b64encode(pdf_file.read()).decode('utf-8')
        except FileNotFoundError:
            print(f"Error: The file {pdf_path} was not found.")
            return None
        except Exception as e:
            print(f"Error encoding PDF: {e}")
            return None
    
    def extract_text_from_pdf(self, pdf_file) -> Tuple[str, str, List[Dict[str, Any]]]:
        """
        Extract text and images from uploaded PDF using Mistral AI OCR.
        
        Args:
            pdf_file: Gradio file object
            
        Returns:
            Tuple of (extracted_text, status_message, images_data)
        """
        if pdf_file is None:
            return "", "Please upload a PDF file.", []
        
        try:
            # Get the file path from Gradio file object
            pdf_path = pdf_file.name if hasattr(pdf_file, 'name') else pdf_file
            
            # Encode PDF to base64
            base64_pdf = self.encode_pdf(pdf_path)
            if base64_pdf is None:
                return "", "Failed to encode PDF file.", []
            
            # Process with Mistral OCR
            print(f"πŸ”„ Processing PDF with Mistral OCR...")
            ocr_response = self.client.ocr.process(
                model="mistral-ocr-latest",
                document={
                    "type": "document_url",
                    "document_url": f"data:application/pdf;base64,{base64_pdf}" 
                },
                include_image_base64=True
            )
            
            # Enhanced debugging and response parsing
            print("πŸ” Analyzing OCR Response Structure...")
            print(f"  Type: {type(ocr_response)}")
            print(f"  String representation: {str(ocr_response)[:500]}...")
            
            # Check if it's a simple object with attributes
            if hasattr(ocr_response, '__dict__'):
                print(f"  Object attributes: {list(ocr_response.__dict__.keys())}")
                for key, value in ocr_response.__dict__.items():
                    print(f"    {key}: {type(value)} = {str(value)[:100]}...")
            
            # Check if it has commonly expected attributes
            common_attrs = ['text', 'content', 'result', 'data', 'output', 'extracted_text', 'ocr_text', 'choices', 'message']
            for attr in common_attrs:
                if hasattr(ocr_response, attr):
                    value = getattr(ocr_response, attr)
                    print(f"  Has '{attr}': {type(value)} = {str(value)[:100]}...")
            
            # Check if it's iterable but not a string
            try:
                if hasattr(ocr_response, '__iter__') and not isinstance(ocr_response, str):
                    print(f"  Iterable with {len(list(ocr_response))} items")
                    for i, item in enumerate(ocr_response):
                        if i < 3:  # Show first 3 items
                            print(f"    Item {i}: {type(item)} = {str(item)[:100]}...")
            except Exception as e:
                print(f"  Error checking iteration: {e}")
            
            # Advanced text extraction with multiple strategies
            extracted_text = ""
            extraction_method = "none"
            extracted_images = []
            
            # Strategy 1: Mistral OCR specific - pages with markdown content and images
            if hasattr(ocr_response, 'pages') and ocr_response.pages:
                pages = ocr_response.pages
                if isinstance(pages, list) and len(pages) > 0:
                    page_texts = []
                    
                    for i, page in enumerate(pages):
                        # Extract text
                        if hasattr(page, 'markdown') and page.markdown:
                            page_texts.append(page.markdown)
                            print(f"βœ… Found text in page {i} markdown: {len(page.markdown)} characters")
                        
                        # Extract images
                        if hasattr(page, 'images') and page.images:
                            for j, img in enumerate(page.images):
                                image_data = {
                                    'page': i,
                                    'image_id': f"img-{i}-{j}",
                                    'top_left_x': getattr(img, 'top_left_x', 0),
                                    'top_left_y': getattr(img, 'top_left_y', 0),
                                    'bottom_right_x': getattr(img, 'bottom_right_x', 0),
                                    'bottom_right_y': getattr(img, 'bottom_right_y', 0),
                                    'base64': getattr(img, 'image_base64', '')
                                }
                                extracted_images.append(image_data)
                                print(f"βœ… Found image in page {i}, image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
                    
                    if page_texts:
                        extracted_text = "\n\n".join(page_texts)
                        extraction_method = f"pages_markdown_{len(page_texts)}_pages"
            
            # Try to extract images from other response structures if no images found yet
            if not extracted_images:
                # Check if response has images attribute directly
                if hasattr(ocr_response, 'images') and ocr_response.images:
                    for j, img in enumerate(ocr_response.images):
                        image_data = {
                            'page': 0,
                            'image_id': getattr(img, 'id', f"img-{j}"),
                            'top_left_x': getattr(img, 'top_left_x', 0),
                            'top_left_y': getattr(img, 'top_left_y', 0),
                            'bottom_right_x': getattr(img, 'bottom_right_x', 0),
                            'bottom_right_y': getattr(img, 'bottom_right_y', 0),
                            'base64': getattr(img, 'image_base64', '')
                        }
                        extracted_images.append(image_data)
                        print(f"βœ… Found image {j}: coordinates ({image_data['top_left_x']}, {image_data['top_left_y']}) to ({image_data['bottom_right_x']}, {image_data['bottom_right_y']})")
            
            # Continue with fallback strategies for text extraction
            if not extracted_text:
                # Strategy 2: Direct text attribute (fallback)
                if hasattr(ocr_response, 'text') and ocr_response.text:
                    extracted_text = str(ocr_response.text)
                    extraction_method = "direct_text_attribute"
                
                # Strategy 3: Content attribute (fallback)
                elif hasattr(ocr_response, 'content') and ocr_response.content:
                    content = ocr_response.content
                    if isinstance(content, str):
                        extracted_text = content
                        extraction_method = "content_attribute_string"
                    elif hasattr(content, 'text'):
                        extracted_text = str(content.text)
                        extraction_method = "content_text_attribute"
                    else:
                        extracted_text = str(content)
                        extraction_method = "content_attribute_converted"
                
                # Strategy 4: Result attribute (fallback)
                elif hasattr(ocr_response, 'result'):
                    result = ocr_response.result
                    if isinstance(result, str):
                        extracted_text = result
                        extraction_method = "result_string"
                    elif hasattr(result, 'text'):
                        extracted_text = str(result.text)
                        extraction_method = "result_text_attribute"
                    elif isinstance(result, dict) and 'text' in result:
                        extracted_text = str(result['text'])
                        extraction_method = "result_dict_text"
                    else:
                        extracted_text = str(result)
                        extraction_method = "result_converted"
                
                # Strategy 5: Choices attribute (ChatGPT-style response - fallback)
                elif hasattr(ocr_response, 'choices') and ocr_response.choices:
                    choices = ocr_response.choices
                    if isinstance(choices, list) and len(choices) > 0:
                        choice = choices[0]
                        if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
                            extracted_text = str(choice.message.content)
                            extraction_method = "choices_message_content"
                        elif hasattr(choice, 'text'):
                            extracted_text = str(choice.text)
                            extraction_method = "choices_text"
                        else:
                            extracted_text = str(choice)
                            extraction_method = "choices_converted"
                
                # Strategy 6: Dict-like access (fallback)
                elif hasattr(ocr_response, 'get') or isinstance(ocr_response, dict):
                    for key in ['text', 'content', 'result', 'extracted_text', 'ocr_text', 'output']:
                        if hasattr(ocr_response, 'get'):
                            value = ocr_response.get(key)
                        else:
                            value = ocr_response.get(key) if isinstance(ocr_response, dict) else None
                        
                        if value:
                            extracted_text = str(value)
                            extraction_method = f"dict_key_{key}"
                            break
                
                # Strategy 7: Inspect all attributes for string-like content (fallback)
                elif hasattr(ocr_response, '__dict__'):
                    for key, value in ocr_response.__dict__.items():
                        if isinstance(value, str) and len(value) > 20:  # Likely text content
                            extracted_text = value
                            extraction_method = f"attribute_{key}"
                            break
                        elif hasattr(value, 'text') and isinstance(value.text, str):
                            extracted_text = str(value.text)
                            extraction_method = f"nested_text_in_{key}"
                            break
                
                # Strategy 8: Convert entire response to string if it seems to contain text (fallback)
                if not extracted_text:
                    response_str = str(ocr_response)
                    if len(response_str) > 50 and not response_str.startswith('<'):  # Not an object reference
                        extracted_text = response_str
                        extraction_method = "full_response_string"
            
            print(f"🎯 Extraction method used: {extraction_method}")
            print(f"πŸ“ Extracted text length: {len(extracted_text)} characters")
            print(f"πŸ–ΌοΈ Extracted images: {len(extracted_images)}")
            
            if extracted_text:
                status = f"βœ… Successfully extracted text from PDF ({len(extracted_text)} characters)"
                if extracted_images:
                    status += f" and {len(extracted_images)} image(s)"
            else:
                extracted_text = "No text could be extracted from this PDF."
                status = "⚠️ OCR completed but no text was found in response."
                if extracted_images:
                    status = f"βœ… Successfully extracted {len(extracted_images)} image(s) from PDF, but no text was found."
                print(f"❌ No extractable text found in OCR response")
            
            return extracted_text, status, extracted_images
            
        except Exception as e:
            error_msg = f"Error processing PDF: {str(e)}"
            print(error_msg)
            return "", f"❌ {error_msg}", []
    
    def generate_explanations(self, extracted_text: str) -> str:
        """
        Generate explanations for the extracted text sections.
        
        Args:
            extracted_text: The extracted text from PDF
            
        Returns:
            Formatted explanations for all sections
        """
        try:
            if not extracted_text or extracted_text.strip() == "":
                return "No text available to explain."
            
            if extracted_text.startswith("No text could be extracted"):
                return "Cannot generate explanations - no text was extracted from the PDF."
            
            print("πŸ€– Generating explanations for extracted text...")
            explained_sections = self.text_explainer.explain_all_sections(extracted_text)
            
            if not explained_sections:
                return "No sections found to explain in the extracted text."
            
            formatted_explanations = self.text_explainer.format_explanations_for_display(explained_sections)
            return formatted_explanations
            
        except Exception as e:
            error_msg = f"Error generating explanations: {str(e)}"
            print(error_msg)
            return f"❌ {error_msg}"