File size: 10,899 Bytes
8099442 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
import os
import json
from pathlib import Path
from typing import Dict, List, Tuple
import pdfplumber
import fitz # PyMuPDF
from PIL import Image
import io
class PDFParser:
"""Parse PDF documents and extract text, tables, and images."""
def __init__(self, extraction_dir: str = "./pdf_extractions"):
self.extraction_dir = extraction_dir
self.state_file = os.path.join(extraction_dir, "processing_state.json")
os.makedirs(extraction_dir, exist_ok=True)
self.processed_files = self._load_processing_state()
def _load_processing_state(self) -> Dict:
"""Load state of already processed files to avoid re-processing."""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Warning: Could not load processing state: {e}")
return {}
return {}
def _save_processing_state(self):
"""Save processing state to disk."""
try:
with open(self.state_file, 'w') as f:
json.dump(self.processed_files, f, indent=2)
except Exception as e:
print(f"Warning: Could not save processing state: {e}")
def _get_file_hash(self, pdf_path: str) -> str:
"""Generate a simple hash for the file (file size + modification time)."""
try:
stat = os.stat(pdf_path)
return f"{stat.st_size}_{stat.st_mtime}"
except Exception as e:
print(f"Error getting file hash: {e}")
return "unknown"
def extract_text_with_pdfplumber(self, pdf_path: str, max_chars: int = 1000000) -> str:
"""Extract text from PDF using pdfplumber (handles complex layouts)."""
text = ""
char_count = 0
try:
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
if char_count >= max_chars:
print(f"Text extraction reached maximum chars limit ({max_chars})")
break
try:
page_text = page.extract_text()
if page_text:
# Limit per-page text to avoid token explosion
page_text = page_text[:50000]
text += f"\n--- Page {page_num} ---\n{page_text}"
char_count += len(page_text)
except Exception as e:
print(f"Error extracting text from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error opening PDF with pdfplumber: {e}")
return text
def extract_tables_from_pdf(self, pdf_path: str, max_tables: int = 50) -> List[Tuple[int, str]]:
"""Extract tables from PDF and return as formatted text."""
tables = []
table_count = 0
try:
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
if table_count >= max_tables:
print(f"Table extraction reached maximum tables limit ({max_tables})")
break
try:
page_tables = page.extract_tables()
if page_tables:
for table_idx, table in enumerate(page_tables):
# Convert table to text format
table_text = f"TABLE on page {page_num}:\n"
for row in table:
row_str = " | ".join([str(cell) if cell else "" for cell in row])
# Limit row length
if len(row_str) > 1000:
row_str = row_str[:1000] + "..."
table_text += row_str + "\n"
tables.append((page_num, table_text))
table_count += 1
except Exception as e:
print(f"Error extracting tables from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error opening PDF for table extraction: {e}")
return tables
def extract_images_from_pdf(self, pdf_path: str, output_dir: str = None, max_images: int = 100) -> List[Tuple[int, str]]:
"""
Extract images from PDF using PyMuPDF.
FIXED: Properly handles xref tuples from get_images()
"""
if output_dir is None:
output_dir = os.path.join(self.extraction_dir, "images")
os.makedirs(output_dir, exist_ok=True)
images = []
image_count = 0
try:
pdf_name = Path(pdf_path).stem
pdf_file = fitz.open(pdf_path)
for page_num in range(len(pdf_file)):
if image_count >= max_images:
print(f"Image extraction reached maximum images limit ({max_images})")
break
try:
page = pdf_file[page_num]
pix_list = page.get_images()
for image_idx, img_info in enumerate(pix_list):
if image_count >= max_images:
break
try:
# FIXED: Extract xref from tuple properly
# get_images() returns tuples: (xref, smask, width, height, ...)
xref = img_info[0] # Get xref as integer
# Extract image
base_image = pdf_file.extract_image(xref)
if base_image and "image" in base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
image_name = f"{pdf_name}_page{page_num+1}_img{image_idx}.{image_ext}"
image_path = os.path.join(output_dir, image_name)
with open(image_path, "wb") as f:
f.write(image_bytes)
images.append((page_num + 1, image_path))
image_count += 1
except TypeError as e:
# Handle comparison errors with tuple
print(f"Error with image data type on page {page_num}, image {image_idx}: {e}")
continue
except Exception as e:
print(f"Error extracting image {image_idx} from page {page_num}: {e}")
continue
except Exception as e:
print(f"Error processing page {page_num}: {e}")
continue
pdf_file.close()
except Exception as e:
print(f"Error opening PDF for image extraction: {e}")
return images
def process_pdf(self, pdf_path: str) -> Dict:
"""Process entire PDF and extract all content."""
file_hash = self._get_file_hash(pdf_path)
# Check if already processed
if pdf_path in self.processed_files and self.processed_files[pdf_path] == file_hash:
print(f"File {pdf_path} already processed. Loading cached results.")
return self._load_cached_results(pdf_path)
print(f"Processing PDF: {pdf_path}")
result = {
"pdf_path": pdf_path,
"filename": Path(pdf_path).name,
"text": self.extract_text_with_pdfplumber(pdf_path, max_chars=1000000),
"tables": self.extract_tables_from_pdf(pdf_path, max_tables=50),
"images": self.extract_images_from_pdf(pdf_path, max_images=100)
}
# Save results to cache
self._save_cached_results(pdf_path, result)
# Update processing state
self.processed_files[pdf_path] = file_hash
self._save_processing_state()
return result
def _save_cached_results(self, pdf_path: str, result: Dict):
"""Save extraction results to a JSON file."""
safe_name = Path(pdf_path).stem
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json")
# Don't save image paths in cache, just metadata
cache_data = {
"pdf_path": result["pdf_path"],
"filename": result["filename"],
"text": result["text"],
"tables": result["tables"],
"image_count": len(result["images"])
}
try:
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Warning: Could not save cache: {e}")
def _load_cached_results(self, pdf_path: str) -> Dict:
"""Load cached extraction results."""
safe_name = Path(pdf_path).stem
cache_file = os.path.join(self.extraction_dir, f"{safe_name}_cache.json")
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading cache: {e}")
return {"text": "", "tables": [], "images": []}
def process_pdf_directory(self, pdf_dir: str) -> List[Dict]:
"""Process all PDFs in a directory."""
results = []
pdf_files = list(Path(pdf_dir).glob("*.pdf"))
if not pdf_files:
print(f"No PDF files found in {pdf_dir}")
return results
print(f"Found {len(pdf_files)} PDF files to process")
for idx, pdf_file in enumerate(pdf_files, 1):
try:
print(f"Processing {idx}/{len(pdf_files)}: {pdf_file.name}")
result = self.process_pdf(str(pdf_file))
results.append(result)
except Exception as e:
print(f"Error processing {pdf_file}: {e}")
continue
print(f"Completed processing {len(results)} PDFs")
return results |