Rs-backend-ml / src /Parse_resume.py
Harshilforworks's picture
Upload 35 files
6518a94 verified
"""
Module: parse_documents.py
Functionality: Hybrid PDF parsing using both PyMuPDF and pdfplumber for optimal table and text extraction.
Combines the strengths of both libraries while maintaining document flow order.
"""
import os
import re
import time
import numpy as np
from typing import List, Dict, Optional, Tuple, Any
import fitz # PyMuPDF for general text and positioning
# Try to import pdfplumber for enhanced table detection
try:
import pdfplumber
PDFPLUMBER_AVAILABLE = True
except ImportError:
pdfplumber = None
PDFPLUMBER_AVAILABLE = False
print("⚠️ pdfplumber not available. Install with: pip install pdfplumber for enhanced table detection")
# Try to import pandas for legacy compatibility
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
def _is_valid_table(cleaned_data: List[List[str]]) -> bool:
"""
Validate if detected table is actually a table structure or just formatted text.
Args:
cleaned_data: List of rows with cells
Returns:
True if it's a valid table, False otherwise
"""
if len(cleaned_data) < 3: # Need at least 3 rows (header + 2 data rows)
return False
# Check if rows have consistent column structure
row_lengths = [len(row) for row in cleaned_data]
if len(set(row_lengths)) > 2: # Too much variation in column count
return False
# Check if it has proper tabular data (not just a paragraph split into lines)
max_cols = max(row_lengths)
if max_cols < 2: # Single column is likely just text
return False
# Check for table-like characteristics
# 1. Multiple rows should have multiple non-empty cells
multi_cell_rows = 0
for row in cleaned_data:
non_empty_cells = sum(1 for cell in row if cell and cell.strip())
if non_empty_cells >= 2:
multi_cell_rows += 1
# If less than 50% of rows have multiple cells, it's probably not a table
if multi_cell_rows / len(cleaned_data) < 0.5:
return False
# 2. Check for list-like patterns (single long text in first column, others empty)
list_pattern_count = 0
for row in cleaned_data:
if len(row) >= 2:
first_cell = row[0].strip() if row[0] else ""
other_cells = [cell.strip() for cell in row[1:] if cell]
# If first cell is very long and others are mostly empty, it's likely a list
if len(first_cell) > 50 and len(other_cells) <= 1:
list_pattern_count += 1
# If more than 60% rows follow list pattern, reject as table
if list_pattern_count / len(cleaned_data) > 0.6:
return False
# 3. Check for paragraph-like content (very long text cells)
long_text_cells = 0
total_cells = 0
for row in cleaned_data:
for cell in row:
if cell and cell.strip():
total_cells += 1
if len(cell.strip()) > 100: # Very long text
long_text_cells += 1
# If too many cells have very long text, it's probably paragraphs
if total_cells > 0 and long_text_cells / total_cells > 0.7:
return False
return True
def extract_tables_with_pdfplumber(pdf_path: str) -> Dict[int, List[Dict]]:
"""
Extract tables using pdfplumber for superior table detection.
Args:
pdf_path: Path to the PDF file
Returns:
Dictionary mapping page numbers to list of tables with positioning
"""
page_tables = {}
# If pdfplumber isn't available, return empty tables (PyMuPDF-only fallback will be used)
if not PDFPLUMBER_AVAILABLE:
print("ℹ️ pdfplumber not available: skipping pdfplumber table extraction (fallback to PyMuPDF)")
return page_tables
try:
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
tables = []
# Extract tables with pdfplumber's superior detection - IMPROVED SETTINGS
detected_tables = page.find_tables(table_settings={
"vertical_strategy": "lines",
"horizontal_strategy": "lines",
"snap_tolerance": 3,
"join_tolerance": 3,
"edge_min_length": 10, # Increased from 3 to 10 for better line detection
"min_words_vertical": 4, # Increased from 2 to 4
"min_words_horizontal": 2 # Increased from 1 to 2
})
for table in detected_tables:
try:
# Extract table data
table_data = table.extract()
if table_data and len(table_data) >= 2: # At least header + 1 row
# Clean and process table data
cleaned_data = []
for row in table_data:
if row and any(cell and str(cell).strip() for cell in row if cell is not None):
cleaned_row = []
for cell in row:
cell_str = str(cell).strip() if cell is not None else ""
# Clean common PDF artifacts
cell_str = re.sub(r'\s+', ' ', cell_str)
cell_str = re.sub(r'[^\x20-\x7E\u00A0-\u024F\u1E00-\u1EFF]', '', cell_str)
cleaned_row.append(cell_str)
if any(cell for cell in cleaned_row): # Only add non-empty rows
cleaned_data.append(cleaned_row)
if len(cleaned_data) >= 2: # Ensure we have meaningful data
# VALIDATE IF THIS IS ACTUALLY A TABLE
if not _is_valid_table(cleaned_data):
continue # Skip this "table" as it's likely just formatted text
# Normalize column count
max_cols = max(len(row) for row in cleaned_data)
normalized_data = []
for row in cleaned_data:
while len(row) < max_cols:
row.append("")
normalized_data.append(row[:max_cols])
# Get table bbox for positioning
bbox = table.bbox if hasattr(table, 'bbox') else [0, 0, 0, 0]
# Create markdown table
table_markdown = _create_clean_markdown_table(normalized_data)
tables.append({
'content': table_markdown,
'type': 'table',
'bbox': bbox,
'y_position': bbox[1] if bbox else 0,
'rows': len(normalized_data),
'cols': max_cols,
'raw_data': normalized_data
})
except Exception as e:
print(f"⚠️ Table processing error on page {page_num + 1}: {e}")
continue
if tables:
page_tables[page_num] = tables
except Exception as e:
print(f"⚠️ pdfplumber table extraction error: {e}")
return page_tables
def _create_clean_markdown_table(data: List[List[str]]) -> str:
"""Create a clean, well-formatted markdown table without truncating content."""
if not data or len(data) < 2:
return ""
# Use first row as headers, rest as data
headers = data[0]
rows = data[1:]
# Clean headers and ensure they have meaningful names
clean_headers = []
for i, header in enumerate(headers):
header_str = str(header).strip() if header else ""
if not header_str:
header_str = f"Col_{i+1}"
clean_headers.append(header_str)
# Create formatted table WITHOUT width restrictions
table_lines = []
# Header row
header_cells = [str(header).strip() for header in clean_headers]
table_lines.append("| " + " | ".join(header_cells) + " |")
# Separator row
separator_cells = ["-" * max(3, len(str(header))) for header in clean_headers]
table_lines.append("| " + " | ".join(separator_cells) + " |")
# Data rows - NO TRUNCATION, preserve all content
for row in rows:
data_cells = []
for cell in row:
cell_str = str(cell).strip() if cell else ""
# Clean whitespace only, preserve all content
cell_str = re.sub(r'\s+', ' ', cell_str)
data_cells.append(cell_str)
table_lines.append("| " + " | ".join(data_cells) + " |")
return "\n".join(table_lines)
def extract_text_with_pymupdf(pdf_path: str, page_tables: Dict[int, List[Dict]]) -> Dict[int, List[Dict]]:
"""
Extract text blocks using PyMuPDF while avoiding table content areas.
Args:
pdf_path: Path to the PDF file
page_tables: Dictionary of tables per page for overlap detection
Returns:
Dictionary mapping page numbers to list of text blocks
"""
page_text_blocks = {}
try:
doc = fitz.open(pdf_path)
for page_num in range(len(doc)):
page = doc[page_num]
text_blocks = []
# Get tables for this page to avoid overlaps
tables_on_page = page_tables.get(page_num, [])
table_bboxes = [table['bbox'] for table in tables_on_page]
# Extract text blocks with positioning
blocks = page.get_text("dict")
for block_num, block in enumerate(blocks.get("blocks", [])):
if "lines" in block: # Text block
block_text = ""
font_sizes = []
is_bold = False
for line in block["lines"]:
line_text = ""
for span in line.get("spans", []):
text = span.get("text", "").strip()
if text:
line_text += text + " "
# Collect font information
font_size = span.get("size", 12)
font_flags = span.get("flags", 0)
font_sizes.append(font_size)
# Check if bold (bit 4 of flags)
if font_flags & (1 << 4):
is_bold = True
if line_text.strip():
block_text += line_text.strip() + " "
if block_text.strip():
bbox = block.get("bbox", [0, 0, 0, 0])
# Check if this text block overlaps with any table
overlaps_with_table = any(
_bbox_overlap(bbox, table_bbox, 0.5)
for table_bbox in table_bboxes
)
if not overlaps_with_table:
avg_font_size = np.mean(font_sizes) if font_sizes else 12
# Determine content type based on formatting
content_type = "text"
if is_bold and avg_font_size > 14:
content_type = "heading"
elif is_bold:
content_type = "subheading"
text_blocks.append({
'content': block_text.strip(),
'type': content_type,
'bbox': bbox,
'y_position': bbox[1],
'font_size': avg_font_size,
'is_bold': is_bold,
'block_number': block_num
})
if text_blocks:
page_text_blocks[page_num] = text_blocks
doc.close()
except Exception as e:
print(f"⚠️ PyMuPDF text extraction error: {e}")
return page_text_blocks
def _bbox_overlap(bbox1: List[float], bbox2: List[float], overlap_threshold: float = 0.5) -> bool:
"""Check if two bounding boxes overlap significantly."""
if not bbox1 or not bbox2 or len(bbox1) < 4 or len(bbox2) < 4:
return False
# Calculate intersection
x_overlap = max(0, min(bbox1[2], bbox2[2]) - max(bbox1[0], bbox2[0]))
y_overlap = max(0, min(bbox1[3], bbox2[3]) - max(bbox1[1], bbox2[1]))
if x_overlap <= 0 or y_overlap <= 0:
return False
intersection_area = x_overlap * y_overlap
# Calculate areas
area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
if area1 <= 0 or area2 <= 0:
return False
# Check if intersection is significant relative to the smaller box
smaller_area = min(area1, area2)
overlap_ratio = intersection_area / smaller_area
return overlap_ratio >= overlap_threshold
def merge_page_content(page_num: int, text_blocks: List[Dict], tables: List[Dict]) -> List[Dict]:
"""
Merge text blocks and tables for a page by their position to maintain document order.
Args:
page_num: Page number
text_blocks: List of text blocks for the page
tables: List of tables for the page
Returns:
List of content elements in document order
"""
all_content = []
# Add text blocks
for block in text_blocks:
all_content.append({
'content': block['content'],
'type': block['type'],
'y_position': block['y_position'],
'page': page_num + 1,
'bbox': block['bbox'],
'font_size': block.get('font_size', 12),
'is_bold': block.get('is_bold', False)
})
# Add tables
for table in tables:
all_content.append({
'content': table['content'],
'type': 'table',
'y_position': table['y_position'],
'page': page_num + 1,
'bbox': table['bbox'],
'rows': table['rows'],
'cols': table['cols']
})
# Sort by vertical position (top to bottom)
all_content.sort(key=lambda x: x['y_position'])
return all_content
def clean_text_content(text: str) -> str:
"""Clean and normalize text content."""
if not text:
return ""
# Remove excessive whitespace while preserving structure
text = re.sub(r'[ \t]+', ' ', text) # Multiple spaces/tabs to single space
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Multiple line breaks to double
# Clean common PDF artifacts
text = re.sub(r'[^\x20-\x7E\u00A0-\u024F\u1E00-\u1EFF\n\r\t]', '', text)
# Fix common character issues
text = text.replace('�', '') # Remove replacement characters
return text.strip()
def parse_document_hybrid(pdf_path: str, save_parsed_text: bool = False) -> dict:
"""
Hybrid PDF parsing using both PyMuPDF and pdfplumber for optimal results.
Args:
pdf_path: Path to the PDF file
Returns:
Dictionary containing parsed content with tables and text in document order
"""
try:
doc_name = os.path.basename(pdf_path)
print(f"🚀 Starting hybrid parsing for {doc_name}...")
print(f"📊 Using pdfplumber for tables + PyMuPDF for text")
start_time = time.time()
# Step 1: Extract tables using pdfplumber (superior table detection)
print("🔍 Extracting tables with pdfplumber...")
page_tables = extract_tables_with_pdfplumber(pdf_path)
total_tables = sum(len(tables) for tables in page_tables.values())
print(f"✅ Found {total_tables} tables across {len(page_tables)} pages")
# Step 2: Extract text using PyMuPDF (avoiding table areas)
print("📝 Extracting text with PyMuPDF...")
page_text_blocks = extract_text_with_pymupdf(pdf_path, page_tables)
total_text_blocks = sum(len(blocks) for blocks in page_text_blocks.values())
print(f"✅ Found {total_text_blocks} text blocks")
# Step 3: Merge content by page and position
print("🔄 Merging content in document order...")
all_content = []
# Get total page count
with fitz.open(pdf_path) as doc:
total_pages = len(doc)
for page_num in range(total_pages):
text_blocks = page_text_blocks.get(page_num, [])
tables = page_tables.get(page_num, [])
page_content = merge_page_content(page_num, text_blocks, tables)
all_content.extend(page_content)
# Step 4: Generate final output
final_text = ""
for item in all_content:
content = item['content'].strip()
if item['type'] == 'table':
# Format tables with clear separation
final_text += "\n" + "="*60 + "\n"
final_text += "TABLE:\n"
final_text += "="*60 + "\n\n"
final_text += content + "\n\n"
final_text += "="*60 + "\n\n"
elif item['type'] == 'heading':
# Format headings with separation
final_text += "\n" + content + "\n"
final_text += "-" * min(len(content), 60) + "\n\n"
else:
# Add regular text content
cleaned_content = clean_text_content(content)
if cleaned_content:
final_text += cleaned_content + "\n\n"
processing_time = time.time() - start_time
result = {
'document_name': doc_name,
'content': final_text.strip(),
'total_pages': total_pages,
'parsing_method': 'hybrid_pymupdf_pdfplumber',
'processing_time': processing_time,
'metadata': {
'total_elements': len(all_content),
'text_elements': total_text_blocks,
'table_elements': total_tables,
'pages_processed': total_pages,
'characters_extracted': len(final_text)
}
}
print(f"✅ Hybrid parsing complete!")
print(f"📊 Results: {total_tables} tables, {total_text_blocks} text blocks")
print(f"📄 Total: {len(final_text):,} characters in {processing_time:.2f}s")
# Save parsed content if requested
if save_parsed_text:
save_parsed_content_to_file(result, pdf_path)
return result
except Exception as e:
print(f"❌ Hybrid parsing error: {e}")
return {
'document_name': os.path.basename(pdf_path),
'content': "",
'total_pages': 0,
'parsing_method': 'hybrid_error',
'processing_time': 0,
'metadata': {
'total_elements': 0,
'text_elements': 0,
'table_elements': 0,
'pages_processed': 0,
'characters_extracted': 0,
'error': str(e)
}
}
def save_parsed_content_to_file(result: dict, original_pdf_path: str) -> str:
"""
Save the hybrid parsing result to a single clean output file.
Args:
result: Parsing result dictionary
original_pdf_path: Path to original PDF file
Returns:
Path to the saved output file
"""
try:
# Create output directory
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
# Create filename
base_name = os.path.splitext(os.path.basename(original_pdf_path))[0]
output_file = os.path.join(output_dir, f"{base_name}_parsed.txt")
with open(output_file, 'w', encoding='utf-8') as f:
# Write header
f.write(f"HYBRID PDF PARSING OUTPUT\n")
f.write(f"="*50 + "\n")
f.write(f"Document: {result['document_name']}\n")
f.write(f"Method: {result['parsing_method']}\n")
f.write(f"Pages: {result['total_pages']}\n")
f.write(f"Tables: {result['metadata']['table_elements']}\n")
f.write(f"Text Blocks: {result['metadata']['text_elements']}\n")
f.write(f"Processing Time: {result['processing_time']:.2f}s\n")
f.write(f"="*50 + "\n\n")
# Write content
f.write(result['content'])
print(f"💾 Output saved to: {output_file}")
return output_file
except Exception as e:
print(f"⚠️ Error saving hybrid output: {e}")
return ""
# Legacy function names for backward compatibility
def parse_document_enhanced_tables(pdf_path: str, save_parsed_text: bool = False) -> dict:
"""Legacy function name - redirects to hybrid parsing."""
return parse_document_hybrid(pdf_path, save_parsed_text)
def parse_document_with_tables(pdf_path: str, save_parsed_text: bool = False) -> dict:
"""Legacy function name - redirects to hybrid parsing."""
return parse_document_hybrid(pdf_path, save_parsed_text)
def detect_table_structures(page, min_rows=2, min_cols=2) -> List[Dict]:
"""Legacy function for backward compatibility - uses PyMuPDF fallback method."""
tables = []
try:
# Use PyMuPDF's find_tables method with enhanced settings
if hasattr(page, 'find_tables'):
found_tables = page.find_tables(
vertical_strategy="lines_strict",
horizontal_strategy="lines_strict",
snap_tolerance=3.0,
join_tolerance=3.0,
edge_min_length=3.0,
min_words_vertical=3,
min_words_horizontal=1
)
for table in found_tables:
try:
# Extract table data
table_data = table.extract()
bbox = table.bbox if hasattr(table, 'bbox') else [0, 0, 0, 0]
if table_data and len(table_data) >= min_rows:
# Filter out empty rows and columns
filtered_data = []
for row in table_data:
if row and any(cell and str(cell).strip() for cell in row):
cleaned_row = []
for cell in row:
cell_str = str(cell).strip() if cell else ""
# Clean common PDF artifacts
cell_str = re.sub(r'\s+', ' ', cell_str)
cell_str = re.sub(r'[^\x20-\x7E\u00A0-\u024F\u1E00-\u1EFF]', '', cell_str)
cleaned_row.append(cell_str)
filtered_data.append(cleaned_row)
if len(filtered_data) >= min_rows and len(filtered_data[0]) >= min_cols:
# Create table with proper headers
try:
# Ensure all rows have the same number of columns
max_cols = max(len(row) for row in filtered_data)
normalized_data = []
for row in filtered_data:
while len(row) < max_cols:
row.append("")
normalized_data.append(row[:max_cols])
if len(normalized_data) >= min_rows and max_cols >= min_cols:
# Create markdown table
table_markdown = _create_clean_markdown_table(normalized_data)
tables.append({
'content': table_markdown,
'type': 'table',
'rows': len(normalized_data),
'cols': max_cols,
'bbox': bbox,
'y_position': bbox[1] if bbox else 0,
'raw_data': normalized_data
})
except Exception as e:
print(f"⚠️ Table processing error: {e}")
continue
except Exception as e:
print(f"⚠️ Table extraction error: {e}")
continue
except Exception as e:
print(f"⚠️ Table detection error: {e}")
return tables
if __name__ == "__main__":
# Test the hybrid parser
pdf_path = "docs/policy.pdf"
if os.path.exists(pdf_path):
result = parse_document_hybrid(pdf_path, save_parsed_text=True)
print(f"✅ Parsing complete: {result['metadata']['characters_extracted']:,} characters")
else:
print(f"❌ PDF file not found: {pdf_path}")