import os import json from pathlib import Path from typing import Dict, List, Tuple import pdfplumber import fitz # PyMuPDF from PIL import Image import io class PDFParser: """Parse PDF documents and extract text, tables, and images.""" def __init__(self, extraction_dir: str = "./pdf_extractions"): self.extraction_dir = extraction_dir self.state_file = os.path.join(extraction_dir, "processing_state.json") os.makedirs(extraction_dir, exist_ok=True) self.processed_files = self._load_processing_state() def _load_processing_state(self) -> Dict: """Load state of already processed files to avoid re-processing.""" if os.path.exists(self.state_file): try: with open(self.state_file, 'r') as f: return json.load(f) except Exception as e: print(f"Warning: Could not load processing state: {e}") return {} return {} def _save_processing_state(self): """Save processing state to disk.""" try: with open(self.state_file, 'w') as f: json.dump(self.processed_files, f, indent=2) except Exception as e: print(f"Warning: Could not save processing state: {e}") def _get_file_hash(self, pdf_path: str) -> str: """Generate a simple hash for the file (file size + modification time).""" try: stat = os.stat(pdf_path) return f"{stat.st_size}_{stat.st_mtime}" except Exception as e: print(f"Error getting file hash: {e}") return "unknown" def extract_text_with_pdfplumber(self, pdf_path: str, max_chars: int = 1000000) -> str: """Extract text from PDF using pdfplumber (handles complex layouts).""" text = "" char_count = 0 try: with pdfplumber.open(pdf_path) as pdf: for page_num, page in enumerate(pdf.pages, 1): if char_count >= max_chars: print(f"Text extraction reached maximum chars limit ({max_chars})") break try: page_text = page.extract_text() if page_text: # Limit per-page text to avoid token explosion page_text = page_text[:50000] text += f"\n--- Page {page_num} ---\n{page_text}" char_count += len(page_text) except Exception as e: print(f"Error extracting text from page {page_num}: {e}") continue except Exception as e: print(f"Error opening PDF with pdfplumber: {e}") return text def extract_tables_from_pdf(self, pdf_path: str, max_tables: int = 50) -> List[Tuple[int, str]]: """Extract tables from PDF and return as formatted text.""" tables = [] table_count = 0 try: with pdfplumber.open(pdf_path) as pdf: for page_num, page in enumerate(pdf.pages, 1): if table_count >= max_tables: print(f"Table extraction reached maximum tables limit ({max_tables})") break try: page_tables = page.extract_tables() if page_tables: for table_idx, table in enumerate(page_tables): # Convert table to text format table_text = f"TABLE on page {page_num}:\n" for row in table: row_str = " | ".join([str(cell) if cell else "" for cell in row]) # Limit row length if len(row_str) > 1000: row_str = row_str[:1000] + "..." table_text += row_str + "\n" tables.append((page_num, table_text)) table_count += 1 except Exception as e: print(f"Error extracting tables from page {page_num}: {e}") continue except Exception as e: print(f"Error opening PDF for table extraction: {e}") return tables def extract_images_from_pdf(self, pdf_path: str, output_dir: str = None, max_images: int = 100) -> List[Tuple[int, str]]: """ Extract images from PDF using PyMuPDF. FIXED: Properly handles xref tuples from get_images() """ if output_dir is None: output_dir = os.path.join(self.extraction_dir, "images") os.makedirs(output_dir, exist_ok=True) images = [] image_count = 0 try: pdf_name = Path(pdf_path).stem pdf_file = fitz.open(pdf_path) for page_num in range(len(pdf_file)): if image_count >= max_images: print(f"Image extraction reached maximum images limit ({max_images})") break try: page = pdf_file[page_num] pix_list = page.get_images() for image_idx, img_info in enumerate(pix_list): if image_count >= max_images: break try: # FIXED: Extract xref from tuple properly # get_images() returns tuples: (xref, smask, width, height, ...) xref = img_info[0] # Get xref as integer # Extract image base_image = pdf_file.extract_image(xref) if base_image and "image" in base_image: image_bytes = base_image["image"] image_ext = base_image["ext"] image_name = f"{pdf_name}_page{page_num+1}_img{image_idx}.{image_ext}" image_path = os.path.join(output_dir, image_name) with open(image_path, "wb") as f: f.write(image_bytes) images.append((page_num + 1, image_path)) image_count += 1 except TypeError as e: # Handle comparison errors with tuple print(f"Error with image data type on page {page_num}, image {image_idx}: {e}") continue except Exception as e: print(f"Error extracting image {image_idx} from page {page_num}: {e}") continue except Exception as e: print(f"Error processing page {page_num}: {e}") continue pdf_file.close() except Exception as e: print(f"Error opening PDF for image extraction: {e}") return images def process_pdf(self, pdf_path: str) -> Dict: """Process entire PDF and extract all content.""" file_hash = self._get_file_hash(pdf_path) # Check if already processed if pdf_path in self.processed_files and self.processed_files[pdf_path] == file_hash: print(f"File {pdf_path} already processed. Loading cached results.") return self._load_cached_results(pdf_path) print(f"Processing PDF: {pdf_path}") result = { "pdf_path": pdf_path, "filename": Path(pdf_path).name, "text": self.extract_text_with_pdfplumber(pdf_path, max_chars=1000000), "tables": self.extract_tables_from_pdf(pdf_path, max_tables=50), "images": self.extract_images_from_pdf(pdf_path, max_images=100) } # Save results to cache self._save_cached_results(pdf_path, result) # Update processing state self.processed_files[pdf_path] = file_hash self._save_processing_state() return result def _save_cached_results(self, pdf_path: str, result: Dict): """Save extraction results to a JSON file.""" safe_name = Path(pdf_path).stem cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json") # Don't save image paths in cache, just metadata cache_data = { "pdf_path": result["pdf_path"], "filename": result["filename"], "text": result["text"], "tables": result["tables"], "image_count": len(result["images"]) } try: with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Warning: Could not save cache: {e}") def _load_cached_results(self, pdf_path: str) -> Dict: """Load cached extraction results.""" safe_name = Path(pdf_path).stem cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json") try: with open(cache_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading cache: {e}") return {"text": "", "tables": [], "images": []} def process_pdf_directory(self, pdf_dir: str) -> List[Dict]: """Process all PDFs in a directory.""" results = [] pdf_files = list(Path(pdf_dir).glob("*.pdf")) if not pdf_files: print(f"No PDF files found in {pdf_dir}") return results print(f"Found {len(pdf_files)} PDF files to process") for idx, pdf_file in enumerate(pdf_files, 1): try: print(f"Processing {idx}/{len(pdf_files)}: {pdf_file.name}") result = self.process_pdf(str(pdf_file)) results.append(result) except Exception as e: print(f"Error processing {pdf_file}: {e}") continue print(f"Completed processing {len(results)} PDFs") return results