project / src /pdf_parser.py
dnj0's picture
Upload 4 files
8099442 verified
import os
import json
from pathlib import Path
from typing import Dict, List, Tuple
import pdfplumber
import fitz # PyMuPDF
from PIL import Image
import io
class PDFParser:
"""Parse PDF documents and extract text, tables, and images."""
def __init__(self, extraction_dir: str = "./pdf_extractions"):
self.extraction_dir = extraction_dir
self.state_file = os.path.join(extraction_dir, "processing_state.json")
os.makedirs(extraction_dir, exist_ok=True)
self.processed_files = self._load_processing_state()
def _load_processing_state(self) -> Dict:
"""Load state of already processed files to avoid re-processing."""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Warning: Could not load processing state: {e}")
return {}
return {}
def _save_processing_state(self):
"""Save processing state to disk."""
try:
with open(self.state_file, 'w') as f:
json.dump(self.processed_files, f, indent=2)
except Exception as e:
print(f"Warning: Could not save processing state: {e}")
def _get_file_hash(self, pdf_path: str) -> str:
"""Generate a simple hash for the file (file size + modification time)."""
try:
stat = os.stat(pdf_path)
return f"{stat.st_size}_{stat.st_mtime}"
except Exception as e:
print(f"Error getting file hash: {e}")
return "unknown"
def extract_text_with_pdfplumber(self, pdf_path: str, max_chars: int = 1000000) -> str:
"""Extract text from PDF using pdfplumber (handles complex layouts)."""
text = ""
char_count = 0
try:
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
if char_count >= max_chars:
print(f"Text extraction reached maximum chars limit ({max_chars})")
break
try:
page_text = page.extract_text()
if page_text:
# Limit per-page text to avoid token explosion
page_text = page_text[:50000]
text += f"\n--- Page {page_num} ---\n{page_text}"
char_count += len(page_text)
except Exception as e:
print(f"Error extracting text from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error opening PDF with pdfplumber: {e}")
return text
def extract_tables_from_pdf(self, pdf_path: str, max_tables: int = 50) -> List[Tuple[int, str]]:
"""Extract tables from PDF and return as formatted text."""
tables = []
table_count = 0
try:
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
if table_count >= max_tables:
print(f"Table extraction reached maximum tables limit ({max_tables})")
break
try:
page_tables = page.extract_tables()
if page_tables:
for table_idx, table in enumerate(page_tables):
# Convert table to text format
table_text = f"TABLE on page {page_num}:\n"
for row in table:
row_str = " | ".join([str(cell) if cell else "" for cell in row])
# Limit row length
if len(row_str) > 1000:
row_str = row_str[:1000] + "..."
table_text += row_str + "\n"
tables.append((page_num, table_text))
table_count += 1
except Exception as e:
print(f"Error extracting tables from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error opening PDF for table extraction: {e}")
return tables
def extract_images_from_pdf(self, pdf_path: str, output_dir: str = None, max_images: int = 100) -> List[Tuple[int, str]]:
"""
Extract images from PDF using PyMuPDF.
FIXED: Properly handles xref tuples from get_images()
"""
if output_dir is None:
output_dir = os.path.join(self.extraction_dir, "images")
os.makedirs(output_dir, exist_ok=True)
images = []
image_count = 0
try:
pdf_name = Path(pdf_path).stem
pdf_file = fitz.open(pdf_path)
for page_num in range(len(pdf_file)):
if image_count >= max_images:
print(f"Image extraction reached maximum images limit ({max_images})")
break
try:
page = pdf_file[page_num]
pix_list = page.get_images()
for image_idx, img_info in enumerate(pix_list):
if image_count >= max_images:
break
try:
# FIXED: Extract xref from tuple properly
# get_images() returns tuples: (xref, smask, width, height, ...)
xref = img_info[0] # Get xref as integer
# Extract image
base_image = pdf_file.extract_image(xref)
if base_image and "image" in base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
image_name = f"{pdf_name}_page{page_num+1}_img{image_idx}.{image_ext}"
image_path = os.path.join(output_dir, image_name)
with open(image_path, "wb") as f:
f.write(image_bytes)
images.append((page_num + 1, image_path))
image_count += 1
except TypeError as e:
# Handle comparison errors with tuple
print(f"Error with image data type on page {page_num}, image {image_idx}: {e}")
continue
except Exception as e:
print(f"Error extracting image {image_idx} from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error processing page {page_num}: {e}")
continue
pdf_file.close()
except Exception as e:
print(f"Error opening PDF for image extraction: {e}")
return images
def process_pdf(self, pdf_path: str) -> Dict:
"""Process entire PDF and extract all content."""
file_hash = self._get_file_hash(pdf_path)
# Check if already processed
if pdf_path in self.processed_files and self.processed_files[pdf_path] == file_hash:
print(f"File {pdf_path} already processed. Loading cached results.")
return self._load_cached_results(pdf_path)
print(f"Processing PDF: {pdf_path}")
result = {
"pdf_path": pdf_path,
"filename": Path(pdf_path).name,
"text": self.extract_text_with_pdfplumber(pdf_path, max_chars=1000000),
"tables": self.extract_tables_from_pdf(pdf_path, max_tables=50),
"images": self.extract_images_from_pdf(pdf_path, max_images=100)
}
# Save results to cache
self._save_cached_results(pdf_path, result)
# Update processing state
self.processed_files[pdf_path] = file_hash
self._save_processing_state()
return result
def _save_cached_results(self, pdf_path: str, result: Dict):
"""Save extraction results to a JSON file."""
safe_name = Path(pdf_path).stem
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json")
# Don't save image paths in cache, just metadata
cache_data = {
"pdf_path": result["pdf_path"],
"filename": result["filename"],
"text": result["text"],
"tables": result["tables"],
"image_count": len(result["images"])
}
try:
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Warning: Could not save cache: {e}")
def _load_cached_results(self, pdf_path: str) -> Dict:
"""Load cached extraction results."""
safe_name = Path(pdf_path).stem
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json")
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading cache: {e}")
return {"text": "", "tables": [], "images": []}
def process_pdf_directory(self, pdf_dir: str) -> List[Dict]:
"""Process all PDFs in a directory."""
results = []
pdf_files = list(Path(pdf_dir).glob("*.pdf"))
if not pdf_files:
print(f"No PDF files found in {pdf_dir}")
return results
print(f"Found {len(pdf_files)} PDF files to process")
for idx, pdf_file in enumerate(pdf_files, 1):
try:
print(f"Processing {idx}/{len(pdf_files)}: {pdf_file.name}")
result = self.process_pdf(str(pdf_file))
results.append(result)
except Exception as e:
print(f"Error processing {pdf_file}: {e}")
continue
print(f"Completed processing {len(results)} PDFs")
return results