Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import base64 | |
| import hashlib | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple | |
| import pdfplumber | |
| import pymupdf | |
| from PIL import Image | |
| import io | |
| class PDFProcessor: | |
| """Processes PDFs to extract text, tables, and images.""" | |
| def __init__(self, pdf_dir: str = "./pdfs", cache_file: str = ".pdf_cache.json"): | |
| self.pdf_dir = pdf_dir | |
| self.cache_file = cache_file | |
| self.cache = self._load_cache() | |
| os.makedirs(pdf_dir, exist_ok=True) | |
| def _load_cache(self) -> Dict: | |
| """Load processing cache to avoid reprocessing PDFs.""" | |
| if os.path.exists(self.cache_file): | |
| with open(self.cache_file, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def _save_cache(self): | |
| """Save processing cache.""" | |
| with open(self.cache_file, 'w') as f: | |
| json.dump(self.cache, f, indent=2) | |
| def _get_file_hash(self, filepath: str) -> str: | |
| """Generate hash of file for change detection.""" | |
| hash_md5 = hashlib.md5() | |
| with open(filepath, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| def _extract_images_from_page(self, pdf_path: str, page_num: int) -> List[Dict]: | |
| """Extract images from specific page using PyMuPDF.""" | |
| images = [] | |
| try: | |
| doc = pymupdf.open(pdf_path) | |
| # Verify page exists | |
| if page_num >= len(doc): | |
| print(f"⚠️ Page {page_num} does not exist") | |
| doc.close() | |
| return images | |
| page = doc[page_num] | |
| # Get image list - returns list of tuples | |
| image_list = page.get_images() | |
| if not image_list: | |
| doc.close() | |
| return images | |
| print(f"Found {len(image_list)} images on page {page_num}") | |
| # Process each image | |
| for img_index, img_info in enumerate(image_list): | |
| try: | |
| # FIXED: Extract xref from tuple (first element) | |
| xref = img_info[0] | |
| # Validate xref is integer | |
| if not isinstance(xref, int): | |
| print(f"⚠️ Invalid xref type: {type(xref).__name__}") | |
| continue | |
| # Extract image | |
| img_data = doc.extract_image(xref) | |
| if not img_data or "image" not in img_data: | |
| print(f"⚠️ No image data at xref {xref}") | |
| continue | |
| # Encode to base64 | |
| image_bytes = img_data["image"] | |
| img_base64 = base64.b64encode(image_bytes).decode() | |
| images.append({ | |
| "type": "image", | |
| "format": img_data.get("ext", "png"), | |
| "base64": img_base64, | |
| "page": page_num, | |
| "index": img_index, | |
| "xref": xref | |
| }) | |
| print(f"✅ Image {img_index + 1}/{len(image_list)}") | |
| except ValueError as e: | |
| if "bad xref" in str(e).lower(): | |
| print(f"⚠️ Bad xref {xref}: {e}") | |
| else: | |
| print(f"⚠️ Error at xref {xref}: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"⚠️ Error extracting image {img_index}: {e}") | |
| continue | |
| doc.close() | |
| except Exception as e: | |
| print(f"❌ Error in _extract_images_from_page: {e}") | |
| return images | |
| def _extract_tables_from_page(self, pdf_path: str, page_num: int) -> List[Dict]: | |
| """Extract tables from specific page using pdfplumber.""" | |
| tables = [] | |
| try: | |
| with pdfplumber.open(pdf_path) as pdf: | |
| page = pdf.pages[page_num] | |
| extracted_tables = page.extract_tables() | |
| for table_idx, table in enumerate(extracted_tables or []): | |
| # Convert table to markdown format | |
| table_md = self._table_to_markdown(table) | |
| tables.append({ | |
| "type": "table", | |
| "content": table_md, | |
| "page": page_num, | |
| "index": table_idx | |
| }) | |
| except Exception as e: | |
| print(f"Error extracting tables from page {page_num}: {e}") | |
| return tables | |
| def _table_to_markdown(self, table: List[List]) -> str: | |
| """Convert table to markdown format.""" | |
| if not table: | |
| return "" | |
| md = "| " + " | ".join(str(cell or "") for cell in table[0]) + " |\n" | |
| md += "| " + " | ".join(["---"] * len(table[0])) + " |\n" | |
| for row in table[1:]: | |
| md += "| " + " | ".join(str(cell or "") for cell in row) + " |\n" | |
| return md | |
| def extract_pdf_content(self, pdf_path: str) -> Dict: | |
| """ | |
| Extract all content from PDF (text, tables, images). | |
| Uses cache to avoid reprocessing. | |
| """ | |
| pdf_name = os.path.basename(pdf_path) | |
| file_hash = self._get_file_hash(pdf_path) | |
| # Check cache | |
| if pdf_name in self.cache and self.cache[pdf_name].get("hash") == file_hash: | |
| print(f"Using cached data for {pdf_name}") | |
| return self.cache[pdf_name]["content"] | |
| print(f"Processing PDF: {pdf_name}") | |
| content = { | |
| "filename": pdf_name, | |
| "pages": [] | |
| } | |
| try: | |
| # Count pages | |
| with pdfplumber.open(pdf_path) as pdf: | |
| num_pages = len(pdf.pages) | |
| # Process each page | |
| for page_num in range(num_pages): | |
| page_content = { | |
| "page_number": page_num + 1, | |
| "text": "", | |
| "tables": [], | |
| "images": [] | |
| } | |
| # Extract text | |
| with pdfplumber.open(pdf_path) as pdf: | |
| page = pdf.pages[page_num] | |
| page_content["text"] = page.extract_text() or "" | |
| # Extract tables | |
| page_content["tables"] = self._extract_tables_from_page(pdf_path, page_num) | |
| # Extract images | |
| page_content["images"] = self._extract_images_from_page(pdf_path, page_num) | |
| content["pages"].append(page_content) | |
| except Exception as e: | |
| print(f"Error processing {pdf_path}: {e}") | |
| return None | |
| # Cache the result | |
| self.cache[pdf_name] = { | |
| "hash": file_hash, | |
| "content": content | |
| } | |
| self._save_cache() | |
| return content | |
| def process_all_pdfs(self, pdf_dir: str = None) -> List[Dict]: | |
| """Process all PDFs in directory.""" | |
| if pdf_dir is None: | |
| pdf_dir = self.pdf_dir | |
| all_content = [] | |
| pdf_files = list(Path(pdf_dir).glob("*.pdf")) | |
| if not pdf_files: | |
| print(f"No PDF files found in {pdf_dir}") | |
| return all_content | |
| for pdf_file in pdf_files: | |
| content = self.extract_pdf_content(str(pdf_file)) | |
| if content: | |
| all_content.append(content) | |
| return all_content | |
| def prepare_documents_for_embedding(pdf_content: Dict) -> List[Tuple[str, Dict]]: | |
| """ | |
| Prepare extracted PDF content for embedding. | |
| Returns list of (text, metadata) tuples. | |
| """ | |
| documents = [] | |
| for page in pdf_content.get("pages", []): | |
| page_num = page.get("page_number") | |
| filename = pdf_content.get("filename") | |
| # Add text chunks | |
| if page.get("text"): | |
| documents.append(( | |
| page["text"], | |
| { | |
| "type": "text", | |
| "page": page_num, | |
| "filename": filename | |
| } | |
| )) | |
| # Add table summaries | |
| for table in page.get("tables", []): | |
| documents.append(( | |
| f"Table on page {page_num}:\n{table['content']}", | |
| { | |
| "type": "table", | |
| "page": page_num, | |
| "filename": filename | |
| } | |
| )) | |
| # Add image descriptions (we'll get these from OpenAI) | |
| for image in page.get("images", []): | |
| documents.append(( | |
| f"Image on page {page_num}", | |
| { | |
| "type": "image", | |
| "page": page_num, | |
| "filename": filename, | |
| "image_base64": image.get("base64"), | |
| "image_format": image.get("format") | |
| } | |
| )) | |
| return documents |