|
|
import os |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple |
|
|
import pdfplumber |
|
|
import fitz |
|
|
from PIL import Image |
|
|
import io |
|
|
|
|
|
|
|
|
class PDFParser: |
|
|
"""Parse PDF documents and extract text, tables, and images.""" |
|
|
|
|
|
def __init__(self, extraction_dir: str = "./pdf_extractions"): |
|
|
self.extraction_dir = extraction_dir |
|
|
self.state_file = os.path.join(extraction_dir, "processing_state.json") |
|
|
os.makedirs(extraction_dir, exist_ok=True) |
|
|
self.processed_files = self._load_processing_state() |
|
|
|
|
|
def _load_processing_state(self) -> Dict: |
|
|
"""Load state of already processed files to avoid re-processing.""" |
|
|
if os.path.exists(self.state_file): |
|
|
try: |
|
|
with open(self.state_file, 'r') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not load processing state: {e}") |
|
|
return {} |
|
|
return {} |
|
|
|
|
|
def _save_processing_state(self): |
|
|
"""Save processing state to disk.""" |
|
|
try: |
|
|
with open(self.state_file, 'w') as f: |
|
|
json.dump(self.processed_files, f, indent=2) |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not save processing state: {e}") |
|
|
|
|
|
def _get_file_hash(self, pdf_path: str) -> str: |
|
|
"""Generate a simple hash for the file (file size + modification time).""" |
|
|
try: |
|
|
stat = os.stat(pdf_path) |
|
|
return f"{stat.st_size}_{stat.st_mtime}" |
|
|
except Exception as e: |
|
|
print(f"Error getting file hash: {e}") |
|
|
return "unknown" |
|
|
|
|
|
def extract_text_with_pdfplumber(self, pdf_path: str, max_chars: int = 1000000) -> str: |
|
|
"""Extract text from PDF using pdfplumber (handles complex layouts).""" |
|
|
text = "" |
|
|
char_count = 0 |
|
|
try: |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page_num, page in enumerate(pdf.pages, 1): |
|
|
if char_count >= max_chars: |
|
|
print(f"Text extraction reached maximum chars limit ({max_chars})") |
|
|
break |
|
|
|
|
|
try: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
|
|
|
page_text = page_text[:50000] |
|
|
text += f"\n--- Page {page_num} ---\n{page_text}" |
|
|
char_count += len(page_text) |
|
|
except Exception as e: |
|
|
print(f"Error extracting text from page {page_num}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"Error opening PDF with pdfplumber: {e}") |
|
|
|
|
|
return text |
|
|
|
|
|
def extract_tables_from_pdf(self, pdf_path: str, max_tables: int = 50) -> List[Tuple[int, str]]: |
|
|
"""Extract tables from PDF and return as formatted text.""" |
|
|
tables = [] |
|
|
table_count = 0 |
|
|
try: |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page_num, page in enumerate(pdf.pages, 1): |
|
|
if table_count >= max_tables: |
|
|
print(f"Table extraction reached maximum tables limit ({max_tables})") |
|
|
break |
|
|
|
|
|
try: |
|
|
page_tables = page.extract_tables() |
|
|
if page_tables: |
|
|
for table_idx, table in enumerate(page_tables): |
|
|
|
|
|
table_text = f"TABLE on page {page_num}:\n" |
|
|
for row in table: |
|
|
row_str = " | ".join([str(cell) if cell else "" for cell in row]) |
|
|
|
|
|
if len(row_str) > 1000: |
|
|
row_str = row_str[:1000] + "..." |
|
|
table_text += row_str + "\n" |
|
|
|
|
|
tables.append((page_num, table_text)) |
|
|
table_count += 1 |
|
|
except Exception as e: |
|
|
print(f"Error extracting tables from page {page_num}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"Error opening PDF for table extraction: {e}") |
|
|
|
|
|
return tables |
|
|
|
|
|
def extract_images_from_pdf(self, pdf_path: str, output_dir: str = None, max_images: int = 100) -> List[Tuple[int, str]]: |
|
|
""" |
|
|
Extract images from PDF using PyMuPDF. |
|
|
FIXED: Properly handles xref tuples from get_images() |
|
|
""" |
|
|
if output_dir is None: |
|
|
output_dir = os.path.join(self.extraction_dir, "images") |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
images = [] |
|
|
image_count = 0 |
|
|
|
|
|
try: |
|
|
pdf_name = Path(pdf_path).stem |
|
|
pdf_file = fitz.open(pdf_path) |
|
|
|
|
|
for page_num in range(len(pdf_file)): |
|
|
if image_count >= max_images: |
|
|
print(f"Image extraction reached maximum images limit ({max_images})") |
|
|
break |
|
|
|
|
|
try: |
|
|
page = pdf_file[page_num] |
|
|
pix_list = page.get_images() |
|
|
|
|
|
for image_idx, img_info in enumerate(pix_list): |
|
|
if image_count >= max_images: |
|
|
break |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
xref = img_info[0] |
|
|
|
|
|
|
|
|
base_image = pdf_file.extract_image(xref) |
|
|
|
|
|
if base_image and "image" in base_image: |
|
|
image_bytes = base_image["image"] |
|
|
image_ext = base_image["ext"] |
|
|
|
|
|
image_name = f"{pdf_name}_page{page_num+1}_img{image_idx}.{image_ext}" |
|
|
image_path = os.path.join(output_dir, image_name) |
|
|
|
|
|
with open(image_path, "wb") as f: |
|
|
f.write(image_bytes) |
|
|
|
|
|
images.append((page_num + 1, image_path)) |
|
|
image_count += 1 |
|
|
|
|
|
except TypeError as e: |
|
|
|
|
|
print(f"Error with image data type on page {page_num}, image {image_idx}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"Error extracting image {image_idx} from page {page_num}: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing page {page_num}: {e}") |
|
|
continue |
|
|
|
|
|
pdf_file.close() |
|
|
except Exception as e: |
|
|
print(f"Error opening PDF for image extraction: {e}") |
|
|
|
|
|
return images |
|
|
|
|
|
def process_pdf(self, pdf_path: str) -> Dict: |
|
|
"""Process entire PDF and extract all content.""" |
|
|
file_hash = self._get_file_hash(pdf_path) |
|
|
|
|
|
|
|
|
if pdf_path in self.processed_files and self.processed_files[pdf_path] == file_hash: |
|
|
print(f"File {pdf_path} already processed. Loading cached results.") |
|
|
return self._load_cached_results(pdf_path) |
|
|
|
|
|
print(f"Processing PDF: {pdf_path}") |
|
|
|
|
|
result = { |
|
|
"pdf_path": pdf_path, |
|
|
"filename": Path(pdf_path).name, |
|
|
"text": self.extract_text_with_pdfplumber(pdf_path, max_chars=1000000), |
|
|
"tables": self.extract_tables_from_pdf(pdf_path, max_tables=50), |
|
|
"images": self.extract_images_from_pdf(pdf_path, max_images=100) |
|
|
} |
|
|
|
|
|
|
|
|
self._save_cached_results(pdf_path, result) |
|
|
|
|
|
|
|
|
self.processed_files[pdf_path] = file_hash |
|
|
self._save_processing_state() |
|
|
|
|
|
return result |
|
|
|
|
|
def _save_cached_results(self, pdf_path: str, result: Dict): |
|
|
"""Save extraction results to a JSON file.""" |
|
|
safe_name = Path(pdf_path).stem |
|
|
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json") |
|
|
|
|
|
|
|
|
cache_data = { |
|
|
"pdf_path": result["pdf_path"], |
|
|
"filename": result["filename"], |
|
|
"text": result["text"], |
|
|
"tables": result["tables"], |
|
|
"image_count": len(result["images"]) |
|
|
} |
|
|
|
|
|
try: |
|
|
with open(cache_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(cache_data, f, ensure_ascii=False, indent=2) |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not save cache: {e}") |
|
|
|
|
|
def _load_cached_results(self, pdf_path: str) -> Dict: |
|
|
"""Load cached extraction results.""" |
|
|
safe_name = Path(pdf_path).stem |
|
|
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json") |
|
|
|
|
|
try: |
|
|
with open(cache_file, 'r', encoding='utf-8') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
print(f"Error loading cache: {e}") |
|
|
return {"text": "", "tables": [], "images": []} |
|
|
|
|
|
def process_pdf_directory(self, pdf_dir: str) -> List[Dict]: |
|
|
"""Process all PDFs in a directory.""" |
|
|
results = [] |
|
|
pdf_files = list(Path(pdf_dir).glob("*.pdf")) |
|
|
|
|
|
if not pdf_files: |
|
|
print(f"No PDF files found in {pdf_dir}") |
|
|
return results |
|
|
|
|
|
print(f"Found {len(pdf_files)} PDF files to process") |
|
|
|
|
|
for idx, pdf_file in enumerate(pdf_files, 1): |
|
|
try: |
|
|
print(f"Processing {idx}/{len(pdf_files)}: {pdf_file.name}") |
|
|
result = self.process_pdf(str(pdf_file)) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
print(f"Error processing {pdf_file}: {e}") |
|
|
continue |
|
|
|
|
|
print(f"Completed processing {len(results)} PDFs") |
|
|
return results |