""" Enhanced Text Extractor Module for FastAPI Handles extracting text content from PDF files with improved performance, table extraction, and proper CID font handling for scripts like Malayalam. """ import pdfplumber import pymupdf # PyMuPDF (fitz) import pandas as pd from typing import Dict, List, Tuple, Optional, Union import concurrent.futures from pathlib import Path import logging import re import unicodedata class TextExtractor: """FastAPI-compatible text extractor with enhanced capabilities.""" def __init__(self, max_workers: int = 4, backend: str = "auto"): """ Initialize the enhanced text extractor. Args: max_workers: Number of worker threads for parallel processing backend: Extraction backend ('pdfplumber', 'pymupdf', 'auto') """ self.max_workers = max_workers self.backend = backend self.logger = logging.getLogger(__name__) self.cache = {} def extract_text_from_pdf(self, pdf_path: str, extract_tables: bool = True, handle_cid: bool = True) -> str: """ Extract text from PDF file with tables in correct order. Args: pdf_path: Path to the PDF file extract_tables: Whether to extract and format tables inline handle_cid: Whether to handle CID font mapping issues Returns: str: Complete text content with tables in correct order Raises: Exception: If text extraction fails """ cached_data = self.cache.get(pdf_path) if cached_data: print("Using Cache: Skipped pdf extraction") return cached_data pdf_path = Path(pdf_path) if not pdf_path.exists(): raise FileNotFoundError(f"PDF file not found: {pdf_path}") self.logger.info(f"๐Ÿ“– Extracting text from PDF: {pdf_path.name}") # Choose extraction method based on backend if self.backend == "pymupdf" or (self.backend == "auto" and handle_cid): text = self._extract_with_pymupdf(pdf_path, extract_tables, handle_cid) else: text = self._extract_with_pdfplumber(pdf_path, extract_tables) self.cache[pdf_path] = text return text def _fallback_text_extraction(self, page: pymupdf.Page) -> List[Dict]: """Fallback text extraction for problematic pages.""" try: # Method 1: Try simple text extraction simple_text = page.get_text() if simple_text and simple_text.strip(): self.logger.info("Using simple text extraction fallback") return [{ 'type': 'text', 'content': simple_text, 'bbox': page.rect }] except: pass try: # Method 2: Try text extraction with different options text_options = ["text", "blocks", "words"] for option in text_options: try: extracted = page.get_text(option) if extracted: if isinstance(extracted, list): # For blocks/words, join them text_content = "" for item in extracted: if isinstance(item, tuple) and len(item) > 4: text_content += str(item[4]) + " " # text part elif isinstance(item, str): text_content += item + " " if text_content.strip(): self.logger.info(f"Using {option} extraction fallback") return [{ 'type': 'text', 'content': text_content.strip(), 'bbox': page.rect }] elif isinstance(extracted, str) and extracted.strip(): self.logger.info(f"Using {option} extraction fallback") return [{ 'type': 'text', 'content': extracted, 'bbox': page.rect }] except Exception as e: self.logger.debug(f"Failed {option} extraction: {e}") continue except: pass try: # Method 3: Check if page has images (might be scanned document) image_list = page.get_images() if image_list: self.logger.warning("Page appears to contain images - might be scanned document") return [{ 'type': 'text', 'content': f"[Page contains {len(image_list)} image(s) - possible scanned document. Consider OCR processing.]", 'bbox': page.rect }] except: pass # Method 4: Last resort - return empty with warning self.logger.warning("All extraction methods failed, returning empty content") return [{ 'type': 'text', 'content': "[Unable to extract text from this page - may be image-based or corrupted]", 'bbox': page.rect }] def _extract_with_pymupdf_safe(self, pdf_path: Path, extract_tables: bool, handle_cid: bool) -> str: """Safe PyMuPDF extraction with comprehensive error handling.""" def extract_page_content_safe(page_data: Tuple[int, pymupdf.Page]) -> Dict: page_num, page = page_data result = { 'page_num': page_num + 1, 'content_blocks': [] } try: # Check if page is valid if not hasattr(page, 'rect'): raise Exception("Invalid page object") # Try to get basic page info first page_rect = page.rect if not page_rect or page_rect.is_empty: raise Exception("Empty page") # Extract text with error handling text_blocks = [] table_blocks = [] # Text extraction with fallbacks if handle_cid: try: text_blocks = self._extract_text_blocks_with_proper_spacing(page) except Exception as text_error: self.logger.warning(f"Advanced text extraction failed on page {page_num + 1}: {text_error}") text_blocks = self._fallback_text_extraction(page) else: try: text = page.get_text() if text and text.strip(): text_blocks.append({ 'type': 'text', 'content': text, 'bbox': page.rect }) else: text_blocks = self._fallback_text_extraction(page) except Exception as simple_error: self.logger.warning(f"Simple text extraction failed on page {page_num + 1}: {simple_error}") text_blocks = self._fallback_text_extraction(page) # Table extraction with error handling if extract_tables: try: tables = page.find_tables() for i, table in enumerate(tables): try: table_data = table.extract() if table_data and any(any(cell for cell in row if cell) for row in table_data): formatted_table = self._format_table_as_text(table_data, page_num + 1, i + 1) table_blocks.append({ 'type': 'table', 'content': formatted_table, 'bbox': table.bbox }) except Exception as table_error: self.logger.warning(f"Failed to extract table {i+1} on page {page_num + 1}: {table_error}") except Exception as tables_error: self.logger.warning(f"Table detection failed on page {page_num + 1}: {tables_error}") # Combine and sort blocks by position all_blocks = text_blocks + table_blocks if all_blocks: # Sort by y-coordinate (top to bottom), handle missing bbox gracefully try: all_blocks.sort(key=lambda x: x.get('bbox', [0, 0, 0, 0])[1]) except: # If sorting fails, keep original order pass result['content_blocks'] = all_blocks if all_blocks else [{ 'type': 'text', 'content': f"[No content extracted from page {page_num + 1}]", 'bbox': (0, 0, 0, 0) }] except Exception as e: self.logger.error(f"Critical error processing page {page_num + 1}: {e}") result['content_blocks'] = [{ 'type': 'text', 'content': f"[Critical error on page {page_num + 1}: {str(e)}]", 'bbox': (0, 0, 0, 0) }] return result return extract_page_content_safe def _extract_with_pymupdf(self, pdf_path: Path, extract_tables: bool, handle_cid: bool) -> str: """Extract using PyMuPDF with comprehensive error handling.""" # Get the safe extraction function extract_page_content_safe = self._extract_with_pymupdf_safe(pdf_path, extract_tables, handle_cid) try: doc = pymupdf.open(pdf_path) # Validate document if not doc or doc.page_count == 0: raise Exception("Invalid or empty PDF document") self.logger.info(f"Processing {doc.page_count} pages") # Process pages in parallel with error handling page_results = [] try: with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Prepare page data with error handling page_data = [] for i in range(doc.page_count): try: page = doc[i] page_data.append((i, page)) except Exception as page_error: self.logger.error(f"Failed to access page {i+1}: {page_error}") # Create dummy page data for error handling page_data.append((i, None)) # Process pages if page_data: page_results = list(executor.map(extract_page_content_safe, page_data)) else: raise Exception("No accessible pages found") except Exception as parallel_error: self.logger.warning(f"Parallel processing failed, trying sequential: {parallel_error}") # Fallback to sequential processing for i in range(doc.page_count): try: page = doc[i] result = extract_page_content_safe((i, page)) page_results.append(result) except Exception as seq_error: self.logger.error(f"Sequential processing failed for page {i+1}: {seq_error}") page_results.append({ 'page_num': i + 1, 'content_blocks': [{ 'type': 'text', 'content': f"[Error: Could not process page {i+1}]", 'bbox': (0, 0, 0, 0) }] }) # Compile final text with proper order full_text = "" successful_pages = 0 for result in page_results: if not result or 'content_blocks' not in result: continue page_content = f"\n--- Page {result['page_num']} ---\n" page_has_content = False # Add content blocks in order for block in result['content_blocks']: if not block or 'content' not in block: continue content = block['content'] if not content or not content.strip(): continue if block.get('type') == 'text' and content.strip(): if not content.startswith('[Error') and not content.startswith('[No content') and not content.startswith('[Unable'): page_has_content = True page_content += content + "\n" elif block.get('type') == 'table': page_content += "\n" + content + "\n" page_has_content = True if page_has_content: successful_pages += 1 full_text += page_content doc.close() # Final cleanup full_text = self._clean_final_text(full_text) self.logger.info(f"โœ… Successfully processed {successful_pages}/{len(page_results)} pages, extracted {len(full_text)} characters") if successful_pages == 0: raise Exception("No content could be extracted from any page. This might be a scanned document requiring OCR.") return full_text except Exception as e: error_msg = f"Failed to extract text with PyMuPDF: {str(e)}" self.logger.error(error_msg) # Try pdfplumber as final fallback self.logger.info("Attempting pdfplumber fallback...") try: return self._extract_with_pdfplumber(pdf_path, extract_tables) except Exception as fallback_error: self.logger.error(f"Pdfplumber fallback also failed: {fallback_error}") raise Exception(f"{error_msg}. Fallback with pdfplumber also failed: {fallback_error}") def _extract_text_blocks_with_proper_spacing(self, page: pymupdf.Page) -> List[Dict]: """Extract text blocks with proper word spacing and line handling.""" text_blocks = [] try: # First, try to get text as dictionary with detailed formatting info text_dict = page.get_text("dict") except Exception as e: self.logger.warning(f"Failed to get text dict, falling back to simple extraction: {e}") # Fallback to simple text extraction try: simple_text = page.get_text() if simple_text.strip(): return [{ 'type': 'text', 'content': simple_text, 'bbox': page.rect }] else: # If no text, try OCR-like extraction return self._fallback_text_extraction(page) except Exception as fallback_error: self.logger.error(f"All text extraction methods failed: {fallback_error}") return self._fallback_text_extraction(page) # Check if we got valid text dictionary if not text_dict or "blocks" not in text_dict: self.logger.warning("Invalid text dictionary, using fallback") return self._fallback_text_extraction(page) current_line_y = None current_line_text = "" for block in text_dict.get("blocks", []): if "lines" not in block: # Skip non-text blocks (images, etc.) continue block_text = "" try: for line in block["lines"]: line_bbox = line.get("bbox", [0, 0, 0, 0]) line_y = line_bbox[1] if len(line_bbox) > 1 else 0 # Check if this is a new line (significant y-coordinate change) if current_line_y is not None and abs(line_y - current_line_y) > 3: # Process the completed line if current_line_text.strip(): processed_line = self._process_line_with_proper_spacing(current_line_text) block_text += processed_line + "\n" current_line_text = "" current_line_y = line_y # Extract text from spans with proper spacing line_text = "" prev_span_end = None for span in line.get("spans", []): span_text = span.get("text", "") span_bbox = span.get("bbox", [0, 0, 0, 0]) if not span_text: # Skip empty spans continue # Handle CID mapping issues if self._has_cid_issues(span_text): span_text = self._resolve_cid_text_advanced(span_text, span) # Add spacing between spans if there's a gap if prev_span_end is not None and len(span_bbox) > 2: gap = span_bbox[0] - prev_span_end if gap > 5: # Significant gap, add space line_text += " " line_text += span_text if len(span_bbox) > 2: prev_span_end = span_bbox[2] # Right edge of span current_line_text += line_text + " " except Exception as block_error: self.logger.warning(f"Error processing text block: {block_error}") continue # Process the last line of the block if current_line_text.strip(): processed_line = self._process_line_with_proper_spacing(current_line_text) block_text += processed_line + "\n" current_line_text = "" if block_text.strip(): text_blocks.append({ 'type': 'text', 'content': block_text, 'bbox': block.get("bbox", page.rect) }) # If no text blocks were extracted, try fallback if not text_blocks: return self._fallback_text_extraction(page) return text_blocks def _process_line_with_proper_spacing(self, line_text: str) -> str: """Process a line to ensure proper word spacing for Malayalam/complex scripts.""" # Remove excessive spaces and newlines within the line line_text = re.sub(r'\s+', ' ', line_text.strip()) # For Malayalam and similar scripts, ensure proper word boundaries # This is a basic implementation - you might need more sophisticated rules processed = "" words = line_text.split() for i, word in enumerate(words): # Clean individual words word = word.strip() if not word: continue # Add the word processed += word # Add space between words, but be careful with Malayalam conjuncts if i < len(words) - 1: next_word = words[i + 1].strip() if next_word and not self._is_conjunct_continuation(word, next_word): processed += " " return processed def _is_conjunct_continuation(self, current_word: str, next_word: str) -> bool: """Check if the next word is a continuation of a Malayalam conjunct.""" # Basic check for Malayalam conjuncts and joiners if not current_word or not next_word: return False # Check for Malayalam zero-width joiner or similar cases malayalam_range = range(0x0D00, 0x0D80) # If both words contain Malayalam characters and current word ends with certain characters current_has_malayalam = any(ord(c) in malayalam_range for c in current_word) next_has_malayalam = any(ord(c) in malayalam_range for c in next_word) if current_has_malayalam and next_has_malayalam: # Check for specific Malayalam joining patterns if current_word.endswith(('เต', 'เตโ€')) or next_word.startswith(('เต', 'เตโ€')): return True return False def _has_cid_issues(self, text: str) -> bool: """Check if text has CID mapping issues.""" return bool(re.search(r'\(cid:\d+\)|cid-\d+', text, re.IGNORECASE)) def _resolve_cid_text_advanced(self, text: str, span: Dict) -> str: """Advanced CID resolution with better character recovery.""" if not self._has_cid_issues(text): return text # Try to extract readable parts processed = text # Handle different CID patterns processed = re.sub(r'\(cid:\d+\)', '', processed, flags=re.IGNORECASE) processed = re.sub(r'cid-\d+', '', processed, flags=re.IGNORECASE) # Clean up multiple spaces processed = re.sub(r'\s+', ' ', processed).strip() return processed def _format_table_as_text(self, table_data: List[List], page_num: int, table_num: int) -> str: """Format table data as readable text.""" if not table_data: return "" # Convert to DataFrame for better formatting try: # Handle cases where first row might not be headers df = pd.DataFrame(table_data) # Clean the data df = df.fillna('') df = df.astype(str) # Format as text formatted = f"\n{'='*60}\n" formatted += f"TABLE {table_num} (Page {page_num})\n" formatted += f"{'='*60}\n" # Use pandas to_string for clean formatting formatted += df.to_string(index=False, header=False, max_colwidth=30) formatted += f"\n{'='*60}\n" return formatted except Exception as e: self.logger.warning(f"Failed to format table: {e}") return f"\n[TABLE {table_num} - Page {page_num}: Formatting Error]\n" def _extract_with_pdfplumber(self, pdf_path: Path, extract_tables: bool) -> str: """Extract using pdfplumber with inline table handling.""" def extract_page_content(page_data: Tuple[int, object]) -> str: page_num, page = page_data page_content = f"\n--- Page {page_num + 1} ---\n" try: # Extract main text text = page.extract_text() or "" if extract_tables: # Get table locations to insert them in correct positions tables = page.extract_tables() if tables: # For simplicity, append tables at the end of page text page_content += text + "\n" for i, table in enumerate(tables): if table: formatted_table = self._format_table_as_text(table, page_num + 1, i + 1) page_content += formatted_table + "\n" else: page_content += text + "\n" else: page_content += text + "\n" except Exception as e: self.logger.error(f"Error processing page {page_num + 1}: {e}") page_content += f"[Error extracting page {page_num + 1}]\n" return page_content try: with pdfplumber.open(pdf_path) as pdf: # Process pages in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: page_data = [(i, pdf.pages[i]) for i in range(len(pdf.pages))] page_results = list(executor.map(extract_page_content, page_data)) # Combine all pages full_text = "".join(page_results) full_text = self._clean_final_text(full_text) self.logger.info(f"โœ… Extracted {len(full_text)} characters") return full_text except Exception as e: raise Exception(f"Failed to extract text with pdfplumber: {str(e)}") def _clean_final_text(self, text: str) -> str: """Final cleanup of extracted text.""" # Remove excessive line breaks but preserve paragraph structure text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Max 2 consecutive newlines text = re.sub(r'[ \t]+', ' ', text) # Multiple spaces/tabs to single space text = re.sub(r'[ \t]*\n[ \t]*', '\n', text) # Clean spaces around newlines # Remove trailing whitespace from each line lines = text.split('\n') cleaned_lines = [line.rstrip() for line in lines] text = '\n'.join(cleaned_lines) return text.strip() def validate_extracted_text(self, text: str, min_chars: int = 50) -> Tuple[bool, Dict]: """ Validate extracted text with detailed feedback. Args: text: The extracted text to validate min_chars: Minimum number of alphabetic characters required Returns: Tuple of (is_valid, validation_details) """ details = { 'total_chars': len(text) if text else 0, 'alphabetic_chars': 0, 'has_content': bool(text and text.strip()), 'cid_issues': 0, 'validation_passed': False, 'line_breaks_ratio': 0 } if not text or not text.strip(): return False, details # Count different character types details['alphabetic_chars'] = sum(1 for char in text if char.isalpha()) # Count CID issues cid_matches = re.findall(r'\(cid:\d+\)|cid-\d+|\[\?\]', text, re.IGNORECASE) details['cid_issues'] = len(cid_matches) # Check line break ratio (to detect excessive line breaking) newline_count = text.count('\n') if details['total_chars'] > 0: details['line_breaks_ratio'] = newline_count / details['total_chars'] # Validation logic details['validation_passed'] = ( details['alphabetic_chars'] >= min_chars and details['line_breaks_ratio'] < 0.1 # Less than 10% line breaks ) return details['validation_passed'], details