Rahul-Samedavar's picture
oneshots hallucinations
2bf325f
"""
Enhanced Text Extractor Module for FastAPI
Handles extracting text content from PDF files with improved performance,
table extraction, and proper CID font handling for scripts like Malayalam.
"""
import pdfplumber
import pymupdf # PyMuPDF (fitz)
import pandas as pd
from typing import Dict, List, Tuple, Optional, Union
import concurrent.futures
from pathlib import Path
import logging
import re
import unicodedata
class TextExtractor:
"""FastAPI-compatible text extractor with enhanced capabilities."""
def __init__(self, max_workers: int = 4, backend: str = "auto"):
"""
Initialize the enhanced text extractor.
Args:
max_workers: Number of worker threads for parallel processing
backend: Extraction backend ('pdfplumber', 'pymupdf', 'auto')
"""
self.max_workers = max_workers
self.backend = backend
self.logger = logging.getLogger(__name__)
self.cache = {}
def extract_text_from_pdf(self, pdf_path: str, extract_tables: bool = True,
handle_cid: bool = True) -> str:
"""
Extract text from PDF file with tables in correct order.
Args:
pdf_path: Path to the PDF file
extract_tables: Whether to extract and format tables inline
handle_cid: Whether to handle CID font mapping issues
Returns:
str: Complete text content with tables in correct order
Raises:
Exception: If text extraction fails
"""
cached_data = self.cache.get(pdf_path)
if cached_data:
print("Using Cache: Skipped pdf extraction")
return cached_data
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
self.logger.info(f"📖 Extracting text from PDF: {pdf_path.name}")
# Choose extraction method based on backend
if self.backend == "pymupdf" or (self.backend == "auto" and handle_cid):
text = self._extract_with_pymupdf(pdf_path, extract_tables, handle_cid)
else:
text = self._extract_with_pdfplumber(pdf_path, extract_tables)
self.cache[pdf_path] = text
return text
def _fallback_text_extraction(self, page: pymupdf.Page) -> List[Dict]:
"""Fallback text extraction for problematic pages."""
try:
# Method 1: Try simple text extraction
simple_text = page.get_text()
if simple_text and simple_text.strip():
self.logger.info("Using simple text extraction fallback")
return [{
'type': 'text',
'content': simple_text,
'bbox': page.rect
}]
except:
pass
try:
# Method 2: Try text extraction with different options
text_options = ["text", "blocks", "words"]
for option in text_options:
try:
extracted = page.get_text(option)
if extracted:
if isinstance(extracted, list):
# For blocks/words, join them
text_content = ""
for item in extracted:
if isinstance(item, tuple) and len(item) > 4:
text_content += str(item[4]) + " " # text part
elif isinstance(item, str):
text_content += item + " "
if text_content.strip():
self.logger.info(f"Using {option} extraction fallback")
return [{
'type': 'text',
'content': text_content.strip(),
'bbox': page.rect
}]
elif isinstance(extracted, str) and extracted.strip():
self.logger.info(f"Using {option} extraction fallback")
return [{
'type': 'text',
'content': extracted,
'bbox': page.rect
}]
except Exception as e:
self.logger.debug(f"Failed {option} extraction: {e}")
continue
except:
pass
try:
# Method 3: Check if page has images (might be scanned document)
image_list = page.get_images()
if image_list:
self.logger.warning("Page appears to contain images - might be scanned document")
return [{
'type': 'text',
'content': f"[Page contains {len(image_list)} image(s) - possible scanned document. Consider OCR processing.]",
'bbox': page.rect
}]
except:
pass
# Method 4: Last resort - return empty with warning
self.logger.warning("All extraction methods failed, returning empty content")
return [{
'type': 'text',
'content': "[Unable to extract text from this page - may be image-based or corrupted]",
'bbox': page.rect
}]
def _extract_with_pymupdf_safe(self, pdf_path: Path, extract_tables: bool,
handle_cid: bool) -> str:
"""Safe PyMuPDF extraction with comprehensive error handling."""
def extract_page_content_safe(page_data: Tuple[int, pymupdf.Page]) -> Dict:
page_num, page = page_data
result = {
'page_num': page_num + 1,
'content_blocks': []
}
try:
# Check if page is valid
if not hasattr(page, 'rect'):
raise Exception("Invalid page object")
# Try to get basic page info first
page_rect = page.rect
if not page_rect or page_rect.is_empty:
raise Exception("Empty page")
# Extract text with error handling
text_blocks = []
table_blocks = []
# Text extraction with fallbacks
if handle_cid:
try:
text_blocks = self._extract_text_blocks_with_proper_spacing(page)
except Exception as text_error:
self.logger.warning(f"Advanced text extraction failed on page {page_num + 1}: {text_error}")
text_blocks = self._fallback_text_extraction(page)
else:
try:
text = page.get_text()
if text and text.strip():
text_blocks.append({
'type': 'text',
'content': text,
'bbox': page.rect
})
else:
text_blocks = self._fallback_text_extraction(page)
except Exception as simple_error:
self.logger.warning(f"Simple text extraction failed on page {page_num + 1}: {simple_error}")
text_blocks = self._fallback_text_extraction(page)
# Table extraction with error handling
if extract_tables:
try:
tables = page.find_tables()
for i, table in enumerate(tables):
try:
table_data = table.extract()
if table_data and any(any(cell for cell in row if cell) for row in table_data):
formatted_table = self._format_table_as_text(table_data, page_num + 1, i + 1)
table_blocks.append({
'type': 'table',
'content': formatted_table,
'bbox': table.bbox
})
except Exception as table_error:
self.logger.warning(f"Failed to extract table {i+1} on page {page_num + 1}: {table_error}")
except Exception as tables_error:
self.logger.warning(f"Table detection failed on page {page_num + 1}: {tables_error}")
# Combine and sort blocks by position
all_blocks = text_blocks + table_blocks
if all_blocks:
# Sort by y-coordinate (top to bottom), handle missing bbox gracefully
try:
all_blocks.sort(key=lambda x: x.get('bbox', [0, 0, 0, 0])[1])
except:
# If sorting fails, keep original order
pass
result['content_blocks'] = all_blocks if all_blocks else [{
'type': 'text',
'content': f"[No content extracted from page {page_num + 1}]",
'bbox': (0, 0, 0, 0)
}]
except Exception as e:
self.logger.error(f"Critical error processing page {page_num + 1}: {e}")
result['content_blocks'] = [{
'type': 'text',
'content': f"[Critical error on page {page_num + 1}: {str(e)}]",
'bbox': (0, 0, 0, 0)
}]
return result
return extract_page_content_safe
def _extract_with_pymupdf(self, pdf_path: Path, extract_tables: bool,
handle_cid: bool) -> str:
"""Extract using PyMuPDF with comprehensive error handling."""
# Get the safe extraction function
extract_page_content_safe = self._extract_with_pymupdf_safe(pdf_path, extract_tables, handle_cid)
try:
doc = pymupdf.open(pdf_path)
# Validate document
if not doc or doc.page_count == 0:
raise Exception("Invalid or empty PDF document")
self.logger.info(f"Processing {doc.page_count} pages")
# Process pages in parallel with error handling
page_results = []
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Prepare page data with error handling
page_data = []
for i in range(doc.page_count):
try:
page = doc[i]
page_data.append((i, page))
except Exception as page_error:
self.logger.error(f"Failed to access page {i+1}: {page_error}")
# Create dummy page data for error handling
page_data.append((i, None))
# Process pages
if page_data:
page_results = list(executor.map(extract_page_content_safe, page_data))
else:
raise Exception("No accessible pages found")
except Exception as parallel_error:
self.logger.warning(f"Parallel processing failed, trying sequential: {parallel_error}")
# Fallback to sequential processing
for i in range(doc.page_count):
try:
page = doc[i]
result = extract_page_content_safe((i, page))
page_results.append(result)
except Exception as seq_error:
self.logger.error(f"Sequential processing failed for page {i+1}: {seq_error}")
page_results.append({
'page_num': i + 1,
'content_blocks': [{
'type': 'text',
'content': f"[Error: Could not process page {i+1}]",
'bbox': (0, 0, 0, 0)
}]
})
# Compile final text with proper order
full_text = ""
successful_pages = 0
for result in page_results:
if not result or 'content_blocks' not in result:
continue
page_content = f"\n--- Page {result['page_num']} ---\n"
page_has_content = False
# Add content blocks in order
for block in result['content_blocks']:
if not block or 'content' not in block:
continue
content = block['content']
if not content or not content.strip():
continue
if block.get('type') == 'text' and content.strip():
if not content.startswith('[Error') and not content.startswith('[No content') and not content.startswith('[Unable'):
page_has_content = True
page_content += content + "\n"
elif block.get('type') == 'table':
page_content += "\n" + content + "\n"
page_has_content = True
if page_has_content:
successful_pages += 1
full_text += page_content
doc.close()
# Final cleanup
full_text = self._clean_final_text(full_text)
self.logger.info(f"✅ Successfully processed {successful_pages}/{len(page_results)} pages, extracted {len(full_text)} characters")
if successful_pages == 0:
raise Exception("No content could be extracted from any page. This might be a scanned document requiring OCR.")
return full_text
except Exception as e:
error_msg = f"Failed to extract text with PyMuPDF: {str(e)}"
self.logger.error(error_msg)
# Try pdfplumber as final fallback
self.logger.info("Attempting pdfplumber fallback...")
try:
return self._extract_with_pdfplumber(pdf_path, extract_tables)
except Exception as fallback_error:
self.logger.error(f"Pdfplumber fallback also failed: {fallback_error}")
raise Exception(f"{error_msg}. Fallback with pdfplumber also failed: {fallback_error}")
def _extract_text_blocks_with_proper_spacing(self, page: pymupdf.Page) -> List[Dict]:
"""Extract text blocks with proper word spacing and line handling."""
text_blocks = []
try:
# First, try to get text as dictionary with detailed formatting info
text_dict = page.get_text("dict")
except Exception as e:
self.logger.warning(f"Failed to get text dict, falling back to simple extraction: {e}")
# Fallback to simple text extraction
try:
simple_text = page.get_text()
if simple_text.strip():
return [{
'type': 'text',
'content': simple_text,
'bbox': page.rect
}]
else:
# If no text, try OCR-like extraction
return self._fallback_text_extraction(page)
except Exception as fallback_error:
self.logger.error(f"All text extraction methods failed: {fallback_error}")
return self._fallback_text_extraction(page)
# Check if we got valid text dictionary
if not text_dict or "blocks" not in text_dict:
self.logger.warning("Invalid text dictionary, using fallback")
return self._fallback_text_extraction(page)
current_line_y = None
current_line_text = ""
for block in text_dict.get("blocks", []):
if "lines" not in block: # Skip non-text blocks (images, etc.)
continue
block_text = ""
try:
for line in block["lines"]:
line_bbox = line.get("bbox", [0, 0, 0, 0])
line_y = line_bbox[1] if len(line_bbox) > 1 else 0
# Check if this is a new line (significant y-coordinate change)
if current_line_y is not None and abs(line_y - current_line_y) > 3:
# Process the completed line
if current_line_text.strip():
processed_line = self._process_line_with_proper_spacing(current_line_text)
block_text += processed_line + "\n"
current_line_text = ""
current_line_y = line_y
# Extract text from spans with proper spacing
line_text = ""
prev_span_end = None
for span in line.get("spans", []):
span_text = span.get("text", "")
span_bbox = span.get("bbox", [0, 0, 0, 0])
if not span_text: # Skip empty spans
continue
# Handle CID mapping issues
if self._has_cid_issues(span_text):
span_text = self._resolve_cid_text_advanced(span_text, span)
# Add spacing between spans if there's a gap
if prev_span_end is not None and len(span_bbox) > 2:
gap = span_bbox[0] - prev_span_end
if gap > 5: # Significant gap, add space
line_text += " "
line_text += span_text
if len(span_bbox) > 2:
prev_span_end = span_bbox[2] # Right edge of span
current_line_text += line_text + " "
except Exception as block_error:
self.logger.warning(f"Error processing text block: {block_error}")
continue
# Process the last line of the block
if current_line_text.strip():
processed_line = self._process_line_with_proper_spacing(current_line_text)
block_text += processed_line + "\n"
current_line_text = ""
if block_text.strip():
text_blocks.append({
'type': 'text',
'content': block_text,
'bbox': block.get("bbox", page.rect)
})
# If no text blocks were extracted, try fallback
if not text_blocks:
return self._fallback_text_extraction(page)
return text_blocks
def _process_line_with_proper_spacing(self, line_text: str) -> str:
"""Process a line to ensure proper word spacing for Malayalam/complex scripts."""
# Remove excessive spaces and newlines within the line
line_text = re.sub(r'\s+', ' ', line_text.strip())
# For Malayalam and similar scripts, ensure proper word boundaries
# This is a basic implementation - you might need more sophisticated rules
processed = ""
words = line_text.split()
for i, word in enumerate(words):
# Clean individual words
word = word.strip()
if not word:
continue
# Add the word
processed += word
# Add space between words, but be careful with Malayalam conjuncts
if i < len(words) - 1:
next_word = words[i + 1].strip()
if next_word and not self._is_conjunct_continuation(word, next_word):
processed += " "
return processed
def _is_conjunct_continuation(self, current_word: str, next_word: str) -> bool:
"""Check if the next word is a continuation of a Malayalam conjunct."""
# Basic check for Malayalam conjuncts and joiners
if not current_word or not next_word:
return False
# Check for Malayalam zero-width joiner or similar cases
malayalam_range = range(0x0D00, 0x0D80)
# If both words contain Malayalam characters and current word ends with certain characters
current_has_malayalam = any(ord(c) in malayalam_range for c in current_word)
next_has_malayalam = any(ord(c) in malayalam_range for c in next_word)
if current_has_malayalam and next_has_malayalam:
# Check for specific Malayalam joining patterns
if current_word.endswith(('്', '്‍')) or next_word.startswith(('്', '്‍')):
return True
return False
def _has_cid_issues(self, text: str) -> bool:
"""Check if text has CID mapping issues."""
return bool(re.search(r'\(cid:\d+\)|cid-\d+', text, re.IGNORECASE))
def _resolve_cid_text_advanced(self, text: str, span: Dict) -> str:
"""Advanced CID resolution with better character recovery."""
if not self._has_cid_issues(text):
return text
# Try to extract readable parts
processed = text
# Handle different CID patterns
processed = re.sub(r'\(cid:\d+\)', '', processed, flags=re.IGNORECASE)
processed = re.sub(r'cid-\d+', '', processed, flags=re.IGNORECASE)
# Clean up multiple spaces
processed = re.sub(r'\s+', ' ', processed).strip()
return processed
def _format_table_as_text(self, table_data: List[List], page_num: int, table_num: int) -> str:
"""Format table data as readable text."""
if not table_data:
return ""
# Convert to DataFrame for better formatting
try:
# Handle cases where first row might not be headers
df = pd.DataFrame(table_data)
# Clean the data
df = df.fillna('')
df = df.astype(str)
# Format as text
formatted = f"\n{'='*60}\n"
formatted += f"TABLE {table_num} (Page {page_num})\n"
formatted += f"{'='*60}\n"
# Use pandas to_string for clean formatting
formatted += df.to_string(index=False, header=False, max_colwidth=30)
formatted += f"\n{'='*60}\n"
return formatted
except Exception as e:
self.logger.warning(f"Failed to format table: {e}")
return f"\n[TABLE {table_num} - Page {page_num}: Formatting Error]\n"
def _extract_with_pdfplumber(self, pdf_path: Path, extract_tables: bool) -> str:
"""Extract using pdfplumber with inline table handling."""
def extract_page_content(page_data: Tuple[int, object]) -> str:
page_num, page = page_data
page_content = f"\n--- Page {page_num + 1} ---\n"
try:
# Extract main text
text = page.extract_text() or ""
if extract_tables:
# Get table locations to insert them in correct positions
tables = page.extract_tables()
if tables:
# For simplicity, append tables at the end of page text
page_content += text + "\n"
for i, table in enumerate(tables):
if table:
formatted_table = self._format_table_as_text(table, page_num + 1, i + 1)
page_content += formatted_table + "\n"
else:
page_content += text + "\n"
else:
page_content += text + "\n"
except Exception as e:
self.logger.error(f"Error processing page {page_num + 1}: {e}")
page_content += f"[Error extracting page {page_num + 1}]\n"
return page_content
try:
with pdfplumber.open(pdf_path) as pdf:
# Process pages in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
page_data = [(i, pdf.pages[i]) for i in range(len(pdf.pages))]
page_results = list(executor.map(extract_page_content, page_data))
# Combine all pages
full_text = "".join(page_results)
full_text = self._clean_final_text(full_text)
self.logger.info(f"✅ Extracted {len(full_text)} characters")
return full_text
except Exception as e:
raise Exception(f"Failed to extract text with pdfplumber: {str(e)}")
def _clean_final_text(self, text: str) -> str:
"""Final cleanup of extracted text."""
# Remove excessive line breaks but preserve paragraph structure
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Max 2 consecutive newlines
text = re.sub(r'[ \t]+', ' ', text) # Multiple spaces/tabs to single space
text = re.sub(r'[ \t]*\n[ \t]*', '\n', text) # Clean spaces around newlines
# Remove trailing whitespace from each line
lines = text.split('\n')
cleaned_lines = [line.rstrip() for line in lines]
text = '\n'.join(cleaned_lines)
return text.strip()
def validate_extracted_text(self, text: str, min_chars: int = 50) -> Tuple[bool, Dict]:
"""
Validate extracted text with detailed feedback.
Args:
text: The extracted text to validate
min_chars: Minimum number of alphabetic characters required
Returns:
Tuple of (is_valid, validation_details)
"""
details = {
'total_chars': len(text) if text else 0,
'alphabetic_chars': 0,
'has_content': bool(text and text.strip()),
'cid_issues': 0,
'validation_passed': False,
'line_breaks_ratio': 0
}
if not text or not text.strip():
return False, details
# Count different character types
details['alphabetic_chars'] = sum(1 for char in text if char.isalpha())
# Count CID issues
cid_matches = re.findall(r'\(cid:\d+\)|cid-\d+|\[\?\]', text, re.IGNORECASE)
details['cid_issues'] = len(cid_matches)
# Check line break ratio (to detect excessive line breaking)
newline_count = text.count('\n')
if details['total_chars'] > 0:
details['line_breaks_ratio'] = newline_count / details['total_chars']
# Validation logic
details['validation_passed'] = (
details['alphabetic_chars'] >= min_chars and
details['line_breaks_ratio'] < 0.1 # Less than 10% line breaks
)
return details['validation_passed'], details